Python & Airflow 学习笔记_建立简易 Dag

这边记录了该如何建立一个简易的 Dag,如果有错误或更好的写法,欢迎留言讨论

一、Dag 定义

就笔者从官方文件的理解,每个 Dag 可以代表是一个要执行的任务,而每个 Dag 里面会有许多个 Operator 可以进行各个流程该做的事情,简单来讲可以理解成 Task 底下有多个 Sub Task,而这些 Sub Task 串连起来就变成一个 Dag

二、利用 with 建立 Dag

(一)、import 套件

from airflow import DAGfrom airflow.operators.empty import EmptyOperatorfrom airflow.operators.bash_operator import BashOperatorfrom datetime import datetime

在 airflow 当中提供许多种 Operator 给我们进行各种功能操作,详细种类的 Operator 可以参考 这个网址 本文主要使用 EmptyOperator 以及 BashOperator 来进行範例撰写

(二)、EmptyOperator 以及 BashOperator

EmptyOperator:顾名思义,就是一个空的流程,常用来製作开头 or 结束BashOperator:可以协助我们对电脑发出 Linux 指令补充:每个 Operator 都会有 task_id 属性,此属性可以指定该流程的名称http://img2.58codes.com/2024/20144024dpYaPorocl.png
start_task = EmptyOperator(task_id="start_task")end_task = EmptyOperator(task_id="end_task")first_task = BashOperator(task_id="first_task",                          bash_command=f"echo execute time: {datetime.now()}")

(三)、利用 with 建立 Dag

按照如下图的範例,我们可以建立一个名为 first_dag 的任务,并且指派 tags 属性,方便我们在 UI 上进行查询

dag = DAG(dag_id="first_dag", tags=['user'], start_date=datetime.today())with dag:    start_task = EmptyOperator(task_id="start_task")    end_task = EmptyOperator(task_id="end_task")    first_task = BashOperator(task_id="first_task",                              bash_command=f"echo execute time: {datetime.now()}")

下图为透过上述程式码建立出的 Dag,如果是使用官方的 docker-compose.yaml 建立环境的话,会出现许多範例 Dag可以善用 tags 来进行过滤
注意,此时还没有将任务流程进行串联
http://img2.58codes.com/2024/20144024HDAbQYf19d.jpg

(四)、串联 Operator

我们可以透过 >> 符号来将各个 Operator 进行串联,下方为完整程式码,串联完成后,即可回到 UI 介面进行重新整理,后面的步骤会教学如何运行任务以及查看 Log

from airflow import DAGfrom airflow.operators.empty import EmptyOperatorfrom airflow.operators.bash_operator import BashOperatorfrom datetime import datetimedag = DAG(dag_id="first_dag", tags=['user'], start_date=datetime.today())with dag:    start_task = EmptyOperator(task_id="start_task")    end_task = EmptyOperator(task_id="end_task")    first_task = BashOperator(task_id="first_task",                              bash_command=f"echo execute time: {datetime.now()}")    start_task >> first_task >> end_task

三、执行 Dag 及查看 Log

(一)、执行 Dag

透过 UI 将 Dag 设定为 unpause 状态点选右方三角形的执行符号,选择 Trigger DAG 选项
http://img2.58codes.com/2024/20144024xLKa9My0Vf.jpg之后就可以在 Runs 栏位看到该 Dag 的运行状态,从左至右分别为 queued、success、running 以及 failed,而上面的数字代表着数量
http://img2.58codes.com/2024/20144024GwwwG9hYLJ.jpg

(二)、查看 Log

在 home 页面点选 Dag 可以查看该 Dag 的详细状况左下角红色框框部分会列出串联在该 Dag 内的 Operator,按照上面的程式码依序为 start_task > first_task > end_task
http://img2.58codes.com/2024/201440244YlkN277S6.jpg沿着该图表往右方看,可以看到一些有颜色的正方形方块,成功为绿色,失败则为红色点选该方块后右手边会出现详细资料,可以在上方发现 Log 选项,点选即可查看 该 Operator 当次的执行纪录的 Log
http://img2.58codes.com/2024/20144024FDhmhFqJo9.jpg按照上面的範例程式,我们会让该 Operator 在系统中印出 execute time: {datetime.now()}
http://img2.58codes.com/2024/20144024GLEuhjcYOQ.jpg

四、使用 decorator 建立 Dag

在 airflow 2.x 版后,提供了利用 decorator 的方式来建立 Dag,下方为简易範例
特别注意:
使用此方法建立 Dag 后,需要于外部将该 Dag 的函式丢给一个变数,airflow 才可以抓到该 Dag

from airflow.decorators import dagfrom airflow.operators.empty import EmptyOperatorfrom airflow.operators.bash_operator import BashOperatorfrom datetime import datetime@dag(tags=['user'], start_date=datetime.today())def simple_dag():    start_task = EmptyOperator(task_id="start_task")    end_task = EmptyOperator(task_id="end_task")    first_task = BashOperator(task_id="first_task",                              bash_command=f"echo execute time: {datetime.now()}")    start_task >> first_task >> end_taskmy_dag = simple_dag()

关于作者: 网站小编

码农网专注IT技术教程资源分享平台,学习资源下载网站,58码农网包含计算机技术、网站程序源码下载、编程技术论坛、互联网资源下载等产品服务,提供原创、优质、完整内容的专业码农交流分享平台。

热门文章