Python & Airflow 学习笔记_SQLAlchemyOperator

在 Airflow 当中有提供 PostgresOperator 这个东西,他可以直接对资料库进行 SQL 指令的执行,不过对于已经用习惯 ORM 方式操作资料库的人,实在是有点难受,笔者这边有在网路上找到有大神写了一个 SQLAlchemyOperator 可以在 Airflow 当中进行 sqlalchemy 的操作与使用,下方为简易的说明

一、参考连结与资料库连线

参考连结:
https://sorokin.engineer/posts/en/apache_airflow_sqlalchemy_operator.html资料库设定:
这篇文章 第六点

程式仅用于参考、教学用途,若有问题,请留言告知,非常感谢

二、建立 SQLAlchemyOperator

(一)、专案目录架构

附上简单的目录架构

models 用来存放 SQLAlchemy 定义好的资料表结构operators 用来存放客製的 operator,例如本次使用的 SQLAlchemyOperator
http://img2.58codes.com/2024/201440246LZjuUzxes.jpg

(二)、SQLAlchemyOperator 撰写

运作逻辑:

利用 airflow 内提供的 PostgresHook 建立一个取得 session 的函式继承 PythonOperator 并于 init 当中继承所有属性以及添加 conn_id 属性修改 execute_callable 函式使其在呼叫 python function 的时候,会传入一个名为 session 的物件
"""ref:https://sorokin.engineer/posts/en/apache_airflow_sqlalchemy_operator.html"""from airflow.operators.python import PythonOperatorfrom airflow.utils.decorators import apply_defaultsfrom sqlalchemy.orm import sessionmaker, Sessionfrom airflow.hooks.postgres_hook import PostgresHookdef get_session(conn_id: str) -> Session:    hook = PostgresHook(postgres_conn_id=conn_id)    engine = hook.get_sqlalchemy_engine()    return sessionmaker(bind=engine)()class SQLAlchemyOperator(PythonOperator):    @apply_defaults    def __init__(            self,            conn_id: str,            *args, **kwargs):        self.conn_id = conn_id        super().__init__(*args, **kwargs)    def execute_callable(self):        session = get_session(self.conn_id)        try:            result = self.python_callable(*self.op_args,                                           session=session,                                           **self.op_kwargs)        except Exception:            session.rollback()            raise        session.commit()        return result

三、实际操作

(一)、import 套件

备注: Exhibitions 为透过 sqlalchemy 建立的资料表,不清楚的人可以参考 这篇文章

from airflow.decorators import dagfrom operators.sqlalchemy_operator import SQLAlchemyOperatorfrom airflow.utils.trigger_rule import TriggerRulefrom airflow.operators.empty import EmptyOperatorfrom datetime import datetimefrom sqlalchemy.orm import Sessionfrom models.exhibitions import Exhibitions, Basefrom sqlalchemy import inspect

(二)、定义会使用到的名称

class Settings:    conn_id = "postgres_connection"    start_task_id = "start_task"    end_task_id = "end_task"    create_table_task_id = "create_table"

(三)、建立 create_table 函式

我们可以看到,在参数的部分有準备了一个 session 变数,并且提示为 Session 物件,此函式接到 session 后即会开始执行建立资料表的动作,这部分就不赘述了

def create_table(session: Session):    insp = inspect(session.get_bind())    if not insp.has_table(Exhibitions.__tablename__):        Base.metadata.tables[Exhibitions.__tablename__].create(            session.get_bind())        print("资料表建立成功")

(四)、建立 dag 并呼叫 create_table 函式

透过使用 SQLAlchemyOperator 建立 operator 在呼叫指定函式时,会自动传入一个 session 物件,所以在对应的函示当中必须準备一个名为 session 的参数,或是使用 **kwargs 来接,才不会造成错误

@dag(start_date=datetime.today(), tags=['user'])def create_table_dag():    start_task = EmptyOperator(task_id=Settings.start_task_id)    create_table_task = SQLAlchemyOperator(task_id=Settings.create_table_task_id,                                           python_callable=create_table,                                           conn_id=Settings.conn_id,                                           trigger_rule=TriggerRule.ALWAYS)    end_task = EmptyOperator(task_id=Settings.end_task_id,                             trigger_rule=TriggerRule.ALWAYS)    start_task >> create_table_task >> end_taskcreate_create_table_dag = create_table_dag()

关于作者: 网站小编

码农网专注IT技术教程资源分享平台,学习资源下载网站,58码农网包含计算机技术、网站程序源码下载、编程技术论坛、互联网资源下载等产品服务,提供原创、优质、完整内容的专业码农交流分享平台。

热门文章