Skip to content

Commit f396439

Browse files
authored
fix(low-code): Add parent state migration from global state (#322)
1 parent e38f914 commit f396439

File tree

3 files changed

+37
-12
lines changed

3 files changed

+37
-12
lines changed

airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -299,23 +299,33 @@ def set_initial_state(self, stream_state: StreamState) -> None:
299299

300300
def _migrate_child_state_to_parent_state(self, stream_state: StreamState) -> StreamState:
301301
"""
302-
Migrate the child stream state to the parent stream's state format.
302+
Migrate the child or global stream state into the parent stream's state format.
303303
304-
This method converts the global or child state into a format compatible with parent
305-
streams. The migration occurs only for parent streams with incremental dependencies.
306-
The method filters out per-partition states and retains only the global state in the
307-
format `{cursor_field: cursor_value}`.
304+
This method converts the child stream state—or, if present, the global state—into a format that is
305+
compatible with parent streams that use incremental synchronization. The migration occurs only for
306+
parent streams with incremental dependencies. It filters out per-partition states and retains only the
307+
global state in the form {cursor_field: cursor_value}.
308+
309+
The method supports multiple input formats:
310+
- A simple global state, e.g.:
311+
{"updated_at": "2023-05-27T00:00:00Z"}
312+
- A state object that contains a "state" key (which is assumed to hold the global state), e.g.:
313+
{"state": {"updated_at": "2023-05-27T00:00:00Z"}, ...}
314+
In this case, the migration uses the first value from the "state" dictionary.
315+
- Any per-partition state formats or other non-simple structures are ignored during migration.
308316
309317
Args:
310318
stream_state (StreamState): The state to migrate. Expected formats include:
311319
- {"updated_at": "2023-05-27T00:00:00Z"}
312-
- {"states": [...] } (ignored during migration)
320+
- {"state": {"updated_at": "2023-05-27T00:00:00Z"}, ...}
321+
(In this format, only the first global state value is used, and per-partition states are ignored.)
313322
314323
Returns:
315324
StreamState: A migrated state for parent streams in the format:
316325
{
317326
"parent_stream_name": {"parent_stream_cursor": "2023-05-27T00:00:00Z"}
318327
}
328+
where each parent stream with an incremental dependency is assigned its corresponding cursor value.
319329
320330
Example:
321331
Input: {"updated_at": "2023-05-27T00:00:00Z"}
@@ -326,11 +336,15 @@ def _migrate_child_state_to_parent_state(self, stream_state: StreamState) -> Str
326336
substream_state_values = list(stream_state.values())
327337
substream_state = substream_state_values[0] if substream_state_values else {}
328338

329-
# Ignore per-partition states or invalid formats
339+
# Ignore per-partition states or invalid formats.
330340
if isinstance(substream_state, (list, dict)) or len(substream_state_values) != 1:
331-
return {}
341+
# If a global state is present under the key "state", use its first value.
342+
if "state" in stream_state and isinstance(stream_state["state"], dict):
343+
substream_state = list(stream_state["state"].values())[0]
344+
else:
345+
return {}
332346

333-
# Copy child state to parent streams with incremental dependencies
347+
# Build the parent state for all parent streams with incremental dependencies.
334348
parent_state = {}
335349
if substream_state:
336350
for parent_config in self.parent_stream_configs:

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1450,8 +1450,19 @@ def test_incremental_parent_state(
14501450
},
14511451
STATE_MIGRATION_GLOBAL_EXPECTED_STATE,
14521452
),
1453+
(
1454+
{
1455+
"state": {"created_at": PARTITION_SYNC_START_TIME},
1456+
},
1457+
STATE_MIGRATION_EXPECTED_STATE,
1458+
),
1459+
],
1460+
ids=[
1461+
"legacy_python_format",
1462+
"low_code_per_partition_state",
1463+
"low_code_global_format",
1464+
"global_state_no_parent",
14531465
],
1454-
ids=["legacy_python_format", "low_code_per_partition_state", "low_code_global_format"],
14551466
)
14561467
def test_incremental_parent_state_migration(
14571468
test_name, manifest, mock_requests, expected_records, initial_state, expected_state

unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ def test_substream_partition_router_invalid_parent_record_type():
415415
# Case 3: Initial state with global `state`, no migration expected
416416
(
417417
{"state": {"updated": "2023-05-27T00:00:00Z"}},
418-
{},
418+
{"parent_stream_cursor": "2023-05-27T00:00:00Z"},
419419
),
420420
# Case 4: Initial state with per-partition `states`, no migration expected
421421
(
@@ -471,7 +471,7 @@ def test_substream_partition_router_invalid_parent_record_type():
471471
"use_global_cursor": True,
472472
"state": {"updated": "2023-05-27T00:00:00Z"},
473473
},
474-
{},
474+
{"parent_stream_cursor": "2023-05-27T00:00:00Z"},
475475
),
476476
],
477477
ids=[

0 commit comments

Comments
 (0)