|
| 1 | +# Airflow Task Design Guide |
| 2 | + |
| 3 | +A concise reference for contributors designing, implementing, and reviewing tasks in SM2A projects.* |
| 4 | + |
| 5 | + |
| 6 | +## Airflow Task Design Goals |
| 7 | + |
| 8 | +| # | Goals | What it means | Why it matters | |
| 9 | +| - | - | - | - | |
| 10 | +| 1 | **Explicit parameters** | Declare every input (e.g., `bucket: str`, `run_date: datetime`) as a named argument in the TaskFlow function signature. | Readers (and IDEs) know exactly what values the task needs; type hints support linting & autocompletion. | |
| 11 | +| 2 | **Direct parameter access** | Pass scalar / simple objects directly—avoid wrapping them in catch‑all dicts or `**kwargs`. | Prevents “mystery meat” payloads and accidental hidden dependencies. | |
| 12 | +| 3 | **Multiple named outputs** | Return a `dict` of discrete outputs via `return {"records": df, "count": len(df)}`, leveraging TaskFlow’s multiple return feature (this can be implicit by returning a dict, or explicit with `@task(multiple_outputs=True)`). | Downstream tasks can pull *only* what they need and don't need to parse larger objects. | |
| 13 | +| 4 | **TaskFlow‑first** | Define tasks with `@task` (TaskFlow) rather than classic operators when writing Python tasks. | Makes tasks testable with `pytest` and keeps DAGs readable. | |
| 14 | +| 5 | **Separation of concerns** | Task functions orchestrate **data flow and execution**; computation and logic lives in `util` functions/modules imported by the task. | Logic can be unit‑tested in isolation and reused in other tasks. | |
| 15 | +| 6 | **Idempotency** | Tasks should safely re‑run without corrupting state; leverage run‑date‑based keys, checksums, or existence checks. | Supports retries & backfills. | |
| 16 | + |
| 17 | + |
| 18 | +## Recommended Patterns |
| 19 | + |
| 20 | +### Minimal TaskFlow Example |
| 21 | + |
| 22 | +```python |
| 23 | +from airflow.decorators import dag, task |
| 24 | +from pendulum import datetime |
| 25 | +from utils.stac import generate_collection # util function (external) |
| 26 | + |
| 27 | +@dag( |
| 28 | + schedule="@daily", |
| 29 | + start_date=datetime(2024, 1, 1), |
| 30 | + catchup=False, |
| 31 | + params={ # DAG‑level parameters accessible to the first task |
| 32 | + "collection_id": "sample-collection", |
| 33 | + "description": "Sample STAC collection generated via Airflow", |
| 34 | + }, |
| 35 | + tags=["stac", "example"], |
| 36 | +) |
| 37 | +def stac_collection_dag(params=None): # Airflow injects params dict |
| 38 | + |
| 39 | + @task(multiple_outputs=True) |
| 40 | + def build_collection(collection_id: str, description: str) -> dict[str, str]: |
| 41 | + """Generate a STAC collection body and return both body and ID.""" |
| 42 | + collection_body = generate_collection( |
| 43 | + collection_id=collection_id, |
| 44 | + description=description, |
| 45 | + ) # heavy lifting happens in utils |
| 46 | + return {"collection_body": collection_body, "collection_id": collection_id} |
| 47 | + |
| 48 | + # Task invocation – passing explicit params pulled from dag.params |
| 49 | + outputs = build_collection( |
| 50 | + collection_id=params["collection_id"], |
| 51 | + description=params["description"], |
| 52 | + ) |
| 53 | + |
| 54 | + @task() |
| 55 | + def publish_collection(collection_body: dict): |
| 56 | + """Pass collection to ingestion API.""" |
| 57 | + ingest_collection(collection_body) |
| 58 | + |
| 59 | + publish_collection(collection_body=outputs["collection_body"]) |
| 60 | + |
| 61 | +stac_collection_dag() |
| 62 | +``` |
| 63 | + |
| 64 | +*Key takeaways:* explicit arg names, `multiple_outputs`, util functions (`fetch_api_events`, `normalize_events`, `write_to_warehouse`). |
| 65 | + |
| 66 | +### Multiple outputs with `@task(multiple_outputs=True)` |
| 67 | + |
| 68 | +Use when returning more than one value so Airflow stores each key as a separate XCom value: |
| 69 | + |
| 70 | +```python |
| 71 | +@task(multiple_outputs=True) |
| 72 | +def split_dataset(path: str) -> dict[str, str]: |
| 73 | + train, test = make_splits(path) |
| 74 | + return {"train_path": train, "test_path": test} |
| 75 | +``` |
| 76 | + |
| 77 | +Down‑stream tasks access exactly what they need: |
| 78 | + |
| 79 | +```python |
| 80 | +training_data, test_data = split_dataset(path="s3://bucket/data.csv") |
| 81 | +train_model(train_path=training_data) # only need training data from the first task |
| 82 | +test_model(model=train_model, test_path=test_data) # only need test data from the first task |
| 83 | +``` |
| 84 | + |
| 85 | +### Delegating compute to utils |
| 86 | + |
| 87 | +```python |
| 88 | +from veda.utils.stac import generate_collection # example util function |
| 89 | + |
| 90 | +@task(multiple_outputs=True) |
| 91 | +def build_collection(collection_id: str, description: str) -> dict[str, str]: |
| 92 | + """Generate a STAC collection body and return both body and ID.""" |
| 93 | + collection_body = generate_collection( |
| 94 | + collection_id=collection_id, |
| 95 | + description=description, |
| 96 | + ) # logic is contained in util function |
| 97 | + return {"collection_body": collection_body, "collection_id": collection_id} |
| 98 | +``` |
| 99 | + |
| 100 | + |
| 101 | +## Anti‑Patterns to Avoid |
| 102 | + |
| 103 | +| Anti‑Pattern | Why to avoid | |
| 104 | +| --------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------- | |
| 105 | +| **Monolithic payloads**: outputting multiple values into a dict or JSON and passing to next task as a single XCom | Downstream tasks must deserialize and know key names; incidental tight coupling between tasks. | |
| 106 | +| **Hidden parameters**: accessing fields on `kwargs["ti"].xcom_pull()` (or similar) inside tasks | Hides dependencies; makes signatures lie; breaks static analysis & tests. | |
| 107 | +| **Heavy logic in DAG file**: performing data transformations directly in the DAG definition | Complicates refactors; hampers testability; Increases DAG parse time | |
| 108 | +| **Non‑idempotent side effects**: tasks must be idempotent - each task does one thing, and can be reversed or retried independently | Retries/backfills can cause duplicated data or data loss. | |
| 109 | + |
| 110 | + |
| 111 | +## Further Reading |
| 112 | +* [Airflow 2 TaskFlow API docs](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/taskflow.html) |
| 113 | +* "DAG writing best practices in Apache Airflow" – [Astronomer article](https://www.astronomer.io/docs/learn/dag-best-practices/) |
| 114 | +* [Adding a DAG](docs/contributing/add_a_general_dag.md) |
0 commit comments