Airflow 動手玩:(七)Airflow Best Practices


這篇會聊聊 Airflow Best Practices,雖然說是 Best Practices,但是工程不像科學,工程只有適不適合,所以大家並不一定要完全採納,只要適合的就是好的。

Tips

使用 Default Arguments

大多數人第一次寫 DAG,最早碰到的應該就是 Default Arguments,使用 Default Arguments 可以大大減少不斷地在 Task 重複同樣的設定參數。

default_args = {
    'owner': 'someone',
    'depends_on_past': False,
    'start_date': datetime(2020, 2, 24),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'end_date': datetime(2020, 2, 29),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
}

使用 params

雖然在實作中沒講到,但是透過 params 可以傳遞自定義的參數進去 Task,而這個參數如果在很多 Task 都重複時,放在 params 就很方便,因為不用每個 Task 都重複一遍。在 Task 中,我們可以從 context.params 拿到這些參數。

dag = DAG(
    ...,
    params={
        "s3_bucket": "test"
    }
}

使用 Variables,但有限制的使用

一樣在實作中沒講到,但是我們可以在 WebServer 的 Admin → Variables 頁面看到,這裡可以使用 key value 形式創造 Variable。標題寫有限制的使用是因為 value 也可以是 json 的格式,所以我們可以用一個 Variable 就設定好一個 DAG 所需的全部參數。

Variable 頁面

然後在 DAG 裡面,我們可以透過 Variable.get("some key") 拿到 Variable,習慣上還會把 Variable 當成 params,因為像上面的 s3_bucket 可能就會隨環境而變。

dag = DAG(
    ...,
    params=Variable.get("test", deserialize_json=True)
)

使用 Connections 儲存敏感資料

如果從第一篇看過來,應該會覺得用 Connections 儲存連線資訊很正常,但如果還不了解 Airflow 的功能,可能會想要透過環境參數傳連線資訊,所以這裡還是提醒一下用 Connections 儲存敏感資料,除了可以用 Connections 連線之外,如果想在 Task 中拿到敏感資訊也很容易。

from airflow.hooks.base_hook import BaseHook
redis_password = BaseHook.get_connection('redis_default').password

使用 context manager

上面幾點都是有關設定的,接下來看看有關 coding 的。每個 Task 我們都要設定 dag=dag,有時 Task 一多就可能忘記,所以 Airflow 也提供了一個方便的功能 context manager,讓在 context manager 下的 Task 都屬於同一個 DAG。

default_args = {...}
with DAG(dag_id='test', default_args=args, schedule_interval='0/1 0 * * *') as dag:
    task_1 = BashOperator(
        task_id='task_1',
    bash_command='echo "say hi"')

    task_2 = DummyOperator(task_id='task_2')

    task_1 >> task_2

同一個 level 的 Task 使用 List 表示

使用 List 表示,可以簡化程式碼。

get_timestamp >> branching >> [store_in_redis, skip]

使用 List 排列 DAG

總結

感謝大家看到這邊,總算完成這次的寫作松了,可以開瓶慶祝了🍾🍾🍾。不過最重要的還是希望在這七篇中可以更了解如何使用 Airflow!謝謝大家!

資料來源

#Airflow #Data Pipeline #Data Cleaning #ETL
Airflow 動手玩
Airflow 是由 Airbnb 開源的 Data Pipeline 軟體,在 2019 年成為了 Apache 軟體基金會 Top-Level 的專案,這個系列文會介紹從如何用 Airflow 管理我們 Data Pipeline 到如何部署在 Kubernetes 上。






Related Posts

[MTR04] W2 D4 JS  判斷式 if/else

[MTR04] W2 D4 JS 判斷式 if/else

[14] 值 - 數字

[14] 值 - 數字

七天打造自己的 Google Map 應用入門 - Day04

七天打造自己的 Google Map 應用入門 - Day04

原子習慣:法則 3 - 讓行動輕而易舉

原子習慣:法則 3 - 讓行動輕而易舉

建立映像檔使用Dockerfile

建立映像檔使用Dockerfile

透過製作 Babel-plugin 初訪 AST

透過製作 Babel-plugin 初訪 AST



Comments