diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index ae0997315..1c5f6723d 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -94,16 +94,13 @@ ClientSideIncrementalRecordFilterDecorator, ) from airbyte_cdk.sources.declarative.incremental import ( - ChildPartitionResumableFullRefreshCursor, ConcurrentCursorFactory, ConcurrentPerPartitionCursor, CursorFactory, DatetimeBasedCursor, DeclarativeCursor, GlobalSubstreamCursor, - PerPartitionCursor, PerPartitionWithGlobalCursor, - ResumableFullRefreshCursor, ) from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping @@ -446,10 +443,6 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ZipfileDecoder as ZipfileDecoderModel, ) -from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import ( - COMPONENTS_MODULE_NAME, - SDM_COMPONENTS_MODULE_NAME, -) from airbyte_cdk.sources.declarative.partition_routers import ( CartesianProductStreamSlicer, GroupingPartitionRouter, @@ -508,7 +501,7 @@ RequestOptionsProvider, ) from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath -from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod, Requester +from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod from airbyte_cdk.sources.declarative.resolvers import ( ComponentMappingDefinition, ConfigComponentsResolver, @@ -617,6 +610,9 @@ ) from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream +from airbyte_cdk.sources.streams.concurrent.partitions.stream_slicer import ( + StreamSlicer as ConcurrentStreamSlicer, +) from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import ( CustomFormatConcurrentStreamStateConverter, DateTimeStreamStateConverter, @@ -1933,29 +1929,7 @@ def create_datetime_based_cursor( def create_declarative_stream( self, model: DeclarativeStreamModel, config: Config, is_parent: bool = False, **kwargs: Any ) -> Union[DeclarativeStream, AbstractStream]: - # When constructing a declarative stream, we assemble the incremental_sync component and retriever's partition_router field - # components if they exist into a single CartesianProductStreamSlicer. This is then passed back as an argument when constructing the - # Retriever. This is done in the declarative stream not the retriever to support custom retrievers. The custom create methods in - # the factory only support passing arguments to the component constructors, whereas this performs a merge of all slicers into one. - combined_slicers = self._merge_stream_slicers(model=model, config=config) - primary_key = model.primary_key.__root__ if model.primary_key else None - stop_condition_on_cursor = ( - model.incremental_sync - and hasattr(model.incremental_sync, "is_data_feed") - and model.incremental_sync.is_data_feed - ) - client_side_filtering_enabled = ( - model.incremental_sync - and hasattr(model.incremental_sync, "is_client_side_incremental") - and model.incremental_sync.is_client_side_incremental - ) - concurrent_cursor = None - if stop_condition_on_cursor or client_side_filtering_enabled: - stream_slicer = self._build_stream_slicer_from_partition_router( - model.retriever, config, stream_name=model.name - ) - concurrent_cursor = self._build_concurrent_cursor(model, stream_slicer, config) if model.incremental_sync and isinstance(model.incremental_sync, DatetimeBasedCursorModel): cursor_model = model.incremental_sync @@ -2023,6 +1997,15 @@ def create_declarative_stream( model=model.file_uploader, config=config ) + # When constructing a declarative stream, we assemble the incremental_sync component and retriever's partition_router field + # components if they exist into a single CartesianProductStreamSlicer. This is then passed back as an argument when constructing the + # Retriever. This is done in the declarative stream not the retriever to support custom retrievers. The custom create methods in + # the factory only support passing arguments to the component constructors, whereas this performs a merge of all slicers into one. + combined_slicers = self._merge_stream_slicers(model=model, config=config) + partition_router = self._build_stream_slicer_from_partition_router( + model.retriever, config, stream_name=model.name + ) + concurrent_cursor = self._build_concurrent_cursor(model, partition_router, config) retriever = self._create_component_from_model( model=model.retriever, config=config, @@ -2030,9 +2013,11 @@ def create_declarative_stream( primary_key=primary_key, stream_slicer=combined_slicers, request_options_provider=request_options_provider, - stop_condition_cursor=concurrent_cursor, + stop_condition_cursor=concurrent_cursor + if self._is_stop_condition_on_cursor(model) + else None, client_side_incremental_sync={"cursor": concurrent_cursor} - if client_side_filtering_enabled + if self._is_client_side_filtering_enabled(model) else None, transformations=transformations, file_uploader=file_uploader, @@ -2066,18 +2051,41 @@ def create_declarative_stream( schema_loader = DefaultSchemaLoader(config=config, parameters=options) if ( - isinstance(combined_slicers, PartitionRouter) + ( + isinstance(combined_slicers, PartitionRouter) + or isinstance(concurrent_cursor, ConcurrentCursor) + ) and not self._emit_connector_builder_messages and not is_parent ): # We are starting to migrate streams to instantiate directly the DefaultStream instead of instantiating the # DeclarativeStream and assembling the DefaultStream from that. The plan is the following: # * Streams without partition router nor cursors and streams with only partition router. This is the `isinstance(combined_slicers, PartitionRouter)` condition as the first kind with have a SinglePartitionRouter - # * Streams without partition router but with cursor + # * Streams without partition router but with cursor. This is the `isinstance(concurrent_cursor, ConcurrentCursor)` condition # * Streams with both partition router and cursor # We specifically exclude parent streams here because SubstreamPartitionRouter has not been updated yet # We specifically exclude Connector Builder stuff for now as Brian is working on this anyway + stream_name = model.name or "" + stream_slicer: ConcurrentStreamSlicer = ( + concurrent_cursor if concurrent_cursor else SinglePartitionRouter(parameters={}) + ) + cursor: Cursor = FinalStateCursor(stream_name, None, self._message_repository) + if isinstance(retriever, AsyncRetriever): + # The AsyncRetriever only ever worked with a cursor from the concurrent package. Hence, the method + # `_build_incremental_cursor` which we would usually think would return only declarative stuff has a + # special clause and return a concurrent cursor. This stream slicer is passed to AsyncRetriever when + # built because the async retriever has a specific partition router which relies on this stream slicer. + # We can't re-use `concurrent_cursor` because it is a different instance than the one passed in + # AsyncJobPartitionRouter. + stream_slicer = retriever.stream_slicer + if isinstance(combined_slicers, Cursor): + cursor = combined_slicers + elif isinstance(combined_slicers, PartitionRouter): + stream_slicer = combined_slicers + elif concurrent_cursor: + cursor = concurrent_cursor + partition_generator = StreamSlicerPartitionGenerator( DeclarativePartitionFactory( stream_name, @@ -2085,18 +2093,19 @@ def create_declarative_stream( retriever, self._message_repository, ), - stream_slicer=combined_slicers, + stream_slicer=stream_slicer, ) return DefaultStream( partition_generator=partition_generator, name=stream_name, json_schema=schema_loader.get_json_schema, primary_key=get_primary_key_from_stream(primary_key), - cursor_field=None, - # FIXME we should have the cursor field has part of the interface of cursor + cursor_field=cursor.cursor_field.cursor_field_key + if hasattr(cursor, "cursor_field") + else "", # FIXME we should have the cursor field has part of the interface of cursor, logger=logging.getLogger(f"airbyte.{stream_name}"), - # FIXME this is a breaking change compared to the old implementation, - cursor=FinalStateCursor(stream_name, None, self._message_repository), + # FIXME this is a breaking change compared to the old implementation which used the source name instead + cursor=cursor, supports_file_transfer=hasattr(model, "file_uploader") and bool(model.file_uploader), ) @@ -2120,6 +2129,20 @@ def create_declarative_stream( parameters=model.parameters or {}, ) + def _is_stop_condition_on_cursor(self, model: DeclarativeStreamModel) -> bool: + return bool( + model.incremental_sync + and hasattr(model.incremental_sync, "is_data_feed") + and model.incremental_sync.is_data_feed + ) + + def _is_client_side_filtering_enabled(self, model: DeclarativeStreamModel) -> bool: + return bool( + model.incremental_sync + and hasattr(model.incremental_sync, "is_client_side_incremental") + and model.incremental_sync.is_client_side_incremental + ) + def _build_stream_slicer_from_partition_router( self, model: Union[ diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index aa8d0d781..b543354f7 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -350,102 +350,92 @@ def test_full_config_stream(): model_type=DeclarativeStreamModel, component_definition=stream_manifest, config=input_config ) - assert isinstance(stream, DeclarativeStream) - assert stream.primary_key == "id" + assert isinstance(stream, DefaultStream) assert stream.name == "lists" - assert stream._stream_cursor_field.string == "created" + assert stream.cursor_field == "created" - assert isinstance(stream.schema_loader, JsonFileSchemaLoader) - assert stream.schema_loader._get_json_filepath() == "./source_sendgrid/schemas/lists.json" + schema_loader = get_schema_loader(stream) + assert isinstance(schema_loader, JsonFileSchemaLoader) + assert schema_loader._get_json_filepath() == "./source_sendgrid/schemas/lists.json" - assert len(stream.retriever.record_selector.transformations) == 1 - add_fields = stream.retriever.record_selector.transformations[0] + retriever = get_retriever(stream) + assert len(retriever.record_selector.transformations) == 1 + add_fields = retriever.record_selector.transformations[0] assert isinstance(add_fields, AddFields) assert add_fields.fields[0].path == ["extra"] assert add_fields.fields[0].value.string == "{{ response.to_add }}" - assert isinstance(stream.retriever, SimpleRetriever) - assert stream.retriever.primary_key == stream.primary_key - assert stream.retriever.name == stream.name + assert isinstance(retriever, SimpleRetriever) + assert retriever.primary_key == "id" + assert retriever.name == stream.name - assert isinstance(stream.retriever.record_selector, RecordSelector) + assert isinstance(retriever.record_selector, RecordSelector) - assert isinstance(stream.retriever.record_selector.extractor, DpathExtractor) - assert isinstance(stream.retriever.record_selector.extractor.decoder, JsonDecoder) - assert [ - fp.eval(input_config) for fp in stream.retriever.record_selector.extractor._field_path - ] == ["lists"] + assert isinstance(retriever.record_selector.extractor, DpathExtractor) + assert isinstance(retriever.record_selector.extractor.decoder, JsonDecoder) + assert [fp.eval(input_config) for fp in retriever.record_selector.extractor._field_path] == [ + "lists" + ] - assert isinstance(stream.retriever.record_selector.record_filter, RecordFilter) + assert isinstance(retriever.record_selector.record_filter, RecordFilter) assert ( - stream.retriever.record_selector.record_filter._filter_interpolator.condition + retriever.record_selector.record_filter._filter_interpolator.condition == "{{ record['id'] > stream_state['id'] }}" ) - assert isinstance(stream.retriever.paginator, DefaultPaginator) - assert isinstance(stream.retriever.paginator.decoder, PaginationDecoderDecorator) - assert stream.retriever.paginator.page_size_option.field_name.eval(input_config) == "page_size" - assert ( - stream.retriever.paginator.page_size_option.inject_into - == RequestOptionType.request_parameter - ) - assert isinstance(stream.retriever.paginator.page_token_option, RequestPath) - assert stream.retriever.paginator.url_base.string == "https://api.sendgrid.com/v3/" - assert stream.retriever.paginator.url_base.default == "https://api.sendgrid.com/v3/" - - assert isinstance(stream.retriever.paginator.pagination_strategy, CursorPaginationStrategy) - assert isinstance( - stream.retriever.paginator.pagination_strategy.decoder, PaginationDecoderDecorator - ) + assert isinstance(retriever.paginator, DefaultPaginator) + assert isinstance(retriever.paginator.decoder, PaginationDecoderDecorator) + assert retriever.paginator.page_size_option.field_name.eval(input_config) == "page_size" + assert retriever.paginator.page_size_option.inject_into == RequestOptionType.request_parameter + assert isinstance(retriever.paginator.page_token_option, RequestPath) + assert retriever.paginator.url_base.string == "https://api.sendgrid.com/v3/" + assert retriever.paginator.url_base.default == "https://api.sendgrid.com/v3/" + + assert isinstance(retriever.paginator.pagination_strategy, CursorPaginationStrategy) + assert isinstance(retriever.paginator.pagination_strategy.decoder, PaginationDecoderDecorator) assert ( - stream.retriever.paginator.pagination_strategy._cursor_value.string + retriever.paginator.pagination_strategy._cursor_value.string == "{{ response._metadata.next }}" ) assert ( - stream.retriever.paginator.pagination_strategy._cursor_value.default + retriever.paginator.pagination_strategy._cursor_value.default == "{{ response._metadata.next }}" ) - assert stream.retriever.paginator.pagination_strategy.page_size == 10 + assert retriever.paginator.pagination_strategy.page_size == 10 - assert isinstance(stream.retriever.requester, HttpRequester) - assert stream.retriever.requester.http_method == HttpMethod.GET - assert stream.retriever.requester.name == stream.name - assert stream.retriever.requester._path.string == "{{ next_page_token['next_page_url'] }}" - assert stream.retriever.requester._path.default == "{{ next_page_token['next_page_url'] }}" + assert isinstance(retriever.requester, HttpRequester) + assert retriever.requester.http_method == HttpMethod.GET + assert retriever.requester.name == stream.name + assert retriever.requester._path.string == "{{ next_page_token['next_page_url'] }}" + assert retriever.requester._path.default == "{{ next_page_token['next_page_url'] }}" - assert isinstance(stream.retriever.request_option_provider, DatetimeBasedRequestOptionsProvider) + assert isinstance(retriever.request_option_provider, DatetimeBasedRequestOptionsProvider) assert ( - stream.retriever.request_option_provider.start_time_option.inject_into + retriever.request_option_provider.start_time_option.inject_into == RequestOptionType.request_parameter ) assert ( - stream.retriever.request_option_provider.start_time_option.field_name.eval( - config=input_config - ) + retriever.request_option_provider.start_time_option.field_name.eval(config=input_config) == "after" ) assert ( - stream.retriever.request_option_provider.end_time_option.inject_into + retriever.request_option_provider.end_time_option.inject_into == RequestOptionType.request_parameter ) assert ( - stream.retriever.request_option_provider.end_time_option.field_name.eval( - config=input_config - ) + retriever.request_option_provider.end_time_option.field_name.eval(config=input_config) == "before" ) - assert stream.retriever.request_option_provider._partition_field_start.string == "start_time" - assert stream.retriever.request_option_provider._partition_field_end.string == "end_time" + assert retriever.request_option_provider._partition_field_start.string == "start_time" + assert retriever.request_option_provider._partition_field_end.string == "end_time" - assert isinstance(stream.retriever.requester.authenticator, BearerAuthenticator) - assert stream.retriever.requester.authenticator.token_provider.get_token() == "verysecrettoken" + assert isinstance(retriever.requester.authenticator, BearerAuthenticator) + assert retriever.requester.authenticator.token_provider.get_token() == "verysecrettoken" assert isinstance( - stream.retriever.requester.request_options_provider, InterpolatedRequestOptionsProvider - ) - assert ( - stream.retriever.requester.request_options_provider.request_parameters.get("unit") == "day" + retriever.requester.request_options_provider, InterpolatedRequestOptionsProvider ) + assert retriever.requester.request_options_provider.request_parameters.get("unit") == "day" checker = factory.create_component( model_type=CheckStreamModel, component_definition=manifest["check"], config=input_config @@ -1117,7 +1107,8 @@ def test_incremental_data_feed(): ) assert isinstance( - stream.retriever.paginator.pagination_strategy, StopConditionPaginationStrategyDecorator + get_retriever(stream).paginator.pagination_strategy, + StopConditionPaginationStrategyDecorator, ) @@ -1198,11 +1189,12 @@ def test_client_side_incremental(): model_type=DeclarativeStreamModel, component_definition=stream_manifest, config=input_config ) + retriever = get_retriever(stream) assert isinstance( - stream.retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator + retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator ) - assert stream.retriever.record_selector.transform_before_filtering == True + assert get_retriever(stream).record_selector.transform_before_filtering == True def test_client_side_incremental_with_partition_router(): @@ -2440,8 +2432,8 @@ def test_default_schema_loader(self): "cursor_granularity": "PT0.000001S", }, None, - DatetimeBasedCursor, - DeclarativeStream, + ConcurrentCursor, + DefaultStream, id="test_create_simple_retriever_with_incremental", ), pytest.param( @@ -4130,7 +4122,8 @@ def test_simple_retriever_with_query_properties(): model_type=DeclarativeStreamModel, component_definition=stream_manifest, config=input_config ) - query_properties = stream.retriever.additional_query_properties + retriever = get_retriever(stream) + query_properties = retriever.additional_query_properties assert isinstance(query_properties, QueryProperties) assert query_properties.property_list == [ "first_name", @@ -4141,18 +4134,16 @@ def test_simple_retriever_with_query_properties(): ] assert query_properties.always_include_properties == ["id"] - property_chunking = stream.retriever.additional_query_properties.property_chunking + property_chunking = retriever.additional_query_properties.property_chunking assert isinstance(property_chunking, PropertyChunking) assert property_chunking.property_limit_type == PropertyLimitType.property_count assert property_chunking.property_limit == 3 - merge_strategy = ( - stream.retriever.additional_query_properties.property_chunking.record_merge_strategy - ) + merge_strategy = retriever.additional_query_properties.property_chunking.record_merge_strategy assert isinstance(merge_strategy, GroupByKey) assert merge_strategy.key == ["id"] - request_options_provider = stream.retriever.requester.request_options_provider + request_options_provider = retriever.requester.request_options_provider assert isinstance(request_options_provider, InterpolatedRequestOptionsProvider) # For a better developer experience we allow QueryProperties to be defined on the requester.request_parameters, # but it actually is leveraged by the SimpleRetriever which is why it is not included in the RequestOptionsProvider @@ -4232,27 +4223,28 @@ def test_simple_retriever_with_request_parameters_properties_from_endpoint(): model_type=DeclarativeStreamModel, component_definition=stream_manifest, config=input_config ) - query_properties = stream.retriever.additional_query_properties + retriever = get_retriever(stream) + query_properties = retriever.additional_query_properties assert isinstance(query_properties, QueryProperties) assert query_properties.always_include_properties is None - properties_from_endpoint = stream.retriever.additional_query_properties.property_list + properties_from_endpoint = retriever.additional_query_properties.property_list assert isinstance(properties_from_endpoint, PropertiesFromEndpoint) assert properties_from_endpoint.property_field_path == ["name"] properties_from_endpoint_retriever = ( - stream.retriever.additional_query_properties.property_list.retriever + retriever.additional_query_properties.property_list.retriever ) assert isinstance(properties_from_endpoint_retriever, SimpleRetriever) properties_from_endpoint_requester = ( - stream.retriever.additional_query_properties.property_list.retriever.requester + retriever.additional_query_properties.property_list.retriever.requester ) assert isinstance(properties_from_endpoint_requester, HttpRequester) assert properties_from_endpoint_requester.url_base == "https://api.hubapi.com" assert properties_from_endpoint_requester.path == "/properties/v2/dynamics/properties" - property_chunking = stream.retriever.additional_query_properties.property_chunking + property_chunking = retriever.additional_query_properties.property_chunking assert isinstance(property_chunking, PropertyChunking) assert property_chunking.property_limit_type == PropertyLimitType.property_count assert property_chunking.property_limit == 3 @@ -4320,22 +4312,23 @@ def test_simple_retriever_with_requester_properties_from_endpoint(): model_type=DeclarativeStreamModel, component_definition=stream_manifest, config=input_config ) - query_properties = stream.retriever.additional_query_properties + retriever = get_retriever(stream) + query_properties = retriever.additional_query_properties assert isinstance(query_properties, QueryProperties) assert query_properties.always_include_properties is None assert query_properties.property_chunking is None - properties_from_endpoint = stream.retriever.additional_query_properties.property_list + properties_from_endpoint = retriever.additional_query_properties.property_list assert isinstance(properties_from_endpoint, PropertiesFromEndpoint) assert properties_from_endpoint.property_field_path == ["name"] properties_from_endpoint_retriever = ( - stream.retriever.additional_query_properties.property_list.retriever + retriever.additional_query_properties.property_list.retriever ) assert isinstance(properties_from_endpoint_retriever, SimpleRetriever) properties_from_endpoint_requester = ( - stream.retriever.additional_query_properties.property_list.retriever.requester + retriever.additional_query_properties.property_list.retriever.requester ) assert isinstance(properties_from_endpoint_requester, HttpRequester) assert properties_from_endpoint_requester.url_base == "https://api.hubapi.com"