自我笔记 - django 系列 [任务列队celery篇]

django celery

celery是什么

一种让异步处理任务的工具主要应用情境为以下:任务调度 (例如: 寄信这种会比较需要等待的事情)定时任务

此篇主要解讲定时任务的部份

定时任务

执行环境* python 3.7.4* django 2.1.7* celery 4.4.7* django-redis 4.12.1 (本篇用redis当作中间人)

必须先安装redis服务,再进行以下

目录结构

目录结构    |   manage.py    \---ProjectName            asgi.py            settings.py            urls.py            wsgi.py            __init__.py            ★ celery.py    \---AppName        task.py (名称task为预设)
celery.py: 基本设定
    from __future__ import absolute_import, unicode_literals    import os    from celery import Celery, platforms    from celery.schedules import crontab    from datetime import timedelta    from kombu import Queue # 用于多任务列队    # django_DRF 为专案名称    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'Django_DRF.settings')    # 使用redis作为中间人(broker)    # 使用redis作为定时任务存取地方(backend)    # 预设redis为本地127.0.0.1:6379/1(redis)    # ★★★ 此处须注意(若本地没有redis却设置redis预设 -> 则会显示10061连线错误)    app = Celery('Django_DRF', backend='redis://127.0.0.1:6379/14', broker='redis://127.0.0.1:6379/15')    # Using a string here means the worker don't have to serialize    # the configuration object to child processes.    # - namespace='CELERY' means all celery-related configuration keys    #   should have a `CELERY_` prefix.    app.config_from_object('django.conf:settings', namespace='CELERY')    # 预设任务名称为tasks用意为此,此行会加载已经有注册的app.task.py档案    app.autodiscover_tasks()    # 设定任务进入哪个列队 (在开启worker时,可以分别进行)    app.conf.task_routes = {        'AppName1.tasks.TF_postingJob': {            'queue': 'tasks_one'        },        'AppName2.tasks.MLO_dailyDataJob': {            'queue': 'tasks_two'        }    }    # 设定列队路由    app.conf.task_queues = (        Queue('tasks_one', routing_key='tasks_one'),        Queue('tasks_two', routing_key='tasks_two'),    )    # 允许root用户运行celery    platforms.C_FORCE_ROOT = True    # 有出错会显示于此    @app.task(bind=True)    def debug_task(self):        print('Request: {0!r}'.format(self.request))    # 固定的定时任务    app.conf.update(        CELERYBEAT_SCHEDULE = {            'JobName': {                            # 任务名称                'task': 'app.tasks.functionName',   # 执行的任务                'schedule':  timedelta(seconds=5),  # 週期多久一次                'args': ()                          # 此处可带参数            }        }    )
app.tasks.py: 任务编写位置
# celery指定的任务档案,只能叫做task.py# 于此加入所需要执行的程序from __future__ import absolute_importfrom celery import shared_taskfrom channels.layers import get_channel_layerfrom .timelyService import *@shared_task    def MLC_postingJob():        print('test')

如何开始执行

定时任务需在开启beat服务(borker)

于专案目录底下输入celery beat -A projectName -l info
http://img2.58codes.com/2024/20132538uKTX7xCJNw.png看到上图已经成功开始定时器了,任务也有成功发送,于redis(下图)可看见以下任务成功进行 排队、排队、排队 (很重要所以说三次),此时任务是没有被成功执行的。
http://img2.58codes.com/2024/20132538kkGXR0K6c7.png

开启worker执行broker下达的任务

于专案目录底下输入celery -A Django_DRF worker --pool=solo -l info
http://img2.58codes.com/2024/20132538gSJYwZFw2Q.png看到上图已经成功开启,若有任务需处理则会执行,执行结果会存于backend
http://img2.58codes.com/2024/201325382GWxNt5ll5.png若有执行时间较长的任务,可利用多列队解决,如下指令
# 1.开启CMD输入以下celery -A Django_DRF worker --pool=solo -l info -Q tasks_one # tasks_one 为该worker需要执行的列队内容。

关于作者: 网站小编

码农网专注IT技术教程资源分享平台,学习资源下载网站,58码农网包含计算机技术、网站程序源码下载、编程技术论坛、互联网资源下载等产品服务,提供原创、优质、完整内容的专业码农交流分享平台。

热门文章