|
| 1 | +--- |
| 2 | +title: "Cloud Composer (Apache Airflow) 実用インフラTips" |
| 3 | +date: 2025/02/06 00:00:00 |
| 4 | +postid: a |
| 5 | +tag: |
| 6 | + - CloudComposer |
| 7 | + - Airflow |
| 8 | + - GoogleCloud |
| 9 | +category: |
| 10 | + - Infrastructure |
| 11 | +thumbnail: /images/20250206a/thumbnail.png |
| 12 | +author: 岸下優介 |
| 13 | +lede: "Apache Airflowはワークフロー管理サービスで、スケジュールされた時間に一連の処理を行ってくれる便利なサービスです。" |
| 14 | +--- |
| 15 | +<img src="/images/20250206a/image.png" alt="" width="1200" height="447" loading="lazy"> |
| 16 | + |
| 17 | +# はじめに |
| 18 | + |
| 19 | +Apache Airflowはワークフロー管理サービスで、スケジュールされた時間に一連の処理を行ってくれる便利なサービスです。ただ、運用には結構コストがかかるサービスです。 |
| 20 | + |
| 21 | +そのため、Google CloudではCloud Composer、AWSではAmazon Managed Workflows for Apache Airflow (Amazon MWAA)といった運用を楽にするためのマネージドなサービスが展開されています。 |
| 22 | + |
| 23 | +とは言え、マネージドなAirflowでさえも運用は難しく、色々な問題が起こりがちです。Cloud Composer自体、v1から始まり既にv3へと進化[^1]を遂げていることから、提供するGoogle側でも試行錯誤と努力の過程が見え透けてきます。 |
| 24 | + |
| 25 | +そこで本記事では、Cloud Composerを運用するプロジェクトにインフラ担当として2年ほど関わっきた中で経験的に得られたTipsを紹介します[^2]。 |
| 26 | + |
| 27 | +※本記事のTipsはCloud Composer v2をベースに話をしています。Airflow単体で当てはまる部分が多いとは思いますが、Cloud Composer v1やv3、Amazon MWAAでは一部当てはまらない可能性があることを考慮して読んで頂ける幸いです。 |
| 28 | + |
| 29 | +# Airflowのお約束事 |
| 30 | + |
| 31 | +Airflowにてワークフローを記述するDirected Acyclic Graph(DAG)は**決定的でべき等**でなければなりません。 |
| 32 | +以下の記事に詳しく説明があります。 |
| 33 | + |
| 34 | +https://cloud.google.com/blog/ja/products/data-analytics/optimize-cloud-composer-via-better-airflow-dags |
| 35 | + |
| 36 | +- 決定的:特定の入力によって常に同じ出力が生成される必要 |
| 37 | +- べき等:DAG を何度トリガーしても、毎回同じ効果 / 結果が得られなければならない |
| 38 | + |
| 39 | +Cloud Composerを運用してる中でDAGとして定義されたワークフローのタスクは結構よくわからないタイミングで失敗します。原因は内部的なネットワーク上の問題だったり、データベースへのアクセス過多だったり、どうしようもない場合がよくあります。 |
| 40 | + |
| 41 | +AirflowのDAGでは標準でリトライ機能が搭載されているので、**いつでもリトライを実施できるようにしておく**ためにも、決定的でべき等なDAGを定義しておくことが大切です。 |
| 42 | + |
| 43 | +# ベストプラクティス回り |
| 44 | + |
| 45 | +Airflowの公式ページにてベストプラクティスが公開されています。 |
| 46 | + |
| 47 | +https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html |
| 48 | + |
| 49 | +このべスプラ、結構大事なことが書いてあるのですが見落とされがちです。運用する中でどうしても守ってほしい2点についてここで紹介致します。 |
| 50 | + |
| 51 | +## Python Codeにおけるトップレベルでの処理 |
| 52 | + |
| 53 | +https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#top-level-python-code |
| 54 | + |
| 55 | +書いてある通りなのですが、DAGを構成するPythonファイルのトップレベルで無意味な重い処理を書かないようにしましょう。 |
| 56 | + |
| 57 | +特に見落としがちなのが、`import`処理に関しても同様なので注意が必要です。以下のように`tensorflow`や`torch`など**Loadに時間のかかるライブラリ**はBad exampleの様なimportは避けて、Good exampleの様にタスク関数内で呼び出すのがべスプラになります。 |
| 58 | + |
| 59 | +```python Bad example |
| 60 | +import random |
| 61 | +import pendulum |
| 62 | +import pandas # Bad example |
| 63 | +import torch # Bad example |
| 64 | + |
| 65 | +with DAG( |
| 66 | + dag_id="example_python_operator", |
| 67 | + schedule=None, |
| 68 | +# ... |
| 69 | + |
| 70 | +@task() |
| 71 | +def do_stuff_with_pandas_and_torch(): |
| 72 | +# ... |
| 73 | +``` |
| 74 | + |
| 75 | +```python Good example |
| 76 | +import random |
| 77 | +import pendulum |
| 78 | + |
| 79 | +with DAG( |
| 80 | + dag_id="example_python_operator", |
| 81 | + schedule=None, |
| 82 | +# ... |
| 83 | + |
| 84 | +@task() |
| 85 | +def do_stuff_with_pandas_and_torch(): |
| 86 | + import pandas # Good example |
| 87 | + import torch # Good example |
| 88 | +# ... |
| 89 | +``` |
| 90 | + |
| 91 | +なぜこれが良くないかと言うと、Airflowでは各DAGを管理し、時間通りに実行できるようにスケジュールする管理役のSchedulerコンテナが存在しており、このコンテナが定期的にデプロイ済みのPythonファイルを実行可能かどうかを解析しています。そのため、トップレベルに重い処理があると解析時間が伸びてしまい、ワークフローのスケジュールに影響を及ぼしてしまいます。 |
| 92 | + |
| 93 | +> Airflow scheduler executes the code outside the Operator’s execute methods with the minimum interval of min_file_process_interval seconds. This is done in order to allow dynamic scheduling of the DAGs - where scheduling and dependencies might change over time and impact the next schedule of the DAG. Airflow scheduler tries to continuously make sure that what you have in DAGs is correctly reflected in scheduled tasks. |
| 94 | + |
| 95 | +## Airflow Variablesの呼び出し |
| 96 | + |
| 97 | +https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#airflow-variables |
| 98 | + |
| 99 | +こちらも前章と似たような内容ですが、トップレベルでAirflow Variablesの呼び出しはできるだけ少なくしましょう。理由としては、ネットワーク上の呼び出しやデータベースへのアクセス処理が伴うため、こちらもスケジューラの定期的な解析処理に影響を及ぼします。 |
| 100 | + |
| 101 | +> Using Airflow Variables yields network calls and database access, so their usage in top-level Python code for DAGs should be avoided as much as possible, as mentioned in the previous chapter, Top level Python Code. |
| 102 | + |
| 103 | +# コマンド回り |
| 104 | + |
| 105 | +## デプロイにはgcloud storage rsyncを使う |
| 106 | + |
| 107 | +Cloud ComposerにDAGをデプロイするには、環境が指定するGCS Bucketへファイルを配置する必要がありますが、いちいち手動でやっていると事故ることが多いです。そこで、GitHubやGitLabなどの本番用ブランチをSingle Source of TruthとしてJenkins、GitHub Actions、GitLab CIなどでデプロイ用のJobを用意すると思います。その際、`gcloud storage cp`でDAGをコピーするよりも`gcloud storage rsync`がお勧めです。 |
| 108 | + |
| 109 | +https://cloud.google.com/sdk/gcloud/reference/storage/rsync |
| 110 | + |
| 111 | +理由としては`gcloud storage rsync`はコピー元を正として、コピー先を同じ状態にします。そのため、本番ブランチから削除されたDAGのPythonファイルは削除され、運用中のDAGのみがディレクトリへ残ることになります。 |
| 112 | + |
| 113 | +もちろんDAG内で`schedule_interval`を`None`にすることでも無効化はできますが、常にAirflowの環境に運用中のワークフローのみを置きたい場合はこれできれいな状態を保つことができます。 |
| 114 | + |
| 115 | +## Airflow CLIを使う |
| 116 | + |
| 117 | +AirflowにはCLIが用意されており、例えばAirflow UI内での操作権限を制御(`Admin`や`Op`の付与)するためにコマンド操作が必要となります。これらのコマンド操作は`gcloud`コマンド経由でAirflowのCLIにアクセスすることができます。 |
| 118 | + |
| 119 | +https://cloud.google.com/composer/docs/composer-2/access-airflow-cli?hl=ja |
| 120 | + |
| 121 | +https://cloud.google.com/composer/docs/composer-2/airflow-rbac?hl=ja |
| 122 | + |
| 123 | +# リソース周り |
| 124 | + |
| 125 | +Cloud Composerは結構お金のかかるサービスなので、初期構築時にリソースをできるだけ抑えたくなります。ただ、抑えすぎるとAirflowが処理し切れなくなり、スケジュールされていたが実行されなかったゾンビタスクと呼ばれるエラーが発生します。 |
| 126 | + |
| 127 | +> ゾンビタスクは、実行されるはずであるが実行されていないタスクです。これは、タスクのプロセスが終了済みか応答していない場合、Airflow ワーカーが過負荷のためにタスク ステータスを時間内に報告しなかった場合、またはタスクが実行された VM がシャットダウンされた場合に発生します。Airflow はそのようなタスクを定期的に検出し、タスクの設定に応じて、失敗するか、再試行します。 |
| 128 | + |
| 129 | +https://cloud.google.com/composer/docs/composer-2/troubleshooting-dags?hl=ja#zombie-tasks |
| 130 | + |
| 131 | +自分もゾンビタスクには散々苦しめられましたので、その中で得られたTipsを紹介します。 |
| 132 | + |
| 133 | +## WorkerやSchedulerの初期値 |
| 134 | + |
| 135 | +Cloud Composer環境を構築する際、WorkerやSchedulerのCPUコア数やメモリを予め決める必要があります(もちろん、後から変更できます)。 |
| 136 | +その場合、実装予定のDAGの数をある程度想定し、それらDAGを並列に実行したい数やタスクを並列に実行したい数から以下のように逆算することができます。 |
| 137 | + |
| 138 | +<img src="/images/20250206a/Screen_Shot_2020-02-04_at_3.36.36_PM.max-1400x1400.png" alt="Screen_Shot_2020-02-04_at_3.36.36_PM.max-1400x1400.png" width="1200" height="1339" loading="lazy"> |
| 139 | + |
| 140 | +Airflowの各パラメータの決め方 |
| 141 | +画像引用元:https://cloud.google.com/blog/ja/products/data-analytics/scale-your-composer-environment-together-your-business |
| 142 | + |
| 143 | +## Schedulerのリソース侮るなかれ |
| 144 | + |
| 145 | +<img src="/images/20250206a/image_2.png" alt="image.png" width="1200" height="436" loading="lazy"> |
| 146 | + |
| 147 | +Schedulerのリソース使用状況 |
| 148 | +画像引用元: https://cloud.google.com/composer/docs/composer-2/optimize-environments?hl=ja |
| 149 | + |
| 150 | +ゾンビタスクが発生した際、ワークフローを実行する主体であるWorkerに関してはCPUやメモリの使用率に対して感度高く見るかと思います。もちろんWorkerのリソース不足に起因する場合が多いのですが、ワークフローのまとめ役であるSchedulerが起因している場合もあるので、CPUやメモリの使用率が高くなっていないか気を配ってあげてください。SchedulerのCPU使用率が常に上限に張り付いてしまい、DAGが予定通りスケジュールされなかった!みたいなケースを何度か見てきました。 |
| 151 | + |
| 152 | +もしお金に余裕がある場合は、SchedulerのPodを2台構成にするのもおススメです。予測できないタイミングでSchedulerの再起動(クラッシュ?)が発生し、1台構成がゆえにスケジューリングに失敗してしまったというパターンもありました。 |
| 153 | + |
| 154 | +## Airflowデータベースのクリーンアップを導入する |
| 155 | + |
| 156 | +実運用の中でAirflowデータベースへのアクセスが発生しないので見落とされがちなのですが、Airflowでは時間の経過とともに環境のAirflowデータベースに保存されるデータが増えていきます。蓄積されるデータは過去のDAG実行やタスクなどオペレーションに関連する情報で、ほとんどのケースで恒久的に必要となる情報ではありません。 |
| 157 | +そしてドキュメントには以下のように記載されており、データベースのストレージはなるべく圧迫しないようにしておく必要があるようです。 |
| 158 | + |
| 159 | +> - Airflow データベースのサイズが 16 GB を超える場合、環境を新しいバージョンにアップグレードすることはできません。 |
| 160 | +> - Airflow データベースのサイズが 20 GB を超える場合、スナップショットを作成することはできません。 |
| 161 | + |
| 162 | +また、これはサポートケースへの問い合わせの中でご教示頂いたのですが、データベースクリーンアップが**ゾンビタスク撲滅にある程度の効果を発揮する**とのことでした。 |
| 163 | +導入方法は以下で紹介されており、基本的にコピペしてデプロイするだけなので是非導入してみてください。 |
| 164 | + |
| 165 | +https://cloud.google.com/composer/docs/composer-2/cleanup-airflow-database?hl=ja |
| 166 | + |
| 167 | +## `/data`にはなるべくファイル放置しない |
| 168 | + |
| 169 | +タスクが生成して使用するデータを保存するのにCloud Composer環境のGCS Bucketには`data`というディレクトリが存在します。 |
| 170 | + |
| 171 | +https://cloud.google.com/composer/docs/composer-2/cloud-storage?hl=ja#folders_in_the_bucket |
| 172 | + |
| 173 | +Cloud Composerの全体アーキを見るとわかるのですが、このGCS BucketはGCS Fuseを介してSchedulerやWorkerのPodに中身が同期される仕組みとなっております。 |
| 174 | + |
| 175 | +<img src="/images/20250206a/image_3.png" alt="image.png" width="1200" height="1297" loading="lazy"> |
| 176 | + |
| 177 | +プライベートIP環境のアーキテクチャ |
| 178 | +画像引用元:https://cloud.google.com/composer/docs/composer-2/environment-architecture?hl=ja |
| 179 | + |
| 180 | +`/data`配下に関してもGCS FuseによってSchedulerやWorkerのPodへ同期される仕組みとなっており、過去に生成したファイルが大量に`/data`配下に残っていると同期のパフォーマンスに影響を及ぼす可能性があります。実際にGCS Fuseが大暴れして障害に発展したパターンも遭遇しました。 |
| 181 | + |
| 182 | +`/data`に配置したファイルは用が済み次第、削除するように心がけましょう。 |
| 183 | + |
| 184 | +# まとめ |
| 185 | + |
| 186 | +Cloud ComposerもといApache Airflowを運用する中で得られたTipsを主にインフラの観点で紹介しました。 |
| 187 | + |
| 188 | +これらのTipsがAirflow運用者の一助になれば幸いです。 |
| 189 | + |
| 190 | +アイキャッチ画像は[Google Cloud公式ページ](https://cloud.google.com/icons?hl=ja)、[Apache Airflow公式ページ](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+logos)からの引用させて頂きました。 |
| 191 | + |
| 192 | +[^1]: v1からv2へ去年移行したばっかりなのに... |
| 193 | +[^2]: 最近は比較的に安定して運用できてる感は出てきましたが、まだまだ発展途上な部分は多いというのが現実です。 |
0 commit comments