Python & Airflow 学习笔记_Schedule 以及 Trigger Rule

这篇文章主要是在讨论在 Schedule 的设定以及 Trigger Rule,如果有问题欢迎留言讨论

一、任务排程 Schedule_Interval

在 airflow 当中,我们可以透过设定 DAG 的 schedule_interval 属性来让指定的 DAG 于指定的时间自动执行,而 官方文件 上有提供了一些预设的设定参数,如下图
http://img2.58codes.com/2024/20144024ZOewA8SPZ4.jpg

另外也可以透过 这个网站 来快速的察看自己想要设定任务的执行时间,只要将网页上产出的 crontab 直接以字串型态贴到 schedule_interval 内即可

下面附上範例程式

@dag(start_date=datetime.today(), tags=['user'], schedule_interval="@daily")def python_operator_demo_dag():    start_task = EmptyOperator(task_id=TaskID.start_task_id)    crawl_task = PythonOperator(task_id=TaskID.crawler_task_id,                                python_callable=crawl_ptt)    end_task = EmptyOperator(task_id=TaskID.end_task_id)    start_task >> crawl_task >> end_taskcreate_python_dag = python_operator_demo_dag()

上述程式码可以看到我们在 dag 的 decorator 当中设定了 schedule_interval 参数,让任务于每天的 00:00 自动执行一次,对于使用建立 DAG 物件方式的朋友也同样要设定相同的属性才会有自动执行的作用

http://img2.58codes.com/2024/201440242z9G2U2Jou.jpg

二、触发条件 Trigger Rule

在 这篇文章 里面有提到 Trigger Rule 这个参数,这个参数主要是用来控制 Operator 的触发条件,直接沿用前篇文章的範例做讲解

在下面的範例当中,可以看到为了让 end_task 会被执行,笔者这边用了 NONE_FAILED 条件,表示当没有任务失败的时候,都会被执行到,确保有一个完整的运行流程,当然 EmptyOperator 本身并不会执行任何事情,就单纯只是写个流程图好看用的

@dag(start_date=datetime.today(), tags=['user'])def branch_operator_demo_dag():    start_task = EmptyOperator(task_id=TaskID.start_task_id)    crawl_task = BranchPythonOperator(task_id=TaskID.crawler_task_id,                                      python_callable=crawl_ptt_branch)    success_task = PythonOperator(task_id=TaskID.success_task_id,                                  python_callable=success_crawl)    failed_task = PythonOperator(task_id=TaskID.failed_task_id,                                 python_callable=failed_crawl)    end_task = EmptyOperator(task_id=TaskID.end_task_id,                             trigger_rule=TriggerRule.NONE_FAILED)    start_task >> crawl_task >> [success_task, failed_task] >> end_task

为了避免打错字我们可以利用 airflow 套件本身提供的 class 进行 trigger_rule 的设定,直接利用下方程式进行 import
from airflow.utils.trigger_rule import TriggerRule

进去套件里面查看,可以发现 TriggerRule 这个 class 有提供一些预设的触发条件,下方程式码为 airflow 套件内部的程式码,可以去 这个网站 进行下载以及查看

class TriggerRule(str, Enum):    ALL_SUCCESS = 'all_success'    ALL_FAILED = 'all_failed'    ALL_DONE = 'all_done'    ONE_SUCCESS = 'one_success'    ONE_FAILED = 'one_failed'    NONE_FAILED = 'none_failed'    NONE_FAILED_OR_SKIPPED = 'none_failed_or_skipped'    NONE_SKIPPED = 'none_skipped'    DUMMY = 'dummy'    ALWAYS = 'always'    NONE_FAILED_MIN_ONE_SUCCESS = "none_failed_min_one_success"    ALL_SKIPPED = 'all_skipped'

基本上预设的条件意思都是很简单的英文,直接进行英翻中即可对应,至于使用方式则是直接 import 这个 class 后直接坐使用即可,如下範例

end_task = EmptyOperator(task_id=TaskID.end_task_id,                         trigger_rule=**TriggerRule.NONE_FAILED**)

关于作者: 网站小编

码农网专注IT技术教程资源分享平台,学习资源下载网站,58码农网包含计算机技术、网站程序源码下载、编程技术论坛、互联网资源下载等产品服务,提供原创、优质、完整内容的专业码农交流分享平台。

热门文章