Skip to content

Commit 0c784b3

Browse files
committed
check if page http request matches stream name
1 parent c92b9f8 commit 0c784b3

File tree

5 files changed

+28
-2
lines changed

5 files changed

+28
-2
lines changed

airbyte_cdk/connector_builder/test_reader/helpers.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,25 @@ def should_close_page_for_slice(at_least_one_page_in_group: bool, message: Airby
268268
"""
269269
return at_least_one_page_in_group and should_process_slice_descriptor(message)
270270

271+
def is_page_http_request_for_different_stream(json_message: Optional[Dict[str, Any]], stream_name: str) -> bool:
272+
"""
273+
Determines whether a given JSON message represents a page HTTP request for a different stream.
274+
275+
This function checks if the provided JSON message is a page HTTP request, and if the stream name in the log is
276+
different from the provided stream name.
277+
278+
This is needed because dynamic streams result in extra page HTTP requests for the dynamic streams that we want to ignore
279+
when they do not match the stream that is being read.
280+
281+
Args:
282+
json_message (Optional[Dict[str, Any]]): The JSON message to evaluate.
283+
stream_name (str): The name of the stream to compare against.
284+
285+
Returns:
286+
bool: True if the JSON message is a page HTTP request for a different stream, False otherwise.
287+
"""
288+
return is_page_http_request(json_message) and json_message.get("airbyte_cdk", {}).get("stream", {}).get("name", {}) != stream_name
289+
271290

272291
def is_page_http_request(json_message: Optional[Dict[str, Any]]) -> bool:
273292
"""

airbyte_cdk/connector_builder/test_reader/message_grouper.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
handle_record_message,
2828
is_async_auxiliary_request,
2929
is_config_update_message,
30+
is_page_http_request_for_different_stream,
3031
is_log_message,
3132
is_record_message,
3233
is_state_message,
@@ -44,6 +45,7 @@ def get_message_groups(
4445
schema_inferrer: SchemaInferrer,
4546
datetime_format_inferrer: DatetimeFormatInferrer,
4647
limit: int,
48+
stream_name: str,
4749
) -> MESSAGE_GROUPS:
4850
"""
4951
Processes an iterator of AirbyteMessage objects to group and yield messages based on their type and sequence.
@@ -96,6 +98,9 @@ def get_message_groups(
9698
while records_count < limit and (message := next(messages, None)):
9799
json_message = airbyte_message_to_json(message)
98100

101+
if is_page_http_request_for_different_stream(json_message, stream_name):
102+
continue
103+
99104
if should_close_page(at_least_one_page_in_group, message, json_message):
100105
current_page_request, current_page_response = handle_current_page(
101106
current_page_request,

airbyte_cdk/connector_builder/test_reader/reader.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ def run_test_read(
130130
schema_inferrer,
131131
datetime_format_inferrer,
132132
record_limit,
133+
stream_name,
133134
)
134135

135136
slices, log_messages, auxiliary_requests, latest_config_update = self._categorise_groups(

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -546,6 +546,7 @@ def _dynamic_stream_configs(
546546
COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
547547
components_resolver_config,
548548
config,
549+
stream_name=dynamic_definition.get("name"),
549550
)
550551

551552
stream_template_config = dynamic_definition["stream_template"]

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3805,15 +3805,15 @@ def create_components_mapping_definition(
38053805
)
38063806

38073807
def create_http_components_resolver(
3808-
self, model: HttpComponentsResolverModel, config: Config
3808+
self, model: HttpComponentsResolverModel, config: Config, stream_name: Optional[str] = None
38093809
) -> Any:
38103810
stream_slicer = self._build_stream_slicer_from_partition_router(model.retriever, config)
38113811
combined_slicers = self._build_resumable_cursor(model.retriever, stream_slicer)
38123812

38133813
retriever = self._create_component_from_model(
38143814
model=model.retriever,
38153815
config=config,
3816-
name="",
3816+
name=f"{stream_name if stream_name else '__http_components_resolver'}",
38173817
primary_key=None,
38183818
stream_slicer=stream_slicer if stream_slicer else combined_slicers,
38193819
transformations=[],

0 commit comments

Comments
 (0)