在 Airflow 当中有提供 PostgresOperator 这个东西,他可以直接对资料库进行 SQL 指令的执行,不过对于已经用习惯 ORM 方式操作资料库的人,实在是有点难受,笔者这边有在网路上找到有大神写了一个 SQLAlchemyOperator 可以在 Airflow 当中进行 sqlalchemy 的操作与使用,下方为简易的说明
一、参考连结与资料库连线
参考连结:https://sorokin.engineer/posts/en/apache_airflow_sqlalchemy_operator.html资料库设定:
这篇文章 第六点
程式仅用于参考、教学用途,若有问题,请留言告知,非常感谢
二、建立 SQLAlchemyOperator
(一)、专案目录架构
附上简单的目录架构
models 用来存放 SQLAlchemy 定义好的资料表结构operators 用来存放客製的 operator,例如本次使用的 SQLAlchemyOperator
(二)、SQLAlchemyOperator 撰写
运作逻辑:
利用 airflow 内提供的 PostgresHook 建立一个取得 session 的函式继承 PythonOperator 并于 init 当中继承所有属性以及添加 conn_id 属性修改execute_callable
函式使其在呼叫 python function 的时候,会传入一个名为 session 的物件"""ref:https://sorokin.engineer/posts/en/apache_airflow_sqlalchemy_operator.html"""from airflow.operators.python import PythonOperatorfrom airflow.utils.decorators import apply_defaultsfrom sqlalchemy.orm import sessionmaker, Sessionfrom airflow.hooks.postgres_hook import PostgresHookdef get_session(conn_id: str) -> Session: hook = PostgresHook(postgres_conn_id=conn_id) engine = hook.get_sqlalchemy_engine() return sessionmaker(bind=engine)()class SQLAlchemyOperator(PythonOperator): @apply_defaults def __init__( self, conn_id: str, *args, **kwargs): self.conn_id = conn_id super().__init__(*args, **kwargs) def execute_callable(self): session = get_session(self.conn_id) try: result = self.python_callable(*self.op_args, session=session, **self.op_kwargs) except Exception: session.rollback() raise session.commit() return result
三、实际操作
(一)、import 套件
备注: Exhibitions 为透过 sqlalchemy 建立的资料表,不清楚的人可以参考 这篇文章
from airflow.decorators import dagfrom operators.sqlalchemy_operator import SQLAlchemyOperatorfrom airflow.utils.trigger_rule import TriggerRulefrom airflow.operators.empty import EmptyOperatorfrom datetime import datetimefrom sqlalchemy.orm import Sessionfrom models.exhibitions import Exhibitions, Basefrom sqlalchemy import inspect
(二)、定义会使用到的名称
class Settings: conn_id = "postgres_connection" start_task_id = "start_task" end_task_id = "end_task" create_table_task_id = "create_table"
(三)、建立 create_table 函式
我们可以看到,在参数的部分有準备了一个 session 变数,并且提示为 Session 物件,此函式接到 session 后即会开始执行建立资料表的动作,这部分就不赘述了
def create_table(session: Session): insp = inspect(session.get_bind()) if not insp.has_table(Exhibitions.__tablename__): Base.metadata.tables[Exhibitions.__tablename__].create( session.get_bind()) print("资料表建立成功")
(四)、建立 dag 并呼叫 create_table 函式
透过使用 SQLAlchemyOperator 建立 operator 在呼叫指定函式时,会自动传入一个 session 物件,所以在对应的函示当中必须準备一个名为 session 的参数,或是使用 **kwargs
来接,才不会造成错误
@dag(start_date=datetime.today(), tags=['user'])def create_table_dag(): start_task = EmptyOperator(task_id=Settings.start_task_id) create_table_task = SQLAlchemyOperator(task_id=Settings.create_table_task_id, python_callable=create_table, conn_id=Settings.conn_id, trigger_rule=TriggerRule.ALWAYS) end_task = EmptyOperator(task_id=Settings.end_task_id, trigger_rule=TriggerRule.ALWAYS) start_task >> create_table_task >> end_taskcreate_create_table_dag = create_table_dag()