Migraciones: Ese mal necesario que no tiene que doler

Si trabajas en la parte del Backend de una aplicación, es probable que hayas tenido que lidiar con migraciones en algún momento. En este post, vamos a ver cómo podemos hacer migraciones en Python de una manera sencilla, automatizada y sin dolor.

SQLAlchemy y Alembic

Aunque es posible operar con bases de datos directamente sin necesidad de ningún ORM, SQLAlchemy es una librería muy popular en Python para trabajar con bases de datos relacionales. Con ella vamos a poder definir nuestros modelos y que estos sean la representación de nuestras tablas en la base de datos.

De igual forma y casi como una extensión de SQLAlchemy, Alembic es una herramienta que nos permite gestionar nuestras bases de datos de una manera sencilla y automatizada. Con ella vamos a poder crear migraciones tanto de forma manual como de forma automática, y aplicarlas a nuestras bases de datos.

Vamos a ver lo que podría ser un caso “real” muy simplificado.

Dado que el artículo va a ser algo largo y para que no se vaya madre, no voy a cubrir posibles fallos en la aplicación, vamos a cubrir solo los ‘happy paths’.

¡De igual forma me estoy dejando fuera un montón de tests, tanto unitarios como de integración, pero recordad que los tests van primero siempre!

Primera iteración: Gestionar usuarios

Imaginemos que queremos desarrollar una aplicación para la gestión de usuarios, en este caso que nos ocupa vamos a utilizar FastAPI como punto de entrada para nuestra aplicación, y vamos a utilizar Postgres como nuestro motor de base de datos.

Dicho todo esto empecemos como siempre por los tests, vamos a definir un test lo más desde el exterior posible que nos permita definir como nos gustaría consumir nuestra aplicación:

from http.client import CREATED, OK

from expects import equal, expect
from fastapi.testclient import TestClient

from main import app


class TestUsers:
    def test_create_user(self) -> None:
        client = TestClient(app)
        name = "yes"
        age = 38
        payload = {"name": name, "age": age}

        response = client.post("/api/v1/users", json=payload)

        expect(response.status_code).to(equal(CREATED))

        user_id = response.json()["id"]
        response = client.get(f"/api/v1/users/{user_id}")

        expect(response.status_code).to(equal(OK))
        user = response.json()
        expect(user).to(equal({"id": user_id, "name": name, "age": age}))

Desde el punto de vista del usuario hemos definido un par de endpoints que:

  • POST /api/v1/users para crear un usuario.
    • Retorna un 201 - CREATED con el id del usuario creado.
  • GET /api/v1/users/{user_id} para obtener un usuario a partir de su id.
    • Retorna un 200 - OK con la información del usuario.

Definamos nuestra API

De afuera hacia adentro, vamos a implementar nuestra capa Delivery:

from http.client import CREATED

from fastapi import APIRouter, Depends
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

from src.common.settings import settings
from src.delivery.api.v1.users.created_user_response import CreatedUserResponse
from src.delivery.api.v1.users.user_request import UserRequest
from src.delivery.api.v1.users.user_response import UserResponse
from src.domain.command import CommandHandler
from src.domain.query import QueryHandler
from src.domain.users.user import User
from src.domain.users.users_repository import UsersRepository
from src.infrastructure.postgres.postgres_users_repository import (
    PostgresUsersRepository,
)
from src.use_cases.commands.create_user_command import (
    CreateUserCommand,
    CreateUserCommandHandler,
)
from src.use_cases.queries.get_user_query_handler import (
    GetUserQuery,
    GetUserQueryHandler,
)

users_router: APIRouter = APIRouter()


async def _get_users_repository() -> UsersRepository:
    engine = create_engine(f"postgresql://{settings.db_dsn}")
    session = Session(engine)
    return PostgresUsersRepository(session)


async def _create_user_command_handler(
    repository: UsersRepository = Depends(_get_users_repository),
) -> CommandHandler:
    return CreateUserCommandHandler(repository)


async def _get_user_query_handler(
    repository: UsersRepository = Depends(_get_users_repository),
) -> QueryHandler:
    return GetUserQueryHandler(repository)


@users_router.post("/users", status_code=CREATED)
def _create(
    user_request: UserRequest,
    handler: CommandHandler = Depends(_create_user_command_handler),
) -> CreatedUserResponse:
    name = user_request.name
    age = user_request.age
    command = CreateUserCommand(name, age)
    response = handler.execute(command)
    user_id = response.message()
    return CreatedUserResponse(id=user_id)


@users_router.get("/users/{id}", response_model=UserResponse)
def _get(
    id: str, handler: QueryHandler = Depends(_get_user_query_handler)
) -> UserResponse:
    command = GetUserQuery(id)
    response = handler.execute(command)
    user: User = response.message()
    return UserResponse(id=user.user_id, name=user.name, age=user.age)

En ella básicamente:

  • Definimos nuestros endpoints.
  • Usando la inyección de dependencias de FastAPI, a cada uno de ellos le inyectamos su caso de uso correspondiente.
  • A los casos de uso se les inyecta a su vez el repositorio de usuarios.
  • Por último implementamos nuestros endpoints, en los que principalmente se ejecuta el caso de uso y se lee la respuesta.

Definimos nuestros casos de uso

Seguimos moviéndonos hacia el core de nuestra aplicación, ahora en la capa de Use Cases.

Primero implementamos el caso de uso para crear usuarios:

from uuid import uuid4

from src.domain.command import Command, CommandHandler
from src.domain.users.user import User
from src.domain.users.users_repository import UsersRepository


class CreateUserCommand(Command):
    def __init__(self, name: str, age: int) -> None:
        super().__init__()
        self.name = name
        self.age = age


class CreateUserCommandResponse:
    def __init__(self, user_id: str):
        self.user_id = user_id

    def message(self) -> str:
        return self.user_id


class CreateUserCommandHandler(CommandHandler):
    def __init__(self, repository: UsersRepository) -> None:
        self.repository = repository

    def execute(self, command: CreateUserCommand) -> CreateUserCommandResponse:
        user_id = uuid4()
        user = User(user_id=user_id.hex, name=command.name, age=command.age)
        self.repository.save(user)
        return CreateUserCommandResponse(user_id=user_id.hex)

Y a continuación el caso de uso para obtener usuarios a partir de su id:

from src.domain.query import Query, QueryHandler
from src.domain.users.user import User
from src.domain.users.users_repository import UsersRepository


class GetUserQuery(Query):
    def __init__(self, user_id: str) -> None:
        self.user_id = user_id


class GetUserQueryResponse:
    def __init__(self, user: User) -> None:
        self.user = user

    def message(self) -> User:
        return self.user


class GetUserQueryHandler(QueryHandler):
    def __init__(self, repository: UsersRepository) -> None:
        self.repository = repository

    def execute(self, query: GetUserQuery) -> GetUserQueryResponse:
        user = self.repository.get(query.user_id)
        return GetUserQueryResponse(user)

En ambos casos la lógica es muy similar:

  • Recibe la información de la capa de Delivery.
  • La adapta para la capa de Infraestructura.
  • Llama al repositorio de usuarios.
  • Retorna una respuesta.

Definimos nuestra capa de Infraestructura

Seguimos bajando capas en nuestra aplicación, en esta capa de Infrastructure vamos a implementar nuestro repositorio de usuarios que habla directamente con la base de datos.

Para ello vamos a crear un par de métodos para salvar y obtener usuarios:

from sqlalchemy import select
from sqlalchemy.orm import Session

from src.domain.users.user import User
from src.domain.users.users_repository import UsersRepository


class PostgresUsersRepository(UsersRepository):
    def __init__(self, session: Session) -> None:
        self.session = session

    def save(self, user: User) -> None:
        self.session.add(user)
        self.session.commit()

    def get(self, id: str) -> User:
        stmt = select(User).where(User.user_id == id)
        return self.session.execute(stmt).scalar_one()

Definimos nuestra capa de Dominio

Y por último llegamos a la parte que tiene más relación con el post.

Como decíamos al principio SQLAlchemy nos permite definir nuestros modelos y que estos sean la representación de nuestras tablas en la base de datos. Además Alembic se apoyará en estos modelos para crear las migraciones basándose en todo el metadata que obtiene de las mismas.

Definimos por tanto una clase User que va a ser la representación de nuestra tabla users en la base de datos:

from datetime import datetime

from sqlalchemy import String, func
from sqlalchemy.orm import Mapped, mapped_column

from src.domain.users.base import Base


class User(Base):
    __tablename__ = "users"
    user_id: Mapped[str] = mapped_column(
        String(36), primary_key=True, autoincrement=False
    )
    name: Mapped[str] = mapped_column(String(30))
    created_at: Mapped[datetime] = mapped_column(insert_default=func.now())

Instalemos Alembic

Lo primero de todo es instalar Alembic e inicializarlo con el comando alembic init alembic. Este comando nos creará una carpeta llamada alembic en la raíz de nuestro proyecto donde irán las migraciones y un fichero llamado alembic.ini.

Configuremos Alembic

Con todo listo, ahora solo necesitamos configurar Alembic para que sea capaz de conectarse a nuestra base de datos para leer el estado actual y ser capaz de generar las migraciones necesarias.

Para ello tenemos que editar el archivo de configuración alembic.ini y cambiar el valor de la clave sqlalchemy.url:

sqlalchemy.url = postgresql://user:%(DB_PASSWORD)s@%(DB_HOST)s/users

Dado que vamos a querer correr las migraciones tanto en local (para poder lanzar nuestros tests de integración y aceptación) como en producción cuando la aplicación se despliegue y antes de arrancar corra las migraciones, necesitamos hacer que el host de nuestra base de datos sea dinámico.

Por suerte es posible usar interpolación en este tipo de ficheros, %(DB_HOST)s, y setear su valor dinámicamente en otro de los ficheros de Alembic llamado env.py:

db_host = os.getenv("DB_HOST", "<PRODUCTION_DB_HOST>")
db_password = os.getenv("DB_PASSWORD", "<PRODUCTION_DB_PASSWORD>")
config.set_main_option("DB_HOST", db_host)
config.set_main_option("DB_PASSWORD", db_password)

De igual forma, si queremos que Alembic sea capaz de auto-generar las migraciones, necesitamos cambiar la siguiente línea en el fichero env.py:

# target_metadata = None
target_metadata = Base.metadata

Siendo Base una clase de nuestra aplicación de la que extienden todos nuestros modelos de datos (class User(Base):):

from sqlalchemy.orm import DeclarativeBase


class Base(DeclarativeBase):
    pass

De esta forma Alembic es capaz de leer nuestros modelos, interpretarlos y autogenerar las migraciones.

Generemos nuestra primera migración

Para generar nuestra primera migración solo necesitamos acceso a una base de datos vacía (setear el valor en el DB_HOST) y ejecutar el siguiente comando: alembic revision --autogenerate -m "first_migration"

Esto va a generar un fichero en la carpeta alembic/versions con el siguiente contenido:

from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


revision: str = '474663a34795'
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
    op.create_table('users',
    sa.Column('user_id', sa.String(length=36), autoincrement=False, nullable=False),
    sa.Column('name', sa.String(length=30), nullable=False),
    sa.Column('created_at', sa.DateTime(), nullable=False),
    sa.PrimaryKeyConstraint('user_id')
    )


def downgrade() -> None:
    op.drop_table('users')

Como podemos ver Alembic ha generado un fichero con dos métodos, upgrade y downgrade. El primero se encarga de la migración creación de las tablas, el segundo de borrarlas en caso de necesitar hacer rollback.

Ejecutemos nuestras migraciones

Una vez con las migraciones creadas solo necesitaríamos ejecutar nuestra aplicación y que esta se encargue de aplicarlas antes de inicializarse.

Para ellos vamos a modificar el punto de entrada de nuestra aplicación, en el caso de FastAPI es incluso más sencillo, ya que permite pasarle un método lifespan que hace las veces de wrapper permitiéndonos controlar que se ejecuta antes y después de que nuestra aplicación arranque y se pare:

from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from time import sleep

from fastapi import FastAPI

from alembic import command
from alembic.config import Config
from src.delivery.api.v1.users.users_router import users_router


def run_sql_migrations() -> None:
    alembic_cfg = Config("alembic.ini")
    command.upgrade(alembic_cfg, "head")


@asynccontextmanager
async def lifespan(_app: FastAPI) -> AsyncGenerator:
    run_sql_migrations()

    yield

    # Graceful shutdown
    sleep(5)  # wait for the app to finish processing requests


app = FastAPI(lifespan=lifespan)

app.include_router(prefix="api/v1", router=users_router)

A partir de ahora cuando nuestra app se inicie, Alembic va a crear la tabla users en nuestra base de datos haciendo uso de nuestra primera migración.

INFO  [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO  [alembic.runtime.migration] Will assume transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> 474663a34795, first_migration

Segunda iteracion, los usuarios ahora tienen edad.

En este momento en el que nuestra aplicación es capaz de aplicar automáticamente en su arranque las migraciónes pendientes, si las hubiera, sea ha vuelvo trivial hacer cambios en nuestros modelos.

Imaginemos que queremos añadir una nueva columna a nuestra tabla de usuarios, la edad.

Primero modificamos nuestro modelo añadiendo la nueva columna:

from datetime import datetime

from sqlalchemy import Numeric, String, func
from sqlalchemy.orm import Mapped, mapped_column

from src.domain.users.base import Base


class User(Base):
    __tablename__ = "users"
    user_id: Mapped[str] = mapped_column(
        String(36), primary_key=True, autoincrement=False
    )
    name: Mapped[str] = mapped_column(String(30))
    age: Mapped[int] = mapped_column(Numeric)
    created_at: Mapped[datetime] = mapped_column(insert_default=func.now())

A continuación generamos una nueva migración: alembic revision --autogenerate -m "add_age_column":

from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


revision: str = '34a2b02d2fc3'
down_revision: Union[str, None] = '474663a34795'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
    op.add_column('users', sa.Column('age', sa.Numeric(), nullable=False))


def downgrade() -> None:
    op.drop_column('users', 'age')

Podemos ver como ha sido capaz de generar la creación de la columna con sus tipos, como de hacer el rollback.

Y por último desplegamos nuestra aplicación:

INFO  [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO  [alembic.runtime.migration] Will assume transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade 34a2b02d2fc3, add_age_column

En tres sencillos pasos hemos sido capaces de cambiar el esquema de nuestra base de datos.

Conclusiones

Como todo en esta vida, nada es perfecto. Y por tanto esta aproximación, bastante sencilla, es solo una posible solución.

Si analizamos un poco los pros y los contras de esta solución podemos ver que:

PROS

  • Fácil de usar e iterar.
  • La aplicación es capaz de aplicar las migraciones automáticamente.
  • Es una muy buena primera aproximación, que no compromete a futuro y que es fácil de cambiar.
    • La lógica ya está separada, simplemente habría que llevar la ejecución de Alembic a otro sitio fuera de la aplicación.

CONTRAS

  • Si las migraciones se vuelven muy lentas puede ser un problema.
  • Estamos acoplando nuestro despliegue de código al de datos.

Os dejo aquí todo el código disponible por si lo queréis mirar con calma

!Espero que os haya gustado el post, nos vemos!