|
29 | 29 | from airflow.models.taskinstance import TaskInstance |
30 | 30 | from airflow.models.xcom import XComModel |
31 | 31 | from airflow.providers.standard.operators.empty import EmptyOperator |
32 | | -from airflow.sdk import DAG |
| 32 | +from airflow.sdk import DAG, AssetAlias |
33 | 33 | from airflow.sdk.bases.xcom import BaseXCom |
34 | 34 | from airflow.sdk.execution_time.xcom import resolve_xcom_backend |
35 | 35 | from airflow.utils.session import provide_session |
@@ -194,6 +194,22 @@ def test_custom_xcom_deserialize(self, params: str, expected_value: int | str, t |
194 | 194 | assert response.status_code == 200 |
195 | 195 | assert response.json()["value"] == expected_value |
196 | 196 |
|
| 197 | + @pytest.mark.parametrize( |
| 198 | + "xcom_value, expected_return", |
| 199 | + [ |
| 200 | + pytest.param(42, 42, id="jsonable"), |
| 201 | + pytest.param(AssetAlias("x"), "AssetAlias(name='x', group='asset')", id="nonjsonable"), |
| 202 | + pytest.param([42, AssetAlias("x")], [42, "AssetAlias(name='x', group='asset')"], id="nested"), |
| 203 | + ], |
| 204 | + ) |
| 205 | + def test_stringify_false(self, test_client, xcom_value, expected_return): |
| 206 | + XComModel.set(TEST_XCOM_KEY, xcom_value, dag_id=TEST_DAG_ID, task_id=TEST_TASK_ID, run_id=run_id) |
| 207 | + |
| 208 | + url = f"/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{TEST_XCOM_KEY}" |
| 209 | + response = test_client.get(url, params={"deserialize": True, "stringify": False}) |
| 210 | + assert response.status_code == 200 |
| 211 | + assert response.json()["value"] == expected_return |
| 212 | + |
197 | 213 |
|
198 | 214 | class TestGetXComEntries(TestXComEndpoint): |
199 | 215 | @pytest.fixture(autouse=True) |
|
0 commit comments