Skip to content

Commit 5f1d743

Browse files
fix: Add circular reference detection to schema_helpers._expand_refs()
Fixes a ValueError: Circular reference detected error that occurs when serpyco-rs tries to serialize connector specs with circular patterns. The _expand_refs() function now tracks visited references during recursive expansion to prevent infinite loops when schemas contain circular references. This resolves issue airbytehq/oncall#9688 for the Facebook Marketing connector and any other connectors with similar circular reference patterns in their specs. Changes: - Add visited_refs parameter to _expand_refs() to track reference URLs - Skip expanding already-visited references to prevent infinite recursion - Remove references from visited set after expansion to allow reuse in different branches - Skip 'definitions' key during iteration to avoid re-expanding resolved definitions - Add unit tests for circular reference handling and simple reference expansion Co-Authored-By: unknown <>
1 parent 035264c commit 5f1d743

File tree

6 files changed

+71
-11
lines changed

6 files changed

+71
-11
lines changed

airbyte_cdk/manifest_migrations/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ This directory contains the logic and registry for manifest migrations in the Ai
2121
3. **Register the Migration:**
2222
- Open `migrations/registry.yaml`.
2323
- Add an entry under the appropriate version, or create a new version section if needed.
24-
- Version can be: "*", "==6.48.3", "~=1.2", ">=1.0.0,<2.0.0", "6.48.3"
24+
- Version can be: "\*", "==6.48.3", "~=1.2", ">=1.0.0,<2.0.0", "6.48.3"
2525
- Each migration entry should include:
2626
- `name`: The filename (without `.py`)
2727
- `order`: The order in which this migration should be applied for the version

airbyte_cdk/manifest_server/README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,4 +175,3 @@ This requires the `ddtrace` dependency, which is included in the `manifest-serve
175175
# Run with Datadog tracing enabled
176176
DD_ENABLED=true manifest-server start
177177
```
178-

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3746,7 +3746,7 @@ definitions:
37463746
properties:
37473747
type:
37483748
type: string
3749-
enum: [ PaginationReset ]
3749+
enum: [PaginationReset]
37503750
action:
37513751
type: string
37523752
enum:
@@ -3763,7 +3763,7 @@ definitions:
37633763
properties:
37643764
type:
37653765
type: string
3766-
enum: [ PaginationResetLimits ]
3766+
enum: [PaginationResetLimits]
37673767
number_of_records:
37683768
type: integer
37693769
GzipDecoder:

airbyte_cdk/sources/utils/schema_helpers.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,25 +82,35 @@ def get_ref_resolver_registry(schema: dict[str, Any]) -> Registry:
8282
)
8383

8484

85-
def _expand_refs(schema: Any, ref_resolver: Resolver) -> None:
85+
def _expand_refs(schema: Any, ref_resolver: Resolver, visited_refs: set[str] | None = None) -> None:
8686
"""Internal function to iterate over schema and replace all occurrences of $ref with their definitions. Recursive.
8787
8888
:param schema: schema that will be patched
89+
:param ref_resolver: resolver to look up references
90+
:param visited_refs: set of already visited reference URLs to detect circular references
8991
"""
92+
if visited_refs is None:
93+
visited_refs = set()
94+
9095
if isinstance(schema, MutableMapping):
9196
if "$ref" in schema:
9297
ref_url = schema.pop("$ref")
98+
99+
if ref_url in visited_refs:
100+
return
101+
102+
visited_refs.add(ref_url)
93103
definition = ref_resolver.lookup(ref_url).contents
94-
_expand_refs(
95-
definition, ref_resolver=ref_resolver
96-
) # expand refs in definitions as well
104+
_expand_refs(definition, ref_resolver=ref_resolver, visited_refs=visited_refs)
97105
schema.update(definition)
106+
visited_refs.remove(ref_url)
98107
else:
99108
for key, value in schema.items():
100-
_expand_refs(value, ref_resolver=ref_resolver)
109+
if key != "definitions":
110+
_expand_refs(value, ref_resolver=ref_resolver, visited_refs=visited_refs)
101111
elif isinstance(schema, List):
102112
for value in schema:
103-
_expand_refs(value, ref_resolver=ref_resolver)
113+
_expand_refs(value, ref_resolver=ref_resolver, visited_refs=visited_refs)
104114

105115

106116
def expand_refs(schema: Any) -> None:

cdk-migrations.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ Rationale: Our current interface for CustomIncrementalSync was assuming that the
1414

1515
Migration steps: Ensures that you don't implement `Retriever.state` or relying on the field `SimpleRetriever.cursor`. For more information, see the point above.
1616

17-
Rationale: As mentioned above, the state has been moved outside the realm of the stream responsibilities. Therefore, it does not make sense for the retriever (which is a stream specific concept) to hold state information. This way, a connector developer wanting to implement a CustomRetriever will not have to bother about state management anymore.
17+
Rationale: As mentioned above, the state has been moved outside the realm of the stream responsibilities. Therefore, it does not make sense for the retriever (which is a stream specific concept) to hold state information. This way, a connector developer wanting to implement a CustomRetriever will not have to bother about state management anymore.
1818

1919
### Inheriting from Substream Partition Routing
2020

unit_tests/sources/utils/test_schema_helpers.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
InternalConfig,
2323
ResourceSchemaLoader,
2424
check_config_against_spec_or_exit,
25+
expand_refs,
2526
)
2627
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
2728

@@ -209,3 +210,53 @@ def test_shared_schemas_resolves_nested():
209210
def test_internal_config(limit, record_count, expected):
210211
config = InternalConfig(_limit=limit)
211212
assert config.is_limit_reached(record_count) == expected
213+
214+
215+
def test_expand_refs_handles_circular_references():
216+
schema = {
217+
"definitions": {
218+
"A": {
219+
"type": "object",
220+
"properties": {
221+
"b": {"$ref": "#/definitions/B"}
222+
}
223+
},
224+
"B": {
225+
"type": "object",
226+
"properties": {
227+
"a": {"$ref": "#/definitions/A"}
228+
}
229+
}
230+
},
231+
"type": "object",
232+
"properties": {
233+
"root": {"$ref": "#/definitions/A"}
234+
}
235+
}
236+
237+
expand_refs(schema)
238+
239+
assert "root" in schema["properties"]
240+
assert schema["properties"]["root"]["type"] == "object"
241+
assert "definitions" not in schema
242+
243+
244+
def test_expand_refs_expands_simple_refs():
245+
schema = {
246+
"definitions": {
247+
"StringType": {
248+
"type": "string",
249+
"minLength": 1
250+
}
251+
},
252+
"type": "object",
253+
"properties": {
254+
"name": {"$ref": "#/definitions/StringType"}
255+
}
256+
}
257+
258+
expand_refs(schema)
259+
260+
assert schema["properties"]["name"]["type"] == "string"
261+
assert schema["properties"]["name"]["minLength"] == 1
262+
assert "definitions" not in schema

0 commit comments

Comments
 (0)