File tree Expand file tree Collapse file tree 1 file changed +6
-2
lines changed
airbyte_cdk/sources/declarative/partition_routers Expand file tree Collapse file tree 1 file changed +6
-2
lines changed Original file line number Diff line number Diff line change @@ -48,11 +48,15 @@ def stream_slices(self) -> Iterable[StreamSlice]:
4848 # Iterate over partitions lazily from the underlying router
4949 for partition in self .underlying_partition_router .stream_slices ():
5050 # Extract the partition key (assuming single key-value pair, e.g., {"board_ids": value})
51- if len (partition .partition .values ()) != 1 :
51+ partition_keys = list (partition .partition .keys ())
52+ # skip parent_slice as it is part of SubstreamPartitionRouter partition
53+ if "parent_slice" in partition_keys :
54+ partition_keys .remove ("parent_slice" )
55+ if len (partition_keys ) != 1 :
5256 raise ValueError (
5357 f"GroupingPartitionRouter expects a single partition key-value pair. Got { partition .partition } "
5458 )
55- key = next ( iter ( partition .partition . values ()), None )
59+ key = partition .partition [ partition_keys [ 0 ]]
5660
5761 # Skip duplicates if deduplication is enabled
5862 if self .deduplicate and key in seen_keys :
You can’t perform that action at this time.
0 commit comments