Airflow 動手玩:(四)淺談 Airflow 架構


前幾篇實作 DAG & Operator,大家應該可以感受到 Airflow 透過 Operator 讓 Data Pipeline 更有彈性,我們可以透過不同功能的 Operator 跟 Trigger Rule 實作 DAG,甚至實作自己的 Operator。接下來這篇會帶大家瞭解 Airflow 如何透過架構安排讓部屬更有彈性!

架構

在聊聊架構之前,大家可以先想想前幾天我們是如何啟動 Airflow 的?沒錯,就是 WebServer & Scheduler,所以 Airflow 的架構至少有這兩部分。

WebServer 是單純的 UI 介面,但卻可以看到跟 Scheduler 一樣的 DAG、Connections 等等,這代表 WebServer 跟 Scheduler 之間一定連到同一個儲存介面,所以才會拿到一樣的值,這個儲存介面就是 Metadata Database。

回想第一篇,我們在做 airflow initdb 時有產生一個 SQLite 的資料庫 airflow.db,如果連進去看可以發現有幾個 Table 紀錄著我們熟悉的資訊,像是 DAG 資訊的 dag,每一次 Task 運行資訊的 task_instance 等等。

> sqlite3 ~/airflow/airflow.db
SQLite version 3.28.0 2019-04-15 14:49:49
Enter ".help" for usage hints.
sqlite> .tables
alembic_version        job                    slot_pool
chart                  known_event            task_fail
connection             known_event_type       task_instance
dag                    kube_resource_version  task_reschedule
dag_pickle             kube_worker_uuid       users
dag_run                log                    variable
dag_tag                serialized_dag         xcom
import_error           sla_miss

接下來可以看一下 Airflow 的架構圖,主要分成四個部分 WebServer, Scheduler, Executor 以及 Worker,我們分別來看看這四個 Components。

Airflow 架構,圖片來源:https://medium.com/@dustinstansbury/understanding-apache-airflows-key-concepts-a96efed52b1a

WebServer

WebServer 顧名思義,就是提供網頁功能的伺服器,WebServer 是可以單獨運作的,所以可以被分開部署,但如果只有 WebServer 而沒有下面三個 Components,也就只能查看 DAG、Task 及 Connections 等一些資訊而已。

Scheduler

Scheduler 負責排程,這裡的排程並不是直接指定哪個 Task 給哪個 Worker 運行,而是從 Metadata Database 中找尋 DAG 跟 Task 的狀態,並判斷是否將哪些 Task 傳送給 Executor 安排執行。

Executor

Executor 是一個 Queue Process,從 Scheduler 接收要執行的 Task,並將這些資訊存進 Queue,並從 Queue 中取出 Task 安排給閒置的 Worker 執行。如果仔細觀察 WebServer 或是 Scheduler 運行時的資訊,會發現 Terminal 有時會印出 INFO - Using executor SequentialExecutor,代表我們使用的是 SequentialExecutorSequentialExecutor 通常用於 debug,一次只能運行一個 Task,也是唯一可以跟 SQLite 配合的 Executor,因為 SQLite 不支援多個連線。

除了 SequentialExecutor 還有其他幾個 Executor,可以在 Airflow 的 Source Code 看到,像是 LocalExecutorCeleryExecutorKubernetesExecutor 等。

  • LocalExecutor:單機版的 Executor,雖然是單機版,但還是可以透過 MultiProcess 同時運行多個 Worker。
  • CeleryExecutor:使用 Celery 分散式的 Task Queue 當作 Executor,所以可以在多台電腦同時運行多個 Worker。
  • KubernetesExecutor:使用 Kubernetes 當作 Executor,所以每個 Task 都會被包裝成一個 Pod 運行,下下篇我們要將 Airflow 架設在 Kuberentes 上就會用到 KubernetesExecutor

Worker

Worker 負責實際執行Task。

Task 狀態

最後來看一下,Task 在 Airflow 裡從開始到結束會有哪些狀態。

  • No status: 當我們手動 Trigger DAG 或是 Scheduler 排程 DAG 後,這時 DAG 的 Task 會先被創造成 Task Instance 並寫進 Database ,這時 Task 的狀態就是 no status。
  • Scheduled: 當 Scheduler 確認某個 Task 需要被執行時,這時 Task 的狀態就會變成 Scheduled,像是當我們使用 BranchPythonOperator,執行結束後,Scheduler 會就依照執行的結果決定下一個 Task,這時那個 Task 的狀態就會是 Scheduled。
  • Queued: 當 Scheduler 把確定要執行的 Task 發送給 Executor 時,相當於把 Task 放入 Queue 裡,所以 Task 的狀態會變成 Queued。
  • Running: 當 Executor 把 Task 發送給閒置的 Worker 時,Task 的狀態就會變成 Running。
  • 最後依據 Workere 執行的結果,Executor 會把 Task 標示成 Success 或是 Failed。

Task 狀態流程,圖片來源:https://airflow.apache.org/docs/stable/concepts.html#task-lifecycle

總結

今天我們看了 Airflow 的架構圖,以及 Task 在 Airflow 中執行的各種狀態,對這兩點有了基礎的瞭解後,未來如果我們部署的 Airflow 有問題,相信大家能更快定位出問題可能出在哪。像是 Task 已經到了 Queued 的狀態了,卻一直遲遲沒有進到 Running,這時候我們就可以看看是不是 Worker 哪裡出問題了。
下一篇會帶大家看過 Airflow 的設定檔,讓大家知道 Airflow 有哪些可調控的部分。

資料來源

#Airflow #Data Pipeline #Data Cleaning #ETL







你可能感興趣的文章

[RESOLVED] SQL With Keyword Error

[RESOLVED] SQL With Keyword Error

What a Bunch of ...  -  Bunching Estimation Methods

What a Bunch of ... - Bunching Estimation Methods

Google Chrome開發者工具教學 - Application

Google Chrome開發者工具教學 - Application






留言討論