Skip to content

Commit 4d546be

Browse files
fix: Respect explicit use_cache: false for parent streams in declarative sources
This change modifies the _initialize_cache_for_parent_streams method to respect explicit use_cache: false settings in the manifest while still defaulting to True for parent streams that don't specify a value. This is important for APIs that use scroll-based pagination (like Intercom's /companies/scroll endpoint), where caching must be disabled because the same scroll_param is returned in pagination responses, causing duplicate records and infinite pagination loops. Fixes: airbytehq/oncall#8346 Co-Authored-By: unknown <>
1 parent daf7d48 commit 4d546be

File tree

4 files changed

+288
-30
lines changed

4 files changed

+288
-30
lines changed

airbyte_cdk/legacy/sources/declarative/manifest_declarative_source.py

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -341,29 +341,47 @@ def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStrea
341341
def _initialize_cache_for_parent_streams(
342342
stream_configs: List[Dict[str, Any]],
343343
) -> List[Dict[str, Any]]:
344+
"""Enable caching for parent streams unless explicitly disabled.
345+
346+
Caching is enabled by default for parent streams to optimize performance when the same
347+
parent data is needed by multiple child streams. However, explicit `use_cache: false`
348+
settings are respected for streams that cannot use caching (e.g., scroll-based pagination
349+
APIs where caching causes duplicate records).
350+
"""
344351
parent_streams = set()
345352

353+
def _should_enable_cache(requester: Dict[str, Any]) -> bool:
354+
"""Return False only if use_cache is explicitly set to False."""
355+
return requester.get("use_cache") is not False
356+
357+
def _set_cache_if_not_disabled(requester: Dict[str, Any]) -> None:
358+
"""Set use_cache to True only if not explicitly disabled."""
359+
if _should_enable_cache(requester):
360+
requester["use_cache"] = True
361+
346362
def update_with_cache_parent_configs(
347363
parent_configs: list[dict[str, Any]],
348364
) -> None:
349365
for parent_config in parent_configs:
350366
parent_streams.add(parent_config["stream"]["name"])
351367
if parent_config["stream"]["type"] == "StateDelegatingStream":
352-
parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][
353-
"use_cache"
354-
] = True
355-
parent_config["stream"]["incremental_stream"]["retriever"]["requester"][
356-
"use_cache"
357-
] = True
368+
_set_cache_if_not_disabled(
369+
parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"]
370+
)
371+
_set_cache_if_not_disabled(
372+
parent_config["stream"]["incremental_stream"]["retriever"]["requester"]
373+
)
358374
else:
359-
parent_config["stream"]["retriever"]["requester"]["use_cache"] = True
375+
_set_cache_if_not_disabled(
376+
parent_config["stream"]["retriever"]["requester"]
377+
)
360378

361379
for stream_config in stream_configs:
362380
if stream_config.get("incremental_sync", {}).get("parent_stream"):
363381
parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"])
364-
stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][
365-
"use_cache"
366-
] = True
382+
_set_cache_if_not_disabled(
383+
stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"]
384+
)
367385

368386
elif stream_config.get("retriever", {}).get("partition_router", {}):
369387
partition_router = stream_config["retriever"]["partition_router"]
@@ -380,14 +398,14 @@ def update_with_cache_parent_configs(
380398
for stream_config in stream_configs:
381399
if stream_config["name"] in parent_streams:
382400
if stream_config["type"] == "StateDelegatingStream":
383-
stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = (
384-
True
401+
_set_cache_if_not_disabled(
402+
stream_config["full_refresh_stream"]["retriever"]["requester"]
385403
)
386-
stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = (
387-
True
404+
_set_cache_if_not_disabled(
405+
stream_config["incremental_stream"]["retriever"]["requester"]
388406
)
389407
else:
390-
stream_config["retriever"]["requester"]["use_cache"] = True
408+
_set_cache_if_not_disabled(stream_config["retriever"]["requester"])
391409
return stream_configs
392410

393411
def spec(self, logger: logging.Logger) -> ConnectorSpecification:

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -424,29 +424,47 @@ def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: i
424424
def _initialize_cache_for_parent_streams(
425425
stream_configs: List[Dict[str, Any]],
426426
) -> List[Dict[str, Any]]:
427+
"""Enable caching for parent streams unless explicitly disabled.
428+
429+
Caching is enabled by default for parent streams to optimize performance when the same
430+
parent data is needed by multiple child streams. However, explicit `use_cache: false`
431+
settings are respected for streams that cannot use caching (e.g., scroll-based pagination
432+
APIs where caching causes duplicate records).
433+
"""
427434
parent_streams = set()
428435

436+
def _should_enable_cache(requester: Dict[str, Any]) -> bool:
437+
"""Return False only if use_cache is explicitly set to False."""
438+
return requester.get("use_cache") is not False
439+
440+
def _set_cache_if_not_disabled(requester: Dict[str, Any]) -> None:
441+
"""Set use_cache to True only if not explicitly disabled."""
442+
if _should_enable_cache(requester):
443+
requester["use_cache"] = True
444+
429445
def update_with_cache_parent_configs(
430446
parent_configs: list[dict[str, Any]],
431447
) -> None:
432448
for parent_config in parent_configs:
433449
parent_streams.add(parent_config["stream"]["name"])
434450
if parent_config["stream"]["type"] == "StateDelegatingStream":
435-
parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][
436-
"use_cache"
437-
] = True
438-
parent_config["stream"]["incremental_stream"]["retriever"]["requester"][
439-
"use_cache"
440-
] = True
451+
_set_cache_if_not_disabled(
452+
parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"]
453+
)
454+
_set_cache_if_not_disabled(
455+
parent_config["stream"]["incremental_stream"]["retriever"]["requester"]
456+
)
441457
else:
442-
parent_config["stream"]["retriever"]["requester"]["use_cache"] = True
458+
_set_cache_if_not_disabled(
459+
parent_config["stream"]["retriever"]["requester"]
460+
)
443461

444462
for stream_config in stream_configs:
445463
if stream_config.get("incremental_sync", {}).get("parent_stream"):
446464
parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"])
447-
stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][
448-
"use_cache"
449-
] = True
465+
_set_cache_if_not_disabled(
466+
stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"]
467+
)
450468

451469
elif stream_config.get("retriever", {}).get("partition_router", {}):
452470
partition_router = stream_config["retriever"]["partition_router"]
@@ -463,14 +481,14 @@ def update_with_cache_parent_configs(
463481
for stream_config in stream_configs:
464482
if stream_config["name"] in parent_streams:
465483
if stream_config["type"] == "StateDelegatingStream":
466-
stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = (
467-
True
484+
_set_cache_if_not_disabled(
485+
stream_config["full_refresh_stream"]["retriever"]["requester"]
468486
)
469-
stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = (
470-
True
487+
_set_cache_if_not_disabled(
488+
stream_config["incremental_stream"]["retriever"]["requester"]
471489
)
472490
else:
473-
stream_config["retriever"]["requester"]["use_cache"] = True
491+
_set_cache_if_not_disabled(stream_config["retriever"]["requester"])
474492
return stream_configs
475493

476494
def spec(self, logger: logging.Logger) -> ConnectorSpecification:

unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1652,6 +1652,112 @@ def test_only_parent_streams_use_cache():
16521652
assert not get_retriever(streams[2]).requester.use_cache
16531653

16541654

1655+
def test_parent_stream_respects_explicit_use_cache_false():
1656+
"""Test that explicit use_cache: false is respected for parent streams.
1657+
1658+
This is important for APIs that use scroll-based pagination (like Intercom's /companies/scroll
1659+
endpoint), where caching must be disabled because the same scroll_param is returned in
1660+
pagination responses, causing duplicate records and infinite pagination loops.
1661+
"""
1662+
# Parent stream with explicit use_cache: false
1663+
companies_stream = {
1664+
"type": "DeclarativeStream",
1665+
"$parameters": {
1666+
"name": "companies",
1667+
"primary_key": "id",
1668+
"url_base": "https://api.intercom.io/",
1669+
},
1670+
"schema_loader": {
1671+
"name": "{{ parameters.stream_name }}",
1672+
"file_path": "./source_intercom/schemas/{{ parameters.name }}.yaml",
1673+
},
1674+
"retriever": {
1675+
"paginator": {
1676+
"type": "DefaultPaginator",
1677+
"page_token_option": {"type": "RequestPath"},
1678+
"pagination_strategy": {
1679+
"type": "CursorPagination",
1680+
"cursor_value": "{{ response.get('scroll_param') }}",
1681+
"page_size": 100,
1682+
},
1683+
},
1684+
"requester": {
1685+
"path": "companies/scroll",
1686+
"use_cache": False, # Explicitly disabled for scroll-based pagination
1687+
"authenticator": {
1688+
"type": "BearerAuthenticator",
1689+
"api_token": "{{ config['api_key'] }}",
1690+
},
1691+
},
1692+
"record_selector": {"extractor": {"type": "DpathExtractor", "field_path": ["data"]}},
1693+
},
1694+
}
1695+
1696+
manifest = {
1697+
"version": "0.29.3",
1698+
"definitions": {},
1699+
"streams": [
1700+
deepcopy(companies_stream),
1701+
{
1702+
"type": "DeclarativeStream",
1703+
"$parameters": {
1704+
"name": "company_segments",
1705+
"primary_key": "id",
1706+
"url_base": "https://api.intercom.io/",
1707+
},
1708+
"schema_loader": {
1709+
"name": "{{ parameters.stream_name }}",
1710+
"file_path": "./source_intercom/schemas/{{ parameters.name }}.yaml",
1711+
},
1712+
"retriever": {
1713+
"paginator": {"type": "NoPagination"},
1714+
"requester": {
1715+
"path": "companies/{{ stream_partition.parent_id }}/segments",
1716+
"authenticator": {
1717+
"type": "BearerAuthenticator",
1718+
"api_token": "{{ config['api_key'] }}",
1719+
},
1720+
},
1721+
"record_selector": {
1722+
"extractor": {"type": "DpathExtractor", "field_path": ["data"]}
1723+
},
1724+
"partition_router": {
1725+
"parent_stream_configs": [
1726+
{
1727+
"parent_key": "id",
1728+
"partition_field": "parent_id",
1729+
"stream": deepcopy(companies_stream),
1730+
}
1731+
],
1732+
"type": "SubstreamPartitionRouter",
1733+
},
1734+
},
1735+
},
1736+
],
1737+
"check": {"type": "CheckStream", "stream_names": ["companies"]},
1738+
}
1739+
source = ManifestDeclarativeSource(source_config=manifest)
1740+
1741+
streams = source.streams({})
1742+
assert len(streams) == 2
1743+
1744+
# Main stream with explicit use_cache: false should remain false (parent for substream)
1745+
assert streams[0].name == "companies"
1746+
# use_cache should remain False because it was explicitly set to False
1747+
assert not get_retriever(streams[0]).requester.use_cache
1748+
1749+
# Substream
1750+
assert streams[1].name == "company_segments"
1751+
1752+
# Parent stream created for substream should also respect use_cache: false
1753+
stream_slicer = streams[1]._stream_partition_generator._stream_slicer
1754+
assert stream_slicer.parent_stream_configs[0].stream.name == "companies"
1755+
# The parent stream in the substream config should also have use_cache: false
1756+
assert not stream_slicer.parent_stream_configs[
1757+
0
1758+
].stream._stream_partition_generator._partition_factory._retriever.requester.use_cache
1759+
1760+
16551761
def _run_read(manifest: Mapping[str, Any], stream_name: str) -> List[AirbyteMessage]:
16561762
catalog = ConfiguredAirbyteCatalog(
16571763
streams=[

unit_tests/sources/declarative/test_concurrent_declarative_source.py

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3931,6 +3931,122 @@ def test_only_parent_streams_use_cache():
39313931
)
39323932

39333933

3934+
def test_parent_stream_respects_explicit_use_cache_false():
3935+
"""Test that explicit use_cache: false is respected for parent streams.
3936+
3937+
This is important for APIs that use scroll-based pagination (like Intercom's /companies/scroll
3938+
endpoint), where caching must be disabled because the same scroll_param is returned in
3939+
pagination responses, causing duplicate records and infinite pagination loops.
3940+
"""
3941+
# Parent stream with explicit use_cache: false
3942+
companies_stream = {
3943+
"type": "DeclarativeStream",
3944+
"$parameters": {
3945+
"name": "companies",
3946+
"primary_key": "id",
3947+
"url_base": "https://api.intercom.io/",
3948+
},
3949+
"schema_loader": {
3950+
"name": "{{ parameters.stream_name }}",
3951+
"file_path": "./source_intercom/schemas/{{ parameters.name }}.yaml",
3952+
},
3953+
"retriever": {
3954+
"paginator": {
3955+
"type": "DefaultPaginator",
3956+
"page_token_option": {"type": "RequestPath"},
3957+
"pagination_strategy": {
3958+
"type": "CursorPagination",
3959+
"cursor_value": "{{ response.get('scroll_param') }}",
3960+
"page_size": 100,
3961+
},
3962+
},
3963+
"requester": {
3964+
"path": "companies/scroll",
3965+
"use_cache": False, # Explicitly disabled for scroll-based pagination
3966+
"authenticator": {
3967+
"type": "BearerAuthenticator",
3968+
"api_token": "{{ config['api_key'] }}",
3969+
},
3970+
},
3971+
"record_selector": {"extractor": {"type": "DpathExtractor", "field_path": ["data"]}},
3972+
},
3973+
}
3974+
3975+
manifest = {
3976+
"version": "0.29.3",
3977+
"definitions": {},
3978+
"streams": [
3979+
deepcopy(companies_stream),
3980+
{
3981+
"type": "DeclarativeStream",
3982+
"$parameters": {
3983+
"name": "company_segments",
3984+
"primary_key": "id",
3985+
"url_base": "https://api.intercom.io/",
3986+
},
3987+
"schema_loader": {
3988+
"name": "{{ parameters.stream_name }}",
3989+
"file_path": "./source_intercom/schemas/{{ parameters.name }}.yaml",
3990+
},
3991+
"retriever": {
3992+
"paginator": {"type": "NoPagination"},
3993+
"requester": {
3994+
"path": "companies/{{ stream_partition.parent_id }}/segments",
3995+
"authenticator": {
3996+
"type": "BearerAuthenticator",
3997+
"api_token": "{{ config['api_key'] }}",
3998+
},
3999+
},
4000+
"record_selector": {
4001+
"extractor": {"type": "DpathExtractor", "field_path": ["data"]}
4002+
},
4003+
"partition_router": {
4004+
"parent_stream_configs": [
4005+
{
4006+
"parent_key": "id",
4007+
"partition_field": "parent_id",
4008+
"stream": deepcopy(companies_stream),
4009+
}
4010+
],
4011+
"type": "SubstreamPartitionRouter",
4012+
},
4013+
},
4014+
},
4015+
],
4016+
"check": {"type": "CheckStream", "stream_names": ["companies"]},
4017+
}
4018+
source = ConcurrentDeclarativeSource(
4019+
source_config=manifest, config={}, catalog=create_catalog("lists"), state=None
4020+
)
4021+
4022+
streams = source.streams({})
4023+
assert len(streams) == 2
4024+
4025+
# Main stream with explicit use_cache: false should remain false (parent for substream)
4026+
stream_0 = streams[0]
4027+
assert stream_0.name == "companies"
4028+
assert isinstance(stream_0, DefaultStream)
4029+
# use_cache should remain False because it was explicitly set to False
4030+
assert (
4031+
not stream_0._stream_partition_generator._partition_factory._retriever.requester.use_cache
4032+
)
4033+
4034+
# Substream
4035+
stream_1 = streams[1]
4036+
assert stream_1.name == "company_segments"
4037+
assert isinstance(stream_1, DefaultStream)
4038+
4039+
# Parent stream created for substream should also respect use_cache: false
4040+
assert (
4041+
stream_1._stream_partition_generator._stream_slicer.parent_stream_configs[0].stream.name
4042+
== "companies"
4043+
)
4044+
# The parent stream in the substream config should also have use_cache: false
4045+
assert not stream_1._stream_partition_generator._stream_slicer.parent_stream_configs[
4046+
0
4047+
].stream._stream_partition_generator._partition_factory._retriever.requester.use_cache
4048+
4049+
39344050
def _run_read(manifest: Mapping[str, Any], stream_name: str) -> List[AirbyteMessage]:
39354051
source = ConcurrentDeclarativeSource(
39364052
source_config=manifest, config={}, catalog=create_catalog("lists"), state=None

0 commit comments

Comments
 (0)