Skip to content

Commit 8ba143f

Browse files
fix: Revert "feat(taps): Queue parent contexts and sync child streams only when the queue is full (#3058)" (#3378)
This reverts commit 16ea7ad. ## Summary by Sourcery Revert the context queueing mechanism introduced earlier for parent-child streams; child streams now sync immediately per context, and related queue configuration and flush logic have been removed along with the associated test case. Bug Fixes: - Restore immediate syncing of child streams per context by removing batch queue processing Enhancements: - Simplify child sync logic by removing QUEUE_MAX_SIZE, context queue, and flush methods Tests: - Remove test for handling exceptions in child stream sync and its snapshots
1 parent 1adfba7 commit 8ba143f

File tree

13 files changed

+18
-141
lines changed

13 files changed

+18
-141
lines changed

singer_sdk/streams/core.py

Lines changed: 3 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,6 @@ class Stream(metaclass=abc.ABCMeta): # noqa: PLR0904
116116
selected_by_default: bool = True
117117
"""Whether this stream is selected by default in the catalog."""
118118

119-
QUEUE_MAX_SIZE: int = 1000
120-
"""Maximum number of contexts to queue before syncing child streams."""
121-
122119
def __init__(
123120
self,
124121
tap: Tap,
@@ -170,13 +167,6 @@ def __init__(
170167
self._is_state_flushed: bool = True
171168
self._sync_costs: dict[str, int] = {}
172169
self.child_streams: list[Stream] = []
173-
174-
# Single queue for all child contexts
175-
# Use a simple list for the context queue. Thread safety is not needed because
176-
# the queue is only accessed from a single thread during sync. Using a list is
177-
# more efficient and simpler than queue.Queue for this use case.
178-
self._child_context_queue: list[types.Context] = []
179-
180170
if schema:
181171
if isinstance(schema, (PathLike, str)):
182172
if not Path(schema).is_file():
@@ -1348,8 +1338,6 @@ def _sync_records( # noqa: C901
13481338
# Finalize per-partition state only if 1:1 with context
13491339
state = self.get_context_state(current_context)
13501340
self._finalize_state(state)
1351-
# FLUSH any remaining child contexts at the end of parent sync
1352-
self._flush_child_context_queue()
13531341

13541342
if not context:
13551343
# Finalize total stream only if we have the full context.
@@ -1444,35 +1432,9 @@ def _sync_children(self, child_context: types.Context | None) -> None:
14441432
)
14451433
return
14461434

1447-
self._child_context_queue.append(child_context)
1448-
if len(self._child_context_queue) >= self.QUEUE_MAX_SIZE:
1449-
self._flush_child_context_queue()
1450-
1451-
def _flush_child_context_queue(self) -> None:
1452-
"""Sync all child streams for each context, then clear the queue."""
1453-
if not self._child_context_queue or not self.child_streams:
1454-
return
1455-
1456-
self.logger.info(
1457-
"Flushing %d child contexts for stream '%s'",
1458-
len(self._child_context_queue),
1459-
self.name,
1460-
)
1461-
1462-
for context in self._child_context_queue:
1463-
for child_stream in self.child_streams:
1464-
if child_stream.selected or child_stream.has_selected_descendents:
1465-
try:
1466-
child_stream.sync(context=context)
1467-
except Exception: # noqa: BLE001
1468-
self.log(
1469-
"Error syncing child stream '%s'",
1470-
child_stream.name,
1471-
level=logging.ERROR,
1472-
extra={"context": context},
1473-
)
1474-
1475-
self._child_context_queue = []
1435+
for child_stream in self.child_streams:
1436+
if child_stream.selected or child_stream.has_selected_descendents:
1437+
child_stream.sync(context=child_context)
14761438

14771439
# Overridable Methods
14781440

tests/core/snapshots/test_parent_child/test_child_deselected_parent/singer.jsonl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,5 @@
1212
{"type":"RECORD","stream":"child","record":{"id":1,"pid":3},"time_extracted":"2022-01-01T00:00:00+00:00"}
1313
{"type":"RECORD","stream":"child","record":{"id":2,"pid":3},"time_extracted":"2022-01-01T00:00:00+00:00"}
1414
{"type":"RECORD","stream":"child","record":{"id":3,"pid":3},"time_extracted":"2022-01-01T00:00:00+00:00"}
15+
{"type":"STATE","value":{"bookmarks":{"child":{"partitions":[{"context":{"pid":1}},{"context":{"pid":2}},{"context":{"pid":3}}]}}}}
1516
{"type":"STATE","value":{"bookmarks":{"child":{"partitions":[{"context":{"pid":1}},{"context":{"pid":2}},{"context":{"pid":3}}]},"parent":{}}}}
Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
11
INFO my-tap.parent Beginning sync of 'parent' in full_table mode
2-
INFO my-tap.parent Flushing 2 child contexts for stream 'parent'
32
INFO my-tap.child Beginning sync of 'child' in full_table mode with context: {'pid': 1}
43
INFO my-tap.child Beginning sync of 'child' in full_table mode with context: {'pid': 2}
5-
INFO my-tap.parent Flushing 1 child contexts for stream 'parent'
64
INFO my-tap.child Beginning sync of 'child' in full_table mode with context: {'pid': 3}

tests/core/snapshots/test_parent_child/test_child_sync_exception/singer.jsonl

Lines changed: 0 additions & 15 deletions
This file was deleted.

tests/core/snapshots/test_parent_child/test_child_sync_exception/stderr.log

Lines changed: 0 additions & 4 deletions
This file was deleted.
Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,2 @@
11
INFO my-tap Skipping deselected stream 'child'.
22
INFO my-tap.parent Beginning sync of 'parent' in full_table mode
3-
INFO my-tap.parent Flushing 2 child contexts for stream 'parent'
4-
INFO my-tap.parent Flushing 1 child contexts for stream 'parent'
Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
{"type":"SCHEMA","stream":"parent_many","schema":{"properties":{"id":{"type":"integer"},"children":{"items":{"type":"integer"},"type":"array"}},"type":"object"},"key_properties":[]}
2-
{"type":"RECORD","stream":"parent_many","record":{"id":"1","children":[1,2,3]},"time_extracted":"2022-01-01T00:00:00+00:00"}
32
{"type":"SCHEMA","stream":"child_many","schema":{"properties":{"id":{"type":"integer"},"pid":{"type":"integer"}},"type":"object"},"key_properties":[]}
43
{"type":"RECORD","stream":"child_many","record":{"id":1,"pid":"1"},"time_extracted":"2022-01-01T00:00:00+00:00"}
5-
{"type":"STATE","value":{"bookmarks":{"parent_many":{},"child_many":{"partitions":[{"context":{"child_id":1,"pid":"1"}}]}}}}
4+
{"type":"STATE","value":{"bookmarks":{"child_many":{"partitions":[{"context":{"child_id":1,"pid":"1"}}]}}}}
65
{"type":"SCHEMA","stream":"child_many","schema":{"properties":{"id":{"type":"integer"},"pid":{"type":"integer"}},"type":"object"},"key_properties":[]}
76
{"type":"RECORD","stream":"child_many","record":{"id":2,"pid":"1"},"time_extracted":"2022-01-01T00:00:00+00:00"}
8-
{"type":"STATE","value":{"bookmarks":{"parent_many":{},"child_many":{"partitions":[{"context":{"child_id":1,"pid":"1"}},{"context":{"child_id":2,"pid":"1"}}]}}}}
7+
{"type":"STATE","value":{"bookmarks":{"child_many":{"partitions":[{"context":{"child_id":1,"pid":"1"}},{"context":{"child_id":2,"pid":"1"}}]}}}}
98
{"type":"SCHEMA","stream":"child_many","schema":{"properties":{"id":{"type":"integer"},"pid":{"type":"integer"}},"type":"object"},"key_properties":[]}
109
{"type":"RECORD","stream":"child_many","record":{"id":3,"pid":"1"},"time_extracted":"2022-01-01T00:00:00+00:00"}
11-
{"type":"STATE","value":{"bookmarks":{"parent_many":{},"child_many":{"partitions":[{"context":{"child_id":1,"pid":"1"}},{"context":{"child_id":2,"pid":"1"}},{"context":{"child_id":3,"pid":"1"}}]}}}}
10+
{"type":"STATE","value":{"bookmarks":{"child_many":{"partitions":[{"context":{"child_id":1,"pid":"1"}},{"context":{"child_id":2,"pid":"1"}},{"context":{"child_id":3,"pid":"1"}}]}}}}
11+
{"type":"RECORD","stream":"parent_many","record":{"id":"1","children":[1,2,3]},"time_extracted":"2022-01-01T00:00:00+00:00"}
12+
{"type":"STATE","value":{"bookmarks":{"child_many":{"partitions":[{"context":{"child_id":1,"pid":"1"}},{"context":{"child_id":2,"pid":"1"}},{"context":{"child_id":3,"pid":"1"}}]},"parent_many":{}}}}

tests/core/snapshots/test_parent_child/test_one_parent_many_children/stderr.log

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
INFO my-tap-many Skipping parse of env var settings...
22
INFO my-tap-many Added 'child_many' as child stream to 'parent_many'
33
INFO my-tap-many.parent_many Beginning sync of 'parent_many' in full_table mode
4-
INFO my-tap-many.parent_many Flushing 3 child contexts for stream 'parent_many'
54
INFO my-tap-many.child_many Beginning sync of 'child_many' in full_table mode with context: {'child_id': 1, 'pid': '1'}
65
WARNING my-tap-many.child_many Properties ('composite_id', 'child_id') were present in the 'child_many' stream but not found in catalog schema. Ignoring.
76
INFO my-tap-many.child_many Beginning sync of 'child_many' in full_table mode with context: {'child_id': 2, 'pid': '1'}
Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
11
{"type":"SCHEMA","stream":"parent","schema":{"properties":{"id":{"type":"integer"}},"type":"object"},"key_properties":[]}
2-
{"type":"RECORD","stream":"parent","record":{"id":1},"time_extracted":"2022-01-01T00:00:00+00:00"}
32
{"type":"SCHEMA","stream":"child","schema":{"properties":{"id":{"type":"integer"},"pid":{"type":"integer"}},"type":"object"},"key_properties":[]}
43
{"type":"RECORD","stream":"child","record":{"id":1,"pid":1},"time_extracted":"2022-01-01T00:00:00+00:00"}
54
{"type":"RECORD","stream":"child","record":{"id":2,"pid":1},"time_extracted":"2022-01-01T00:00:00+00:00"}
65
{"type":"RECORD","stream":"child","record":{"id":3,"pid":1},"time_extracted":"2022-01-01T00:00:00+00:00"}
7-
{"type":"STATE","value":{"bookmarks":{"parent":{},"child":{"partitions":[{"context":{"pid":1}}]}}}}
6+
{"type":"STATE","value":{"bookmarks":{"child":{"partitions":[{"context":{"pid":1}}]}}}}
7+
{"type":"RECORD","stream":"parent","record":{"id":1},"time_extracted":"2022-01-01T00:00:00+00:00"}
88
{"type":"SCHEMA","stream":"child","schema":{"properties":{"id":{"type":"integer"},"pid":{"type":"integer"}},"type":"object"},"key_properties":[]}
99
{"type":"RECORD","stream":"child","record":{"id":1,"pid":2},"time_extracted":"2022-01-01T00:00:00+00:00"}
1010
{"type":"RECORD","stream":"child","record":{"id":2,"pid":2},"time_extracted":"2022-01-01T00:00:00+00:00"}
1111
{"type":"RECORD","stream":"child","record":{"id":3,"pid":2},"time_extracted":"2022-01-01T00:00:00+00:00"}
12-
{"type":"STATE","value":{"bookmarks":{"parent":{},"child":{"partitions":[{"context":{"pid":1}},{"context":{"pid":2}}]}}}}
12+
{"type":"STATE","value":{"bookmarks":{"child":{"partitions":[{"context":{"pid":1}},{"context":{"pid":2}}]},"parent":{}}}}
1313
{"type":"RECORD","stream":"parent","record":{"id":2},"time_extracted":"2022-01-01T00:00:00+00:00"}
14-
{"type":"RECORD","stream":"parent","record":{"id":3},"time_extracted":"2022-01-01T00:00:00+00:00"}
1514
{"type":"SCHEMA","stream":"child","schema":{"properties":{"id":{"type":"integer"},"pid":{"type":"integer"}},"type":"object"},"key_properties":[]}
1615
{"type":"RECORD","stream":"child","record":{"id":1,"pid":3},"time_extracted":"2022-01-01T00:00:00+00:00"}
1716
{"type":"RECORD","stream":"child","record":{"id":2,"pid":3},"time_extracted":"2022-01-01T00:00:00+00:00"}
1817
{"type":"RECORD","stream":"child","record":{"id":3,"pid":3},"time_extracted":"2022-01-01T00:00:00+00:00"}
19-
{"type":"STATE","value":{"bookmarks":{"parent":{},"child":{"partitions":[{"context":{"pid":1}},{"context":{"pid":2}},{"context":{"pid":3}}]}}}}
18+
{"type":"STATE","value":{"bookmarks":{"child":{"partitions":[{"context":{"pid":1}},{"context":{"pid":2}},{"context":{"pid":3}}]},"parent":{}}}}
19+
{"type":"RECORD","stream":"parent","record":{"id":3},"time_extracted":"2022-01-01T00:00:00+00:00"}
Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
11
INFO my-tap.parent Beginning sync of 'parent' in full_table mode
2-
INFO my-tap.parent Flushing 2 child contexts for stream 'parent'
32
INFO my-tap.child Beginning sync of 'child' in full_table mode with context: {'pid': 1}
43
INFO my-tap.child Beginning sync of 'child' in full_table mode with context: {'pid': 2}
5-
INFO my-tap.parent Flushing 1 child contexts for stream 'parent'
64
INFO my-tap.child Beginning sync of 'child' in full_table mode with context: {'pid': 3}

0 commit comments

Comments
 (0)