Flask 用 Celery 实现后台任务
约 75 个字 42 行代码 预计阅读时间 1 分钟
安装软件包
集成
配置类 |
---|
| class FlaskPlaygroundConfig:
# ...
CELERY = dict(
broker_url="redis://localhost",
result_backend="redis://localhost",
task_ignore_result=False,
)
|
ext.py |
---|
| # ...
def celery_init_app(app: Flask) -> Celery:
class FlaskTask(Task):
def __call__(self, *args: object, **kwargs: object) -> object:
with app.app_context():
return self.run(*args, **kwargs)
celery_app = Celery(app.name, task_cls=FlaskTask)
celery_app.config_from_object(app.config["CELERY"])
celery_app.set_default()
app.extensions["celery"] = celery_app
return celery_app
|
app.py |
---|
| app = Flask(__name__)
app.config.from_object(FlaskPlaygroundConfig)
celery_app = celery_init_app(app)
|
celery_app.py |
---|
| from app import app
celery_app = app.extensions['celery']
if __name__ == '__main__':
celery_app.start()
|
启动 Celery |
---|
| celery -A celery_app.celery_app worker --loglevel=INFO
# Windows 下添加 --pool=solo
celery -A celery_app.celery_app worker --loglevel=INFO --pool=solo
|
定义后台任务
| @shared_task(ignore_result=False)
def set_route_point_area_task(route_point_id: int):
# ...
|
传参尽量最小化,不能传实体类。
使用后台任务
| task_result = set_route_point_area_task.delay(i.id)
|
可以获取 task_result.id
以便后续追踪任务状态。
查询任务状态
假如上面的任务 ID 记为 result_id
:
| result = AsyncResult(result_id)
# 可用方法
result.ready() # 是否完成
result.successful() # 是否执行成功
result.result # 返回值,需在 result.ready() 为真时查看
|