Skip to content

Commit 1af2264

Browse files
author
maxime.c
committed
refactor regarding async stuff
1 parent d9d09f0 commit 1af2264

File tree

2 files changed

+52
-60
lines changed

2 files changed

+52
-60
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -94,16 +94,13 @@
9494
ClientSideIncrementalRecordFilterDecorator,
9595
)
9696
from airbyte_cdk.sources.declarative.incremental import (
97-
ChildPartitionResumableFullRefreshCursor,
9897
ConcurrentCursorFactory,
9998
ConcurrentPerPartitionCursor,
10099
CursorFactory,
101100
DatetimeBasedCursor,
102101
DeclarativeCursor,
103102
GlobalSubstreamCursor,
104-
PerPartitionCursor,
105103
PerPartitionWithGlobalCursor,
106-
ResumableFullRefreshCursor,
107104
)
108105
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
109106
from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
@@ -446,10 +443,6 @@
446443
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
447444
ZipfileDecoder as ZipfileDecoderModel,
448445
)
449-
from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import (
450-
COMPONENTS_MODULE_NAME,
451-
SDM_COMPONENTS_MODULE_NAME,
452-
)
453446
from airbyte_cdk.sources.declarative.partition_routers import (
454447
CartesianProductStreamSlicer,
455448
GroupingPartitionRouter,
@@ -508,7 +501,7 @@
508501
RequestOptionsProvider,
509502
)
510503
from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath
511-
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod, Requester
504+
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod
512505
from airbyte_cdk.sources.declarative.resolvers import (
513506
ComponentMappingDefinition,
514507
ConfigComponentsResolver,
@@ -1941,10 +1934,10 @@ def create_declarative_stream(
19411934

19421935
primary_key = model.primary_key.__root__ if model.primary_key else None
19431936

1944-
stream_slicer = self._build_stream_slicer_from_partition_router(
1937+
partition_router = self._build_stream_slicer_from_partition_router(
19451938
model.retriever, config, stream_name=model.name
19461939
)
1947-
concurrent_cursor = self._build_concurrent_cursor(model, stream_slicer, config)
1940+
concurrent_cursor = self._build_concurrent_cursor(model, partition_router, config)
19481941

19491942
if model.incremental_sync and isinstance(model.incremental_sync, DatetimeBasedCursorModel):
19501943
cursor_model = model.incremental_sync
@@ -2019,7 +2012,9 @@ def create_declarative_stream(
20192012
primary_key=primary_key,
20202013
stream_slicer=combined_slicers,
20212014
request_options_provider=request_options_provider,
2022-
stop_condition_cursor=concurrent_cursor if self._is_stop_condition_on_cursor(model) else None,
2015+
stop_condition_cursor=concurrent_cursor
2016+
if self._is_stop_condition_on_cursor(model)
2017+
else None,
20232018
client_side_incremental_sync={"cursor": concurrent_cursor}
20242019
if self._is_client_side_filtering_enabled(model)
20252020
else None,
@@ -2055,7 +2050,10 @@ def create_declarative_stream(
20552050
schema_loader = DefaultSchemaLoader(config=config, parameters=options)
20562051

20572052
if (
2058-
(isinstance(combined_slicers, PartitionRouter) or isinstance(concurrent_cursor, ConcurrentCursor))
2053+
(
2054+
isinstance(combined_slicers, PartitionRouter)
2055+
or isinstance(concurrent_cursor, ConcurrentCursor)
2056+
)
20592057
and not is_parent
20602058
and not self._emit_connector_builder_messages
20612059
):
@@ -2067,7 +2065,9 @@ def create_declarative_stream(
20672065
# We specifically exclude parent streams here because SubstreamPartitionRouter has not been updated yet
20682066
# We specifically exclude Connector Builder stuff for now as Brian is working on this anyway
20692067

2068+
stream_name = model.name or ""
20702069
stream_slicer = concurrent_cursor
2070+
cursor = FinalStateCursor(stream_name, None, self._message_repository)
20712071
if isinstance(retriever, AsyncRetriever):
20722072
# The AsyncRetriever only ever worked with a cursor from the concurrent package. Hence, the method
20732073
# `_build_incremental_cursor` which we would usually think would return only declarative stuff has a
@@ -2076,10 +2076,13 @@ def create_declarative_stream(
20762076
# We can't re-use `concurrent_cursor` because it is a different instance than the one passed in
20772077
# AsyncJobPartitionRouter.
20782078
stream_slicer = retriever.stream_slicer
2079+
if isinstance(combined_slicers, Cursor):
2080+
cursor = combined_slicers
20792081
elif isinstance(combined_slicers, PartitionRouter):
20802082
stream_slicer = combined_slicers
2083+
else:
2084+
cursor = concurrent_cursor
20812085

2082-
stream_name = model.name or ""
20832086
partition_generator = StreamSlicerPartitionGenerator(
20842087
DeclarativePartitionFactory(
20852088
stream_name,
@@ -2089,16 +2092,17 @@ def create_declarative_stream(
20892092
),
20902093
stream_slicer,
20912094
)
2092-
cursor = concurrent_cursor if concurrent_cursor else FinalStateCursor(stream_name, None, self._message_repository)
2095+
20932096
return DefaultStream(
20942097
partition_generator=partition_generator,
20952098
name=stream_name,
20962099
json_schema=schema_loader.get_json_schema,
20972100
primary_key=get_primary_key_from_stream(primary_key),
2098-
cursor_field=cursor.cursor_field.cursor_field_key if hasattr(cursor, "cursor_field") else "", # FIXME we should have the cursor field has part of the interface of cursor,
2099-
# FIXME we should have the cursor field has part of the interface of cursor
2101+
cursor_field=cursor.cursor_field.cursor_field_key
2102+
if hasattr(cursor, "cursor_field")
2103+
else "", # FIXME we should have the cursor field has part of the interface of cursor,
21002104
logger=logging.getLogger(f"airbyte.{stream_name}"),
2101-
# FIXME this is a breaking change compared to the old implementation,
2105+
# FIXME this is a breaking change compared to the old implementation which used the source name instead
21022106
cursor=cursor,
21032107
)
21042108

@@ -2121,18 +2125,18 @@ def create_declarative_stream(
21212125
parameters=model.parameters or {},
21222126
)
21232127

2124-
def _is_stop_condition_on_cursor(self, model):
2128+
def _is_stop_condition_on_cursor(self, model: DeclarativeStreamModel) -> bool:
21252129
return (
2126-
model.incremental_sync
2127-
and hasattr(model.incremental_sync, "is_data_feed")
2128-
and model.incremental_sync.is_data_feed
2130+
model.incremental_sync
2131+
and hasattr(model.incremental_sync, "is_data_feed")
2132+
and model.incremental_sync.is_data_feed
21292133
)
21302134

2131-
def _is_client_side_filtering_enabled(self, model):
2135+
def _is_client_side_filtering_enabled(self, model: DeclarativeStreamModel) -> bool:
21322136
client_side_filtering_enabled = (
2133-
model.incremental_sync
2134-
and hasattr(model.incremental_sync, "is_client_side_incremental")
2135-
and model.incremental_sync.is_client_side_incremental
2137+
model.incremental_sync
2138+
and hasattr(model.incremental_sync, "is_client_side_incremental")
2139+
and model.incremental_sync.is_client_side_incremental
21362140
)
21372141
return client_side_filtering_enabled
21382142

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 23 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -373,38 +373,33 @@ def test_full_config_stream():
373373

374374
assert isinstance(retriever.record_selector.extractor, DpathExtractor)
375375
assert isinstance(retriever.record_selector.extractor.decoder, JsonDecoder)
376-
assert [
377-
fp.eval(input_config) for fp in retriever.record_selector.extractor._field_path
378-
] == ["lists"]
376+
assert [fp.eval(input_config) for fp in retriever.record_selector.extractor._field_path] == [
377+
"lists"
378+
]
379379

380380
assert isinstance(retriever.record_selector.record_filter, RecordFilter)
381381
assert (
382-
retriever.record_selector.record_filter._filter_interpolator.condition
383-
== "{{ record['id'] > stream_state['id'] }}"
382+
retriever.record_selector.record_filter._filter_interpolator.condition
383+
== "{{ record['id'] > stream_state['id'] }}"
384384
)
385385

386386
assert isinstance(retriever.paginator, DefaultPaginator)
387387
assert isinstance(retriever.paginator.decoder, PaginationDecoderDecorator)
388388
assert retriever.paginator.page_size_option.field_name.eval(input_config) == "page_size"
389-
assert (
390-
retriever.paginator.page_size_option.inject_into
391-
== RequestOptionType.request_parameter
392-
)
389+
assert retriever.paginator.page_size_option.inject_into == RequestOptionType.request_parameter
393390
assert isinstance(retriever.paginator.page_token_option, RequestPath)
394391
assert retriever.paginator.url_base.string == "https://api.sendgrid.com/v3/"
395392
assert retriever.paginator.url_base.default == "https://api.sendgrid.com/v3/"
396393

397394
assert isinstance(retriever.paginator.pagination_strategy, CursorPaginationStrategy)
398-
assert isinstance(
399-
retriever.paginator.pagination_strategy.decoder, PaginationDecoderDecorator
400-
)
395+
assert isinstance(retriever.paginator.pagination_strategy.decoder, PaginationDecoderDecorator)
401396
assert (
402-
retriever.paginator.pagination_strategy._cursor_value.string
403-
== "{{ response._metadata.next }}"
397+
retriever.paginator.pagination_strategy._cursor_value.string
398+
== "{{ response._metadata.next }}"
404399
)
405400
assert (
406-
retriever.paginator.pagination_strategy._cursor_value.default
407-
== "{{ response._metadata.next }}"
401+
retriever.paginator.pagination_strategy._cursor_value.default
402+
== "{{ response._metadata.next }}"
408403
)
409404
assert retriever.paginator.pagination_strategy.page_size == 10
410405

@@ -416,24 +411,20 @@ def test_full_config_stream():
416411

417412
assert isinstance(retriever.request_option_provider, DatetimeBasedRequestOptionsProvider)
418413
assert (
419-
retriever.request_option_provider.start_time_option.inject_into
420-
== RequestOptionType.request_parameter
414+
retriever.request_option_provider.start_time_option.inject_into
415+
== RequestOptionType.request_parameter
421416
)
422417
assert (
423-
retriever.request_option_provider.start_time_option.field_name.eval(
424-
config=input_config
425-
)
426-
== "after"
418+
retriever.request_option_provider.start_time_option.field_name.eval(config=input_config)
419+
== "after"
427420
)
428421
assert (
429-
retriever.request_option_provider.end_time_option.inject_into
430-
== RequestOptionType.request_parameter
422+
retriever.request_option_provider.end_time_option.inject_into
423+
== RequestOptionType.request_parameter
431424
)
432425
assert (
433-
retriever.request_option_provider.end_time_option.field_name.eval(
434-
config=input_config
435-
)
436-
== "before"
426+
retriever.request_option_provider.end_time_option.field_name.eval(config=input_config)
427+
== "before"
437428
)
438429
assert retriever.request_option_provider._partition_field_start.string == "start_time"
439430
assert retriever.request_option_provider._partition_field_end.string == "end_time"
@@ -444,9 +435,7 @@ def test_full_config_stream():
444435
assert isinstance(
445436
retriever.requester.request_options_provider, InterpolatedRequestOptionsProvider
446437
)
447-
assert (
448-
retriever.requester.request_options_provider.request_parameters.get("unit") == "day"
449-
)
438+
assert retriever.requester.request_options_provider.request_parameters.get("unit") == "day"
450439

451440
checker = factory.create_component(
452441
model_type=CheckStreamModel, component_definition=manifest["check"], config=input_config
@@ -1118,7 +1107,8 @@ def test_incremental_data_feed():
11181107
)
11191108

11201109
assert isinstance(
1121-
get_retriever(stream).paginator.pagination_strategy, StopConditionPaginationStrategyDecorator
1110+
get_retriever(stream).paginator.pagination_strategy,
1111+
StopConditionPaginationStrategyDecorator,
11221112
)
11231113

11241114

@@ -4149,9 +4139,7 @@ def test_simple_retriever_with_query_properties():
41494139
assert property_chunking.property_limit_type == PropertyLimitType.property_count
41504140
assert property_chunking.property_limit == 3
41514141

4152-
merge_strategy = (
4153-
retriever.additional_query_properties.property_chunking.record_merge_strategy
4154-
)
4142+
merge_strategy = retriever.additional_query_properties.property_chunking.record_merge_strategy
41554143
assert isinstance(merge_strategy, GroupByKey)
41564144
assert merge_strategy.key == ["id"]
41574145

0 commit comments

Comments
 (0)