跳转至

Flask 用 Celery 实现后台任务

约 75 个字 42 行代码 预计阅读时间 1 分钟

安装软件包

pip install celery redis

集成

配置类
1
2
3
4
5
6
7
class FlaskPlaygroundConfig:
    # ...
    CELERY = dict(
        broker_url="redis://localhost",
        result_backend="redis://localhost",
        task_ignore_result=False,
    )
ext.py
# ...
def celery_init_app(app: Flask) -> Celery:
    class FlaskTask(Task):
        def __call__(self, *args: object, **kwargs: object) -> object:
            with app.app_context():
                return self.run(*args, **kwargs)

    celery_app = Celery(app.name, task_cls=FlaskTask)
    celery_app.config_from_object(app.config["CELERY"])
    celery_app.set_default()
    app.extensions["celery"] = celery_app
    return celery_app
app.py
1
2
3
app = Flask(__name__)
app.config.from_object(FlaskPlaygroundConfig)
celery_app = celery_init_app(app)
celery_app.py
1
2
3
4
5
6
7
from app import app


celery_app = app.extensions['celery']

if __name__ == '__main__':
    celery_app.start()
启动 Celery
1
2
3
celery -A celery_app.celery_app worker --loglevel=INFO
# Windows 下添加 --pool=solo
celery -A celery_app.celery_app worker --loglevel=INFO --pool=solo

定义后台任务

1
2
3
@shared_task(ignore_result=False)
def set_route_point_area_task(route_point_id: int):
    # ...

传参尽量最小化,不能传实体类。

使用后台任务

task_result = set_route_point_area_task.delay(i.id)

可以获取 task_result.id 以便后续追踪任务状态。

查询任务状态

假如上面的任务 ID 记为 result_id

1
2
3
4
5
result = AsyncResult(result_id)
# 可用方法
result.ready()      # 是否完成
result.successful() # 是否执行成功
result.result       # 返回值,需在 result.ready() 为真时查看