Tareas en segundo plano con Celery

Si su aplicación tiene una tarea de larga duración, como el procesamiento de algunos datos cargados o el envío de correo electrónico, no querrá esperar a que termine durante una solicitud. En su lugar, utilice una cola de tareas para enviar los datos necesarios a otro proceso que ejecutará la tarea en segundo plano mientras la solicitud vuelve inmediatamente.

Celery es una potente cola de tareas que se puede utilizar tanto para tareas simples en segundo plano como para complejos programas y planificaciones multietapa. Esta guía te mostrará cómo configurar Celery utilizando Flask. Lea la guía de Celery Primeros pasos con Celery para aprender a utilizar Celery.

El repositorio de Flask contiene un ejemplo basado en la información de esta pagina, que también muestra cómo utilizar JavaScript para enviar tareas y sondear el progreso y los resultados.

Instalar

Instala Celery desde PyPI, por ejemplo utilizando pip:

$ pip install celery

Integra Celery con Flask

Puedes usar Celery sin ninguna integración con Flask, pero es conveniente configurarlo a través de la configuración de Flask, y dejar que las tareas accedan a la aplicación de Flask.

Celery utiliza ideas similares a Flask, con un objeto de aplicación Celery que tiene tareas de configuración y registro. Mientras creas una app Flask, utiliza el siguiente código para crear y configurar también una app Celery.

from celery import Celery, Task

def celery_init_app(app: Flask) -> Celery:
    class FlaskTask(Task):
        def __call__(self, *args: object, **kwargs: object) -> object:
            with app.app_context():
                return self.run(*args, **kwargs)

    celery_app = Celery(app.name, task_cls=FlaskTask)
    celery_app.config_from_object(app.config["CELERY"])
    celery_app.set_default()
    app.extensions["celery"] = celery_app
    return celery_app

Esto crea y devuelve un objeto de aplicación Celery. Celery configuration se toma de la llave CELERY en la configuración de Flask. La aplicación Celery se establece por defecto, de forma que se vea durante cada petición. La subclase Task ejecuta automáticamente funciones de tarea con un contexto de aplicación Flask activo, para que los servicios como las conexiones a bases de datos estén disponibles.

Aquí hay un example.py básico que configura Celery para usar Redis para la comunicación. Habilitamos un backend de resultados, pero ignoramos los resultados por defecto. Esto nos permite almacenar los resultados sólo para las tareas en las que nos importa el resultado.

from flask import Flask

app = Flask(__name__)
app.config.from_mapping(
    CELERY=dict(
        broker_url="redis://localhost",
        result_backend="redis://localhost",
        task_ignore_result=True,
    ),
)
celery_app = celery_init_app(app)

Apunta el comando celery worker a esto y encontrará el objeto celery_app.

$ celery -A example worker --loglevel INFO

También puedes ejecutar el comando celery beat para ejecutar tareas de forma programada. Consulta la documentación de Celery para obtener más información acerca de la definición de horarios.

$ celery -A example beat --loglevel INFO

Fábrica de aplicaciones

Cuando uses el patrón de fábrica de aplicaciones Flask, llama a la función celery_init_app dentro de la fábrica. Establece app.extensions["celery"] al objeto de la aplicación Celery, que se puede utilizar para obtener la aplicación Celery de la aplicación Flask devuelta por la fábrica.

def create_app() -> Flask:
    app = Flask(__name__)
    app.config.from_mapping(
        CELERY=dict(
            broker_url="redis://localhost",
            result_backend="redis://localhost",
            task_ignore_result=True,
        ),
    )
    app.config.from_prefixed_env()
    celery_init_app(app)
    return app

Para usar los comandos celery, Celery necesita un objeto app, pero eso ya no está disponible directamente. Crea un archivo make_celery.py que llame a la fábrica de aplicaciones Flask y obtenga la aplicación Celery de la aplicación Flask devuelta.

from example import create_app

flask_app = create_app()
celery_app = flask_app.extensions["celery"]

Apunta el comando celery a este archivo.

$ celery -A make_celery worker --loglevel INFO
$ celery -A make_celery beat --loglevel INFO

Definición de Tareas

El uso de @celery_app.task para decorar funciones de tareas requiere el acceso al objeto celery_app, que no estará disponible cuando se utiliza el patrón de fábrica. También significa que las tareas decoradas están vinculadas a las instancias específicas de Flask y Celery app, lo que podría ser un problema durante las pruebas si se cambia la configuración para una prueba.

En su lugar, utiliza el decorador @shared_task de Celery. Esto crea objetos de tarea que accederán a cualquiera que sea la «aplicación actual», que es un concepto similar a blueprint de Flask y el contexto de la aplicación. Esta es la razón por la que llamamos celery_app.set_default() arriba.

Aquí hay una tarea de ejemplo que suma dos números y devuelve el resultado.

from celery import shared_task

@shared_task(ignore_result=False)
def add_together(a: int, b: int) -> int:
    return a + b

Anteriormente, hemos configurado Celery para ignorar los resultados de la tarea por defecto. Como queremos conocer el valor de retorno de esta tarea, configuramos ignore_result=False. Por otra parte, una tarea que no necesita un resultado, como el envío de un correo electrónico, no requiere esto.

Llamada a Tareas

La función decorada se convierte en un objeto tarea con métodos para llamarla en segundo plano. La forma más sencilla es utilizar el método delay(*args, **kwargs). Ve la documentación de Celery para más métodos.

Un Celery worker debe estar en ejecución para ejecutar la tarea. El inicio de un worker se muestra en las secciones anteriores.

from flask import request

@app.post("/add")
def start_add() -> dict[str, object]:
    a = request.form.get("a", type=int)
    b = request.form.get("b", type=int)
    result = add_together.delay(a, b)
    return {"result_id": result.id}

La ruta no obtiene el resultado de la tarea de inmediato. Eso iría en contra del objetivo al bloquear la respuesta. En su lugar, devolvemos el id de resultado de la tarea en ejecución, que podemos utilizar más tarde para obtener el resultado.

Obtener Resultados

Para obtener el resultado de la tarea que iniciamos anteriormente, añadiremos otra ruta que toma el identificador de resultado que devolvimos antes. Devolvemos si la tarea está terminada (lista), si terminó con éxito, y cuál fue el valor de retorno (o error) si está terminada.

from celery.result import AsyncResult

@app.get("/result/<id>")
def task_result(id: str) -> dict[str, object]:
    result = AsyncResult(id)
    return {
        "ready": result.ready(),
        "successful": result.successful(),
        "value": result.result if result.ready() else None,
    }

Ahora puede iniciar la tarea utilizando la primera ruta, y luego sondear el resultado utilizando la segunda ruta. Esto evita que los request workers de Flask se bloqueen esperando a que las tareas terminen.

El repositorio de Flask contiene un ejemplo usando JavaScript para enviar tareas y sondear el progreso y los resultados.

Pasar datos a las tareas

La tarea «add» anterior tomó dos enteros como argumentos. Para pasar argumentos a las tareas, Celery tiene que serializarlos a un formato que pueda pasar a otros procesos. Por lo tanto, no se recomienda pasar objetos complejos. Por ejemplo, sería imposible pasar un objeto modelo SQLAlchemy, ya que ese objeto probablemente no es serializable y está ligado a la sesión que lo consultó.

Pase la cantidad mínima de datos necesaria para obtener o recrear cualquier dato complejo dentro de la tarea. Consideremos una tarea que se ejecutará cuando el usuario conectado solicite un archivo de sus datos. La petición de Flask conoce el usuario conectado, y tiene el objeto usuario consultado desde la base de datos. Para ello, consulta la base de datos con un identificador determinado, por lo que la tarea puede hacer lo mismo. Pasa el id del usuario en lugar del objeto usuario.

@shared_task
def generate_user_archive(user_id: str) -> None:
    user = db.session.get(User, user_id)
    ...

generate_user_archive.delay(current_user.id)