diff --git a/airbyte_cdk/legacy/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/legacy/sources/declarative/manifest_declarative_source.py index 0f452f97e..818ce0179 100644 --- a/airbyte_cdk/legacy/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/legacy/sources/declarative/manifest_declarative_source.py @@ -341,29 +341,41 @@ def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStrea def _initialize_cache_for_parent_streams( stream_configs: List[Dict[str, Any]], ) -> List[Dict[str, Any]]: + """Enable caching for parent streams unless explicitly disabled. + + Caching is enabled by default for parent streams to optimize performance when the same + parent data is needed by multiple child streams. However, explicit `use_cache: false` + settings are respected for streams that cannot use caching (e.g., scroll-based pagination + APIs where caching causes duplicate records). + """ parent_streams = set() + def _set_cache_if_not_disabled(requester: Dict[str, Any]) -> None: + """Set use_cache to True only if not explicitly disabled.""" + if requester.get("use_cache") is not False: + requester["use_cache"] = True + def update_with_cache_parent_configs( parent_configs: list[dict[str, Any]], ) -> None: for parent_config in parent_configs: parent_streams.add(parent_config["stream"]["name"]) if parent_config["stream"]["type"] == "StateDelegatingStream": - parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][ - "use_cache" - ] = True - parent_config["stream"]["incremental_stream"]["retriever"]["requester"][ - "use_cache" - ] = True + _set_cache_if_not_disabled( + parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"] + ) + _set_cache_if_not_disabled( + parent_config["stream"]["incremental_stream"]["retriever"]["requester"] + ) else: - parent_config["stream"]["retriever"]["requester"]["use_cache"] = True + _set_cache_if_not_disabled(parent_config["stream"]["retriever"]["requester"]) for stream_config in stream_configs: if stream_config.get("incremental_sync", {}).get("parent_stream"): parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"]) - stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][ - "use_cache" - ] = True + _set_cache_if_not_disabled( + stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"] + ) elif stream_config.get("retriever", {}).get("partition_router", {}): partition_router = stream_config["retriever"]["partition_router"] @@ -380,14 +392,14 @@ def update_with_cache_parent_configs( for stream_config in stream_configs: if stream_config["name"] in parent_streams: if stream_config["type"] == "StateDelegatingStream": - stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = ( - True + _set_cache_if_not_disabled( + stream_config["full_refresh_stream"]["retriever"]["requester"] ) - stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = ( - True + _set_cache_if_not_disabled( + stream_config["incremental_stream"]["retriever"]["requester"] ) else: - stream_config["retriever"]["requester"]["use_cache"] = True + _set_cache_if_not_disabled(stream_config["retriever"]["requester"]) return stream_configs def spec(self, logger: logging.Logger) -> ConnectorSpecification: diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 781bb64d1..45fe6aa2d 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -424,29 +424,41 @@ def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: i def _initialize_cache_for_parent_streams( stream_configs: List[Dict[str, Any]], ) -> List[Dict[str, Any]]: + """Enable caching for parent streams unless explicitly disabled. + + Caching is enabled by default for parent streams to optimize performance when the same + parent data is needed by multiple child streams. However, explicit `use_cache: false` + settings are respected for streams that cannot use caching (e.g., scroll-based pagination + APIs where caching causes duplicate records). + """ parent_streams = set() + def _set_cache_if_not_disabled(requester: Dict[str, Any]) -> None: + """Set use_cache to True only if not explicitly disabled.""" + if requester.get("use_cache") is not False: + requester["use_cache"] = True + def update_with_cache_parent_configs( parent_configs: list[dict[str, Any]], ) -> None: for parent_config in parent_configs: parent_streams.add(parent_config["stream"]["name"]) if parent_config["stream"]["type"] == "StateDelegatingStream": - parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][ - "use_cache" - ] = True - parent_config["stream"]["incremental_stream"]["retriever"]["requester"][ - "use_cache" - ] = True + _set_cache_if_not_disabled( + parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"] + ) + _set_cache_if_not_disabled( + parent_config["stream"]["incremental_stream"]["retriever"]["requester"] + ) else: - parent_config["stream"]["retriever"]["requester"]["use_cache"] = True + _set_cache_if_not_disabled(parent_config["stream"]["retriever"]["requester"]) for stream_config in stream_configs: if stream_config.get("incremental_sync", {}).get("parent_stream"): parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"]) - stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][ - "use_cache" - ] = True + _set_cache_if_not_disabled( + stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"] + ) elif stream_config.get("retriever", {}).get("partition_router", {}): partition_router = stream_config["retriever"]["partition_router"] @@ -463,14 +475,14 @@ def update_with_cache_parent_configs( for stream_config in stream_configs: if stream_config["name"] in parent_streams: if stream_config["type"] == "StateDelegatingStream": - stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = ( - True + _set_cache_if_not_disabled( + stream_config["full_refresh_stream"]["retriever"]["requester"] ) - stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = ( - True + _set_cache_if_not_disabled( + stream_config["incremental_stream"]["retriever"]["requester"] ) else: - stream_config["retriever"]["requester"]["use_cache"] = True + _set_cache_if_not_disabled(stream_config["retriever"]["requester"]) return stream_configs def spec(self, logger: logging.Logger) -> ConnectorSpecification: diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index f521186d0..e04a82c0d 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2286,7 +2286,7 @@ definitions: - "$ref": "#/definitions/CustomErrorHandler" use_cache: title: Use Cache - description: Enables stream requests caching. This field is automatically set by the CDK. + description: Enables stream requests caching. When set to true, repeated requests to the same URL will return cached responses. Parent streams automatically have caching enabled. Only set this to false if you are certain that caching should be disabled, as it may negatively impact performance when the same data is needed multiple times (e.g., for scroll-based pagination APIs where caching causes duplicate records). type: boolean default: false $parameters: diff --git a/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py b/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py index ab4335fe6..8bc130a35 100644 --- a/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py +++ b/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py @@ -1652,6 +1652,112 @@ def test_only_parent_streams_use_cache(): assert not get_retriever(streams[2]).requester.use_cache +def test_parent_stream_respects_explicit_use_cache_false(): + """Test that explicit use_cache: false is respected for parent streams. + + This is important for APIs that use scroll-based pagination (like Intercom's /companies/scroll + endpoint), where caching must be disabled because the same scroll_param is returned in + pagination responses, causing duplicate records and infinite pagination loops. + """ + # Parent stream with explicit use_cache: false + companies_stream = { + "type": "DeclarativeStream", + "$parameters": { + "name": "companies", + "primary_key": "id", + "url_base": "https://api.intercom.io/", + }, + "schema_loader": { + "name": "{{ parameters.stream_name }}", + "file_path": "./source_intercom/schemas/{{ parameters.name }}.yaml", + }, + "retriever": { + "paginator": { + "type": "DefaultPaginator", + "page_token_option": {"type": "RequestPath"}, + "pagination_strategy": { + "type": "CursorPagination", + "cursor_value": "{{ response.get('scroll_param') }}", + "page_size": 100, + }, + }, + "requester": { + "path": "companies/scroll", + "use_cache": False, # Explicitly disabled for scroll-based pagination + "authenticator": { + "type": "BearerAuthenticator", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": {"extractor": {"type": "DpathExtractor", "field_path": ["data"]}}, + }, + } + + manifest = { + "version": "0.29.3", + "definitions": {}, + "streams": [ + deepcopy(companies_stream), + { + "type": "DeclarativeStream", + "$parameters": { + "name": "company_segments", + "primary_key": "id", + "url_base": "https://api.intercom.io/", + }, + "schema_loader": { + "name": "{{ parameters.stream_name }}", + "file_path": "./source_intercom/schemas/{{ parameters.name }}.yaml", + }, + "retriever": { + "paginator": {"type": "NoPagination"}, + "requester": { + "path": "companies/{{ stream_partition.parent_id }}/segments", + "authenticator": { + "type": "BearerAuthenticator", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": { + "extractor": {"type": "DpathExtractor", "field_path": ["data"]} + }, + "partition_router": { + "parent_stream_configs": [ + { + "parent_key": "id", + "partition_field": "parent_id", + "stream": deepcopy(companies_stream), + } + ], + "type": "SubstreamPartitionRouter", + }, + }, + }, + ], + "check": {"type": "CheckStream", "stream_names": ["companies"]}, + } + source = ManifestDeclarativeSource(source_config=manifest) + + streams = source.streams({}) + assert len(streams) == 2 + + # Main stream with explicit use_cache: false should remain false (parent for substream) + assert streams[0].name == "companies" + # use_cache should remain False because it was explicitly set to False + assert not get_retriever(streams[0]).requester.use_cache + + # Substream + assert streams[1].name == "company_segments" + + # Parent stream created for substream should also respect use_cache: false + stream_slicer = streams[1]._stream_partition_generator._stream_slicer + assert stream_slicer.parent_stream_configs[0].stream.name == "companies" + # The parent stream in the substream config should also have use_cache: false + assert not stream_slicer.parent_stream_configs[ + 0 + ].stream._stream_partition_generator._partition_factory._retriever.requester.use_cache + + def _run_read(manifest: Mapping[str, Any], stream_name: str) -> List[AirbyteMessage]: catalog = ConfiguredAirbyteCatalog( streams=[ diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 8df16b463..bde6c35b1 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -3931,6 +3931,122 @@ def test_only_parent_streams_use_cache(): ) +def test_parent_stream_respects_explicit_use_cache_false(): + """Test that explicit use_cache: false is respected for parent streams. + + This is important for APIs that use scroll-based pagination (like Intercom's /companies/scroll + endpoint), where caching must be disabled because the same scroll_param is returned in + pagination responses, causing duplicate records and infinite pagination loops. + """ + # Parent stream with explicit use_cache: false + companies_stream = { + "type": "DeclarativeStream", + "$parameters": { + "name": "companies", + "primary_key": "id", + "url_base": "https://api.intercom.io/", + }, + "schema_loader": { + "name": "{{ parameters.stream_name }}", + "file_path": "./source_intercom/schemas/{{ parameters.name }}.yaml", + }, + "retriever": { + "paginator": { + "type": "DefaultPaginator", + "page_token_option": {"type": "RequestPath"}, + "pagination_strategy": { + "type": "CursorPagination", + "cursor_value": "{{ response.get('scroll_param') }}", + "page_size": 100, + }, + }, + "requester": { + "path": "companies/scroll", + "use_cache": False, # Explicitly disabled for scroll-based pagination + "authenticator": { + "type": "BearerAuthenticator", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": {"extractor": {"type": "DpathExtractor", "field_path": ["data"]}}, + }, + } + + manifest = { + "version": "0.29.3", + "definitions": {}, + "streams": [ + deepcopy(companies_stream), + { + "type": "DeclarativeStream", + "$parameters": { + "name": "company_segments", + "primary_key": "id", + "url_base": "https://api.intercom.io/", + }, + "schema_loader": { + "name": "{{ parameters.stream_name }}", + "file_path": "./source_intercom/schemas/{{ parameters.name }}.yaml", + }, + "retriever": { + "paginator": {"type": "NoPagination"}, + "requester": { + "path": "companies/{{ stream_partition.parent_id }}/segments", + "authenticator": { + "type": "BearerAuthenticator", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": { + "extractor": {"type": "DpathExtractor", "field_path": ["data"]} + }, + "partition_router": { + "parent_stream_configs": [ + { + "parent_key": "id", + "partition_field": "parent_id", + "stream": deepcopy(companies_stream), + } + ], + "type": "SubstreamPartitionRouter", + }, + }, + }, + ], + "check": {"type": "CheckStream", "stream_names": ["companies"]}, + } + source = ConcurrentDeclarativeSource( + source_config=manifest, config={}, catalog=create_catalog("lists"), state=None + ) + + streams = source.streams({}) + assert len(streams) == 2 + + # Main stream with explicit use_cache: false should remain false (parent for substream) + stream_0 = streams[0] + assert stream_0.name == "companies" + assert isinstance(stream_0, DefaultStream) + # use_cache should remain False because it was explicitly set to False + assert ( + not stream_0._stream_partition_generator._partition_factory._retriever.requester.use_cache + ) + + # Substream + stream_1 = streams[1] + assert stream_1.name == "company_segments" + assert isinstance(stream_1, DefaultStream) + + # Parent stream created for substream should also respect use_cache: false + assert ( + stream_1._stream_partition_generator._stream_slicer.parent_stream_configs[0].stream.name + == "companies" + ) + # The parent stream in the substream config should also have use_cache: false + assert not stream_1._stream_partition_generator._stream_slicer.parent_stream_configs[ + 0 + ].stream._stream_partition_generator._partition_factory._retriever.requester.use_cache + + def _run_read(manifest: Mapping[str, Any], stream_name: str) -> List[AirbyteMessage]: source = ConcurrentDeclarativeSource( source_config=manifest, config={}, catalog=create_catalog("lists"), state=None