diff --git a/dags/kids_first/dbt_bash.py b/dags/kids_first/dbt_bash.py new file mode 100644 index 0000000..db14d3a --- /dev/null +++ b/dags/kids_first/dbt_bash.py @@ -0,0 +1,60 @@ +from datetime import datetime, timedelta +from airflow.sdk import Variable +import os + +# The DAG object; we'll need this to instantiate a DAG +from airflow.models.dag import DAG + +# Operators; we need this to operate! +from airflow.operators.bash import BashOperator +from airflow.operators.python import PythonOperator + +with DAG( + "dbt_bash_status", + # These args will get passed on to each operator + # You can override them on a per-task basis during operator initialization + default_args={ + "depends_on_past": False, + "email": ["airflow@example.com"], + "email_on_failure": False, + "email_on_retry": False, + "retries": 1, + "retry_delay": timedelta(minutes=5), + # 'queue': 'bash_queue', + # 'pool': 'backfill', + # 'priority_weight': 10, + # 'end_date': datetime(2016, 1, 1), + # 'wait_for_downstream': False, + # 'sla': timedelta(hours=2), + # 'execution_timeout': timedelta(seconds=300), + # 'on_failure_callback': some_function, # or list of functions + # 'on_success_callback': some_other_function, # or list of functions + # 'on_retry_callback': another_function, # or list of functions + # 'sla_miss_callback': yet_another_function, # or list of functions + # 'on_skipped_callback': another_function, #or list of functions + # 'trigger_rule': 'all_success' + }, + description="Provide dbt information", + schedule=timedelta(days=1), + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["POC"], +) as dag: + + # t1, t2 and t3 are examples of tasks created by instantiating operators + t1 = BashOperator( + task_id="dbt_version", + bash_command=f"{os.environ['AIRFLOW_HOME']}/dbt_venv/bin/dbt --version", + ) + + def check_for_warehouse_host_var(): + warehouse_host = Variable.get("INCLUDEWAREHOUSE_HOST", default=None) + if warehouse_host: + print("INCLUDEWAREHOUSE_HOST exists!") + else: + print("INCLUDEWAREHOUSE_HOST variable is not set.") + + t2 = PythonOperator( + task_id="check_for_warehouse_host_var", + python_callable=check_for_warehouse_host_var, + ) diff --git a/dags/kids_first/example_study.py b/dags/kids_first/example_study.py new file mode 100644 index 0000000..ded91f3 --- /dev/null +++ b/dags/kids_first/example_study.py @@ -0,0 +1,35 @@ +from airflow.sdk import Variable + +from cosmos import ( + DbtDag, + ProjectConfig, + ProfileConfig, + ExecutionConfig, + RenderConfig, +) +from cosmos.profiles import PostgresUserPasswordProfileMapping + +profile_config = ProfileConfig( + profile_name=Variable.get("DBT_PROFILE_NAME"), + target_name="prd", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="postgres_dev_svc", + profile_args={"schema": "prd"}, + ), +) + +example_study_dag = DbtDag( + project_config=ProjectConfig( + Variable.get("DBT_PROJECT_DIR"), + install_dbt_deps=True, + ), + profile_config=profile_config, + execution_config=ExecutionConfig( + dbt_executable_path=Variable.get("DBT_EXECUTABLE_PATH"), + ), + render_config=RenderConfig(select=["config.meta.study:kf_example_study"]), + # normal dag parameters + schedule="@daily", + dag_id="kf_example_study", + tags=["POC", "Kids First"], +) diff --git a/dbt_project/models/kids_first/example_study/_kf_example_study__models.yml b/dbt_project/models/kids_first/example_study/_kf_example_study__models.yml new file mode 100644 index 0000000..3320f5d --- /dev/null +++ b/dbt_project/models/kids_first/example_study/_kf_example_study__models.yml @@ -0,0 +1,45 @@ +--- +version: 2 + +models: + - name: my_first_dbt_model + description: "A starter dbt model" + config: + meta: + study: kf_example_study + columns: + - name: id + description: "The primary key for this table" + data_tests: + - unique + - not_null + + - name: my_second_dbt_model + description: "A starter dbt model" + config: + meta: + study: kf_example_study + columns: + - name: id + description: "The primary key for this table" + data_tests: + - unique + - not_null + + - name: my_third_dbt_model + description: "a model that maps values in alphabet_groups to + `my_second_dbt_model`" + config: + meta: + study: kf_example_study + columns: + - name: id + description: "The primary key for this table" + data_tests: + #- unique + - not_null + - name: letter + description: "The letter in the alphabet" + data_tests: + - unique + - not_null diff --git a/dbt_project/models/kids_first/example_study/my_first_dbt_model.sql b/dbt_project/models/kids_first/example_study/my_first_dbt_model.sql new file mode 100644 index 0000000..5b51873 --- /dev/null +++ b/dbt_project/models/kids_first/example_study/my_first_dbt_model.sql @@ -0,0 +1,27 @@ + +/* + Welcome to your first dbt model! + Did you know that you can also configure models directly within SQL files? + This will override configurations stated in dbt_project.yml + + Try changing "table" to "view" below +*/ + +{{ config(materialized='table', schema='kf_example') }} + +with source_data as ( + + select 1 as id + union all + select null as id + +) + +select * +from source_data + +/* + Uncomment the line below to remove records with null `id` values +*/ + +where id is not null diff --git a/dbt_project/models/kids_first/example_study/my_second_dbt_model.sql b/dbt_project/models/kids_first/example_study/my_second_dbt_model.sql new file mode 100644 index 0000000..3fdf3f1 --- /dev/null +++ b/dbt_project/models/kids_first/example_study/my_second_dbt_model.sql @@ -0,0 +1,6 @@ +{{ config(materialized='table', schema='kf_example') }} +-- Use the `ref` function to select from other models + +select * +from {{ ref('my_first_dbt_model') }} +where id = 1 diff --git a/dbt_project/models/kids_first/example_study/my_third_dbt_model.sql b/dbt_project/models/kids_first/example_study/my_third_dbt_model.sql new file mode 100644 index 0000000..f4006d1 --- /dev/null +++ b/dbt_project/models/kids_first/example_study/my_third_dbt_model.sql @@ -0,0 +1,7 @@ +{{ config(materialized='table', schema='kf_example') }} + +select + id, + letter +from {{ ref('my_second_dbt_model') }} +left join {{ ref('alphabet_grouping') }} on id = letter_grouping \ No newline at end of file diff --git a/dbt_project/profiles.yml b/dbt_project/profiles.yml index ee7c9a7..a4d580e 100644 --- a/dbt_project/profiles.yml +++ b/dbt_project/profiles.yml @@ -1,6 +1,6 @@ --- include_dbt_sandbox: - target: ci + target: dev outputs: ci: type: postgres @@ -20,3 +20,22 @@ include_dbt_sandbox: threads: 4 type: postgres user: "{{ env_var('INCLUDEWAREHOUSE_USERNAME') }}" + qa: + type: postgres + host: "{{ env_var('INCLUDEWAREHOUSE_HOST') }}" + user: "{{ env_var('INCLUDEWAREHOUSE_SCV_USERNAME') }}" + password: "{{ env_var('INCLUDEWAREHOUSE_SCV_PASSWORD') }}" + port: 5432 + dbname: postgres + schema: qa + threads: 4 + prd: + type: postgres + host: "{{ env_var('INCLUDEWAREHOUSE_HOST') }}" + user: "{{ env_var('INCLUDEWAREHOUSE_SCV_USERNAME') }}" + password: "{{ env_var('INCLUDEWAREHOUSE_SCV_PASSWORD') }}" + port: 5432 + dbname: postgres + schema: prd + threads: 4 + diff --git a/dbt_project/seeds/_seeds.yml b/dbt_project/seeds/_seeds.yml index e69de29..3ea41e7 100644 --- a/dbt_project/seeds/_seeds.yml +++ b/dbt_project/seeds/_seeds.yml @@ -0,0 +1,12 @@ +seeds: + - name: alphabet_grouping + config: + schema: kf + description: | + An example seed file that groups letters of the alphabet. This seed is + used by the kids first example study models. + columns: + - name: letter + description: A letter of the alphabet + - name: letter_grouping + description: The grouping that the letter belongs to \ No newline at end of file diff --git a/dbt_project/seeds/alphabet_grouping.csv b/dbt_project/seeds/alphabet_grouping.csv new file mode 100644 index 0000000..5dd554f --- /dev/null +++ b/dbt_project/seeds/alphabet_grouping.csv @@ -0,0 +1,27 @@ +letter,letter_grouping +a,1 +b,1 +c,2 +d,2 +e,3 +f,3 +g,3 +h,4 +i,4 +j,4 +k,5 +l,5 +m,6 +n,6 +o,7 +p,7 +q,7 +r,8 +s,8 +t,8 +u,9 +v,9 +w,9 +x,10 +y,10 +z,10 \ No newline at end of file diff --git a/docs/guides/running-models-in-airflow.md b/docs/guides/running-models-in-airflow.md new file mode 100644 index 0000000..8168cc4 --- /dev/null +++ b/docs/guides/running-models-in-airflow.md @@ -0,0 +1,146 @@ +# How to Run dbt in Airflow + +This guide discusses how dbt models may be run in airflow and the file +configuration that is needed to run those models. + +## How Airflow Runs dbt Models + +The airflow instance that is used to orchestrate data pipelines is hosted in +AWS using Managed Workflow for Apache Airflow (MWAA). MWAA provides a simple way +for the airflow instance - as well as its workers to be provisioned and +dispatched. As of the writing of this guide, the MWAA instance used by these +workflows is utilizing Airflow version 3.0.6 and python version 3.12. + +To run dbt models, we utilize [`cosmos`](https://astronomer.github.io/astronomer-cosmos/index.html). +`cosmos` is a python library written for Airflow that has operators that use dbt +and convert sets of dbt models within a dbt project into DAGs. While an older +approach may have specified running dbt models within a bash command within an +airflow bash operator, using the `cosmos` `DbtDag` operator allows individual +models and those models' tests to be materialized as individual tasks. + +For a dbt model - or set of models - to be picked up by airflow and run, those +models need to be referenced by a DAG. + +### Getting files into MWAA + +When pull requests are merged into main branch in this repository, a GitHub +action is triggered to mirror the contents of this repository into the s3 bucket +that MWAA uses to store models and DAGs. + +## How to write a DAG file + +A DAG that runs dbt models has three important parts that are needed for those +models to run: + +1. imports +2. profile configuration +3. the model DAG + +### Example DAG + +An example DAG is below. Its components will be described in the following +sections. The purpose of this DAG is to run models in the study +`kf_example_study` every day. + +```python +from airflow.sdk import Variable + +from cosmos import ( + DbtDag, + ProjectConfig, + ProfileConfig, + ExecutionConfig, + RenderConfig, +) +from cosmos.profiles import PostgresUserPasswordProfileMapping + +profile_config = ProfileConfig( + profile_name=Variable.get("DBT_PROFILE_NAME"), + target_name="prd", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="postgres_dev_svc", + profile_args={"schema": "prd"}, + ), +) + +example_study_dag = DbtDag( + project_config=ProjectConfig( + Variable.get("DBT_PROJECT_DIR"), + install_dbt_deps=True, + ), + profile_config=profile_config, + execution_config=ExecutionConfig( + dbt_executable_path=Variable.get("DBT_EXECUTABLE_PATH"), + ), + render_config=RenderConfig(select=["config.meta.study:kf_example_study"]), + # normal dag parameters + schedule="@daily", + dag_id="kf_example_study", + tags=["POC", "Kids First"], +) +``` + +### Imports + +This section has all of the packages and modules that are needed for the DAG +file to run. An important set of items that need to be imported are the cosmos +imports that have both the `DbtDag` operator and the appropriate cosmos +`Config`s. Information about different configuration modules used by cosmos +is [here](https://astronomer.github.io/astronomer-cosmos/configuration/index.html). + +Minimally for cosmos, the below is needed: + +```python +from cosmos import ( + DbtDag, + ProjectConfig, + ProfileConfig, + ExecutionConfig, + RenderConfig, +) +from cosmos.profiles import PostgresUserPasswordProfileMapping +``` + +In the example, the `from airflow.sdk import Variable` is used to extract +environment variables in the airflow environment used by commands in the DAG. + +### `profile_config` + +The `profile_config` is used by cosmos to identify the profile to be used by dbt +commands. The value for `profile_name` should use the indicated variables. Acceptable values for `target_name` are `qa` and `prd`. + +The `profile_mapping` takes advantage of an airflow connection object to connect +to the warehouse. Pass it the indicated connection id and make sure to set the +`schema` appropriately for the `target_name` + +### the model DAG + +The model DAG(s) python object can be named whatever makes the most sense for +the DAG + +[`project_config`](https://astronomer.github.io/astronomer-cosmos/configuration/project-config.html), +[`profile_config`](https://astronomer.github.io/astronomer-cosmos/profiles/index.html), +and [`execution_config`](https://astronomer.github.io/astronomer-cosmos/configuration/execution-config.html) +will look identical between DAGs and should follow the model of this example. +More information about these configurations is linked. + +[`render_config`](https://astronomer.github.io/astronomer-cosmos/configuration/render-config.html) +is used to identify what dbt model(s) should be run and follow dbt's `--select` +behavior. More information about using `RenderConfig` to select models can be +found [here](https://astronomer.github.io/astronomer-cosmos/configuration/selecting-excluding.html). + +The rest of the DAG's parameters are familiar to other airflow operators. + +* `schedule` indicates how often a DAG should be run +* `dag_id` is the identifier for the DAG within airflow and should follow the + form `{program}_{study}`. +* `tags` is used to tag the DAG and should minimally have a tag for the program + the DAG corresponds to. + +## Required components of models + +To be picked up by airflow, all that is required for models is that they have +the corresponding filtering criteria used in the `select` of the `RenderConfig`. + +In this example, the `meta` `config` property `study` was created and used to +filter models.