这篇文章主要是在教学如何在 ariflow 中建立一个可以执行 python function 的 operator,同时也会介绍如何使用 BranchPythonOperator 来进行任务分支的串接,如果有问题欢迎留言讨论
一、PythonOperator
上一篇有提到 BashOperator 这个东西,Bash 主要是在 linux 下命令的一个功能,而 PythonOperator 顾名思义就是一个可以执行 python function 的一个 operator 了,在这个 operator 当中提供了 python_callable
参数,用来连结要执行的 python function,下面为建立範例的步骤
(一)、import 套件
import requestsfrom airflow.decorators import dagfrom airflow.operators.empty import EmptyOperatorfrom airflow.operators.python import PythonOperator, BranchPythonOperatorfrom airflow.utils.trigger_rule import TriggerRulefrom datetime import datetime
(二)、建立 task id class
这边是这篇文章比较特殊的部分,由于后面会介绍到 BranchPythonOperator,所以笔者这边利用 class 来储存本次会用到的所有 task id,这样在撰写程式上面比较快速也可以防止手残打错字
class TaskID: start_task_id = "start_task" end_task_id = "end_task" crawler_task_id = "crawler_task" success_task_id = "crawl_success" failed_task_id = "crawl_failed"
(三)、建立简易 python 爬虫
这边撰写了一个简易的爬虫,爬取目标为 批踢踢实业坊 的热门看板,不做任何解析,只单纯判断 status_code 是否为 200,并印出 html code
def crawl_ptt(): url = "https://www.ptt.cc/bbs/index.html" res = requests.get(url=url) if res.status_code == 200: print(res.text)
(四)、建立 DAG
可以看到在 PythonOperator 当中将第三步写的 crawl_ptt 函式丢进去 python_callable 这个参数里面,这样当 airflow 执行到这个 operator 的时候就会自动地去执行 crawl_ptt 里面的内容
@dag(start_date=datetime.today(), tags=['user'])def python_operator_demo_dag(): start_task = EmptyOperator(task_id=TaskID.start_task_id) crawl_task = PythonOperator(task_id=TaskID.crawler_task_id, python_callable=crawl_ptt) end_task = EmptyOperator(task_id=TaskID.end_task_id) start_task >> crawl_task >> end_task create_python_dag = python_operator_demo_dag()
(五)、成果展示
这边直接进入到 crawl_task 这个 operator 的执行 Log 当中察看结果,如果不清楚怎么查看 Log 的人,可以参考 这篇文章 第三点
二、BranchPythonOperator
有时候根据程式执行的结果不同,会需要处理不同的事情,例如爬虫成功时应该做资料写入,失败时应该做 log 输出 or 通知使用整等程序,airflow 提供了 BranchPythonOperator 来替我们完成这项工作,下面为简易的使用方式
P.S. import 套件以及 task id class 的部分会沿用上面範例做使用,下面不再赘述
(一)、建立爬虫函式
可以看到在下面的函式当中,根据爬虫执行的情况正常与否,会回传出不同的 task id 让 airflow 知道该执行哪个 operator
def crawl_ptt_branch(): url = "https://www.ptt.cc/bbs/index.html" res = requests.get(url=url) if res.status_code == 200: return TaskID.success_task_id else: return TaskID.failed_task_id
(二)、建立成功与失败函式
下面为简单的範例,主要用在 demo 当中,没有写很複杂的程式,单纯印出字串提示使用者,在 dag 当中会利用 PythonOperator 进行串接使用
def success_crawl(): print("爬取成功")def failed_crawl(): print("爬取失败")
(三)、建立 DAG
在 BranchPythonOperator 之后有可能会被执行的 PythonOperator,可以利用 [] 包起来表示,airflow 会自动根据回传出来的 task_id 进行判断要执行哪个 operator,在下面的成果展示执行纪录当中,可以看到 failed operator 为粉红色的 skipped 状态,只有执行到 success operator 内的函式
P.S. end_task 内有一个参数 trigger_rule 为触发条件,本次设定为没有失败就会触发,后续会发一篇文章介绍使用方式
@dag(start_date=datetime.today(), tags=['user'])def branch_operator_demo_dag(): start_task = EmptyOperator(task_id=TaskID.start_task_id) crawl_task = BranchPythonOperator(task_id=TaskID.crawler_task_id, python_callable=crawl_ptt_branch) success_task = PythonOperator(task_id=TaskID.success_task_id, python_callable=success_crawl) failed_task = PythonOperator(task_id=TaskID.failed_task_id, python_callable=failed_crawl) end_task = EmptyOperator(task_id=TaskID.end_task_id, trigger_rule=TriggerRule.NONE_FAILED) start_task >> crawl_task >> [success_task, failed_task] >> end_task create_branch_python_dag = branch_operator_demo_dag()
(四)、成果展示
流程图

