Skip to content

Commit cf8b084

Browse files
author
maxime.c
committed
coderabbitai code review
1 parent f9eb050 commit cf8b084

File tree

1 file changed

+13
-10
lines changed

1 file changed

+13
-10
lines changed

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -557,16 +557,19 @@ def limit_reached(self) -> bool:
557557
def get_parent_state(
558558
stream_state: Optional[StreamState], parent_stream_name: str
559559
) -> Optional[AirbyteStateMessage]:
560-
return (
561-
AirbyteStateMessage(
562-
type=AirbyteStateType.STREAM,
563-
stream=AirbyteStreamState(
564-
stream_descriptor=StreamDescriptor(parent_stream_name, None),
565-
stream_state=AirbyteStateBlob(stream_state["parent_state"][parent_stream_name]),
566-
),
567-
)
568-
if stream_state and "parent_state" in stream_state
569-
else None
560+
if "parent_state" not in stream_state:
561+
logger.warning(f"Trying to get_parent_state for stream `{parent_stream_name}` when there are not parent state in the state")
562+
return None
563+
elif parent_stream_name not in stream_state["parent_state"]:
564+
logger.info(f"Could not find parent state for stream `{parent_stream_name}`. On parents available are {list(stream_state['parent_state'].keys())}")
565+
return None
566+
567+
return AirbyteStateMessage(
568+
type=AirbyteStateType.STREAM,
569+
stream=AirbyteStreamState(
570+
stream_descriptor=StreamDescriptor(parent_stream_name, None),
571+
stream_state=AirbyteStateBlob(stream_state["parent_state"][parent_stream_name]),
572+
),
570573
)
571574

572575
@staticmethod

0 commit comments

Comments
 (0)