Skip to content

Implement AiiDA provenance tracking for Airflow via XCom backend and listeners#19

Draft
agoscinski wants to merge 3 commits intophase1from
phase1-xcom-backend
Draft

Implement AiiDA provenance tracking for Airflow via XCom backend and listeners#19
agoscinski wants to merge 3 commits intophase1from
phase1-xcom-backend

Conversation

@agoscinski
Copy link
Copy Markdown
Contributor

@agoscinski agoscinski commented Nov 17, 2025

Add AiiDA provenance graph creation for Airflow DAG executions:

XCom Backend (src/airflow_provider_aiida/xcom/backend.py):

  • Custom XCom backend that creates AiiDA nodes during serialization/deserialization
  • WorkChainNode represents entire DAG run (dag_id + run_id)
  • CalcJobNode represents individual tasks (task_id + run_id + map_index)
  • Data nodes store XCom values as AiiDA-typed data
  • Establishes provenance links:
    • CALL_CALC: WorkChain → CalcJob (workflow calls calculation)
    • CREATE: CalcJob → Data (task produces output)
    • INPUT_CALC: Data → CalcJob (data flows to consuming task)
  • Smart link labeling:
    • For PythonOperator: extracts parameter names via inspect.signature()
    • For other operators: deterministic hash based on producer task info
  • Handles duplicate link prevention and stored node constraints
  • Monkey-patches link validation for stored nodes when necessary

Provenance Listener (src/airflow_provider_aiida/plugins/provenance_listener.py):

  • Airflow listener plugin that updates AiiDA node states in real-time
  • Hooks into DAG run lifecycle (success, failure, running)
  • Hooks into task instance lifecycle (success, failure, running)
  • Maps Airflow states to AiiDA ProcessStates:
    • QUEUED/SCHEDULED → CREATED
    • RUNNING → RUNNING
    • SUCCESS → FINISHED
    • FAILED → EXCEPTED
    • SKIPPED → KILLED
  • Creates nodes proactively if they don't exist (handles tasks starting before XCom)
  • Registered as ProvenanceListenerPlugin for automatic discovery

Common utilities to handle aiida nodes(src/airflow_provider_aiida/common/utils.py):

  • _get_or_create_workchain_node: Query by unique_id or create new WorkChainNode
  • _get_or_create_calcjob_node: Query by unique_id or create new CalcJobNode
  • _sanitize_link_label: Ensure AiiDA-compatible link labels (alphanumeric + underscore)
  • All new nodes initialized with ProcessState.CREATED

Caveats:

  • When deserializing no information about the input key is given, so an
    educated guess has to be made which for the moment fails when maps are used
  • on_dag_run_running is not called in test run environment, therefore the
    workchain node is created in on_task_run_running function
  • Because we have no guarantee from airflow for the order of callbacks (executed by the task instance) and xcom backend (executed by the scheduler) we have to make logic redundant in the xcom backend and listeners

TODO:

  • WorkChainNode needs to be also created in the listener because there is no guarantee that the listener is executed before the xcom backend. Maybe we remove listeners completely? Because every dag should at least call the deserialization function once.
  • Tests for atomization and EOS workflow, check for test run and trigger run if the graph is the same

…listeners

Add comprehensive AiiDA provenance graph creation for Airflow DAG executions:

XCom Backend (src/airflow_provider_aiida/xcom/backend.py):
- Custom XCom backend that creates AiiDA nodes during serialization/deserialization
- WorkChainNode represents entire DAG run (dag_id + run_id)
- CalcJobNode represents individual tasks (task_id + run_id + map_index)
- Data nodes store XCom values as AiiDA-typed data
- Establishes provenance links:
  * CALL_CALC: WorkChain → CalcJob (workflow calls calculation)
  * CREATE: CalcJob → Data (task produces output)
  * INPUT_CALC: Data → CalcJob (data flows to consuming task)
- Smart link labeling:
  * For PythonOperator: extracts parameter names via inspect.signature()
  * For other operators: deterministic hash based on producer task info
- Handles duplicate link prevention and stored node constraints
- Monkey-patches link validation for stored nodes when necessary

Provenance Listener (src/airflow_provider_aiida/plugins/provenance_listener.py):
- Airflow listener plugin that updates AiiDA node states in real-time
- Hooks into DAG run lifecycle (success, failure, running)
- Hooks into task instance lifecycle (success, failure, running)
- Maps Airflow states to AiiDA ProcessStates:
  * QUEUED/SCHEDULED → CREATED
  * RUNNING → RUNNING
  * SUCCESS → FINISHED
  * FAILED → EXCEPTED
  * SKIPPED → KILLED
- Creates nodes proactively if they don't exist (handles tasks starting before XCom)
- Registered as ProvenanceListenerPlugin for automatic discovery

Common utilities to handle aiida nodes(src/airflow_provider_aiida/common/utils.py):
- _get_or_create_workchain_node: Query by unique_id or create new WorkChainNode
- _get_or_create_calcjob_node: Query by unique_id or create new CalcJobNode
- _sanitize_link_label: Ensure AiiDA-compatible link labels (alphanumeric + underscore)
- All new nodes initialized with ProcessState.CREATED

Caveats:
- When deserializing no information about the input key is given, so an
  educated guess has to be made which for the moment fails when maps are used
- on_dag_run_running is not called in test run environment, therefore the
  workchain node is created in on_task_run_running function
- Because we have no guarantee from airflow for the order of callbacks (executed by the task instance) and xcom backend (executed by the scheduler) we have to make logic redundant in the xcom backend and listeners

Result: Complete AiiDA provenance graph mirroring Airflow DAG structure with
real-time state synchronization and proper data lineage tracking.
@agoscinski agoscinski deleted the branch phase1 November 17, 2025 14:29
@agoscinski agoscinski closed this Nov 17, 2025
@agoscinski agoscinski reopened this Nov 17, 2025
@agoscinski agoscinski mentioned this pull request Nov 18, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant