Skip to content

Commit 0c27b96

Browse files
updated model_to_component.py, set up RecordFilter with resolve_dependencies method
1 parent b878987 commit 0c27b96

File tree

2 files changed

+64
-20
lines changed

2 files changed

+64
-20
lines changed

airbyte_cdk/sources/declarative/extractors/record_filter.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
44
from dataclasses import InitVar, dataclass
5-
from typing import Any, Iterable, Mapping, Optional, Union
5+
from typing import Any, Iterable, Mapping, Optional, Union, Callable
66

77
from airbyte_cdk.sources.declarative.incremental import (
88
DatetimeBasedCursor,
@@ -11,10 +11,14 @@
1111
)
1212
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
1313
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
14+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
15+
RecordFilter as RecordFilterModel,
16+
)
17+
from airbyte_cdk.sources.declarative.parsers.component_constructor import ComponentConstructor
1418

1519

1620
@dataclass
17-
class RecordFilter:
21+
class RecordFilter(ComponentConstructor[RecordFilterModel]):
1822
"""
1923
Filter applied on a list of Records
2024
@@ -26,6 +30,21 @@ class RecordFilter:
2630
config: Config
2731
condition: str = ""
2832

33+
@classmethod
34+
def resolve_dependencies(
35+
cls,
36+
model: RecordFilterModel,
37+
config: Config,
38+
dependency_constructor: Callable[..., Any],
39+
additional_flags: Optional[Mapping[str, Any]] = None,
40+
**kwargs: Any,
41+
) -> Mapping[str, Any]:
42+
return {
43+
"condition": model.condition or "",
44+
"config": config,
45+
"parameters": model.parameters or {},
46+
}
47+
2948
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
3049
self._filter_interpolator = InterpolatedBoolean(
3150
condition=self.condition, parameters=parameters

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
get_args,
2424
get_origin,
2525
get_type_hints,
26+
TypeVar,
2627
)
2728

2829
from isodate import parse_duration
@@ -615,6 +616,8 @@
615616
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction
616617
from airbyte_cdk.sources.types import Config
617618
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
619+
from airbyte_cdk.sources.declarative.parsers.component_constructor import ComponentConstructor
620+
618621

619622
ComponentDefinition = Mapping[str, Any]
620623

@@ -623,8 +626,17 @@
623626
SchemaNormalizationModel.Default: TransformConfig.DefaultSchemaNormalization,
624627
}
625628

629+
M = TypeVar("M", bound=BaseModel)
630+
D = TypeVar("D", bound=BaseModel)
631+
626632

627633
class ModelToComponentFactory:
634+
"""
635+
The default Model > Component Factory implementation.
636+
The Custom components are built separately from the default implementations,
637+
to provide the reasonable decoupling from the standard and Custom implementation build technique.
638+
"""
639+
628640
EPOCH_DATETIME_FORMAT = "%s"
629641

630642
def __init__(
@@ -655,8 +667,19 @@ def __init__(
655667
# placeholder for deprecation warnings
656668
self._collected_deprecation_logs: List[ConnectorBuilderLogMessage] = []
657669

670+
# support the dependency constructors with the re-usable parts from this Factory
671+
self._flags = {
672+
"_limit_pages_fetched_per_slice": self._limit_pages_fetched_per_slice,
673+
"_limit_slices_fetched": self._limit_slices_fetched,
674+
"_emit_connector_builder_messages": self._emit_connector_builder_messages,
675+
"_disable_retries": self._disable_retries,
676+
"_message_repository": self._message_repository,
677+
}
678+
658679
def _init_mappings(self) -> None:
659-
self.PYDANTIC_MODEL_TO_CONSTRUCTOR: Mapping[Type[BaseModel], Callable[..., Any]] = {
680+
self.PYDANTIC_MODEL_TO_CONSTRUCTOR: Dict[
681+
Type[BaseModel], Union[Type[ComponentConstructor], Callable[..., Any]]
682+
] = {
660683
AddedFieldDefinitionModel: self.create_added_field_definition,
661684
AddFieldsModel: self.create_add_fields,
662685
ApiKeyAuthenticatorModel: self.create_api_key_authenticator,
@@ -734,7 +757,7 @@ def _init_mappings(self) -> None:
734757
PropertiesFromEndpointModel: self.create_properties_from_endpoint,
735758
PropertyChunkingModel: self.create_property_chunking,
736759
QueryPropertiesModel: self.create_query_properties,
737-
RecordFilterModel: self.create_record_filter,
760+
RecordFilterModel: RecordFilter,
738761
RecordSelectorModel: self.create_record_selector,
739762
RemoveFieldsModel: self.create_remove_fields,
740763
RequestPathModel: self.create_request_path,
@@ -803,20 +826,30 @@ def create_component(
803826
model=declarative_component_model, config=config, **kwargs
804827
)
805828

806-
def _create_component_from_model(self, model: BaseModel, config: Config, **kwargs: Any) -> Any:
829+
def _create_component_from_model(
830+
self, model: BaseModel, config: Config, **kwargs: Any
831+
) -> ComponentConstructor[BaseModel]:
807832
if model.__class__ not in self.PYDANTIC_MODEL_TO_CONSTRUCTOR:
808833
raise ValueError(
809834
f"{model.__class__} with attributes {model} is not a valid component type"
810835
)
811-
component_constructor = self.PYDANTIC_MODEL_TO_CONSTRUCTOR.get(model.__class__)
812-
if not component_constructor:
813-
raise ValueError(f"Could not find constructor for {model.__class__}")
814836

815-
# collect deprecation warnings for supported models.
816-
if isinstance(model, BaseModelWithDeprecations):
817-
self._collect_model_deprecations(model)
837+
component = self.PYDANTIC_MODEL_TO_CONSTRUCTOR.get(model.__class__)
838+
if not component:
839+
raise ValueError(f"Could not find constructor for {model.__class__}")
818840

819-
return component_constructor(model=model, config=config, **kwargs)
841+
if inspect.isclass(component) and issubclass(component, ComponentConstructor):
842+
# Default components flow
843+
component_instance: ComponentConstructor[BaseModel] = component.build(
844+
model=model,
845+
config=config,
846+
dependency_constructor=self._create_component_from_model,
847+
additional_flags=self._flags,
848+
**kwargs,
849+
)
850+
return component_instance
851+
else:
852+
return component(model=model, config=config, **kwargs)
820853

821854
def get_model_deprecations(self) -> List[ConnectorBuilderLogMessage]:
822855
"""
@@ -2999,14 +3032,6 @@ def create_query_properties(
29993032
parameters=model.parameters or {},
30003033
)
30013034

3002-
@staticmethod
3003-
def create_record_filter(
3004-
model: RecordFilterModel, config: Config, **kwargs: Any
3005-
) -> RecordFilter:
3006-
return RecordFilter(
3007-
condition=model.condition or "", config=config, parameters=model.parameters or {}
3008-
)
3009-
30103035
@staticmethod
30113036
def create_request_path(model: RequestPathModel, config: Config, **kwargs: Any) -> RequestPath:
30123037
return RequestPath(parameters={})

0 commit comments

Comments
 (0)