Skip to content

Commit 762f23c

Browse files
committed
clean up the code and consolidate concurrent and legacy flow under a single path
1 parent 5d20ab9 commit 762f23c

File tree

4 files changed

+47
-180
lines changed

4 files changed

+47
-180
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ def _group_streams(
202202

203203
# Combine streams and dynamic_streams. Note: both cannot be empty at the same time,
204204
# and this is validated during the initialization of the source.
205-
streams = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
205+
streams = self._stream_configs(self._source_config, config) + self._dynamic_stream_configs(
206206
self._source_config, config
207207
)
208208

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 46 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING
3636
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
3737
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
38+
from airbyte_cdk.sources.declarative.interpolation import InterpolatedBoolean
3839
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
3940
ConditionalStreams as ConditionalStreamsModel,
4041
)
@@ -303,43 +304,27 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
303304
}
304305
)
305306

306-
stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams
307+
stream_configs = (
308+
self._stream_configs(self._source_config, config=config) + self.dynamic_streams
309+
)
307310

308311
api_budget_model = self._source_config.get("api_budget")
309312
if api_budget_model:
310313
self._constructor.set_api_budget(api_budget_model, config)
311314

312-
source_streams = []
313-
for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)):
314-
match stream_config.get("type"):
315-
case StateDelegatingStreamModel.__name__:
316-
source_streams.append(
317-
self._constructor.create_component(
318-
StateDelegatingStreamModel,
319-
stream_config,
320-
config,
321-
emit_connector_builder_messages=self._emit_connector_builder_messages,
322-
)
323-
)
324-
case ConditionalStreamsModel.__name__:
325-
# ConditionalStreamsModel returns a list of DeclarativeStreams so it must be extended
326-
source_streams.extend(
327-
self._constructor.create_component(
328-
ConditionalStreamsModel,
329-
stream_config,
330-
config,
331-
emit_connector_builder_messages=self._emit_connector_builder_messages,
332-
)
333-
)
334-
case DeclarativeStreamModel.__name__:
335-
source_streams.append(
336-
self._constructor.create_component(
337-
DeclarativeStreamModel,
338-
stream_config,
339-
config,
340-
emit_connector_builder_messages=self._emit_connector_builder_messages,
341-
)
342-
)
315+
source_streams = [
316+
self._constructor.create_component(
317+
(
318+
StateDelegatingStreamModel
319+
if stream_config.get("type") == StateDelegatingStreamModel.__name__
320+
else DeclarativeStreamModel
321+
),
322+
stream_config,
323+
config,
324+
emit_connector_builder_messages=self._emit_connector_builder_messages,
325+
)
326+
for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
327+
]
343328
return source_streams
344329

345330
@staticmethod
@@ -383,26 +368,16 @@ def update_with_cache_parent_configs(
383368
update_with_cache_parent_configs(router["parent_stream_configs"])
384369

385370
for stream_config in stream_configs:
386-
current_stream_configs = []
387-
if stream_config.get("type") == ConditionalStreamsModel.__name__:
388-
current_stream_configs = [
389-
conditional_stream_config
390-
for conditional_stream_config in stream_config.get("streams") or []
391-
if conditional_stream_config["name"] in parent_streams
392-
]
393-
elif stream_config["name"] in parent_streams:
394-
current_stream_configs = [stream_config]
395-
396-
for current_stream_config in current_stream_configs:
397-
if current_stream_config["type"] == "StateDelegatingStream":
398-
current_stream_config["full_refresh_stream"]["retriever"]["requester"][
399-
"use_cache"
400-
] = True
401-
current_stream_config["incremental_stream"]["retriever"]["requester"][
402-
"use_cache"
403-
] = True
371+
if stream_config["name"] in parent_streams:
372+
if stream_config["type"] == "StateDelegatingStream":
373+
stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = (
374+
True
375+
)
376+
stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = (
377+
True
378+
)
404379
else:
405-
current_stream_config["retriever"]["requester"]["use_cache"] = True
380+
stream_config["retriever"]["requester"]["use_cache"] = True
406381
return stream_configs
407382

408383
def spec(self, logger: logging.Logger) -> ConnectorSpecification:
@@ -506,12 +481,27 @@ def _parse_version(
506481
# No exception
507482
return parsed_version
508483

509-
def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]:
484+
def _stream_configs(
485+
self, manifest: Mapping[str, Any], config: Mapping[str, Any]
486+
) -> List[Dict[str, Any]]:
510487
# This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config
511-
stream_configs: List[Dict[str, Any]] = manifest.get("streams", [])
512-
for s in stream_configs:
513-
if "type" not in s:
514-
s["type"] = "DeclarativeStream"
488+
stream_configs = []
489+
for current_stream_config in manifest.get("streams", []):
490+
if (
491+
"type" in current_stream_config
492+
and current_stream_config["type"] == "ConditionalStreams"
493+
):
494+
interpolated_boolean = InterpolatedBoolean(
495+
condition=current_stream_config.get("condition"),
496+
parameters={},
497+
)
498+
499+
if interpolated_boolean.eval(config=config):
500+
stream_configs.extend(current_stream_config.get("streams", []))
501+
else:
502+
if "type" not in current_stream_config:
503+
current_stream_config["type"] = "DeclarativeStream"
504+
stream_configs.append(current_stream_config)
515505
return stream_configs
516506

517507
def _dynamic_stream_configs(

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -659,7 +659,6 @@ def _init_mappings(self) -> None:
659659
CheckDynamicStreamModel: self.create_check_dynamic_stream,
660660
CompositeErrorHandlerModel: self.create_composite_error_handler,
661661
ConcurrencyLevelModel: self.create_concurrency_level,
662-
ConditionalStreamsModel: self.create_conditional_streams,
663662
ConfigMigrationModel: self.create_config_migration,
664663
ConfigAddFieldsModel: self.create_config_add_fields,
665664
ConfigRemapFieldModel: self.create_config_remap_field,
@@ -1623,22 +1622,6 @@ def create_concurrent_cursor_from_perpartition_cursor(
16231622
use_global_cursor=use_global_cursor,
16241623
)
16251624

1626-
def create_conditional_streams(
1627-
self, model: ConditionalStreamsModel, config: Config, **kwargs: Any
1628-
) -> List[DeclarativeStream]:
1629-
condition = InterpolatedBoolean(
1630-
condition=model.condition, parameters=model.parameters or {}
1631-
)
1632-
should_include_streams = condition.eval(config=config)
1633-
return (
1634-
[
1635-
self._create_component_from_model(stream, config=config, **kwargs)
1636-
for stream in model.streams
1637-
]
1638-
if should_include_streams
1639-
else []
1640-
)
1641-
16421625
@staticmethod
16431626
def create_constant_backoff_strategy(
16441627
model: ConstantBackoffStrategyModel, config: Config, **kwargs: Any

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 0 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -4802,109 +4802,3 @@ def test_create_stream_with_multiple_schema_loaders():
48024802
assert len(schema_loader.schema_loaders) == 2
48034803
assert isinstance(schema_loader.schema_loaders[0], InlineSchemaLoader)
48044804
assert isinstance(schema_loader.schema_loaders[1], InlineSchemaLoader)
4805-
4806-
4807-
def test_conditional_streams():
4808-
content = """
4809-
lists_stream:
4810-
type: DeclarativeStream
4811-
name: lists
4812-
primary_key: id
4813-
retriever:
4814-
paginator:
4815-
type: "DefaultPaginator"
4816-
page_size_option:
4817-
type: RequestOption
4818-
inject_into: request_parameter
4819-
field_name: page_size
4820-
page_token_option:
4821-
type: RequestPath
4822-
pagination_strategy:
4823-
type: "CursorPagination"
4824-
cursor_value: "{{ response._metadata.next }}"
4825-
page_size: 10
4826-
requester:
4827-
url_base: "https://testing.com"
4828-
path: "/api/v1/lists"
4829-
authenticator:
4830-
type: "BearerAuthenticator"
4831-
api_token: "{{ config.apikey }}"
4832-
request_parameters:
4833-
page_size: 10
4834-
record_selector:
4835-
extractor:
4836-
field_path: ["result"]
4837-
schema_loader:
4838-
type: JsonFileSchemaLoader
4839-
file_path: "./source_test/schemas/lists.yaml"
4840-
conditions_stream:
4841-
type: DeclarativeStream
4842-
name: sometimes
4843-
primary_key: id
4844-
retriever:
4845-
paginator:
4846-
type: "DefaultPaginator"
4847-
page_size_option:
4848-
type: RequestOption
4849-
inject_into: request_parameter
4850-
field_name: page_size
4851-
page_token_option:
4852-
type: RequestPath
4853-
pagination_strategy:
4854-
type: "CursorPagination"
4855-
cursor_value: "{{ response._metadata.next }}"
4856-
page_size: 10
4857-
requester:
4858-
type: HttpRequester
4859-
url_base: "https://testing.com"
4860-
path: "/api/v1/sometimes"
4861-
authenticator:
4862-
type: "BearerAuthenticator"
4863-
api_token: "{{ config.apikey }}"
4864-
request_parameters:
4865-
page_size: 10
4866-
record_selector:
4867-
type: RecordSelector
4868-
extractor:
4869-
type: DpathExtractor
4870-
field_path: ["result"]
4871-
schema_loader:
4872-
type: JsonFileSchemaLoader
4873-
file_path: "./source_test/schemas/sometimes.yaml"
4874-
streams:
4875-
- "#/lists_stream"
4876-
- type: ConditionalStreams
4877-
condition: "{{ config['is_sandbox'] }}"
4878-
streams:
4879-
- "#/conditions_stream"
4880-
"""
4881-
parsed_manifest = YamlDeclarativeSource._parse(content)
4882-
resolved_manifest = resolver.preprocess_manifest(parsed_manifest)
4883-
resolved_manifest["type"] = "DeclarativeSource"
4884-
stream_manifest = transformer.propagate_types_and_parameters(
4885-
"", resolved_manifest["conditions_stream"], {}
4886-
)
4887-
4888-
conditional_streams_manifest = resolved_manifest["streams"][1]
4889-
conditional_streams_manifest["streams"] = [stream_manifest]
4890-
4891-
sandbox_config = {
4892-
**input_config,
4893-
"is_sandbox": True,
4894-
}
4895-
4896-
streams = factory.create_component(
4897-
model_type=ConditionalStreamsModel,
4898-
component_definition=conditional_streams_manifest,
4899-
config=sandbox_config,
4900-
)
4901-
4902-
assert len(streams) == 1
4903-
conditional_stream = streams[0]
4904-
assert isinstance(conditional_stream, DeclarativeStream)
4905-
assert conditional_stream.name == "sometimes"
4906-
assert conditional_stream.primary_key == "id"
4907-
4908-
assert isinstance(conditional_stream.retriever, SimpleRetriever)
4909-
assert conditional_stream.retriever.name == "sometimes"
4910-
assert conditional_stream.retriever.primary_key == "id"

0 commit comments

Comments
 (0)