这篇文章主要是在讨论在 Schedule 的设定以及 Trigger Rule,如果有问题欢迎留言讨论
一、任务排程 Schedule_Interval
在 airflow 当中,我们可以透过设定 DAG 的 schedule_interval
属性来让指定的 DAG 于指定的时间自动执行,而 官方文件 上有提供了一些预设的设定参数,如下图
另外也可以透过 这个网站 来快速的察看自己想要设定任务的执行时间,只要将网页上产出的 crontab 直接以字串型态贴到 schedule_interval 内即可
下面附上範例程式
@dag(start_date=datetime.today(), tags=['user'], schedule_interval="@daily")def python_operator_demo_dag(): start_task = EmptyOperator(task_id=TaskID.start_task_id) crawl_task = PythonOperator(task_id=TaskID.crawler_task_id, python_callable=crawl_ptt) end_task = EmptyOperator(task_id=TaskID.end_task_id) start_task >> crawl_task >> end_taskcreate_python_dag = python_operator_demo_dag()
上述程式码可以看到我们在 dag 的 decorator 当中设定了 schedule_interval 参数,让任务于每天的 00:00 自动执行一次,对于使用建立 DAG 物件方式的朋友也同样要设定相同的属性才会有自动执行的作用
二、触发条件 Trigger Rule
在 这篇文章 里面有提到 Trigger Rule 这个参数,这个参数主要是用来控制 Operator 的触发条件,直接沿用前篇文章的範例做讲解
在下面的範例当中,可以看到为了让 end_task 会被执行,笔者这边用了 NONE_FAILED 条件,表示当没有任务失败的时候,都会被执行到,确保有一个完整的运行流程,当然 EmptyOperator 本身并不会执行任何事情,就单纯只是写个流程图好看用的
@dag(start_date=datetime.today(), tags=['user'])def branch_operator_demo_dag(): start_task = EmptyOperator(task_id=TaskID.start_task_id) crawl_task = BranchPythonOperator(task_id=TaskID.crawler_task_id, python_callable=crawl_ptt_branch) success_task = PythonOperator(task_id=TaskID.success_task_id, python_callable=success_crawl) failed_task = PythonOperator(task_id=TaskID.failed_task_id, python_callable=failed_crawl) end_task = EmptyOperator(task_id=TaskID.end_task_id, trigger_rule=TriggerRule.NONE_FAILED) start_task >> crawl_task >> [success_task, failed_task] >> end_task
为了避免打错字我们可以利用 airflow 套件本身提供的 class 进行 trigger_rule
的设定,直接利用下方程式进行 importfrom airflow.utils.trigger_rule import TriggerRule
进去套件里面查看,可以发现 TriggerRule 这个 class 有提供一些预设的触发条件,下方程式码为 airflow
套件内部的程式码,可以去 这个网站 进行下载以及查看
class TriggerRule(str, Enum): ALL_SUCCESS = 'all_success' ALL_FAILED = 'all_failed' ALL_DONE = 'all_done' ONE_SUCCESS = 'one_success' ONE_FAILED = 'one_failed' NONE_FAILED = 'none_failed' NONE_FAILED_OR_SKIPPED = 'none_failed_or_skipped' NONE_SKIPPED = 'none_skipped' DUMMY = 'dummy' ALWAYS = 'always' NONE_FAILED_MIN_ONE_SUCCESS = "none_failed_min_one_success" ALL_SKIPPED = 'all_skipped'
基本上预设的条件意思都是很简单的英文,直接进行英翻中即可对应,至于使用方式则是直接 import 这个 class 后直接坐使用即可,如下範例
end_task = EmptyOperator(task_id=TaskID.end_task_id, trigger_rule=**TriggerRule.NONE_FAILED**)