|
| 1 | +--- |
| 2 | +title: CloudComposer 始めました |
| 3 | +author: thimi0412 |
| 4 | +date: 2022-06-06 |
| 5 | +image: start-CloudComposer.png |
| 6 | +--- |
| 7 | + |
| 8 | +WEDでデータエンジニアをしている[thimi0412](https://twitter.com/thimi0412) こと清水です。 |
| 9 | + |
| 10 | +# 始めに |
| 11 | + |
| 12 | +WEDでは現在GKE上に[Airflow](https://airflow.apache.org/) の環境を構築し、[Embulk](https://www.embulk.org/) 使用してアプリケーションで使用しているCloudSQLから分析用のBigQueryにデータを転送しています。そして、自前のGEK上のAirflowからCloudComposerへの移行を現在行っている最中です。 |
| 13 | + |
| 14 | +今回はCloudComposerの作成と開発運用について紹介します。 |
| 15 | + |
| 16 | +# なぜCloudComposer |
| 17 | + |
| 18 | +移行の理由については以下の2点 |
| 19 | + |
| 20 | +- 自前でAirlfowの環境を作成し運用している、各リソースの設定等も必要となり運用のコストが上がってしまう |
| 21 | +- 今後他社とのデータ連携のプロジェクトの予定もあるので、Airflowを使用してデータ連携系タスクを管理したい |
| 22 | + |
| 23 | +# 構築 |
| 24 | + |
| 25 | +CloudComposerはv1とv2があり今回は新しいv2で作りました。(Airflowが2系なので使ってみたい&GKEがAutopilotモード等の理由) |
| 26 | + |
| 27 | +GCPのリソースは[Terraform](https://www.terraform.io/)で管理してるのでこんな感じでかけます。 |
| 28 | + |
| 29 | +ハマった点としてはGoogleのproviderのバージョンが古くて各設定値が使えないことがあったので、バージョンはなるべく新しいものを使うことをお勧めします。 |
| 30 | + |
| 31 | +`terraform apply` するとCloudComposerが作成されますが、10分以上待たされるので気長に待ちましょう。 |
| 32 | + |
| 33 | +```json |
| 34 | +resource "google_composer_environment" "analytics" { |
| 35 | + name = "analytics" |
| 36 | + project = var.project_id |
| 37 | + region = var.default_region |
| 38 | + |
| 39 | + config { |
| 40 | + software_config { |
| 41 | + # 2022/05/25: latest version |
| 42 | + image_version = "composer-2.0.13-airflow-2.2.5" |
| 43 | + |
| 44 | + pypi_packages = { |
| 45 | + boto3 = "" |
| 46 | + apache-airflow-providers-amazon = "" |
| 47 | + } |
| 48 | + } |
| 49 | + node_config { |
| 50 | + service_account = var.composer_sa_email |
| 51 | + } |
| 52 | + } |
| 53 | +} |
| 54 | +``` |
| 55 | + |
| 56 | +[https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/composer_environment](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/composer_environment) |
| 57 | + |
| 58 | +作成が完了するとGCPのコンソールで見るとこんな感じ。 |
| 59 | + |
| 60 | + |
| 61 | + |
| 62 | +AirflowウェブサーバーからAirflowの管理画面にいけます。 |
| 63 | + |
| 64 | + |
| 65 | + |
| 66 | +# DAGの管理 |
| 67 | + |
| 68 | +DAGはAirflowで使われるタスクの依存関係を整理して、どのように実行するか定義されているものです。 |
| 69 | + |
| 70 | +Pythonファイルで定義と処理を記述できます。 |
| 71 | + |
| 72 | +こんな感じで。 |
| 73 | + |
| 74 | +```python |
| 75 | +from datetime import timedelta |
| 76 | + |
| 77 | +import airflow |
| 78 | +from airflow import DAG |
| 79 | +from airflow.operators.python_operator import PythonOperator |
| 80 | + |
| 81 | +default_args = { |
| 82 | + "start_date": airflow.utils.dates.days_ago(0), |
| 83 | + "retries": 1, |
| 84 | + "retry_delay": timedelta(minutes=5), |
| 85 | +} |
| 86 | + |
| 87 | +def test(**context): |
| 88 | + print("Hello World") |
| 89 | + |
| 90 | +with DAG( |
| 91 | + "example_dag", |
| 92 | + default_args=default_args, |
| 93 | + description="test dag", |
| 94 | + schedule_interval=None, |
| 95 | + dagrun_timeout=timedelta(minutes=20), |
| 96 | +) as dag: |
| 97 | + task1 = PythonOperator( |
| 98 | + task_id="test", |
| 99 | + python_callable=test, |
| 100 | + provide_context=True, |
| 101 | + dag=dag, |
| 102 | + ) |
| 103 | +``` |
| 104 | + |
| 105 | +CloudComposerではGCS上に `dags/` 配下にファイルを配置すると読み取りを行ってくれて、DAGの追加更新が行えます。 |
| 106 | + |
| 107 | +GCS上にファイルをアップロードするだけなのでCIも簡単です。 |
| 108 | + |
| 109 | +WEDではGitHub Actionsを使用していて、Workload Identityを使用してGCSのファイルをアップロードしています。 |
| 110 | + |
| 111 | +GitHubのリポジトリはこのような構成になってます。 |
| 112 | + |
| 113 | +```yaml |
| 114 | +. |
| 115 | +├── README.md |
| 116 | +├── dags |
| 117 | +│ └── exapmle |
| 118 | +│ └── dag.py |
| 119 | +├── poetry.lock |
| 120 | +└── pyproject.toml |
| 121 | +``` |
| 122 | + |
| 123 | +staging環境とproduction環境それぞれ環境を作成していてstaging環境はPR作成時、production環境はmain branchにpushされたらGCSにファイルをアップロードするようになっています。 |
| 124 | + |
| 125 | +```yaml |
| 126 | +name: production gcs upload |
| 127 | + |
| 128 | +on: |
| 129 | + push: |
| 130 | + branches: |
| 131 | + - "main" |
| 132 | + paths: |
| 133 | + - "**.py" |
| 134 | + - "pyproject.toml" |
| 135 | + - ".github/workflows/production_gcs_upload.yml" |
| 136 | + |
| 137 | +jobs: |
| 138 | + upload: |
| 139 | + name: upload |
| 140 | + runs-on: ubuntu-latest |
| 141 | + permissions: |
| 142 | + contents: read |
| 143 | + id-token: write |
| 144 | + |
| 145 | + steps: |
| 146 | + - name: Check out code |
| 147 | + uses: actions/checkout@v3 |
| 148 | + |
| 149 | + - id: auth |
| 150 | + uses: google-github-actions/auth@v0 |
| 151 | + with: |
| 152 | + workload_identity_provider: ${{ secrets.WORKLOAD_IDENTITY_PROVIDER }} |
| 153 | + service_account: "<service_account>" |
| 154 | + - name: "Set up Cloud SDK" |
| 155 | + uses: "google-github-actions/setup-gcloud@v0" |
| 156 | + |
| 157 | + - id: upload-folder |
| 158 | + uses: google-github-actions/upload-cloud-storage@v0 |
| 159 | + with: |
| 160 | + path: ./dags/ |
| 161 | + destination: "<gcs_path>" |
| 162 | +``` |
| 163 | +
|
| 164 | +[https://cloud.google.com/blog/ja/products/identity-security/enabling-keyless-authentication-from-github-actions](https://cloud.google.com/blog/ja/products/identity-security/enabling-keyless-authentication-from-github-actions) |
| 165 | +
|
| 166 | +# 終わりに |
| 167 | +
|
| 168 | +まだデータ転送は現在移行中ですがデータ連携等のタスクは本番運用しています。データ転送の移行完了や運用面で気になったことやハマりどころなどあればまた記事を書こうと思っています。 |
0 commit comments