Skip to content

Commit f504ba7

Browse files
authored
fix(APDR): add asset events to partitioned DagRun (#61433)
* test: simplify test_partitioned_dag_run_with_customized_mapper * test(scheduler_job): improve APDR test cases to include consumed_asset_event validation * fix(APDR): add asset events to partitioned DagRun * fixup! fix(APDR): add asset events to partitioned DagRun * fixup! test: simplify test_partitioned_dag_run_with_customized_mapper * fixup! test(scheduler_job): improve APDR test cases to include consumed_asset_event validation * fixup! test(scheduler_job): improve APDR test cases to include consumed_asset_event validation * fixup! test(scheduler_job): improve APDR test cases to include consumed_asset_event validation
1 parent e55f579 commit f504ba7

File tree

2 files changed

+34
-37
lines changed

2 files changed

+34
-37
lines changed

airflow-core/src/airflow/jobs/scheduler_job_runner.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1848,6 +1848,13 @@ def _create_dagruns_for_partitioned_asset_dags(self, session: Session) -> set[st
18481848
creating_job_id=self.job.id,
18491849
session=session,
18501850
)
1851+
asset_events = session.scalars(
1852+
select(AssetEvent).where(
1853+
PartitionedAssetKeyLog.asset_partition_dag_run_id == apdr.id,
1854+
PartitionedAssetKeyLog.asset_event_id == AssetEvent.id,
1855+
)
1856+
)
1857+
dag_run.consumed_asset_events.extend(asset_events)
18511858
session.flush()
18521859
apdr.created_dag_run_id = dag_run.id
18531860
session.flush()

airflow-core/tests/unit/jobs/test_scheduler_job.py

Lines changed: 27 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -8810,46 +8810,15 @@ def test_partitioned_dag_run_with_customized_mapper(
88108810
runner = SchedulerJobRunner(
88118811
job=Job(job_type=SchedulerJobRunner.job_type), executors=[MockExecutor(do_update=False)]
88128812
)
8813-
8814-
with dag_maker(dag_id="asset-event-producer", schedule=None, session=session) as dag:
8815-
EmptyOperator(task_id="hi", outlets=[asset_1])
8816-
8817-
dr = dag_maker.create_dagrun(partition_key="this-is-not-key-1-before-mapped", session=session)
8818-
[ti] = dr.get_task_instances(session=session)
8819-
session.commit()
8820-
8821-
serialized_outlets = dag.get_task("hi").outlets
88228813
with custom_partition_mapper_patch():
8823-
TaskInstance.register_asset_changes_in_db(
8824-
ti=ti,
8825-
task_outlets=[o.asprofile() for o in serialized_outlets],
8826-
outlet_events=[],
8814+
apdr = _produce_and_register_asset_event(
8815+
dag_id="asset-event-producer",
8816+
asset=asset_1,
8817+
partition_key="this-is-not-key-1-before-mapped",
88278818
session=session,
8819+
dag_maker=dag_maker,
8820+
expected_partition_key="key-1",
88288821
)
8829-
session.commit()
8830-
8831-
event = session.scalar(
8832-
select(AssetEvent).where(
8833-
AssetEvent.source_dag_id == dag.dag_id,
8834-
AssetEvent.source_run_id == dr.run_id,
8835-
)
8836-
)
8837-
assert event is not None
8838-
assert event.partition_key == "this-is-not-key-1-before-mapped"
8839-
8840-
apdr = session.scalar(
8841-
select(AssetPartitionDagRun)
8842-
.join(
8843-
PartitionedAssetKeyLog,
8844-
PartitionedAssetKeyLog.asset_partition_dag_run_id == AssetPartitionDagRun.id,
8845-
)
8846-
.where(PartitionedAssetKeyLog.asset_event_id == event.id)
8847-
)
8848-
assert apdr is not None
8849-
assert apdr.created_dag_run_id is None
8850-
assert apdr.partition_key == "key-1"
8851-
8852-
with custom_partition_mapper_patch():
88538822
partition_dags = runner._create_dagruns_for_partitioned_asset_dags(session=session)
88548823
session.refresh(apdr)
88558824
# Since asset event for Asset(name="asset-2") with key "key-1" has not yet been created,
@@ -8858,6 +8827,13 @@ def test_partitioned_dag_run_with_customized_mapper(
88588827
assert len(partition_dags) == 1
88598828
assert partition_dags == {"asset-event-consumer"}
88608829

8830+
dag_run = session.scalar(select(DagRun).where(DagRun.id == apdr.created_dag_run_id))
8831+
assert dag_run is not None
8832+
asset_event = dag_run.consumed_asset_events[0]
8833+
assert asset_event.source_task_id == "hi"
8834+
assert asset_event.source_dag_id == "asset-event-producer"
8835+
assert asset_event.source_run_id == "test"
8836+
88618837

88628838
@pytest.mark.need_serialized_dag
88638839
@pytest.mark.usefixtures("clear_asset_partition_rows")
@@ -8931,6 +8907,13 @@ def test_consumer_dag_listen_to_two_partitioned_asset(
89318907
assert len(partition_dags) == 1
89328908
assert partition_dags == {"asset-event-consumer"}
89338909

8910+
dag_run = session.scalar(select(DagRun).where(DagRun.id == apdr.created_dag_run_id))
8911+
assert dag_run is not None
8912+
for asset_event in dag_run.consumed_asset_events:
8913+
assert asset_event.source_task_id == "hi"
8914+
assert "asset-event-producer-" in asset_event.source_dag_id
8915+
assert asset_event.source_run_id == "test"
8916+
89348917

89358918
@pytest.mark.need_serialized_dag
89368919
@pytest.mark.usefixtures("clear_asset_partition_rows")
@@ -8995,3 +8978,10 @@ def test_consumer_dag_listen_to_two_partitioned_asset_with_key_1_mapper(
89958978
assert apdr.created_dag_run_id is not None
89968979
assert len(partition_dags) == 1
89978980
assert partition_dags == {"asset-event-consumer"}
8981+
8982+
dag_run = session.scalar(select(DagRun).where(DagRun.id == apdr.created_dag_run_id))
8983+
assert dag_run is not None
8984+
for asset_event in dag_run.consumed_asset_events:
8985+
assert asset_event.source_task_id == "hi"
8986+
assert "asset-event-producer-" in asset_event.source_dag_id
8987+
assert asset_event.source_run_id == "test"

0 commit comments

Comments
 (0)