Celery 提供任务链结的功能,字面上的意思,就是将任务一个一个串联在一起,下面的叙述 or 範例如果有错误,欢迎留言讨论!!
一、建立任务
from celery import Celeryfrom datetime import datetimeapp = Celery("task", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0")@app.taskdef hello_world(name: str): if name: return f"hello {name}" elif not name: raise ValueError("请输入名字")@app.taskdef show_time(name: str): return f"{name} 您好,现在时间为 {datetime.now()}"
二、链结任务
我们可以透过 apply_async 在送出任务的时候,使用 link 参数来进行任务的串联
from task import hello_world, show_timeres = hello_world.apply_async(("nick",), link=show_time.s())
可以看到在 link 后面我们接了一个 show_time 的任务,其中 show_time 任务会需要一个名为 name 的参数,celery 会自动将 hello_world 这个任务执行完 return 的结果自动带入到后面的任务中,下图为执行结果
三、于链结任务中带入参数
这边要特别注意的是,只要前一个任务有回传资料,后面一个任务若要再输入参数的话,就必须在第二个任务当中多增加一个接收参数,否则会跳错
我们可以看到下面这段程式,在 link 任务的时候有多输入了一个参数,但此时我们还没将 show_time 做更改,因此会跳出错误讯息告知你多了一个参数
from task import hello_world, show_timeres = hello_world.apply_async(("nick",), link=show_time.s("andy"))
我们简单将 show_time 做一下更改,并重启 worker,然后再利用刚刚那段程式送一次任务
注意: 任务有任何的修改,worker 都需要重新启动,才会正常被读取修改后的任务
@app.taskdef show_time(name: str, name_2: str): return f"{name} 以及 {name_2} 您好,现在时间为 {datetime.now()}"
四、使用 list 建立任务鍊
link 除了接收单个 task 外,也接受 list 型态,可以将多个任务利用 list 串联,我们先多建立一个任务,另外要注意的是,如果前一个任务有 return 值回 worker,celery 一定会将该 value 带入下一个 link task 当中,因此若确定下一个任务没有要接收任何参数,还是加上 *args or **kwargs
来接收,避免跳错
@app.taskdef show_message(*args, **kwargs): return "非常欢迎您使用 Celery"
利用下面的程式送出任务
from task import hello_world, show_time, show_messagetask_list = [show_time.s("andy"), show_message.s()]res = hello_world.apply_async(("nick",), link=task_list)
下图为测试结果
五、link_error
在 celery 当中也有提供 link_error 来做为当错误发生时,该进行的任务,该参数使用方式同 link,但笔者在进行测试的时候发现有问题,有时候会跳出来,有时候则不会,或是会产生一些奇怪的错误,上网搜寻了一下,发现好像是已知问题,若后续官方有将此 bug 修复或是有查到其他资料,会在上来补充,如果有知道的大神希望能留言帮小弟补充一下,会再将文章修改,非常感谢!!