Skip to content

Commit 37b03c1

Browse files
authored
Move XComOperatorLink to serialized_objects as only API server uses it (apache#47674)
1 parent 535feab commit 37b03c1

File tree

3 files changed

+38
-40
lines changed

3 files changed

+38
-40
lines changed

airflow/serialization/serialized_objects.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
)
4949
from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance
5050
from airflow.models.taskinstancekey import TaskInstanceKey
51+
from airflow.models.xcom import BaseXCom
5152
from airflow.models.xcom_arg import SchedulerXComArg, deserialize_xcom_arg
5253
from airflow.providers_manager import ProvidersManager
5354
from airflow.sdk.definitions.asset import (
@@ -63,7 +64,6 @@
6364
BaseAsset,
6465
)
6566
from airflow.sdk.definitions.baseoperator import BaseOperator as TaskSDKBaseOperator
66-
from airflow.sdk.definitions.baseoperatorlink import XComOperatorLink
6767
from airflow.sdk.definitions.mappedoperator import MappedOperator
6868
from airflow.sdk.definitions.param import Param, ParamsDict
6969
from airflow.sdk.definitions.taskgroup import MappedTaskGroup, TaskGroup
@@ -88,6 +88,7 @@
8888
)
8989
from airflow.utils.db import LazySelectSequence
9090
from airflow.utils.docs import get_docs_url
91+
from airflow.utils.log.logging_mixin import LoggingMixin
9192
from airflow.utils.module_loading import import_string, qualname
9293
from airflow.utils.operator_resources import Resources
9394
from airflow.utils.timezone import from_timestamp, parse_timezone
@@ -1991,3 +1992,38 @@ def get_run_data_interval(self, run: DagRun) -> DataInterval:
19911992
access_control: Mapping[str, Mapping[str, Collection[str]] | Collection[str]] | None = pydantic.Field(
19921993
init=False, default=None
19931994
)
1995+
1996+
1997+
@attrs.define()
1998+
class XComOperatorLink(LoggingMixin):
1999+
"""A generic operator link class that can retrieve link only using XCOMs. Used while deserializing operators."""
2000+
2001+
name: str
2002+
xcom_key: str
2003+
2004+
def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) -> str:
2005+
"""
2006+
Retrieve the link from the XComs.
2007+
2008+
:param operator: The Airflow operator object this link is associated to.
2009+
:param ti_key: TaskInstance ID to return link for.
2010+
:return: link to external system, but by pulling it from XComs
2011+
"""
2012+
self.log.info(
2013+
"Attempting to retrieve link from XComs with key: %s for task id: %s", self.xcom_key, ti_key
2014+
)
2015+
value = BaseXCom.get_one(
2016+
key=self.xcom_key,
2017+
run_id=ti_key.run_id,
2018+
dag_id=ti_key.dag_id,
2019+
task_id=ti_key.task_id,
2020+
map_index=ti_key.map_index,
2021+
)
2022+
if not value:
2023+
self.log.debug(
2024+
"No link with name: %s present in XCom as key: %s, returning empty link",
2025+
self.name,
2026+
self.xcom_key,
2027+
)
2028+
return ""
2029+
return value

task-sdk/src/airflow/sdk/definitions/baseoperatorlink.py

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -22,49 +22,11 @@
2222

2323
import attrs
2424

25-
from airflow.models.xcom import BaseXCom
26-
from airflow.utils.log.logging_mixin import LoggingMixin
27-
2825
if TYPE_CHECKING:
2926
from airflow.models.baseoperator import BaseOperator
3027
from airflow.models.taskinstancekey import TaskInstanceKey
3128

3229

33-
@attrs.define()
34-
class XComOperatorLink(LoggingMixin):
35-
"""A generic operator link class that can retrieve link only using XCOMs. Used while deserializing operators."""
36-
37-
name: str
38-
xcom_key: str
39-
40-
def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) -> str:
41-
"""
42-
Retrieve the link from the XComs.
43-
44-
:param operator: The Airflow operator object this link is associated to.
45-
:param ti_key: TaskInstance ID to return link for.
46-
:return: link to external system, but by pulling it from XComs
47-
"""
48-
self.log.info(
49-
"Attempting to retrieve link from XComs with key: %s for task id: %s", self.xcom_key, ti_key
50-
)
51-
value = BaseXCom.get_one(
52-
key=self.xcom_key,
53-
run_id=ti_key.run_id,
54-
dag_id=ti_key.dag_id,
55-
task_id=ti_key.task_id,
56-
map_index=ti_key.map_index,
57-
)
58-
if not value:
59-
self.log.debug(
60-
"No link with name: %s present in XCom as key: %s, returning empty link",
61-
self.name,
62-
self.xcom_key,
63-
)
64-
return ""
65-
return value
66-
67-
6830
@attrs.define()
6931
class BaseOperatorLink(metaclass=ABCMeta):
7032
"""Abstract base class that defines how we get an operator link."""

tests/serialization/test_dag_serialization.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@
6767
from airflow.providers.standard.operators.bash import BashOperator
6868
from airflow.providers.standard.sensors.bash import BashSensor
6969
from airflow.sdk.definitions.asset import Asset
70-
from airflow.sdk.definitions.baseoperatorlink import XComOperatorLink
7170
from airflow.sdk.definitions.param import Param, ParamsDict
7271
from airflow.security import permissions
7372
from airflow.serialization.enums import Encoding
@@ -76,6 +75,7 @@
7675
BaseSerialization,
7776
SerializedBaseOperator,
7877
SerializedDAG,
78+
XComOperatorLink,
7979
)
8080
from airflow.task.priority_strategy import _DownstreamPriorityWeightStrategy
8181
from airflow.timetables.simple import NullTimetable, OnceTimetable

0 commit comments

Comments
 (0)