Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -341,29 +341,45 @@ def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStrea
def _initialize_cache_for_parent_streams(
stream_configs: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""Enable caching for parent streams unless explicitly disabled.

Caching is enabled by default for parent streams to optimize performance when the same
parent data is needed by multiple child streams. However, explicit `use_cache: false`
settings are respected for streams that cannot use caching (e.g., scroll-based pagination
APIs where caching causes duplicate records).
"""
parent_streams = set()

def _should_enable_cache(requester: Dict[str, Any]) -> bool:
"""Return False only if use_cache is explicitly set to False."""
return requester.get("use_cache") is not False

def _set_cache_if_not_disabled(requester: Dict[str, Any]) -> None:
"""Set use_cache to True only if not explicitly disabled."""
if _should_enable_cache(requester):
requester["use_cache"] = True

def update_with_cache_parent_configs(
parent_configs: list[dict[str, Any]],
) -> None:
for parent_config in parent_configs:
parent_streams.add(parent_config["stream"]["name"])
if parent_config["stream"]["type"] == "StateDelegatingStream":
parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][
"use_cache"
] = True
parent_config["stream"]["incremental_stream"]["retriever"]["requester"][
"use_cache"
] = True
_set_cache_if_not_disabled(
parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"]
)
_set_cache_if_not_disabled(
parent_config["stream"]["incremental_stream"]["retriever"]["requester"]
)
else:
parent_config["stream"]["retriever"]["requester"]["use_cache"] = True
_set_cache_if_not_disabled(parent_config["stream"]["retriever"]["requester"])

for stream_config in stream_configs:
if stream_config.get("incremental_sync", {}).get("parent_stream"):
parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"])
stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][
"use_cache"
] = True
_set_cache_if_not_disabled(
stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"]
)

elif stream_config.get("retriever", {}).get("partition_router", {}):
partition_router = stream_config["retriever"]["partition_router"]
Expand All @@ -380,14 +396,14 @@ def update_with_cache_parent_configs(
for stream_config in stream_configs:
if stream_config["name"] in parent_streams:
if stream_config["type"] == "StateDelegatingStream":
stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = (
True
_set_cache_if_not_disabled(
stream_config["full_refresh_stream"]["retriever"]["requester"]
)
stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = (
True
_set_cache_if_not_disabled(
stream_config["incremental_stream"]["retriever"]["requester"]
)
else:
stream_config["retriever"]["requester"]["use_cache"] = True
_set_cache_if_not_disabled(stream_config["retriever"]["requester"])
return stream_configs

def spec(self, logger: logging.Logger) -> ConnectorSpecification:
Expand Down
46 changes: 31 additions & 15 deletions airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,29 +424,45 @@ def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: i
def _initialize_cache_for_parent_streams(
stream_configs: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""Enable caching for parent streams unless explicitly disabled.

Caching is enabled by default for parent streams to optimize performance when the same
parent data is needed by multiple child streams. However, explicit `use_cache: false`
settings are respected for streams that cannot use caching (e.g., scroll-based pagination
APIs where caching causes duplicate records).
"""
parent_streams = set()

def _should_enable_cache(requester: Dict[str, Any]) -> bool:
"""Return False only if use_cache is explicitly set to False."""
return requester.get("use_cache") is not False

def _set_cache_if_not_disabled(requester: Dict[str, Any]) -> None:
"""Set use_cache to True only if not explicitly disabled."""
if _should_enable_cache(requester):
requester["use_cache"] = True

def update_with_cache_parent_configs(
parent_configs: list[dict[str, Any]],
) -> None:
for parent_config in parent_configs:
parent_streams.add(parent_config["stream"]["name"])
if parent_config["stream"]["type"] == "StateDelegatingStream":
parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][
"use_cache"
] = True
parent_config["stream"]["incremental_stream"]["retriever"]["requester"][
"use_cache"
] = True
_set_cache_if_not_disabled(
parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"]
)
_set_cache_if_not_disabled(
parent_config["stream"]["incremental_stream"]["retriever"]["requester"]
)
else:
parent_config["stream"]["retriever"]["requester"]["use_cache"] = True
_set_cache_if_not_disabled(parent_config["stream"]["retriever"]["requester"])

for stream_config in stream_configs:
if stream_config.get("incremental_sync", {}).get("parent_stream"):
parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"])
stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][
"use_cache"
] = True
_set_cache_if_not_disabled(
stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"]
)

elif stream_config.get("retriever", {}).get("partition_router", {}):
partition_router = stream_config["retriever"]["partition_router"]
Expand All @@ -463,14 +479,14 @@ def update_with_cache_parent_configs(
for stream_config in stream_configs:
if stream_config["name"] in parent_streams:
if stream_config["type"] == "StateDelegatingStream":
stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = (
True
_set_cache_if_not_disabled(
stream_config["full_refresh_stream"]["retriever"]["requester"]
)
stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = (
True
_set_cache_if_not_disabled(
stream_config["incremental_stream"]["retriever"]["requester"]
)
else:
stream_config["retriever"]["requester"]["use_cache"] = True
_set_cache_if_not_disabled(stream_config["retriever"]["requester"])
return stream_configs

def spec(self, logger: logging.Logger) -> ConnectorSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1652,6 +1652,112 @@ def test_only_parent_streams_use_cache():
assert not get_retriever(streams[2]).requester.use_cache


def test_parent_stream_respects_explicit_use_cache_false():
"""Test that explicit use_cache: false is respected for parent streams.

This is important for APIs that use scroll-based pagination (like Intercom's /companies/scroll
endpoint), where caching must be disabled because the same scroll_param is returned in
pagination responses, causing duplicate records and infinite pagination loops.
"""
# Parent stream with explicit use_cache: false
companies_stream = {
"type": "DeclarativeStream",
"$parameters": {
"name": "companies",
"primary_key": "id",
"url_base": "https://api.intercom.io/",
},
"schema_loader": {
"name": "{{ parameters.stream_name }}",
"file_path": "./source_intercom/schemas/{{ parameters.name }}.yaml",
},
"retriever": {
"paginator": {
"type": "DefaultPaginator",
"page_token_option": {"type": "RequestPath"},
"pagination_strategy": {
"type": "CursorPagination",
"cursor_value": "{{ response.get('scroll_param') }}",
"page_size": 100,
},
},
"requester": {
"path": "companies/scroll",
"use_cache": False, # Explicitly disabled for scroll-based pagination
"authenticator": {
"type": "BearerAuthenticator",
"api_token": "{{ config['api_key'] }}",
},
},
"record_selector": {"extractor": {"type": "DpathExtractor", "field_path": ["data"]}},
},
}

manifest = {
"version": "0.29.3",
"definitions": {},
"streams": [
deepcopy(companies_stream),
{
"type": "DeclarativeStream",
"$parameters": {
"name": "company_segments",
"primary_key": "id",
"url_base": "https://api.intercom.io/",
},
"schema_loader": {
"name": "{{ parameters.stream_name }}",
"file_path": "./source_intercom/schemas/{{ parameters.name }}.yaml",
},
"retriever": {
"paginator": {"type": "NoPagination"},
"requester": {
"path": "companies/{{ stream_partition.parent_id }}/segments",
"authenticator": {
"type": "BearerAuthenticator",
"api_token": "{{ config['api_key'] }}",
},
},
"record_selector": {
"extractor": {"type": "DpathExtractor", "field_path": ["data"]}
},
"partition_router": {
"parent_stream_configs": [
{
"parent_key": "id",
"partition_field": "parent_id",
"stream": deepcopy(companies_stream),
}
],
"type": "SubstreamPartitionRouter",
},
},
},
],
"check": {"type": "CheckStream", "stream_names": ["companies"]},
}
source = ManifestDeclarativeSource(source_config=manifest)

streams = source.streams({})
assert len(streams) == 2

# Main stream with explicit use_cache: false should remain false (parent for substream)
assert streams[0].name == "companies"
# use_cache should remain False because it was explicitly set to False
assert not get_retriever(streams[0]).requester.use_cache

# Substream
assert streams[1].name == "company_segments"

# Parent stream created for substream should also respect use_cache: false
stream_slicer = streams[1]._stream_partition_generator._stream_slicer
assert stream_slicer.parent_stream_configs[0].stream.name == "companies"
# The parent stream in the substream config should also have use_cache: false
assert not stream_slicer.parent_stream_configs[
0
].stream._stream_partition_generator._partition_factory._retriever.requester.use_cache


def _run_read(manifest: Mapping[str, Any], stream_name: str) -> List[AirbyteMessage]:
catalog = ConfiguredAirbyteCatalog(
streams=[
Expand Down
116 changes: 116 additions & 0 deletions unit_tests/sources/declarative/test_concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -3931,6 +3931,122 @@ def test_only_parent_streams_use_cache():
)


def test_parent_stream_respects_explicit_use_cache_false():
"""Test that explicit use_cache: false is respected for parent streams.

This is important for APIs that use scroll-based pagination (like Intercom's /companies/scroll
endpoint), where caching must be disabled because the same scroll_param is returned in
pagination responses, causing duplicate records and infinite pagination loops.
"""
# Parent stream with explicit use_cache: false
companies_stream = {
"type": "DeclarativeStream",
"$parameters": {
"name": "companies",
"primary_key": "id",
"url_base": "https://api.intercom.io/",
},
"schema_loader": {
"name": "{{ parameters.stream_name }}",
"file_path": "./source_intercom/schemas/{{ parameters.name }}.yaml",
},
"retriever": {
"paginator": {
"type": "DefaultPaginator",
"page_token_option": {"type": "RequestPath"},
"pagination_strategy": {
"type": "CursorPagination",
"cursor_value": "{{ response.get('scroll_param') }}",
"page_size": 100,
},
},
"requester": {
"path": "companies/scroll",
"use_cache": False, # Explicitly disabled for scroll-based pagination
"authenticator": {
"type": "BearerAuthenticator",
"api_token": "{{ config['api_key'] }}",
},
},
"record_selector": {"extractor": {"type": "DpathExtractor", "field_path": ["data"]}},
},
}

manifest = {
"version": "0.29.3",
"definitions": {},
"streams": [
deepcopy(companies_stream),
{
"type": "DeclarativeStream",
"$parameters": {
"name": "company_segments",
"primary_key": "id",
"url_base": "https://api.intercom.io/",
},
"schema_loader": {
"name": "{{ parameters.stream_name }}",
"file_path": "./source_intercom/schemas/{{ parameters.name }}.yaml",
},
"retriever": {
"paginator": {"type": "NoPagination"},
"requester": {
"path": "companies/{{ stream_partition.parent_id }}/segments",
"authenticator": {
"type": "BearerAuthenticator",
"api_token": "{{ config['api_key'] }}",
},
},
"record_selector": {
"extractor": {"type": "DpathExtractor", "field_path": ["data"]}
},
"partition_router": {
"parent_stream_configs": [
{
"parent_key": "id",
"partition_field": "parent_id",
"stream": deepcopy(companies_stream),
}
],
"type": "SubstreamPartitionRouter",
},
},
},
],
"check": {"type": "CheckStream", "stream_names": ["companies"]},
}
source = ConcurrentDeclarativeSource(
source_config=manifest, config={}, catalog=create_catalog("lists"), state=None
)

streams = source.streams({})
assert len(streams) == 2

# Main stream with explicit use_cache: false should remain false (parent for substream)
stream_0 = streams[0]
assert stream_0.name == "companies"
assert isinstance(stream_0, DefaultStream)
# use_cache should remain False because it was explicitly set to False
assert (
not stream_0._stream_partition_generator._partition_factory._retriever.requester.use_cache
)

# Substream
stream_1 = streams[1]
assert stream_1.name == "company_segments"
assert isinstance(stream_1, DefaultStream)

# Parent stream created for substream should also respect use_cache: false
assert (
stream_1._stream_partition_generator._stream_slicer.parent_stream_configs[0].stream.name
== "companies"
)
# The parent stream in the substream config should also have use_cache: false
assert not stream_1._stream_partition_generator._stream_slicer.parent_stream_configs[
0
].stream._stream_partition_generator._partition_factory._retriever.requester.use_cache


def _run_read(manifest: Mapping[str, Any], stream_name: str) -> List[AirbyteMessage]:
source = ConcurrentDeclarativeSource(
source_config=manifest, config={}, catalog=create_catalog("lists"), state=None
Expand Down
Loading