Python & Airflow 学习笔记_PythonOperator 和 BranchPythonOpera

这篇文章主要是在教学如何在 ariflow 中建立一个可以执行 python function 的 operator,同时也会介绍如何使用 BranchPythonOperator 来进行任务分支的串接,如果有问题欢迎留言讨论

一、PythonOperator

上一篇有提到 BashOperator 这个东西,Bash 主要是在 linux 下命令的一个功能,而 PythonOperator 顾名思义就是一个可以执行 python function 的一个 operator 了,在这个 operator 当中提供了 python_callable 参数,用来连结要执行的 python function,下面为建立範例的步骤

(一)、import 套件

import requestsfrom airflow.decorators import dagfrom airflow.operators.empty import EmptyOperatorfrom airflow.operators.python import PythonOperator, BranchPythonOperatorfrom airflow.utils.trigger_rule import TriggerRulefrom datetime import datetime

(二)、建立 task id class

这边是这篇文章比较特殊的部分,由于后面会介绍到 BranchPythonOperator,所以笔者这边利用 class 来储存本次会用到的所有 task id,这样在撰写程式上面比较快速也可以防止手残打错字

class TaskID:    start_task_id = "start_task"    end_task_id = "end_task"    crawler_task_id = "crawler_task"    success_task_id = "crawl_success"    failed_task_id = "crawl_failed"

(三)、建立简易 python 爬虫

这边撰写了一个简易的爬虫,爬取目标为 批踢踢实业坊 的热门看板,不做任何解析,只单纯判断 status_code 是否为 200,并印出 html code

def crawl_ptt():    url = "https://www.ptt.cc/bbs/index.html"    res = requests.get(url=url)    if res.status_code == 200:        print(res.text)

(四)、建立 DAG

可以看到在 PythonOperator 当中将第三步写的 crawl_ptt 函式丢进去 python_callable 这个参数里面,这样当 airflow 执行到这个 operator 的时候就会自动地去执行 crawl_ptt 里面的内容

@dag(start_date=datetime.today(), tags=['user'])def python_operator_demo_dag():    start_task = EmptyOperator(task_id=TaskID.start_task_id)    crawl_task = PythonOperator(task_id=TaskID.crawler_task_id,                                python_callable=crawl_ptt)    end_task = EmptyOperator(task_id=TaskID.end_task_id)    start_task >> crawl_task >> end_task    create_python_dag = python_operator_demo_dag()

(五)、成果展示

这边直接进入到 crawl_task 这个 operator 的执行 Log 当中察看结果,如果不清楚怎么查看 Log 的人,可以参考 这篇文章 第三点
http://img2.58codes.com/2024/201440249TSwjRYGHk.jpg

二、BranchPythonOperator

有时候根据程式执行的结果不同,会需要处理不同的事情,例如爬虫成功时应该做资料写入,失败时应该做 log 输出 or 通知使用整等程序,airflow 提供了 BranchPythonOperator 来替我们完成这项工作,下面为简易的使用方式

P.S. import 套件以及 task id class 的部分会沿用上面範例做使用,下面不再赘述

(一)、建立爬虫函式

可以看到在下面的函式当中,根据爬虫执行的情况正常与否,会回传出不同的 task id 让 airflow 知道该执行哪个 operator

def crawl_ptt_branch():    url = "https://www.ptt.cc/bbs/index.html"    res = requests.get(url=url)    if res.status_code == 200:        return TaskID.success_task_id    else:        return TaskID.failed_task_id

(二)、建立成功与失败函式

下面为简单的範例,主要用在 demo 当中,没有写很複杂的程式,单纯印出字串提示使用者,在 dag 当中会利用 PythonOperator 进行串接使用

def success_crawl():    print("爬取成功")def failed_crawl():    print("爬取失败")

(三)、建立 DAG

在 BranchPythonOperator 之后有可能会被执行的 PythonOperator,可以利用 [] 包起来表示,airflow 会自动根据回传出来的 task_id 进行判断要执行哪个 operator,在下面的成果展示执行纪录当中,可以看到 failed operator 为粉红色的 skipped 状态,只有执行到 success operator 内的函式

P.S. end_task 内有一个参数 trigger_rule 为触发条件,本次设定为没有失败就会触发,后续会发一篇文章介绍使用方式

@dag(start_date=datetime.today(), tags=['user'])def branch_operator_demo_dag():    start_task = EmptyOperator(task_id=TaskID.start_task_id)    crawl_task = BranchPythonOperator(task_id=TaskID.crawler_task_id,                                      python_callable=crawl_ptt_branch)    success_task = PythonOperator(task_id=TaskID.success_task_id,                                  python_callable=success_crawl)    failed_task = PythonOperator(task_id=TaskID.failed_task_id,                                 python_callable=failed_crawl)    end_task = EmptyOperator(task_id=TaskID.end_task_id,                             trigger_rule=TriggerRule.NONE_FAILED)    start_task >> crawl_task >> [success_task, failed_task] >> end_task    create_branch_python_dag = branch_operator_demo_dag()

(四)、成果展示

流程图http://img2.58codes.com/2024/20144024QxQTSDLjQM.jpg执行结果http://img2.58codes.com/2024/20144024OnTTLQNpL0.jpg执行纪录 (请忽略前面测试时的纪录,成功纪录会使用红色框框标注)http://img2.58codes.com/2024/20144024LJgCid5wIN.jpg

关于作者: 网站小编

码农网专注IT技术教程资源分享平台,学习资源下载网站,58码农网包含计算机技术、网站程序源码下载、编程技术论坛、互联网资源下载等产品服务,提供原创、优质、完整内容的专业码农交流分享平台。

热门文章