这边记录了该如何建立一个简易的 Dag,如果有错误或更好的写法,欢迎留言讨论
一、Dag 定义
就笔者从官方文件的理解,每个 Dag 可以代表是一个要执行的任务,而每个 Dag 里面会有许多个 Operator 可以进行各个流程该做的事情,简单来讲可以理解成 Task 底下有多个 Sub Task,而这些 Sub Task 串连起来就变成一个 Dag
二、利用 with 建立 Dag
(一)、import 套件
from airflow import DAGfrom airflow.operators.empty import EmptyOperatorfrom airflow.operators.bash_operator import BashOperatorfrom datetime import datetime
在 airflow 当中提供许多种 Operator 给我们进行各种功能操作,详细种类的 Operator 可以参考 这个网址 本文主要使用 EmptyOperator 以及 BashOperator 来进行範例撰写
(二)、EmptyOperator 以及 BashOperator
EmptyOperator:顾名思义,就是一个空的流程,常用来製作开头 or 结束BashOperator:可以协助我们对电脑发出 Linux 指令补充:每个 Operator 都会有 task_id 属性,此属性可以指定该流程的名称
start_task = EmptyOperator(task_id="start_task")end_task = EmptyOperator(task_id="end_task")first_task = BashOperator(task_id="first_task", bash_command=f"echo execute time: {datetime.now()}")
(三)、利用 with 建立 Dag
按照如下图的範例,我们可以建立一个名为 first_dag 的任务,并且指派 tags 属性,方便我们在 UI 上进行查询
dag = DAG(dag_id="first_dag", tags=['user'], start_date=datetime.today())with dag: start_task = EmptyOperator(task_id="start_task") end_task = EmptyOperator(task_id="end_task") first_task = BashOperator(task_id="first_task", bash_command=f"echo execute time: {datetime.now()}")
下图为透过上述程式码建立出的 Dag,如果是使用官方的 docker-compose.yaml 建立环境的话,会出现许多範例 Dag可以善用 tags 来进行过滤
注意,此时还没有将任务流程进行串联
(四)、串联 Operator
我们可以透过 >>
符号来将各个 Operator 进行串联,下方为完整程式码,串联完成后,即可回到 UI 介面进行重新整理,后面的步骤会教学如何运行任务以及查看 Log
from airflow import DAGfrom airflow.operators.empty import EmptyOperatorfrom airflow.operators.bash_operator import BashOperatorfrom datetime import datetimedag = DAG(dag_id="first_dag", tags=['user'], start_date=datetime.today())with dag: start_task = EmptyOperator(task_id="start_task") end_task = EmptyOperator(task_id="end_task") first_task = BashOperator(task_id="first_task", bash_command=f"echo execute time: {datetime.now()}") start_task >> first_task >> end_task
三、执行 Dag 及查看 Log
(一)、执行 Dag
透过 UI 将 Dag 设定为 unpause 状态点选右方三角形的执行符号,选择Trigger DAG
选项

(二)、查看 Log
在 home 页面点选 Dag 可以查看该 Dag 的详细状况左下角红色框框部分会列出串联在该 Dag 内的 Operator,按照上面的程式码依序为start_task > first_task > end_task


execute time: {datetime.now()}

四、使用 decorator 建立 Dag
在 airflow 2.x 版后,提供了利用 decorator 的方式来建立 Dag,下方为简易範例
特别注意:
使用此方法建立 Dag 后,需要于外部将该 Dag 的函式丢给一个变数,airflow 才可以抓到该 Dag
from airflow.decorators import dagfrom airflow.operators.empty import EmptyOperatorfrom airflow.operators.bash_operator import BashOperatorfrom datetime import datetime@dag(tags=['user'], start_date=datetime.today())def simple_dag(): start_task = EmptyOperator(task_id="start_task") end_task = EmptyOperator(task_id="end_task") first_task = BashOperator(task_id="first_task", bash_command=f"echo execute time: {datetime.now()}") start_task >> first_task >> end_taskmy_dag = simple_dag()