目录:
成都创新互联公司是一家专业提供海兴企业网站建设,专注与成都做网站、网站建设、H5高端网站建设、小程序制作等业务。10年已为海兴众多企业、政府机构等服务。创新互联专业网站制作公司优惠进行中。
1. Django集成Celery
2. 声明异步任务
3. 封装工具task_util.py
4. 单元测试test_task_util.py
5. 创建异步任务
6. 常见问题和解决方法
Celery是一个灵活可靠的分布式系统,用于异步任务调度,主要有3部分组成:消息中间件broker,任务执行单元worker,执行结果存储task result store。Celery使用第三方消息中间件redis,RabbitMQ等。
系统通常将一些耗时的操作任务提交给Celery去异步执行,典型架构示意图如下。本文详细介绍Django集成Celery配置方法和功能测试。
时序图如下:
示例代码:https://github.com/rickding/HelloPython/tree/master/hello_celery
├──__init__.py
├── settings.py
├── celery.py
├── tasks.py
├── util
│ └── task_util.py
├── tests
│ └── test_task_util.py
一,Django集成Celery
代码文件 | 功能要点 | |
Django集成Celery | requirements.txt | 安装Celery, Redis和工具包: celery == 4.2.1 flower == 0.9.2 redis == 3.2.0 eventlet == 0.24.1 |
celery.py | 配置Celery,依赖的消息中间件broker和后端backend地址配置在settings.py中集中维护。 | |
__init__.py | 配置项目加载celery.app | |
声明异步任务 | tasks.py | 声明Celery可调度的任务@shared_task |
封装工具task_util | task_util.py | 异步任务创建和分发 |
单元测试 | test_task_util.py | 测试异步任务创建和分发功能 |
创建异步任务 | views.py | 增加REST接口/chk/job |
1. 新建Django项目,运行:django-admin startproject hello_celery
2. 进到目录hello_celery,增加应用:python manage.py startapp app
项目的目录文件结构如下:
3. 安装Celery和依赖包,pip install celery >= 4.2.1,如果不是新建项目,注意版本兼容问题。
celery == 4.2.1
flower == 0.9.2
redis == 3.2.0
eventlet == 0.24.1
4. 增加celery.py,配置信息:
from__future__importabsolute_import,unicode_literals
importos
fromceleryimportCelery,platforms
fromdjango.confimportsettings
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE','hello_celery.settings')
app = Celery(
'hello_celery',
include=['hello_celery.tasks'],
broker=settings.CELERY_BROKER,
backend=settings.CELERY_BACKEND
)
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings',namespace='CELERY')
app.conf.update(
CELERY_ACKS_LATE=True,
CELERY_ACCEPT_CONTENT=['pickle','json'],
CELERYD_FORCE_EXECV=True,
CELERYD_MAX_TASKS_PER_CHILD=500,
BROKER_HEARTBEAT=0,
)
# Optional configuration, see the application user guide.
app.conf.update(
CELERY_TASK_RESULT_EXPIRES=3600, # celery任务执行结果的超时时间,即结果在backend里的保存时间,单位s
)
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
platforms.C_FORCE_ROOT =True
5. 打开settings.py,配置BROKER和BACKEND地址:
CELERY_BROKER ='redis://127.0.0.1:6379/2'
CELERY_BACKEND ='redis://127.0.0.1:6379/3'
6. 打开__init__.py,增加代码:
from__future__importabsolute_import,unicode_literals
from.celeryimportappascelery_app
__all__ = ['celery_app']
二,增加tasks.py,声明异步任务
from__future__importabsolute_import,unicode_literals
importlogging
importjson
fromceleryimportshared_task
fromhello_celery.util.task_utilimportdispatch_task
log = logging.getLogger(__name__)
@shared_task
deftask(param_str):
log.info('task starts: %s, %s'% (type(param_str),param_str))
param_dict =None
try:
param_dict = json.loads(param_str)
exceptExceptionase:
log.warning('Exception when parse param: %s'%str(e))
log.info('parsed param: {}, {}'.format(type(param_dict),param_dict))
return'finished'
正确配置后,运行命令:celery -A hello_celery worker -l info -P eventlet,注意Win10环境中需要增加eventlet,Celery成功启动信息:
三,封装工具task_util.py
封装两个有用的工具函数,分别用于分发(创建)异步任务和获取任务信息:
importlogging
importjson
log = logging.getLogger(__name__)
defdispatch_task(task_func,param_dict):
param_json = json.dumps(param_dict)
try:
returntask_func.apply_async(
[param_json],
retry=True,
retry_policy={
'max_retries':1,
'interval_start':0,
'interval_step':0.2,
'interval_max':0.2,
},
)
exceptExceptionasex:
log.info(ex)
raise
defget_task_status(task_func,task_id):
t = task_func.AsyncResult(task_id)
status = t.state
progress =0
ifstatus ==u'SUCCESS':
progress =100
elifstatus ==u'FAILURE':
progress =0
elifstatus =='PROGRESS':
progress = t.info['progress']
return{'status': status,'progress': progress}
四,单元测试test_task_util.py
增加测试函数,创建一个任务任务并获取信息:
importlogging
fromdjango.testimportTestCase
fromhello_celery.tasksimporttask
fromhello_celery.util.task_utilimportdispatch_task,get_task_status
log = logging.getLogger(__name__)
classTasksTest(TestCase):
deftest_get_task_status(self):
t = dispatch_task(task,{'msg':'test_task'})
self.assertIsNotNone(t)
ret = get_task_status(task,t.id)
log.info('task status: %s,%s, %s'% (ret,t.id,str(task)))
self.assertIsNotNone(ret.get('status'))
运行python manage.py test,同时Celery将执行测试函数创建的任务:
五,创建异步任务
1. 在views.py中增加请求处理函数,创建一个异步执行的任务:
fromdjango.httpimportJsonResponse
fromhello_celery.tasksimportdo_task
defchk_job(req):
param_dict = {
'url': req.get_raw_uri(),
'path': req.get_full_path(),
}
job = do_task(param_dict)
returnJsonResponse({'code':0,'msg':'success','job': job.task_id})
2. 在urls.py中配置路由
fromdjango.urlsimportpath
fromapp.viewsimportchk_job
urlpatterns = [
path('',chk_job,name='chk'),
]
3. 运行命令启动服务:python manage.py runserver 0.0.0.0:8001
4. REST接口创建异步任务示例
六,常见问题和解决方法
1. 启动Celery:celery -A hello_celery worker -l info,运行出错:
Unrecoverable error: VersionMismatch('Redis transport requires redis-py versions 3.2.0 or later. You have 2.10.6',)
解决:指定Redis使用3.2.0或更高pip install redis>=3.2.0