Airflow 動手玩:(二)動手寫 DAG


在上一篇中簡介了 Airflow 如何操作,接下來我們要更深入的認識 DAG,並且讓 Airflow 執行我們自己寫的 DAG,在開始前,請大家記得把 WebServer 及 Scheduler 先開起來。

DAG & Task

一個 DAG 是由多個 Tasks 組成,每個 Task 是分開執行的,Task 是 Airflow 執行基礎單位。

DAG

在 Graph View 頁面,可以看到每個 DAG 都是由多個 Tasks 組成,所以代表我們待會在寫 DAG 時,除了宣告 DAG 物件代表不同的 DAG,同時也會宣告很多 Task 物件來組成 DAG 要執行的內容。
Graph View

Task

在 Tree View 頁面的右邊,可以看 Task 的狀態有很多種,如果仔細觀察的話,每個 Task 都是要先經過 queued 的狀態,才會進到 Running 的狀態,而每個 Task 之間又存在著相依的關係,像是上圖中的 run_after_loop 要等 runme_0, runme_1 及 runme_2 都成功執行後,才會開始執行,這代表每一個 Task 都是分開執行的,因為每個 Task 都是分開進入 queue 的。

每個 Task 都是分開執行的,也意味著,即使把 Task 寫在同一份 Python file 裡,也無法透過 Global variable 傳遞 Task 執行的結果,因為每一次執行的 context 是不一樣的。那不同的 Task 之間要如何傳遞執行中產生的變數呢?Airflow 提供了 XCom 功能,將執行的結果存在 Database 裡,供後面執行的 Task 調用前面 Task 執行的結果,在待會的範例中,也會示範如何在 Task 裡面將想傳遞的變數存入 XCom
Task 的不同狀態

SubDAG

除了 DAG 及 Task 之外,Airflow 還提供了另一種 DAG 稱作 SubDAG,顧名思義,SubDAG 代表在 DAG 底下,所以也可以用於組成 DAG,但為什麼要有 SubDAG 呢?想像一下,如果有一組不斷重複的 Tasks 出現在 DAG 中,像是下面的 section tasks 1~5 重複了兩次,有沒有辦法用模組化的方式,讓我們不用重複好幾次這段 Code 呢?
重複的 Section tasks。圖片來源自 Airflow Document。
方法就是透過 SubDAG,像是下面這張圖將 section tasks 包裝成 SubDAG。在這次的範例中雖然不會介紹 SubDAG 如何實作,有興趣的話可以至 Airflow Document 查看如何使用。

用 SubDAG 將 section 模組化。圖片來源自 Airflow Document。

實作 DAG

回到 WebServer 或是 Scheduler 的 Terminal 查看,會發現他們都有一行 log 是 INFO - Searching for files in {AIRFLOW_HOME}/dags,代表 Airflow 會在 {AIRFLOW_HOME}/dags 找尋使用者寫的 DAG,所以我們先回到 AIRFLOW_HOME 建立 dags 資料夾。

> mkdir ~/airflow/dags
> touch ~/airflow/dags/my_dag.py

目標

這次實作的 DAG 是取得時間戳後,存入 XCom,然後判斷時間戳是不是偶數,如果是的話就存入 redis,如果不是的話就不做任何事。所以在開始之前,我們要先安裝好 redis,以及在 airflow 中使用 redis 的相依套件。

> brew install redis
> pip install apache-airflow[redis]

安裝好 redis 後,可以透過 redis-server 開始 redis。

> redis-server
20741:C 25 Feb 2020 00:25:21.690 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
20741:C 25 Feb 2020 00:25:21.690 # Redis version=5.0.7, bits=64, commit=00000000, modified=0, pid=20741, just started
20741:C 25 Feb 2020 00:25:21.690 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf
20741:M 25 Feb 2020 00:25:21.691 * Increased maximum number of open files to 10032 (it was originally set to 256).
                _._
           _.-``__ ''-._
      _.-``    `.  `_.  ''-._           Redis 5.0.7 (00000000/0) 64 bit
  .-`` .-```.  ```\/    _.,_ ''-._
 (    '      ,       .-`  | `,    )     Running in standalone mode
 |`-._`-...-` __...-.``-._|'` _.-'|     Port: 6379
 |    `-._   `._    /     _.-'    |     PID: 20741
  `-._    `-._  `-./  _.-'    _.-'
 |`-._`-._    `-.__.-'    _.-'_.-'|
 |    `-._`-._        _.-'_.-'    |           http://redis.io
  `-._    `-._`-.__.-'_.-'    _.-'
 |`-._`-._    `-.__.-'    _.-'_.-'|
 |    `-._`-._        _.-'_.-'    |
  `-._    `-._`-.__.-'_.-'    _.-'
      `-._    `-.__.-'    _.-'
          `-._        _.-'
              `-.__.-'

20741:M 25 Feb 2020 00:25:21.692 # Server initialized
20741:M 25 Feb 2020 00:25:21.692 * DB loaded from disk: 0.000 seconds
20741:M 25 Feb 2020 00:25:21.692 * Ready to accept connections

Default Arguments

Default Arguments 待會會透過 DAG 傳遞給每個 Task,我們也可以在 Task 裡面再次指定不同的值,但透過 Default Arguments,我們統一傳遞個預設參數,對於開發會比較方便。

default_args = {
    'owner': 'someone', # DAG 擁有者的名稱,如上一篇說明的,通常是負責實作這個 DAG 的人員名稱
    'depends_on_past': False, # 每一次執行的 Task 是否會依賴於上次執行的 Task,如果是 False 的話,代表上次的 Task 如果執行失敗,這次的 Task 就不會繼續執行
    'start_date': datetime(2020, 2, 24), # Task 從哪個日期後開始可以被 Scheduler 排入排程
    'email': ['airflow@example.com'], # 如果 Task 執行失敗的話,要寄信給哪些人的 email
    'email_on_failure': False, # 如果 Task 執行失敗的話,是否寄信
    'email_on_retry': False, # 如果 Task 重試的話,是否寄信
    'retries': 1, # 最多重試的次數
    'retry_delay': timedelta(minutes=5), # 每次重試中間的間隔
    # 'end_date': datetime(2020, 2, 29), # Task 從哪個日期後,開始不被 Scheduler 放入排程
    # 'execution_timeout': timedelta(seconds=300), # Task 執行時間的上限
    # 'on_failure_callback': some_function, # Task 執行失敗時,呼叫的 function
    # 'on_success_callback': some_other_function, # Task 執行成功時,呼叫的 function
    # 'on_retry_callback': another_function, # Task 重試時,呼叫的 function
}

DAG Object

在創建 DAG Object 時,要注意每個 dag_id 都是唯一的,schedule_interval 可以設定 DAG 多久執行一次。

dag = DAG(
    dag_id='my_dag',
    description='my dag',
    default_args=default_args,
    schedule_interval='*/1 * * * *'
)

Operators

做好 DAG 的基礎設定後,我們就可以決定要用哪些 Tasks 組成這個 DAG 的內容,這裡要介紹一個新的名詞是 Operator,Operator 用來定義 Task,有些 Task 的功能是執行 Bash 指令、有些則是執行 Python,當然也可以透過 Python Operator 執行全部的事,但透過不同功能的 Operator 來定義 Task,在實作上會更容易些。

第一個 Task 是取得時間戳,並把結果放入 XCom ,這個 Task 透過 BashOperator 執行 bash 指令完成,在 BashOperator 設定 xcom_push=True,可以將 bash 執行的成果放入 XCom

get_timestamp = BashOperator(
    task_id='get_timestamp',
    bash_command='date +%s',
    xcom_push=True,
    dag=dag
)

有了時間戳之後,接下來要從 XCom 拿到時間戳,並判斷要執行寫入 redis,還是什麼都不做。這裡的功能比較簡單,可以在 Python Operator 裡用 if-else 就能完成,但是有時程式比較複雜,if-else 判斷後,後面可能還要好幾個步驟才能完成,這時單純用 Python Operator 就不合適了,所以這裡介紹 BranchPythonOperator,在 BranchPythonOperator 裡,我們可以透過回傳的文字,決定下一個 Task 要執行什麼。像是如果時間戳是偶數,回傳 store_in_redis,代表執行 store_in_redis 的 Task,反之則執行 skip 的 Task。

branching = BranchPythonOperator(
    task_id='branching',
    python_callable=lambda **context: 'store_in_redis' if int(context['task_instance'].xcom_pull(task_ids='get_timestamp')) % 2 == 0 else 'skip',
    provide_context=True,
    dag=dag,
)

接下來我們先實作把資料存進 redis,這部分我們就可以使用 PythonOperator 了,不過我們要怎麼連到 redis 呢?還記得昨天在介紹 Connections 頁面時,Airflow 已經幫我們創建了很多 Connection 預設的 Credential 嗎?先回到 Connections 頁面尋找 redis_default 的 Connection,這時我們可以點選左邊像筆的圖示標籤,進去修改 redis 的 host。

redis_default 編輯頁面

回到程式,我們可以透過 RedisHook 讀取 redis 的 credential 並跟 redis 進行連線,Airflow 透過這樣的方式,讓我們把程式邏輯跟 Credential 分開,所以我們不再需要透過像是環境變數去設置連線參數。

def set_last_timestamp_in_redis(**context):
    timestamp = context['task_instance'].xcom_pull(task_ids='get_timestamp')
    redis = RedisHook(redis_conn_id='redis_default').get_conn()
    redis.set('last_timestamp', timestamp)


store_in_redis = PythonOperator(
    task_id='store_in_redis',
    python_callable=set_last_timestamp_in_redis,
    provide_context=True,
    dag=dag
)

什麼事情都不做的情況,Airflow 提供了 DummyOperator,讓我們可以將什麼事情都不做的情況,也設置成一個 Operator。

skip = DummyOperator(
    task_id='skip',
    dag=dag
)

最後,我們雖然宣告了各種 Task,但 Airflow 這時還是不知道怎麼串接我們的 Task,這裏 Airflow 提供了很簡單的方式進行串接。

get_timestamp >> branching >> [store_in_redis, skip]

在這裡我們將 store_in_redisskip 用一個 list 包起來,代表這兩個 Task 都在 branching 之後,但 store_in_redisskip 之間沒有先後關係。

my_dag Graph View

完整程式碼

from datetime import datetime, timedelta


from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator, PythonOperator
from airflow.contrib.hooks.redis_hook import RedisHook
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'someone',
    'depends_on_past': False,
    'start_date': datetime(2020, 2, 24),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'end_date': datetime(2020, 2, 29),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
}


dag = DAG(
    dag_id='my_dag',
    description='my dag',
    default_args=default_args,
    schedule_interval='*/1 * * * *'
)


get_timestamp = BashOperator(
    task_id='get_timestamp',
    bash_command='date +%s',
    xcom_push=True,
    dag=dag
)

branching = BranchPythonOperator(
    task_id='branching',
    python_callable=lambda **context: 'store_in_redis' if int(context['task_instance'].xcom_pull(task_ids='get_timestamp')) % 2 == 0 else 'skip',
    provide_context=True,
    dag=dag,
)


def set_last_timestamp_in_redis(**context):
    timestamp = context['task_instance'].xcom_pull(task_ids='get_timestamp')
    redis = RedisHook(redis_conn_id='redis_default').get_conn()
    redis.set('last_timestamp', timestamp)


store_in_redis = PythonOperator(
    task_id='store_in_redis',
    python_callable=set_last_timestamp_in_redis,
    provide_context=True,
    dag=dag
)


skip = DummyOperator(
    task_id='skip',
    dag=dag
)

get_timestamp >> branching >> [store_in_redis, skip]

Trigger Rules

原本用一份 Script 完成 Data Pipeline 的方式,我們可以用 if-else 判斷前面的執行結果,決定接下來要執行哪段程式,但 Airflow 透過 Task 將一次完整的 Data Pipeline 分開執行,那我們要如何透過前面 Tasks 的狀態,來判斷後面的 Tasks 要不要執行呢?答案是 Trigger Rules,Airflow 可以幫每個 Task 設定 Trigger Rule,讓 Scheduler 判斷要不要將某個 Task 放入排程,所以我們就不需要在 Task 裡,再自己用 if-else 判斷了。在每個 Task 裡,我們都可以透過 trigger_rule 表示我們想設置的 Trigger Rule。

  • all_success: 這是預設的 Trigger Rule,代表某個 Task 的上游 Tasks 的狀態都要是成功,才會執行這個 Task。以第一張圖 run_after_loop 為例,代表 runeme_0, runme_1 及 runme_2 都要成功,才會執行。
  • all_failed: 與 all_success 相反,表示上游 Tasks 的狀態都是失敗時執行,這可以用於處理 exception 狀態。
  • all_done: 代表只要上游 Tasks 完成,不管它們的狀態是成功、失敗或是 skipped,都會執行。
  • one_failed: 上游 Tasks 其中一個失敗就執行,這個 Trigger Rule 不會等上游 Tasks 都完成才執行,而是只要有失敗就立即執行。
  • one_success: 與 one_failed 相反,上游 Tasks 其中一個成功就立即執行。
  • none_failed: none_failedall_success 的差異是,all_success 要上游所有 Tasks 都成功,none_failed 則是上游 Tasks 的狀態都是成功或是 skipped
  • none_skipped: 上游 Tasks 的狀態是成功或是失敗時執行。

總結

今天介紹了 DAG & Task 之間的關係,並且實作了我們的第一個 DAG,在 Task 的參數中我們看到 Airflow 提供了多種情況的處理,像是 on_failure_callback。Airflow 同時也提供了不同功能的 Operator,讓我們可以更方便的定義我們的 Data Pipeline。在連線上,Airflow 透過 Hook 的設計,讓我們可以將程式碼與 Credential 分開。最後在 Task 與 Task 之間多種複雜的關係,Airflow 也提供了 Trigger Rules 設定。明天會進一部探討,如何實作自己的 Operator 與 Hook。

#Airflow #Data Pipeline #Data Cleaning #ETL





Airflow 動手玩
Airflow 是由 Airbnb 開源的 Data Pipeline 軟體,在 2019 年成為了 Apache 軟體基金會 Top-Level 的專案,這個系列文會介紹從如何用 Airflow 管理我們 Data Pipeline 到如何部署在 Kubernetes 上。

留言討論