Skip to content

Commit 78c092b

Browse files
committed
add refs for parent streams
1 parent 970eb3f commit 78c092b

File tree

2 files changed

+197
-1
lines changed

2 files changed

+197
-1
lines changed

airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,11 +140,47 @@ def _deduplicate_manifest(self) -> None:
140140
self._prepare_definitions()
141141
# replace duplicates with references, if any
142142
self._handle_duplicates(self._collect_duplicates())
143+
# replace parent streams with $refs
144+
# self._replace_parent_streams_with_refs()
143145
# clean dangling fields after resolving $refs
144146
self._clean_dangling_fields()
145147
except Exception as e:
146148
raise ManifestNormalizationException(str(e))
147-
149+
150+
def _replace_parent_streams_with_refs(self) -> None:
151+
"""
152+
For each stream in the manifest, if it has a retriever.partition_router with parent_stream_configs,
153+
replace any 'stream' fields in those configs that are dicts and deeply equal to another stream object
154+
with a $ref to the correct stream index.
155+
"""
156+
import copy
157+
158+
streams = self._normalized_manifest.get(STREAMS_TAG, [])
159+
# Use deep copy for comparison to avoid mutation issues
160+
stream_copies = [copy.deepcopy(s) for s in streams]
161+
162+
for idx, stream in enumerate(streams):
163+
retriever = stream.get("retriever")
164+
if not retriever:
165+
continue
166+
partition_router = retriever.get("partition_router")
167+
routers = partition_router if isinstance(partition_router, list) else [partition_router] if partition_router else []
168+
for router in routers:
169+
if not isinstance(router, dict):
170+
continue
171+
if router.get("type") != "SubstreamPartitionRouter":
172+
continue
173+
parent_stream_configs = router.get("parent_stream_configs", [])
174+
for parent_config in parent_stream_configs:
175+
if not isinstance(parent_config, dict):
176+
continue
177+
stream_ref = parent_config.get("stream")
178+
# Only replace if it's a dict and matches any stream in the manifest
179+
for other_idx, other_stream in enumerate(stream_copies):
180+
if stream_ref is not None and isinstance(stream_ref, dict) and stream_ref == other_stream:
181+
parent_config["stream"] = {"$ref": f"#/streams/{other_idx}"}
182+
break
183+
148184
def _clean_dangling_fields(self) -> None:
149185
"""
150186
Clean the manifest by removing unused definitions and schemas.

unit_tests/sources/declarative/parsers/test_manifest_normalizer.py

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,3 +202,163 @@ def test_clean_dangling_fields_removes_empty_sections() -> None:
202202
normalizer = ManifestNormalizer(manifest, schema)
203203
normalizer._clean_dangling_fields()
204204
assert normalizer._normalized_manifest == expected
205+
206+
207+
def test_replace_parent_streams_with_refs_replaces_with_ref():
208+
"""
209+
If a parent_stream_config's stream field matches another stream object, it should be replaced with a $ref to the correct index.
210+
"""
211+
stream_a = {"name": "A", "type": "DeclarativeStream", "retriever": {}}
212+
stream_b = {"name": "B", "type": "DeclarativeStream", "retriever": {}}
213+
manifest = {
214+
"streams": [
215+
stream_a,
216+
stream_b,
217+
{
218+
"name": "C",
219+
"type": "DeclarativeStream",
220+
"retriever": {
221+
"partition_router": {
222+
"type": "SubstreamPartitionRouter",
223+
"parent_stream_configs": [
224+
{"stream": stream_a.copy(), "type": "ParentStreamConfig", "parent_key": "id", "partition_field": "aid"},
225+
{"stream": stream_b.copy(), "type": "ParentStreamConfig", "parent_key": "id", "partition_field": "bid"},
226+
]
227+
}
228+
}
229+
},
230+
]
231+
}
232+
expected = {
233+
"streams": [
234+
stream_a,
235+
stream_b,
236+
{
237+
"name": "C",
238+
"type": "DeclarativeStream",
239+
"retriever": {
240+
"partition_router": {
241+
"type": "SubstreamPartitionRouter",
242+
"parent_stream_configs": [
243+
{"stream": {"$ref": "#/streams/0"}, "type": "ParentStreamConfig", "parent_key": "id", "partition_field": "aid"},
244+
{"stream": {"$ref": "#/streams/1"}, "type": "ParentStreamConfig", "parent_key": "id", "partition_field": "bid"},
245+
]
246+
}
247+
}
248+
},
249+
]
250+
}
251+
schema = _get_declarative_component_schema()
252+
normalizer = ManifestNormalizer(manifest, schema)
253+
normalizer._replace_parent_streams_with_refs()
254+
assert normalizer._normalized_manifest == expected
255+
256+
257+
def test_replace_parent_streams_with_refs_no_match():
258+
"""
259+
If a parent_stream_config's stream field does not match any stream object, it should not be replaced.
260+
"""
261+
stream_a = {"name": "A", "type": "DeclarativeStream", "retriever": {}}
262+
unrelated_stream = {"name": "X", "type": "DeclarativeStream", "retriever": {"foo": 1}}
263+
manifest = {
264+
"streams": [
265+
stream_a,
266+
{
267+
"name": "C",
268+
"type": "DeclarativeStream",
269+
"retriever": {
270+
"partition_router": {
271+
"type": "SubstreamPartitionRouter",
272+
"parent_stream_configs": [
273+
{"stream": unrelated_stream.copy(), "type": "ParentStreamConfig", "parent_key": "id", "partition_field": "aid"},
274+
]
275+
}
276+
}
277+
},
278+
]
279+
}
280+
expected = {
281+
"streams": [
282+
stream_a,
283+
{
284+
"name": "C",
285+
"type": "DeclarativeStream",
286+
"retriever": {
287+
"partition_router": {
288+
"type": "SubstreamPartitionRouter",
289+
"parent_stream_configs": [
290+
{"stream": unrelated_stream.copy(), "type": "ParentStreamConfig", "parent_key": "id", "partition_field": "aid"},
291+
]
292+
}
293+
}
294+
},
295+
]
296+
}
297+
schema = _get_declarative_component_schema()
298+
normalizer = ManifestNormalizer(manifest, schema)
299+
normalizer._replace_parent_streams_with_refs()
300+
assert normalizer._normalized_manifest == expected
301+
302+
303+
def test_replace_parent_streams_with_refs_handles_multiple_partition_routers():
304+
"""
305+
If there are multiple partition_routers (as a list), all should be checked.
306+
"""
307+
stream_a = {"name": "A", "type": "DeclarativeStream", "retriever": {}}
308+
stream_b = {"name": "B", "type": "DeclarativeStream", "retriever": {}}
309+
manifest = {
310+
"streams": [
311+
stream_a,
312+
stream_b,
313+
{
314+
"name": "C",
315+
"type": "DeclarativeStream",
316+
"retriever": {
317+
"partition_router": [
318+
{
319+
"type": "SubstreamPartitionRouter",
320+
"parent_stream_configs": [
321+
{"stream": stream_a.copy(), "type": "ParentStreamConfig", "parent_key": "id", "partition_field": "aid"},
322+
]
323+
},
324+
{
325+
"type": "SubstreamPartitionRouter",
326+
"parent_stream_configs": [
327+
{"stream": stream_b.copy(), "type": "ParentStreamConfig", "parent_key": "id", "partition_field": "bid"},
328+
]
329+
}
330+
]
331+
}
332+
},
333+
]
334+
}
335+
expected = {
336+
"streams": [
337+
stream_a,
338+
stream_b,
339+
{
340+
"name": "C",
341+
"type": "DeclarativeStream",
342+
"retriever": {
343+
"partition_router": [
344+
{
345+
"type": "SubstreamPartitionRouter",
346+
"parent_stream_configs": [
347+
{"stream": {"$ref": "#/streams/0"}, "type": "ParentStreamConfig", "parent_key": "id", "partition_field": "aid"},
348+
]
349+
},
350+
{
351+
"type": "SubstreamPartitionRouter",
352+
"parent_stream_configs": [
353+
{"stream": {"$ref": "#/streams/1"}, "type": "ParentStreamConfig", "parent_key": "id", "partition_field": "bid"},
354+
]
355+
}
356+
]
357+
}
358+
},
359+
]
360+
}
361+
schema = _get_declarative_component_schema()
362+
normalizer = ManifestNormalizer(manifest, schema)
363+
normalizer._replace_parent_streams_with_refs()
364+
assert normalizer._normalized_manifest == expected

0 commit comments

Comments
 (0)