Si trabajas en la parte del Backend de una aplicación, es probable que hayas tenido que lidiar con migraciones en algún momento. En este post, vamos a ver cómo podemos hacer migraciones en Python
de una manera sencilla, automatizada y sin dolor.
SQLAlchemy y Alembic
Aunque es posible operar con bases de datos directamente sin necesidad de ningún ORM, SQLAlchemy es una librería muy popular en Python
para trabajar con bases de datos relacionales. Con ella vamos a poder definir nuestros modelos y que estos sean la representación de nuestras tablas en la base de datos.
De igual forma y casi como una extensión de SQLAlchemy
, Alembic es una herramienta que nos permite gestionar nuestras bases de datos de una manera sencilla y automatizada. Con ella vamos a poder crear migraciones tanto de forma manual como de forma automática, y aplicarlas a nuestras bases de datos.
Vamos a ver lo que podría ser un caso “real” muy simplificado.
Dado que el artículo va a ser algo largo y para que no se vaya madre, no voy a cubrir posibles fallos en la aplicación, vamos a cubrir solo los ‘happy paths’.
¡De igual forma me estoy dejando fuera un montón de tests, tanto unitarios como de integración, pero recordad que los tests van primero siempre!
Primera iteración: Gestionar usuarios
Imaginemos que queremos desarrollar una aplicación para la gestión de usuarios, en este caso que nos ocupa vamos a utilizar FastAPI
como punto de entrada para nuestra aplicación, y vamos a utilizar Postgres
como nuestro motor de base de datos.
Dicho todo esto empecemos como siempre por los tests, vamos a definir un test lo más desde el exterior posible que nos permita definir como nos gustaría consumir nuestra aplicación:
from http.client import CREATED, OK
from expects import equal, expect
from fastapi.testclient import TestClient
from main import app
class TestUsers:
def test_create_user(self) -> None:
client = TestClient(app)
name = "yes"
age = 38
payload = {"name": name, "age": age}
response = client.post("/api/v1/users", json=payload)
expect(response.status_code).to(equal(CREATED))
user_id = response.json()["id"]
response = client.get(f"/api/v1/users/{user_id}")
expect(response.status_code).to(equal(OK))
user = response.json()
expect(user).to(equal({"id": user_id, "name": name, "age": age}))
Desde el punto de vista del usuario hemos definido un par de endpoints que:
POST /api/v1/users
para crear un usuario.- Retorna un 201 - CREATED con el
id
del usuario creado.
- Retorna un 201 - CREATED con el
GET /api/v1/users/{user_id}
para obtener un usuario a partir de suid
.- Retorna un 200 - OK con la información del usuario.
Definamos nuestra API
De afuera hacia adentro, vamos a implementar nuestra capa Delivery
:
from http.client import CREATED
from fastapi import APIRouter, Depends
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from src.common.settings import settings
from src.delivery.api.v1.users.created_user_response import CreatedUserResponse
from src.delivery.api.v1.users.user_request import UserRequest
from src.delivery.api.v1.users.user_response import UserResponse
from src.domain.command import CommandHandler
from src.domain.query import QueryHandler
from src.domain.users.user import User
from src.domain.users.users_repository import UsersRepository
from src.infrastructure.postgres.postgres_users_repository import (
PostgresUsersRepository,
)
from src.use_cases.commands.create_user_command import (
CreateUserCommand,
CreateUserCommandHandler,
)
from src.use_cases.queries.get_user_query_handler import (
GetUserQuery,
GetUserQueryHandler,
)
users_router: APIRouter = APIRouter()
async def _get_users_repository() -> UsersRepository:
engine = create_engine(f"postgresql://{settings.db_dsn}")
session = Session(engine)
return PostgresUsersRepository(session)
async def _create_user_command_handler(
repository: UsersRepository = Depends(_get_users_repository),
) -> CommandHandler:
return CreateUserCommandHandler(repository)
async def _get_user_query_handler(
repository: UsersRepository = Depends(_get_users_repository),
) -> QueryHandler:
return GetUserQueryHandler(repository)
@users_router.post("/users", status_code=CREATED)
def _create(
user_request: UserRequest,
handler: CommandHandler = Depends(_create_user_command_handler),
) -> CreatedUserResponse:
name = user_request.name
age = user_request.age
command = CreateUserCommand(name, age)
response = handler.execute(command)
user_id = response.message()
return CreatedUserResponse(id=user_id)
@users_router.get("/users/{id}", response_model=UserResponse)
def _get(
id: str, handler: QueryHandler = Depends(_get_user_query_handler)
) -> UserResponse:
command = GetUserQuery(id)
response = handler.execute(command)
user: User = response.message()
return UserResponse(id=user.user_id, name=user.name, age=user.age)
En ella básicamente:
- Definimos nuestros
endpoints
. - Usando la inyección de dependencias de
FastAPI
, a cada uno de ellos le inyectamos su caso de uso correspondiente. - A los casos de uso se les inyecta a su vez el repositorio de usuarios.
- Por último implementamos nuestros endpoints, en los que principalmente se ejecuta el caso de uso y se lee la respuesta.
Definimos nuestros casos de uso
Seguimos moviéndonos hacia el core de nuestra aplicación, ahora en la capa de Use Cases
.
Primero implementamos el caso de uso para crear usuarios:
from uuid import uuid4
from src.domain.command import Command, CommandHandler
from src.domain.users.user import User
from src.domain.users.users_repository import UsersRepository
class CreateUserCommand(Command):
def __init__(self, name: str, age: int) -> None:
super().__init__()
self.name = name
self.age = age
class CreateUserCommandResponse:
def __init__(self, user_id: str):
self.user_id = user_id
def message(self) -> str:
return self.user_id
class CreateUserCommandHandler(CommandHandler):
def __init__(self, repository: UsersRepository) -> None:
self.repository = repository
def execute(self, command: CreateUserCommand) -> CreateUserCommandResponse:
user_id = uuid4()
user = User(user_id=user_id.hex, name=command.name, age=command.age)
self.repository.save(user)
return CreateUserCommandResponse(user_id=user_id.hex)
Y a continuación el caso de uso para obtener usuarios a partir de su id
:
from src.domain.query import Query, QueryHandler
from src.domain.users.user import User
from src.domain.users.users_repository import UsersRepository
class GetUserQuery(Query):
def __init__(self, user_id: str) -> None:
self.user_id = user_id
class GetUserQueryResponse:
def __init__(self, user: User) -> None:
self.user = user
def message(self) -> User:
return self.user
class GetUserQueryHandler(QueryHandler):
def __init__(self, repository: UsersRepository) -> None:
self.repository = repository
def execute(self, query: GetUserQuery) -> GetUserQueryResponse:
user = self.repository.get(query.user_id)
return GetUserQueryResponse(user)
En ambos casos la lógica es muy similar:
- Recibe la información de la capa de
Delivery
. - La adapta para la capa de
Infraestructura
. - Llama al repositorio de usuarios.
- Retorna una respuesta.
Definimos nuestra capa de Infraestructura
Seguimos bajando capas en nuestra aplicación, en esta capa de Infrastructure
vamos a implementar nuestro repositorio de usuarios que habla
directamente con la base de datos.
Para ello vamos a crear un par de métodos para salvar y obtener usuarios:
from sqlalchemy import select
from sqlalchemy.orm import Session
from src.domain.users.user import User
from src.domain.users.users_repository import UsersRepository
class PostgresUsersRepository(UsersRepository):
def __init__(self, session: Session) -> None:
self.session = session
def save(self, user: User) -> None:
self.session.add(user)
self.session.commit()
def get(self, id: str) -> User:
stmt = select(User).where(User.user_id == id)
return self.session.execute(stmt).scalar_one()
Definimos nuestra capa de Dominio
Y por último llegamos a la parte que tiene más relación con el post.
Como decíamos al principio SQLAlchemy
nos permite definir nuestros modelos y que estos sean la representación de nuestras tablas en la base de datos. Además Alembic
se apoyará en estos modelos para crear las migraciones basándose en todo el metadata que obtiene de las mismas.
Definimos por tanto una clase User
que va a ser la representación de nuestra tabla users
en la base de datos:
from datetime import datetime
from sqlalchemy import String, func
from sqlalchemy.orm import Mapped, mapped_column
from src.domain.users.base import Base
class User(Base):
__tablename__ = "users"
user_id: Mapped[str] = mapped_column(
String(36), primary_key=True, autoincrement=False
)
name: Mapped[str] = mapped_column(String(30))
created_at: Mapped[datetime] = mapped_column(insert_default=func.now())
Instalemos Alembic
Lo primero de todo es instalar Alembic
e inicializarlo con el comando alembic init alembic
. Este comando nos creará
una carpeta llamada alembic
en la raíz de nuestro proyecto donde irán las migraciones y un fichero llamado alembic.ini
.
Configuremos Alembic
Con todo listo, ahora solo necesitamos configurar Alembic
para que sea capaz de conectarse
a nuestra base de datos para leer el estado actual y ser capaz de generar las migraciones necesarias.
Para ello tenemos que editar el archivo de configuración alembic.ini
y cambiar el valor de la clave sqlalchemy.url
:
sqlalchemy.url = postgresql://user:%(DB_PASSWORD)s@%(DB_HOST)s/users
Dado que vamos a querer correr las migraciones tanto en local (para poder lanzar nuestros tests de integración y aceptación) como en producción cuando la aplicación se despliegue y antes de arrancar corra las migraciones, necesitamos hacer que el host de nuestra base de datos sea dinámico.
Por suerte es posible usar interpolación en este tipo de ficheros, %(DB_HOST)s
, y setear su
valor dinámicamente en otro de los ficheros de Alembic
llamado env.py
:
db_host = os.getenv("DB_HOST", "<PRODUCTION_DB_HOST>")
db_password = os.getenv("DB_PASSWORD", "<PRODUCTION_DB_PASSWORD>")
config.set_main_option("DB_HOST", db_host)
config.set_main_option("DB_PASSWORD", db_password)
De igual forma, si queremos que Alembic
sea capaz de auto-generar las migraciones, necesitamos cambiar la siguiente línea
en el fichero env.py
:
# target_metadata = None
target_metadata = Base.metadata
Siendo Base
una clase de nuestra aplicación de la que extienden todos nuestros modelos de datos (class User(Base):
):
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
De esta forma Alembic
es capaz de leer nuestros modelos, interpretarlos y autogenerar las migraciones.
Generemos nuestra primera migración
Para generar nuestra primera migración solo necesitamos acceso a una base de datos vacía (setear el valor en el DB_HOST
)
y ejecutar el siguiente comando: alembic revision --autogenerate -m "first_migration"
Esto va a generar un fichero en la carpeta alembic/versions
con el siguiente contenido:
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
revision: str = '474663a34795'
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table('users',
sa.Column('user_id', sa.String(length=36), autoincrement=False, nullable=False),
sa.Column('name', sa.String(length=30), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint('user_id')
)
def downgrade() -> None:
op.drop_table('users')
Como podemos ver Alembic
ha generado un fichero con dos métodos, upgrade
y downgrade
. El primero se encarga de
la migración creación de las tablas, el segundo de borrarlas en caso de necesitar hacer rollback.
Ejecutemos nuestras migraciones
Una vez con las migraciones creadas solo necesitaríamos ejecutar nuestra aplicación y que esta se encargue de aplicarlas antes de inicializarse.
Para ellos vamos a modificar el punto de entrada de nuestra aplicación, en el caso de FastAPI
es incluso más sencillo, ya que permite pasarle un método lifespan
que hace las veces de wrapper permitiéndonos controlar que se ejecuta antes y después de que nuestra aplicación arranque y se pare:
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from time import sleep
from fastapi import FastAPI
from alembic import command
from alembic.config import Config
from src.delivery.api.v1.users.users_router import users_router
def run_sql_migrations() -> None:
alembic_cfg = Config("alembic.ini")
command.upgrade(alembic_cfg, "head")
@asynccontextmanager
async def lifespan(_app: FastAPI) -> AsyncGenerator:
run_sql_migrations()
yield
# Graceful shutdown
sleep(5) # wait for the app to finish processing requests
app = FastAPI(lifespan=lifespan)
app.include_router(prefix="api/v1", router=users_router)
A partir de ahora cuando nuestra app se inicie, Alembic
va a crear la tabla users
en nuestra base de datos haciendo
uso de nuestra primera migración.
INFO [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO [alembic.runtime.migration] Will assume transactional DDL.
INFO [alembic.runtime.migration] Running upgrade -> 474663a34795, first_migration
Segunda iteracion, los usuarios ahora tienen edad.
En este momento en el que nuestra aplicación es capaz de aplicar automáticamente en su arranque las migraciónes pendientes, si las hubiera, sea ha vuelvo trivial hacer cambios en nuestros modelos.
Imaginemos que queremos añadir una nueva columna a nuestra tabla de usuarios, la edad.
Primero modificamos nuestro modelo añadiendo la nueva columna:
from datetime import datetime
from sqlalchemy import Numeric, String, func
from sqlalchemy.orm import Mapped, mapped_column
from src.domain.users.base import Base
class User(Base):
__tablename__ = "users"
user_id: Mapped[str] = mapped_column(
String(36), primary_key=True, autoincrement=False
)
name: Mapped[str] = mapped_column(String(30))
age: Mapped[int] = mapped_column(Numeric)
created_at: Mapped[datetime] = mapped_column(insert_default=func.now())
A continuación generamos una nueva migración: alembic revision --autogenerate -m "add_age_column"
:
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
revision: str = '34a2b02d2fc3'
down_revision: Union[str, None] = '474663a34795'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.add_column('users', sa.Column('age', sa.Numeric(), nullable=False))
def downgrade() -> None:
op.drop_column('users', 'age')
Podemos ver como ha sido capaz de generar la creación de la columna con sus tipos, como de hacer el rollback.
Y por último desplegamos nuestra aplicación:
INFO [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO [alembic.runtime.migration] Will assume transactional DDL.
INFO [alembic.runtime.migration] Running upgrade 34a2b02d2fc3, add_age_column
En tres sencillos pasos hemos sido capaces de cambiar el esquema de nuestra base de datos.
Conclusiones
Como todo en esta vida, nada es perfecto. Y por tanto esta aproximación, bastante sencilla, es solo una posible solución.
Si analizamos un poco los pros y los contras de esta solución podemos ver que:
PROS
- Fácil de usar e iterar.
- La aplicación es capaz de aplicar las migraciones automáticamente.
- Es una muy buena primera aproximación, que no compromete a futuro y que es fácil de cambiar.
- La lógica ya está separada, simplemente habría que llevar la ejecución de
Alembic
a otro sitio fuera de la aplicación.
- La lógica ya está separada, simplemente habría que llevar la ejecución de
CONTRAS
- Si las migraciones se vuelven muy lentas puede ser un problema.
- Estamos acoplando nuestro despliegue de código al de datos.
Os dejo aquí todo el código disponible por si lo queréis mirar con calma
!Espero que os haya gustado el post, nos vemos!