网站 免费认证,中国建设银行安徽分行网站,哪个网站教做ppt,怎么选择网站开发公司在Django项目中#xff0c;如何集成使用Celery框架来完成一些异步任务以及定时任务呢#xff1f;
1. 安装
pip install celery # celery框架
pip install django-celery-beat # celery定时任务使用
pip install django-celery-results # celery存储结果使用2. Django集成…在Django项目中如何集成使用Celery框架来完成一些异步任务以及定时任务呢
1. 安装
pip install celery # celery框架
pip install django-celery-beat # celery定时任务使用
pip install django-celery-results # celery存储结果使用2. Django集成celery
在 settings.py 配置文件中增加如下配置项
INSTALLED_APPS [...celery,django_celery_beat,django_celery_results
]以下是celery的相关配置
# 时区配置
CELERY_TIMEZONE Asia/Shanghai
CELERY_ENABLE_UTC False
DJANGO_CELERY_BEAT_TZ_AWARE False# broker backend 配置使用rabbitmq作为中间件
CELERY_BROKER_URL amqp://devops:devops123127.0.0.1:5672/alarm
CELERY_RESULT_BACKEND amqp://devops:devops123127.0.0.1:5672/alarm# 使用django_celery_beat动态配置任务
CELERY_BEAT_SCHEDULER django_celery_beat.schedulers:DatabaseScheduler# celery序列化和反序列化配置
CELERY_TASK_SERIALIZER pickle
CELERY_RESULT_SERIALIZER pickle
CELERY_ACCEPT_CONTENT [pickle, json]# 下面配置项在有些情况下可以防止死锁 非常重要
CELERYD_FORCE_EXECV True
# 任务结果存储的过期时间默认1天过期。如果beat开启Celery每天会自动清除设为0存储结果永不过期。
CELERY_RESULT_EXPIRES 60 * 60 * 24
# 每个worker执行1000次任务后死掉会自动重启worker防止任务占用太多内存导致内存泄漏
CELERY_MAX_TASKS_PER_CHILD 1000
# 禁用所有速度限制如果网络资源有限不建议开足马力。
CELERY_DISABLE_RATE_LIMITS True
# 单个任务的运行时间限制否则会被杀死
CELERYD_TASK_TIME_LIMIT 60 * 60
CELERY_TASK_RETRY 2# celery 队列配置
from kombu import Exchange, Queue
# consumer_arguments设置队列的优先级
CELERY_TASK_QUEUES (Queue(alarm_queue, Exchange(alarm_exchange), routing_keyalarm_email, consumer_arguments{x-priority: 5}),Queue(alarm_queue, Exchange(alarm_exchange), routing_keyalarm_phone, consumer_arguments{x-priority: 8}),Queue(calcu_queue, Exchange(calcu_exchange), routing_keycalcu_feature, consumer_arguments{x-priority: 10})
)CELERY_TASK_ROUTES {alarm.tasks.call_phone: {queue: alarm_queue, routing_key: alarm_phone},alarm.tasks.send_email: {queue: alarm_queue, routing_key: alarm_email},calcu.tasks.execute_calcu: {queue: calcu_queue, routing_key: calcu_feature},
}在 settings.py 同级目录下新增一个 celery.py 文件
from __future__ import absolute_import, unicode_literals # absolute_import: 使用python的库而不是项目目录下的文件
import os
from celery import Celery
from django.conf import settingsos.environ.setdefault(DJANGO_SETTINGS_MODULE, celerymq.settings)
celery_app Celery(celerymq)celery_app.config_from_object(django.conf:settings, namespaceCELERY)
celery_app.autodiscover_tasks(settings.INSTALLED_APPS)在App中增加一个 tasks.py 文件用于实现异步任务
import time
from celery import shared_taskshared_task(ignore_resultTrue)
def execute_calcu(dataframe):print(fexecute celery task: calcu feature)time.sleep(10) # 这里写比较耗时的逻辑print(fexecute calcu feature task run over)在其他文件逻辑中进行异步调用
execute_calcu.delay(dataframe)项目启动后如果有异步任务进来可以在 RabbitMQ 监控平台看到队列信息 http://127.0.0.1:15672/。
3. 启动命令
启动 worker 去消费数据。
# 启动worker
celery worker -A celerymq -l INFO -n alarm_queue -Q alarm_queue -P eventlet# 启动beat
celery beat -A celerymq -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler在linux中启动worker的话可以去掉 -P eventlet 参数。
另外定时任务推荐使用 django-admin 来下发。