Skip to content

Commit bc2a34e

Browse files
authored
feat: Removes stream_state interpolation from CDK (#320)
1 parent ab2c827 commit bc2a34e

21 files changed

+97
-320
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 0 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import (
2525
PerPartitionWithGlobalCursor,
2626
)
27-
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
2827
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
2928
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
3029
ConcurrencyLevel as ConcurrencyLevelModel,
@@ -36,13 +35,11 @@
3635
ModelToComponentFactory,
3736
)
3837
from airbyte_cdk.sources.declarative.partition_routers import AsyncJobPartitionRouter
39-
from airbyte_cdk.sources.declarative.requesters import HttpRequester
4038
from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, Retriever, SimpleRetriever
4139
from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import (
4240
DeclarativePartitionFactory,
4341
StreamSlicerPartitionGenerator,
4442
)
45-
from airbyte_cdk.sources.declarative.transformations.add_fields import AddFields
4643
from airbyte_cdk.sources.declarative.types import ConnectionDefinition
4744
from airbyte_cdk.sources.source import TState
4845
from airbyte_cdk.sources.streams import Stream
@@ -321,9 +318,6 @@ def _group_streams(
321318
incremental_sync_component_definition
322319
and incremental_sync_component_definition.get("type", "")
323320
== DatetimeBasedCursorModel.__name__
324-
and self._stream_supports_concurrent_partition_processing(
325-
declarative_stream=declarative_stream
326-
)
327321
and hasattr(declarative_stream.retriever, "stream_slicer")
328322
and isinstance(
329323
declarative_stream.retriever.stream_slicer, PerPartitionWithGlobalCursor
@@ -390,82 +384,13 @@ def _is_datetime_incremental_without_partition_routing(
390384
and bool(incremental_sync_component_definition)
391385
and incremental_sync_component_definition.get("type", "")
392386
== DatetimeBasedCursorModel.__name__
393-
and self._stream_supports_concurrent_partition_processing(
394-
declarative_stream=declarative_stream
395-
)
396387
and hasattr(declarative_stream.retriever, "stream_slicer")
397388
and (
398389
isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor)
399390
or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter)
400391
)
401392
)
402393

403-
def _stream_supports_concurrent_partition_processing(
404-
self, declarative_stream: DeclarativeStream
405-
) -> bool:
406-
"""
407-
Many connectors make use of stream_state during interpolation on a per-partition basis under the assumption that
408-
state is updated sequentially. Because the concurrent CDK engine processes different partitions in parallel,
409-
stream_state is no longer a thread-safe interpolation context. It would be a race condition because a cursor's
410-
stream_state can be updated in any order depending on which stream partition's finish first.
411-
412-
We should start to move away from depending on the value of stream_state for low-code components that operate
413-
per-partition, but we need to gate this otherwise some connectors will be blocked from publishing. See the
414-
cdk-migrations.md for the full list of connectors.
415-
"""
416-
417-
if isinstance(declarative_stream.retriever, SimpleRetriever) and isinstance(
418-
declarative_stream.retriever.requester, HttpRequester
419-
):
420-
http_requester = declarative_stream.retriever.requester
421-
if "stream_state" in http_requester._path.string:
422-
self.logger.warning(
423-
f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the HttpRequester which is not thread-safe. Defaulting to synchronous processing"
424-
)
425-
return False
426-
427-
request_options_provider = http_requester._request_options_provider
428-
if request_options_provider.request_options_contain_stream_state():
429-
self.logger.warning(
430-
f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the HttpRequester which is not thread-safe. Defaulting to synchronous processing"
431-
)
432-
return False
433-
434-
record_selector = declarative_stream.retriever.record_selector
435-
if isinstance(record_selector, RecordSelector):
436-
if (
437-
record_selector.record_filter
438-
and not isinstance(
439-
record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator
440-
)
441-
and "stream_state" in record_selector.record_filter.condition
442-
):
443-
self.logger.warning(
444-
f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the RecordFilter which is not thread-safe. Defaulting to synchronous processing"
445-
)
446-
return False
447-
448-
for add_fields in [
449-
transformation
450-
for transformation in record_selector.transformations
451-
if isinstance(transformation, AddFields)
452-
]:
453-
for field in add_fields.fields:
454-
if isinstance(field.value, str) and "stream_state" in field.value:
455-
self.logger.warning(
456-
f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the AddFields which is not thread-safe. Defaulting to synchronous processing"
457-
)
458-
return False
459-
if (
460-
isinstance(field.value, InterpolatedString)
461-
and "stream_state" in field.value.string
462-
):
463-
self.logger.warning(
464-
f"Low-code stream '{declarative_stream.name}' uses interpolation of stream_state in the AddFields which is not thread-safe. Defaulting to synchronous processing"
465-
)
466-
return False
467-
return True
468-
469394
@staticmethod
470395
def _get_retriever(
471396
declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any]

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ definitions:
8282
- stream_interval
8383
- stream_partition
8484
- stream_slice
85-
- stream_state
8685
examples:
8786
- "{{ record['updates'] }}"
8887
- "{{ record['MetaData']['LastUpdatedTime'] }}"
@@ -1776,7 +1775,6 @@ definitions:
17761775
- stream_interval
17771776
- stream_partition
17781777
- stream_slice
1779-
- stream_state
17801778
examples:
17811779
- "/products"
17821780
- "/quotes/{{ stream_partition['id'] }}/quote_line_groups"
@@ -1826,7 +1824,6 @@ definitions:
18261824
- stream_interval
18271825
- stream_partition
18281826
- stream_slice
1829-
- stream_state
18301827
examples:
18311828
- |
18321829
[{"clause": {"type": "timestamp", "operator": 10, "parameters":
@@ -1844,7 +1841,6 @@ definitions:
18441841
- stream_interval
18451842
- stream_partition
18461843
- stream_slice
1847-
- stream_state
18481844
examples:
18491845
- sort_order: "ASC"
18501846
sort_field: "CREATED_AT"
@@ -1865,7 +1861,6 @@ definitions:
18651861
- stream_interval
18661862
- stream_partition
18671863
- stream_slice
1868-
- stream_state
18691864
examples:
18701865
- Output-Format: JSON
18711866
- Version: "{{ config['version'] }}"
@@ -1882,7 +1877,6 @@ definitions:
18821877
- stream_interval
18831878
- stream_partition
18841879
- stream_slice
1885-
- stream_state
18861880
examples:
18871881
- unit: "day"
18881882
- query: 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"'
@@ -2237,7 +2231,6 @@ definitions:
22372231
interpolation_context:
22382232
- config
22392233
- record
2240-
- stream_state
22412234
- stream_slice
22422235
new:
22432236
type: string
@@ -2251,7 +2244,6 @@ definitions:
22512244
interpolation_context:
22522245
- config
22532246
- record
2254-
- stream_state
22552247
- stream_slice
22562248
$parameters:
22572249
type: object
@@ -2901,7 +2893,6 @@ definitions:
29012893
- stream_interval
29022894
- stream_partition
29032895
- stream_slice
2904-
- stream_state
29052896
examples:
29062897
- "{{ record['created_at'] >= stream_interval['start_time'] }}"
29072898
- "{{ record.status in ['active', 'expired'] }}"
@@ -3689,12 +3680,6 @@ interpolation:
36893680
- title: stream_slice
36903681
description: This variable is deprecated. Use stream_interval or stream_partition instead.
36913682
type: object
3692-
- title: stream_state
3693-
description: The current state of the stream. The object's keys are defined by the incremental sync's cursor_field the and partition router's values.
3694-
type: object
3695-
examples:
3696-
- created_at: "2020-01-01 00:00:00.000+00:00"
3697-
- updated_at: "2020-01-02 00:00:00.000+00:00"
36983683
macros:
36993684
- title: now_utc
37003685
description: Returns the current date and time in the UTC timezone.

airbyte_cdk/sources/declarative/interpolation/jinja.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@
1111
from jinja2.exceptions import UndefinedError
1212
from jinja2.sandbox import SandboxedEnvironment
1313

14+
from airbyte_cdk.models import FailureType
1415
from airbyte_cdk.sources.declarative.interpolation.filters import filters
1516
from airbyte_cdk.sources.declarative.interpolation.interpolation import Interpolation
1617
from airbyte_cdk.sources.declarative.interpolation.macros import macros
1718
from airbyte_cdk.sources.types import Config
19+
from airbyte_cdk.utils import AirbyteTracedException
1820

1921

2022
class StreamPartitionAccessEnvironment(SandboxedEnvironment):
@@ -36,6 +38,10 @@ def is_safe_attribute(self, obj: Any, attr: str, value: Any) -> bool:
3638
"stream_partition": "stream_slice", # Use stream_partition to access partition router's values
3739
}
3840

41+
_UNSUPPORTED_INTERPOLATION_VARIABLES: Mapping[str, str] = {
42+
"stream_state": "`stream_state` is no longer supported for interpolation. We recommend using `stream_interval` instead. Please reference the CDK Migration Guide for more information.",
43+
}
44+
3945
# These extensions are not installed so they're not currently a problem,
4046
# but we're still explicitly removing them from the jinja context.
4147
# At worst, this is documentation that we do NOT want to include these extensions because of the potential security risks
@@ -95,6 +101,13 @@ def eval(
95101
elif equivalent in context:
96102
context[alias] = context[equivalent]
97103

104+
for variable_name in _UNSUPPORTED_INTERPOLATION_VARIABLES:
105+
if variable_name in input_str:
106+
raise AirbyteTracedException(
107+
message=_UNSUPPORTED_INTERPOLATION_VARIABLES[variable_name],
108+
internal_message=_UNSUPPORTED_INTERPOLATION_VARIABLES[variable_name],
109+
failure_type=FailureType.config_error,
110+
)
98111
try:
99112
if isinstance(input_str, str):
100113
result = self._eval(input_str, context)

airbyte_cdk/sources/declarative/requesters/http_requester.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,6 @@ def get_path(
123123
next_page_token: Optional[Mapping[str, Any]],
124124
) -> str:
125125
kwargs = {
126-
"stream_state": stream_state,
127126
"stream_slice": stream_slice,
128127
"next_page_token": next_page_token,
129128
}

airbyte_cdk/sources/declarative/requesters/request_options/interpolated_nested_request_input_provider.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
NestedMapping,
1111
)
1212
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
13-
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
13+
from airbyte_cdk.sources.types import Config, StreamSlice
1414

1515

1616
@dataclass
@@ -42,20 +42,17 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
4242

4343
def eval_request_inputs(
4444
self,
45-
stream_state: Optional[StreamState] = None,
4645
stream_slice: Optional[StreamSlice] = None,
4746
next_page_token: Optional[Mapping[str, Any]] = None,
4847
) -> Mapping[str, Any]:
4948
"""
5049
Returns the request inputs to set on an outgoing HTTP request
5150
52-
:param stream_state: The stream state
5351
:param stream_slice: The stream slice
5452
:param next_page_token: The pagination token
5553
:return: The request inputs to set on an outgoing HTTP request
5654
"""
5755
kwargs = {
58-
"stream_state": stream_state,
5956
"stream_slice": stream_slice,
6057
"next_page_token": next_page_token,
6158
}

airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_input_provider.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
3737

3838
def eval_request_inputs(
3939
self,
40-
stream_state: Optional[StreamState] = None,
4140
stream_slice: Optional[StreamSlice] = None,
4241
next_page_token: Optional[Mapping[str, Any]] = None,
4342
valid_key_types: Optional[Tuple[Type[Any]]] = None,
@@ -46,15 +45,13 @@ def eval_request_inputs(
4645
"""
4746
Returns the request inputs to set on an outgoing HTTP request
4847
49-
:param stream_state: The stream state
5048
:param stream_slice: The stream slice
5149
:param next_page_token: The pagination token
5250
:param valid_key_types: A tuple of types that the interpolator should allow
5351
:param valid_value_types: A tuple of types that the interpolator should allow
5452
:return: The request inputs to set on an outgoing HTTP request
5553
"""
5654
kwargs = {
57-
"stream_state": stream_state,
5855
"stream_slice": stream_slice,
5956
"next_page_token": next_page_token,
6057
}

airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py

Lines changed: 2 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
from dataclasses import InitVar, dataclass, field
66
from typing import Any, Mapping, MutableMapping, Optional, Union
77

8-
from typing_extensions import deprecated
9-
108
from airbyte_cdk.sources.declarative.interpolation.interpolated_nested_mapping import NestedMapping
119
from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_nested_request_input_provider import (
1210
InterpolatedNestedRequestInputProvider,
@@ -17,7 +15,6 @@
1715
from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import (
1816
RequestOptionsProvider,
1917
)
20-
from airbyte_cdk.sources.source import ExperimentalClassWarning
2118
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
2219

2320
RequestInput = Union[str, Mapping[str, str]]
@@ -80,7 +77,6 @@ def get_request_params(
8077
next_page_token: Optional[Mapping[str, Any]] = None,
8178
) -> MutableMapping[str, Any]:
8279
interpolated_value = self._parameter_interpolator.eval_request_inputs(
83-
stream_state,
8480
stream_slice,
8581
next_page_token,
8682
valid_key_types=(str,),
@@ -97,9 +93,7 @@ def get_request_headers(
9793
stream_slice: Optional[StreamSlice] = None,
9894
next_page_token: Optional[Mapping[str, Any]] = None,
9995
) -> Mapping[str, Any]:
100-
return self._headers_interpolator.eval_request_inputs(
101-
stream_state, stream_slice, next_page_token
102-
)
96+
return self._headers_interpolator.eval_request_inputs(stream_slice, next_page_token)
10397

10498
def get_request_body_data(
10599
self,
@@ -109,7 +103,6 @@ def get_request_body_data(
109103
next_page_token: Optional[Mapping[str, Any]] = None,
110104
) -> Union[Mapping[str, Any], str]:
111105
return self._body_data_interpolator.eval_request_inputs(
112-
stream_state,
113106
stream_slice,
114107
next_page_token,
115108
valid_key_types=(str,),
@@ -123,42 +116,4 @@ def get_request_body_json(
123116
stream_slice: Optional[StreamSlice] = None,
124117
next_page_token: Optional[Mapping[str, Any]] = None,
125118
) -> Mapping[str, Any]:
126-
return self._body_json_interpolator.eval_request_inputs(
127-
stream_state, stream_slice, next_page_token
128-
)
129-
130-
@deprecated(
131-
"This class is temporary and used to incrementally deliver low-code to concurrent",
132-
category=ExperimentalClassWarning,
133-
)
134-
def request_options_contain_stream_state(self) -> bool:
135-
"""
136-
Temporary helper method used as we move low-code streams to the concurrent framework. This method determines if
137-
the InterpolatedRequestOptionsProvider has is a dependency on a non-thread safe interpolation context such as
138-
stream_state.
139-
"""
140-
141-
return (
142-
self._check_if_interpolation_uses_stream_state(self.request_parameters)
143-
or self._check_if_interpolation_uses_stream_state(self.request_headers)
144-
or self._check_if_interpolation_uses_stream_state(self.request_body_data)
145-
or self._check_if_interpolation_uses_stream_state(self.request_body_json)
146-
)
147-
148-
@staticmethod
149-
def _check_if_interpolation_uses_stream_state(
150-
request_input: Optional[Union[RequestInput, NestedMapping]],
151-
) -> bool:
152-
if not request_input:
153-
return False
154-
elif isinstance(request_input, str):
155-
return "stream_state" in request_input
156-
else:
157-
for key, val in request_input.items():
158-
# Covers the case of RequestInput in the form of a string or Mapping[str, str]. It also covers the case
159-
# of a NestedMapping where the value is a string.
160-
# Note: Doesn't account for nested mappings for request_body_json, but I don't see stream_state used in that way
161-
# in our code
162-
if "stream_state" in key or (isinstance(val, str) and "stream_state" in val):
163-
return True
164-
return False
119+
return self._body_json_interpolator.eval_request_inputs(stream_slice, next_page_token)

0 commit comments

Comments
 (0)