Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion airbyte_cdk/connector_builder/connector_builder_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,12 @@ def read_stream(
stream_name = configured_catalog.streams[0].stream.name

stream_read = test_read_handler.run_test_read(
source, config, configured_catalog, state, limits.max_records
source,
config,
configured_catalog,
stream_name,
state,
limits.max_records,
)

return AirbyteMessage(
Expand Down
26 changes: 26 additions & 0 deletions airbyte_cdk/connector_builder/test_reader/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,32 @@ def should_close_page_for_slice(at_least_one_page_in_group: bool, message: Airby
return at_least_one_page_in_group and should_process_slice_descriptor(message)


def is_page_http_request_for_different_stream(
json_message: Optional[Dict[str, Any]], stream_name: str
) -> bool:
"""
Determines whether a given JSON message represents a page HTTP request for a different stream.

This function checks if the provided JSON message is a page HTTP request, and if the stream name in the log is
different from the provided stream name.

This is needed because dynamic streams result in extra page HTTP requests for the dynamic streams that we want to ignore
when they do not match the stream that is being read.

Args:
json_message (Optional[Dict[str, Any]]): The JSON message to evaluate.
stream_name (str): The name of the stream to compare against.

Returns:
bool: True if the JSON message is a page HTTP request for a different stream, False otherwise.
"""
return (
json_message and
is_page_http_request(json_message)
and json_message.get("airbyte_cdk", {}).get("stream", {}).get("name", "") != stream_name
)


def is_page_http_request(json_message: Optional[Dict[str, Any]]) -> bool:
"""
Determines whether a given JSON message represents a page HTTP request.
Expand Down
5 changes: 5 additions & 0 deletions airbyte_cdk/connector_builder/test_reader/message_grouper.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
handle_record_message,
is_async_auxiliary_request,
is_config_update_message,
is_page_http_request_for_different_stream,
is_log_message,
is_record_message,
is_state_message,
Expand All @@ -44,6 +45,7 @@ def get_message_groups(
schema_inferrer: SchemaInferrer,
datetime_format_inferrer: DatetimeFormatInferrer,
limit: int,
stream_name: str,
) -> MESSAGE_GROUPS:
"""
Processes an iterator of AirbyteMessage objects to group and yield messages based on their type and sequence.
Expand Down Expand Up @@ -96,6 +98,9 @@ def get_message_groups(
while records_count < limit and (message := next(messages, None)):
json_message = airbyte_message_to_json(message)

if is_page_http_request_for_different_stream(json_message, stream_name):
continue

if should_close_page(at_least_one_page_in_group, message, json_message):
current_page_request, current_page_response = handle_current_page(
current_page_request,
Expand Down
11 changes: 8 additions & 3 deletions airbyte_cdk/connector_builder/test_reader/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def run_test_read(
source: DeclarativeSource,
config: Mapping[str, Any],
configured_catalog: ConfiguredAirbyteCatalog,
stream_name: str,
state: List[AirbyteStateMessage],
record_limit: Optional[int] = None,
) -> StreamRead:
Expand All @@ -112,14 +113,17 @@ def run_test_read(

record_limit = self._check_record_limit(record_limit)
# The connector builder currently only supports reading from a single stream at a time
stream = source.streams(config)[0]
streams = source.streams(config)
stream = next((stream for stream in streams if stream.name == stream_name), None)

# get any deprecation warnings during the component creation
deprecation_warnings: List[LogMessage] = source.deprecation_warnings()

schema_inferrer = SchemaInferrer(
self._pk_to_nested_and_composite_field(stream.primary_key),
self._cursor_field_to_nested_and_composite_field(stream.cursor_field),
self._pk_to_nested_and_composite_field(stream.primary_key) if stream else None,
self._cursor_field_to_nested_and_composite_field(stream.cursor_field)
if stream
else None,
)
datetime_format_inferrer = DatetimeFormatInferrer()

Expand All @@ -128,6 +132,7 @@ def run_test_read(
schema_inferrer,
datetime_format_inferrer,
record_limit,
stream_name,
)

slices, log_messages, auxiliary_requests, latest_config_update = self._categorise_groups(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ def _dynamic_stream_configs(
COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
components_resolver_config,
config,
stream_name=dynamic_definition.get("name"),
)

stream_template_config = dynamic_definition["stream_template"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3805,15 +3805,15 @@ def create_components_mapping_definition(
)

def create_http_components_resolver(
self, model: HttpComponentsResolverModel, config: Config
self, model: HttpComponentsResolverModel, config: Config, stream_name: Optional[str] = None
) -> Any:
stream_slicer = self._build_stream_slicer_from_partition_router(model.retriever, config)
combined_slicers = self._build_resumable_cursor(model.retriever, stream_slicer)

retriever = self._create_component_from_model(
model=model.retriever,
config=config,
name="",
name=f"{stream_name if stream_name else '__http_components_resolver'}",
primary_key=None,
stream_slicer=stream_slicer if stream_slicer else combined_slicers,
transformations=[],
Expand Down
Loading