Inteligencia Artificial. Problema jarras de agua resuelto en Python

Hay un problema bastante conocido que puede resolverse mediante técnicas de IA, este es el problema de las jarras de agua. Es tan famoso que hasta en la película La Jungla de cristal tienen que resolverlo para impedir que una bomba explote.

El problema

Se tienen dos jarras, una con capacidad de 5 litros y otra con 3. Ninguna de ellas tiene marcas de medición. Tenemos un grifo que permite llenar las jarras de agua.

La solución consiste en conseguir que en la jarra de 5 litros hayan exactamente 4 litros de agua, y en la de 3 litros hayan 3.

Acciones posibles

Estas serían las acciones que podríamos ejecutar sobre las jarras de agua:

  • Llenar la grande
  • Llenar la pequeña
  • Vaciar la grande
  • Vaciar la pequeña
  • Traspasar grande a pequeña
  • Traspasar pequeña a grande

Solución

Este problema puede resolverse de forma directa con un árbol de búsqueda no informada. En primer lugar lo resolveremos usando Python con un algoritmo de búsqueda en amplitud y después con un algoritmo de búsqueda en profundidad para ver si hay diferencias.

Se mantienen dos listas, una para los nodos por visitar y otra para los nodos visitados, de esta forma, evitamos que el algoritmo caiga en ciclos porque la solución podría no encontrarse nunca.

Para los dos algoritmos, usaremos una clase llamada Nodo

class Nodo:
    def __init__(self, datos, hijos=None) -> None:
        self.datos = datos
        self.hijos = None
        self.padre = None
        self.coste = None
        self.set_hijos(hijos)

    def set_hijos(self, hijos) -> None:
        self.hijos = hijos

        if self.hijos != None:
            for h in self.hijos:
                h.padre = self

    def get_hijos(self):
        return self.hijos

    def get_padre(self):
        return self.padre

    def set_padre(self, padre) -> None:
        self.padre = padre

    def set_datos(self, datos) -> None:
        self.datos = datos

    def get_datos(self):
        return self.datos

    def set_coste(self, coste) -> None:
        self.coste = coste

    def get_coste(self):
        return self.coste

    def igual(self, nodo) -> bool:
        if self.get_datos() == nodo.get_datos():
            return True

        return False

    def en_lista(self, lista_nodos) -> bool:
        en_la_lista = False

        for n in lista_nodos:
            if self.igual(n):
                en_la_lista = True

        return en_la_lista

    def __str__(self) -> str:
        return str(self.get_datos())

Búsqueda en amplitud

La búsqueda en amplitud recorre el árbol por niveles siguiendo estos pasos:

  • Primero cogemos el nodo raíz y lo almacenamos en una lista de nodos frontera (nodos por visitar).
  • Cogemos un nodo de la lista de nodos frontera y miramos si es el objetivo, si lo es, hemos terminado y almacenaremos el nodo en la lista nodos visitados.
  • Seguidamente se visitan todos sus hijos del nodo seleccionado aplicando las operaciones definidas. Para cada hijo comprobamos que no se haya visitado en la lista de nodos visitados, si no lo está, lo añadimos a la lista de nodos frontera.
  • Volvemos a seleccionar un nodo de la lista de nodos frontera y repetimos el proceso hasta que encontremos el nodo meta.

Este algoritmo suele implementarse usando una cola FIFO (Primero en entrar, primero en salir), por eso, en el código fuente vemos que extraemos el primer elemento de la lista de nodos frontera usando nodos_frontera.pop(0).

Este sería el árbol que nos da el resultado:

El código fuente de este algoritmo es el siguiente:

 import Nodo


def buscar_solucion_BFS(estado_inicial, solucion):
    solucionado = False
    nodos_visitados = []
    nodos_frontera = []

    nodoInicial = Nodo(estado_inicial)
    nodos_frontera.append(nodoInicial)

    while (not solucionado) and len(nodos_frontera) != 0:
        nodo = nodos_frontera.pop(0)  # Extraer nodo a visitar
        nodos_visitados.append(nodo)  # Añadir a nodos visitados

        if nodo.get_datos() == solucion:
            solucionado = True

            # Devuelvo el último nodo (meta) que tiene un padre, que a su vez tiene otro padre etc y ese sería el camino a la solución
            return nodo
        else:  # Expandir hijos
            dato_nodo = nodo.get_datos()

            # Llenar grande
            hijo = [5, dato_nodo[1]]

            hijo_1 = Nodo(hijo)

            if not hijo_1.en_lista(nodos_visitados) and not hijo_1.en_lista(nodos_frontera):
                nodos_frontera.append(hijo_1)

            # Llenar pequeña
            hijo = [dato_nodo[0], 3]

            hijo_2 = Nodo(hijo)

            if not hijo_2.en_lista(nodos_visitados) and not hijo_2.en_lista(nodos_frontera):
                nodos_frontera.append(hijo_2)

            # Vaciar grande
            hijo = [0, dato_nodo[1]]

            hijo_3 = Nodo(hijo)

            if not hijo_3.en_lista(nodos_visitados) and not hijo_3.en_lista(nodos_frontera):
                nodos_frontera.append(hijo_3)

            # Vaciar pequeña
            hijo = [dato_nodo[0], 0]

            hijo_4 = Nodo(hijo)

            if not hijo_4.en_lista(nodos_visitados) and not hijo_4.en_lista(nodos_frontera):
                nodos_frontera.append(hijo_4)

            # Traspasar grande-pequeña
            actual_grande = dato_nodo[0]
            actual_peque = dato_nodo[1]
            fin_grande = actual_grande
            fin_peque = actual_peque

            puedo_traspasar = 3 - actual_peque

            if puedo_traspasar > 0:
                if actual_grande <= puedo_traspasar:                    
                    fin_peque = actual_peque + fin_grande
                    fin_grande = 0

                    if fin_peque > 3:
                        fin_peque = 3
                else:
                    fin_peque = puedo_traspasar + actual_peque
                    fin_grande = actual_grande - puedo_traspasar

            hijo = [fin_grande, fin_peque]

            hijo_5 = Nodo(hijo)

            if not hijo_5.en_lista(nodos_visitados) and not hijo_5.en_lista(nodos_frontera):
                nodos_frontera.append(hijo_5)

            # Traspasar pequeña-grande
            actual_grande = dato_nodo[0]
            actual_peque = dato_nodo[1]
            fin_grande = actual_grande
            fin_peque = actual_peque

            puedo_traspasar = 5 - actual_grande

            if puedo_traspasar > 0:
                if actual_peque <= puedo_traspasar:                    
                    fin_grande = actual_grande + fin_peque
                    fin_peque = 0

                    if fin_grande > 5:
                        fin_grande = 5
                else:
                    fin_grande = puedo_traspasar + actual_grande
                    fin_peque = actual_peque - puedo_traspasar

            hijo = [fin_grande, fin_peque]

            hijo_6 = Nodo(hijo)

            if not hijo_6.en_lista(nodos_visitados) and not hijo_6.en_lista(nodos_frontera):
                nodos_frontera.append(hijo_6)

            nodo.set_hijos([hijo_1, hijo_2, hijo_3, hijo_4, hijo_5, hijo_6])

Vemos la función buscar_solucion_BFS que se encarga de expandir el árbol usando las acciones permitidas y de mantener la lista de nodos frontera y de nodos visitados.

Por último, tenemos la función main que recoge el nodo solución que devolvió la función buscar_solucion_BFS y crea una lista de todos los padres de ese nodo, para al final, hacer un print con el camino resultante:

if __name__ == '__main__':
    estado_inicial = [0, 0]
    solucion = [4, 3]

    nodo_solucion = buscar_solucion_BFS(estado_inicial, solucion)

    # Creo una lista de todos los padres del nodo solución
    resultado = []
    nodo = nodo_solucion

    while nodo.get_padre() != None:
        resultado.append(nodo.get_datos())
        nodo = nodo.get_padre()

    resultado.append(estado_inicial)
    resultado.reverse()

    print(resultado)

El print del resultado nos mostraría lo siguiente:

[[0, 0], [5, 0], [2, 3], [2, 0], [0, 2], [5, 2], [4, 3]]

Como vemos, coincide con el camino encontrado en la imagen del árbol.

Búsqueda en profundidad

La búsqueda en profundidad es algo diferente a la búsqueda en amplitud. En lugar de ir visitando todos los nodos del mismo nivel, va descendiendo por una rama hasta la profundidad máxima, cuando llega al nodo más profundo, continúa con la siguiente rama.

Este algoritmo suele implementarse usando una cola LIFO (Último en entrar, primero en salir), por eso, en el código fuente vemos que extraemos el último elemento de la lista de nodos frontera usando nodos_frontera.pop().

Este sería el árbol que nos da el resultado:

En el código fuente de este algoritmo tenemos la función buscar_solucion_DFS que se encarga de expandir el árbol usando las acciones permitidas y de mantener la lista de nodos frontera y de nodos visitados. La diferencia con la función buscar_solucion_BFS del otro algoritmo es la forma en la que extraemos los nodos de la lista de nodos frontera:

from arbol import Nodo


def buscar_solucion_DFS(estado_inicial, solucion):
    solucionado = False
    nodos_visitados = []
    nodos_frontera = []

    nodoInicial = Nodo(estado_inicial)
    nodos_frontera.append(nodoInicial)

    while (not solucionado) and len(nodos_frontera) != 0:
        nodo = nodos_frontera.pop()  # Extraer nodo a visitar
        nodos_visitados.append(nodo)  # Añadir a nodos visitados

        if nodo.get_datos() == solucion:
            solucionado = True

            # Devuelvo el último nodo (meta) que tiene un padre, que a su vez tiene otro padre etc y ese sería el camino a la solución
            return nodo
        else:  # Expandir hijos
            dato_nodo = nodo.get_datos()

            # Llenar grande
            hijo = [5, dato_nodo[1]]

            hijo_1 = Nodo(hijo)

            if not hijo_1.en_lista(nodos_visitados) and not hijo_1.en_lista(nodos_frontera):
                nodos_frontera.append(hijo_1)

            # Llenar pequeña
            hijo = [dato_nodo[0], 3]

            hijo_2 = Nodo(hijo)

            if not hijo_2.en_lista(nodos_visitados) and not hijo_2.en_lista(nodos_frontera):
                nodos_frontera.append(hijo_2)

            # Vaciar grande
            hijo = [0, dato_nodo[1]]

            hijo_3 = Nodo(hijo)

            if not hijo_3.en_lista(nodos_visitados) and not hijo_3.en_lista(nodos_frontera):
                nodos_frontera.append(hijo_3)

            # Vaciar pequeña
            hijo = [dato_nodo[0], 0]

            hijo_4 = Nodo(hijo)

            if not hijo_4.en_lista(nodos_visitados) and not hijo_4.en_lista(nodos_frontera):
                nodos_frontera.append(hijo_4)

            # Traspasar grande-pequeña
            actual_grande = dato_nodo[0]
            actual_peque = dato_nodo[1]
            fin_grande = actual_grande
            fin_peque = actual_peque

            puedo_traspasar = 3 - actual_peque

            if puedo_traspasar > 0:
                if actual_grande <= puedo_traspasar:                    
                    fin_peque = actual_peque + fin_grande
                    fin_grande = 0

                    if fin_peque > 3:
                        fin_peque = 3
                else:
                    fin_peque = puedo_traspasar + actual_peque
                    fin_grande = actual_grande - puedo_traspasar

            hijo = [fin_grande, fin_peque]

            hijo_5 = Nodo(hijo)

            if not hijo_5.en_lista(nodos_visitados) and not hijo_5.en_lista(nodos_frontera):
                nodos_frontera.append(hijo_5)

            # Traspasar pequeña-grande
            actual_grande = dato_nodo[0]
            actual_peque = dato_nodo[1]
            fin_grande = actual_grande
            fin_peque = actual_peque

            puedo_traspasar = 5 - actual_grande

            if puedo_traspasar > 0:
                if actual_peque <= puedo_traspasar:                    
                    fin_grande = actual_grande + fin_peque
                    fin_peque = 0

                    if fin_grande > 5:
                        fin_grande = 5
                else:
                    fin_grande = puedo_traspasar + actual_grande
                    fin_peque = actual_peque - puedo_traspasar

            hijo = [fin_grande, fin_peque]

            hijo_6 = Nodo(hijo)

            if not hijo_6.en_lista(nodos_visitados) and not hijo_6.en_lista(nodos_frontera):
                nodos_frontera.append(hijo_6)

            nodo.set_hijos([hijo_1, hijo_2, hijo_3, hijo_4, hijo_5, hijo_6])

Por último, tenemos la función main que recoge el nodo solución que devolvió la función buscar_solucion_DFS y crea una lista de todos los padres de ese nodo, para al final, hacer un print con el camino resultante:

if __name__ == '__main__':
    estado_inicial = [0, 0]
    solucion = [4, 3]

    nodo_solucion = buscar_solucion_DFS(estado_inicial, solucion)

    # Creo una lista de todos los padres del nodo solución
    resultado = []
    nodo = nodo_solucion

    while nodo.get_padre() != None:
        resultado.append(nodo.get_datos())
        nodo = nodo.get_padre()

    resultado.append(estado_inicial)
    resultado.reverse()

    print(resultado)

El print del resultado nos mostraría lo siguiente:

[[0, 0], [0, 3], [3, 0], [3, 3], [5, 1], [0, 1], [1, 0], [1, 3], [4, 0], [4, 3]]

Como vemos, coincide con el camino encontrado en la imagen del árbol, además, el resultado es visiblemente peor que el obtenido con la búsqueda en amplitud, lo que indica que la solución encontrada no es óptima.

Puedes ver el código completo en este repositorio de GitHub

Aprendiendo FastAPI con PostgreSQL

FastAPI

Sirva este artículo como tutorial básico para trabajar con FastAPI, He decidido usar este framework/ORM para un side project que estoy desarrollando y que trata temas como Kafka, Python, microservicios, webscrapping.

El artículo es un resumen en español de la documentación oficial de FastAPI (que está realmente bien detallada), pero orientado a usar una base de datos Postgre, en mi opinión la parte que le falta a la documentación. Por eso hago incapié en el tema de Alembic para la ejecución de las migraciones en la base de datos Postgre.

Aquí la Documentación oficial.

Las librerías necesarias para trabajar con FastAPI y Postgre son las siguientes:

alembic==1.4.3
click==7.1.2
fastapi==0.63.0
flake8==3.8.4
h11==0.11.0
kafka-python==2.0.2
Mako==1.1.3
MarkupSafe==1.1.1
mccabe==0.6.1
psycopg2==2.8.6
pycodestyle==2.6.0
pydantic==1.7.3
pydocstyle==5.1.1
pyflakes==2.2.0
pylama==7.7.1
python-dateutil==2.8.1
python-editor==1.0.4
six==1.15.0
snowballstemmer==2.0.0
SQLAlchemy==1.3.22
starlette==0.13.6
typing==3.7.4.3
uvicorn==0.13.2

NOTA: Puede que al instalar psycopg2 nos de un error, eso es porque falta instalar a nivel de sistema (no en el entorno virtual) libpq-dev. Para instalar:

sudo apt-get install libpq-dev

Estructura de un proyecto FastAPI

Tenemos la carpeta principal del proyecto y generamos una para todo lo relacionado con FastAPI (la llamamos fastapi_app), dentro de esa carpeta estarán los modelos etc:

carpeta_proyecto
	alembic
	fastapi_app
		__init__.py
		base.py
		crud.py
		database.py
		main.py
		models.py
		schemas.py
	env
	.gitignore
	README.md
	LICENSE.md
	requirements.txt
	alembic.ini

Detalle de cada fichero

A continuación describimos para qué sirve cada uno de los ficheros.

init.py

Este fichero estará vacío

base.py

Este fichero es necesario para ejecutar las migraciones automáticamente, en él solo tendremos que hacer un import de todos nuestros modelos:

from fastapi_app.models import *

Después, dentro de alembic indicaremos que este fichero es el que tiene los metadatos de los modelos.

database.py

Aquí estableceremos la conexión con la base de datos. Ejemplo de fichero:

"""coding=utf-8."""

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "postgresql://postgres:pass@host:port/db_name"

engine = create_engine(
    SQLALCHEMY_DATABASE_URL
)

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

Si usamos mysql, en el create_engine hay que añadir un parámetro más, sería así:

engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)


models.py

Aquí pondremos nuestros modelos de base de datos. Este sería un ejemplo de este fichero:

"""coding=utf-8."""

from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from .database import Base


class User(Base):
    """User Class contains standard information for a User."""

    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True)
    hashed_password = Column(String)
    is_active = Column(Boolean, default=True)

schemas.py

Aquí estarán los modelos de Pydantic. Digamos que estarán las clases con los atributos que serán necesarios para crear o leer cuando se haga un GET. También se indica qué se devolverá cuando se cree un nuevo registro.

Ojo que algunos de los métodos pueden (y deben) tener herencia de otros.

Ejemplo de fichero:

"""coding=utf-8."""

from typing import List, Optional
from pydantic import BaseModel

class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int
    is_active: bool

    class Config:
        orm_mode = True

crud.py

Aquí estarán los métodos que serán visibles desde la API, usará las clases Pydantic definidas en el fichero schemas.py para saber qué atributos son necesarios dependiendo de qué método se invoque. Ejemplo:

"""coding=utf-8."""

from sqlalchemy.orm import Session

from . import models, schemas


def get_user(db: Session, user_id: int):
    return db.query(models.User).filter(models.User.id == user_id).first()


def get_user_by_email(db: Session, email: str):
    return db.query(models.User).filter(models.User.email == email).first()


def get_users(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.User).offset(skip).limit(limit).all()


def create_user(db: Session, user: schemas.UserCreate):
    fake_hashed_password = user.password + "notreallyhashed"
    db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user

main.py

Integrar las partes que hemos comentado anteriormente. Al inicio del fichero, se pone una instrucción que crea los modelos en la base de datos, pero una vez creados, si añadimos o modificamos algún campo, esta instrucción no actualiza la tabla, de ahí el uso de alembic.

Ejemplo:

"""coding=utf-8."""

from typing import List

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .database import SessionLocal, engine

models.Base.metadata.create_all(bind=engine)

app = FastAPI()


# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user

Alembic

Usamos Alembic para ejecutar las migraciones (cambios) en la base de datos. Alembic también tiene una estructura de carpetas propia, y en la raíz deja un fichero llamado alembic.ini para la configuración. La estructura de carpetas se genera sola inicializando alembic con el comando (ejecutar con el entorno virtual activado):

alembic init alembic

Configuración de Alembic

De momento, lo único que nos interesa de la configuración de alembic es la cadena de conexión a la base de datos, para configurarla modificamos esta línea del fichero:

sqlalchemy.url = postgresql://pass@host:port/db_name

Migraciones automáticas

Existe una forma de meter los cambios uno a uno a mano en la base de datos, pero lo más cómodo es hacer que los cambios se detecten automáticamente, para ello, anteriormente generamos dentro de la app de FastAPI un fichero con un import de todos los modelos de la base de datos, el fichero base.py.

Para que alembic tenga en cuenta los modelos, tenemos que ir al fichero env.py en el directorio de alembic e indicárselo:

import os,sys,inspect

current_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
parent_dir = os.path.dirname(current_dir)
sys.path.insert(0, parent_dir) 

from atlas_app.base import Base

target_metadata = Base.metadata

Generar las migraciones

Para generar las migraciones, con el entorno virtual activo ejecutamos este comando:

alembic revision --autogenerate -m "Added user table"

una vez hecho esto, hacemos que las migraciones persistan en la base de datos con el siguiente comando:

alembic upgrade head

Ejecutar la aplicación

Una vez generados los modelos, configurado cadenas de conexión etc, nuestra aplicación está lista para ejecutarse. Con el entorno virtual activo ejecutamos:

uvicorn atlas_app.main:app --reload

En la terminal nos aparecerá la url de la aplicación que por defecto será:

http://127.0.0.1:8000

si queremos acceder al Swagger que se monta automáticamente donde se listan los métodos de la API y pueden probarse, basta con dirigirnos a la url:

http://127.0.0.1:8000/docs

Este es el repositorio de GitHub del side project que he comentado antes. Donde estoy usando las herramientas que quiero aprender, como Kafka, FastAPI etc.

Conjetura de Collatz en Python. Recursivo vs iterativo

La conjetura de Collatz fue enunciada por el matemático Lothar Collatz en 1937 y todavía no ha sido demostrada. Esta conjetura dice que dado cualquier número entero positivo, aplicando estas dos reglas, acaba siendo siempre 1.

  • Si el número es par, se divide entre 2
  • Si el número es impar, se multiplica por 3 y se suma 1

Ejemplo partiendo de 6:

  • 6 es par, dividimos entre 2 = 3
  • 3 es impar, multiplicamos por 3 y sumamos 1 = 10
  • 10 es par, dividimos entre 2 = 5
  • 5 es impar, multiplicamos por 3 y sumamos 1 = 16
  • 16 es par, dividimos entre 2 = 8
  • 8 es par, dividimos entre 2 = 4
  • 4 es par, dividimos entre 2 = 2
  • 2 es par, dividimos entre 2 = 1

En la Wikipedia hay más información sobre la conjetura de Collatz.

Vamos a implementar un algoritmo con la conjetura de Collatz de dos formas distintas, una de forma recursiva y otra de forma iterativa.

Diferencias entre recursividad e iteración

La iteración consiste en repetir una y otra vez una o varias sentencias usando bucles for, while etc. La iteración es la más común a día de hoy, pero va perdiendo terreno a causa del auge de la programación funcional.

Hablamos de recursividad cuando una función se llama a sí misma una y otra vez. Es más elegante que la iterativa porque se usa menos código y es más fácil leerlo, aunque si no estamos acostumbrados, dificulta un poco la implementación de un método recursivo. Pero tenemos que tener en cuenta la siguiente frase:

Code is read many more times than written

El problema de la recursividad es que afecta al rendimiento, ya que al llamarse una función a sí misma, se mantienen los registros de las funciones en la memoria RAM y puede llegar a ocuparla por completo.

Algoritmo recursivo

Esta sería la implementación de la conjetura de Collatz de forma recursiva:

def calculaCollatz(pNumero):
    print('Numero actual: {0}'.format(pNumero))

    if pNumero != 1:
        if pNumero % 2 == 0:
            pNumero = pNumero // 2
        else:
            pNumero = pNumero * 3 + 1

        calculaCollatz(pNumero)

Algoritmo iterativo

Y esta de forma iterativa:

def calculaCollatzIterativo(pNumero):
    while pNumero > 1:
        print('Numero actual: {0}'.format(pNumero))

        if pNumero % 2 == 0:
            pNumero = pNumero // 2

        else:
            pNumero = pNumero * 3 + 1
    else:
        print('Numero actual: {0}'.format(pNumero))

Pruebas y resultados

Para hacer las pruebas con números muy muy grandes para que haya una diferencia real en los tiempos de cálculo he tenido que cambiar el nivel máximo de recursión de Python que por defecto es 999. Para cambiarlo, hacemos uso de la librería sys:

import sys
sys.setrecursionlimit(100000000)

Estos son los tiempos obtenidos tras varias ejecuciones:

Tiempo recursivo: 0.7219452857971191
Tiempo iterativo: 0.6483826637268066

Tiempo recursivo: 0.5863070487976074
Tiempo iterativo: 0.5835487842559814

Tiempo recursivo: 0.5863070487976074
Tiempo iterativo: 0.5835487842559814

Tiempo recursivo: 0.6134541034698486
Tiempo iterativo: 0.5413932800292969

Como vemos, la diferencia no es muy bestia, pero en un proceso de menos de un segundo ya es palpable. Para procesos muy costosos, la recursividad no es buena opción, ya que ocuparíamos toda la memoria y podríamos no obtener un resultado.

¿Por qué 0.1 + 0.2 – 0.3 no es 0 en Python?

Si nos preguntasen cuál sería el resultado de esta operación al ejecutarla en el intérprete de Python, pensaríamos sin duda alguna el resultado es 0 o 0.0, pero muy lejos de eso, el intérprete nos respondería que el resultado es 5.551115123125783e-17 ¿Por qué pasa esto?

No es un fallo en la matriz, el resultado es un número muy muy pequeño, pero sigue sin ser 0. El problema viene por la forma en la que se representan los números de punto flotante en el ordenador, ya que se hace como fracciones en base 2 (binarias).

Muchas fracciones decimales no pueden ser representadas en base 2, es lo que pasa con el 0.1. Por ejemplo, si sumamos 0.2 + 0.2 usando el intérprete de python el resultado sería 0.4:

pero si sumamos 0.1 + 0.2 el resultado no es 0.3, si no 0.30000000000000004

Viendo esto, ya sabemos que 0.1 + 0.2 – 0.3 no será 0:

¿Cómo podemos hacer que sea 0?

Tenemos que hacer uso de la librería decimal y cuidado porque tenemos que darle los números como strings.

from decimal import Decimal

print(
     Decimal('0.1') +
     Decimal('0.2') -
     Decimal('0.3')
)

El resultado usando Decimal sí sería 0.0.

Instalar OneDrive (Microsoft) en Linux Mint y Ubuntu

Siempre fui usuario de Linux, en concreto de Ubuntu, cuando empecé a trabajar de programador me compré un Mac y fui usuario de Mac OS durante años, hasta enero de este año. Tenía que renovar el portátil y ya no estaba dispuesto a pagar más por menos, así que me compré un portátil con Windows, llevo ya un par de años trabajando en remoto y en esta última empresa no me dieron portátil, así que trabajo con el mío personal y mi trabajo remunerado requiere de SO Windows (programador Dynamics 365 Business Central).

Pero en mi tiempo libre sigo estudiando y aprendiendo Python, así que decidí instalar Linux Mint en el otro disco duro del portátil. La operación fue un éxito y estoy encantado con el cambio, también me ayuda a desconectar del trabajo porque son dos entornos totalmente distintos.

El caso es que uso OneDrive para tener mis archivos disponibles en cualquier parte y en Linux no existe app de sincronización oficial, así que me puse a investigar y encontré en Github este proyecto que te permite tener en Linux tus archivos de OneDrive sincronizados.

He visto en muchos tutoriales en los que usando el apt-get install funciona correctamente, pero no ha sido mi caso, así que describo aquí los pasos que he seguido para que me funcione correctamente, siguiendo más o menos las instrucciones del repositorio GitHub. Instrucciones de instalación. Instrucciones de uso.

Dependencias necesarias antes de instalar OneDrive

He instalado estas referencias (alguna no las tenía):

sudo apt install build-essential
sudo apt install libcurl4-openssl-dev
sudo apt install libsqlite3-dev
sudo apt install pkg-config
sudo apt install git
sudo apt install curl
sudo apt install libnotify-dev

Luego he instalado el compilador DMD (después de instalar hay que activar el entorno):

curl -fsS https://dlang.org/install.sh | bash -s dmd

activamos el entorno ojo con la versión, ya que pueden haberla actualizado y la carpeta puede llamarse distinto a 2.094.1:

source ~/dlang/dmd-2.094.1/activate

Instalar OneDrive

Descargamos, compilamos e instalamos la herramienta:

git clone https://github.com/abraunegg/onedrive.git
cd onedrive
./configure
make clean; make;
sudo make install

Configuración

Una vez instalada, tenemos que asociarla a nuestra cuenta de Office 365, para ello, escribimos en la terminal:

onedrive

en la terminal aparecerá un enlace que tendremos que copiar y abrir en el navegador, o hacer click directamente en la terminal presionando la tecla Control+click de ratón.

Ahora tendremos que hacer login en nuestra cuenta y conceder permisos a la aplicación:

Si miramos la terminal, se ha quedado esperando a que pongamos una url, cuando nos hayamos logueado en la página de Microsoft, acabaremos en una web en blanco, copiamos la url y la pegamos en la terminal, si todo ha ido bien, nos aparecerá un mensaje confirmándolo.

Ejemplo para hacer el login:

[user@hostname ~]$ onedrive 
Authorize this app visiting:

https://login.microsoftonline.com/common/oauth2/v2.0/authorize?client_id=22c49a0d-d21c-4792-aed1-8f163c982546&scope=Files.ReadWrite%20Files.ReadWrite.all%20Sites.ReadWrite.All%20offline_access&response_type=code&redirect_uri=https://login.microsoftonline.com/common/oauth2/nativeclient

Enter the response uri: https://login.microsoftonline.com/common/oauth2/nativeclient?code=<redacted>

Application has been successfully authorised, however no additional command switches were provided.

Please use --help for further assistance in regards to running this application.

Uso

Antes de hacer la primera sincronización, comprobamos que los parámetros de configuración son los correctos (ruta de la carpeta OneDrive etc):

onedrive --display-config

Lanzamos la sincronización:

onedrive --synchronize

Si todo ha ido bien, veremos ya como se van descargando archivos a nuestra carpeta OneDrive.

Cambio en la configuración

Siempre podemos hacer cambios en la configuración, en mi caso, tengo activa la sincronización automática de las fotos del móvil y esta carpeta no quiero que esté actualizada en mi pc por el gran espacio que ocupa, este y otros parámetros se pueden cambiar en el propio fichero de configuración que se encuentra en: /home/usuario/onedrive/config

Por ejemplo, si no quisiésemos sincronización en la carpeta que yo he nombrado, había que cambiar esta línea:

skip_dir = "Álbum de cámara"

Esta línea puede añadirse tantas veces como se quiera si tenemos más de una carpeta que no queremos sincronizar.

Por último, cada vez que hagamos un cambio en la configuración, tenemos que ejecutar la sincronización con un parámetro más:

onedrive --synchronize --resync

Descargar vídeo o audio de Youtube usando Python

Esta aplicación hecha en PYthon permite descargar vídeos o audio de Youtube seleccionando la calidad. También se puede descargar solo el audio de un vídeo y puede hacerse link a link o en bucle usando un fichero.

Funcionamiento 🔧

Podemos descargar los vídeos uno a uno usando su link a Youtube o poner las urls en un fichero de texto y que la aplicación los descargue en bucle.

Al iniciar la aplicación nos pide que seleccionemos una de las dos opciones:

Descarga un solo link

Nos pedirá la url del vídeo:

Ahora tendremos que seleccionar una de estas tres opciones:

  • Descarga rápida de vídeo y audio: descarga el vídeo junto con el audio en baja calidad
  • Descargar vídeo seleccionando la calidad: nos mostrará las calidades disponibles en el vídeo para que seleccionemos una. Esta opción es la que más tarda porque tiene que descargar el vídeo por un lado y el audio por otro para mergearlo después. Este proceso es transparente para el usuario y se hace de forma automática.
  • Descargar audio: descarga únicamente el audio de la canción.

Descarga en bucle

Nos pedirá la ruta del fichero de texto que contiene los enlaces:

El fichero tendrá este formato:

Cuando el proceso acabe, veremos los vídeos o audios en la ruta configurada.

Funcionamiento 💻

Vamos a analizar por encima el código.

En la función principal simplemente mostramos un menú para pedir al usuario si desea descargar un solo vídeo o varios a través de un fichero de texto con enlaces. En cualquiera de las dos opciones el link se guarda en una lista.

if __name__ == "__main__":
    tarea = 0
    job = 0
    enlaces = []    
    parent_dir = cfg.get_ruta_descargas()  # Obtener ruta donde se guardan las descargas

    while tarea != '3':        
        enlaces.clear()
        tarea = menu_inicio()  # Mostrar menú inicial
        loop = False

        clear()

        # Llenar una lista con los enlaces a descargar
        if tarea == '1':  # Un solo vídeo
            enlaces.append(pide_url())            
        elif tarea == '2': # Fichero de texto
            loop = True
            ruta_fichero = input('Ruta fichero enlaces: ')
            enlaces = procesa_fichero(ruta_fichero)        
        
        # Seleccionar qué tipo de descarga hacer
        while job not in ['1', '2', '3']:
            clear()
            job = seleccionar_accion()

        [inicia_proceso_descarga(x, job, loop) for x in enlaces]

    print_proceso_terminado()

Del bloque de arriba, la línea que inicia el proceso de descarga es esta (se recorre la lista usando list comprehension y le pasa cada link a la función “inicia_proceso_descarga”):

[inicia_proceso_descarga(x, job, loop) for x in enlaces]

Este bloque de abajo es el que lee el fichero e inserta en la lista cada enlace:

def procesa_fichero(ruta) -> list:
    list_enlaces = []

    file_object = open(ruta, 'r')

    [list_enlaces.append(linea) for linea in file_object]    

    file_object.close()

    return list_enlaces

Esta parte es la que permite descargar el vídeo + audio de forma rápida (opción 1), descargar el vídeo y audio permitiendo seleccionar la calidad (opción 3) y descargar solo el audio (opción 2):

if job == '1':  # Vídeo y audio rápido
        video_y_audio.download(parent_dir + '/video')
        print_proceso_terminado()
    elif job == '3': # Solo audio
        ruta_fin = yt.streams.get_audio_only().download(parent_dir + '/audio')
        audioclip = AudioFileClip(ruta_fin)                
        audioclip.write_audiofile(audioclip.filename.replace('.mp4', '.mp3'))

        os.remove(audioclip.filename)

        print_proceso_terminado()
    elif job == '2':  # Descargar vídeo y audio seleccionando calidad    
        num_video_descargar = 1

        if not loop:
            print('')
            print('Seleccionar vídeo (1..{0})'.format(len(vids)))    

            for video in vids:
                print('    ({1}) - {0}'.format(video, contador))
                contador += 1

            print('')

            num_video_descargar = int(input('Nº vídeo: '))    

        num_video_descargar -= 1    

        nombre_video = vids[num_video_descargar].default_filename
        nombre_video_final = 'f_' + nombre_video
        vids[num_video_descargar].download(parent_dir + '/video')

        yt.streams.get_audio_only().download(parent_dir + '/audio')

        audioclip = AudioFileClip(parent_dir + '/audio/' + nombre_video)

        videoclip2 = VideoFileClip(parent_dir + '/video/' + nombre_video)
        videoclip2 = videoclip2.set_audio(audioclip)
        
        videoclip2.write_videofile(parent_dir + '/video/' + nombre_video_final)    

        os.remove(videoclip2.filename)
        os.remove(audioclip.filename)

La parte más enrevesada es la opción 2 (descargar vídeo y audio seleccionando la calidad) porque Youtube no tiene el vídeo + audio en el mismo archivo, lo que hace el programa es descargar el vídeo por un lado, el audio por otro y mergearlos en un solo archivo, después borra los ficheros de vídeo y audio que se descargaron por separado.

La parte del audio, también tiene un poco de miga, ya que el audio se descarga en formato mp4 y el programa lo convierte a mp3 borrando después el mp4 original.

Aquí está el repositorio de GitHub con todo el código. https://github.com/Dynam1co/Python_youtube_video_downloader

Generar .exe de un script Python que se ejecute en segundo plano al inicio de Windows

En ocasiones necesitamos que un script en Python sea usado por un cliente final, puede ser que este usuario no tenga el intérprete de Python instalado en el sistema o no tenga los conocimientos necesarios para ejecutar el programa desde la terminal o simplemente por comodidad siempre es más fácil hacer doble click en un .exe.

Generar un ejecutable de un script de Python es muy fácil y existen varias librerías capaces de hacerlo, la más famosa es PyInstaller pero deja de sernos útil si usamos librerías que no son del sistema ya que no es capaz de empaquetarlas y el ejecutable no funcionaría. Por eso mismo vamos a hablar de cx_Freeze.

El funcionamiento es más o menos el mismo que PyInstaller, pero la diferencia es que esta sí que permite generar ejecutables que usen librerías de terceros.

Lo primero que tenemos que hacer es instalarla (recomendable hacerlo en el entorno virtual del proyecto con pip):

$ pip install cx_Freeze

Ahora nos vamos a la ruta donde esté nuestro script para generar su ejecutable y ejecutamos el comando:

$ cxfreeze script.py --target-dir dist

Si queremos que se ejecute en segundo plano sin mostar la ventana de la termina, el comando sería el siguiente:

$ cxfreeze script.py --base-name=Win32GUI --target-dir dist

Cuando termine veremos que se ha creado un directorio llamado dist y dentro estará el ejecutable, si nuestro script se llamaba script.py el ejecutable se llamará script.exe. También habrá otros ficheros necesarios para la ejecución:

Configurarlo para que se inicie automáticamente con el sistema

En mi caso, mi programa debe iniciarse automáticamente con el sistema porque tiene que ir buscando contínuamente ficheros en una ruta sin que tenga que intervenir el usuairo. Hacerlo es muy fácil, solo tenemos que pulsar con el botón derecho del ratón sobre el .exe y crearlo como acceso directo en el escritorio:

Ahora desde la ventana ejecutar Tecla Windows+R escribimos: shell:startup

Se nos abrirá una ventana que (si lo hay) contendrá los programas que se ejecutan al inicio de Windows. Movemos ahí el acceso directo que habíamos creado en el escritorio:

Y listo, así de fácil es crear un ejecutable de un script Python que se ejecute al inicio y en segundo plano.

Mostrar notificaciones de Windows con Python

Es muy útil mostrar notificaciones estándares del sistema cuando se completa alguna tarea o se ejecuta un evento.

Hacerlo en Windows con Python es muy sencillo y solo tenemos que hacer uso de la librería win10toast. Aquí está toda la información sobre la librería.

Primero deberemos instalarla en nuestro entorno virtual:

pip install win10toast

Ya en nuestro script en Python, la importamos y hacemos uso de ella:

from win10toast import ToastNotifier

if __name__ == "__main__":
    toaster = ToastNotifier()

    toaster.show_toast(
        "Hello World!!!",
        "Notificación de 10 segundos con Python",
        icon_path="python_icon.ico",
        duration=10
    )

Yo he decidido usar un icono con el logo de Python, pero se puede usar la imagen por defecto sustituyendo “python_icon.ico” por “”.

La notificación se vería así:

En este repositorio de Git Hub está el código y el icono de Python.

Mostrar calendario en la terminal con Python

En este artículo veremos cómo mostrar el calendario del mes y año que queramos en la terminal usando Python.

No hace falta instalar ninguna librería, ya que usaremos una que viene en el sistema llamada calendar.

Este sería el código necesario:

import calendar

if __name__ == "__main__":
    year = int(input('Escribe el año: '))
    month = int(input('Escribe el mes: '))

    print('\n')
    print(calendar.month(year, month))

La salida de este script sería la siguiente:

Calendario mostrado en terminal

También podemos imprimir el calendario completo del año actual iterando sobre una lista de meses. En este caso, voy a poner un ejemplo que lo hace usando programación funcional por dar otro punto de vista. Si obviamos los imports y la declaración del main, mostramos el calendario completo en una sola línea de código:

import calendar
from datetime import datetime

if __name__ == "__main__":
    print(list(map(lambda x : print(calendar.month(datetime.now().year, x)), range(1, 13))))