Skip to content

Commit bc4a36f

Browse files
updated ModelToComponent
1 parent 57706a7 commit bc4a36f

File tree

1 file changed

+13
-279
lines changed

1 file changed

+13
-279
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 13 additions & 279 deletions
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,10 @@
447447
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
448448
ZipfileDecoder as ZipfileDecoderModel,
449449
)
450-
from airbyte_cdk.sources.declarative.parsers.component_constructor import ComponentConstructor
450+
from airbyte_cdk.sources.declarative.parsers.component_constructor import (
451+
ComponentConstructor,
452+
AdditionalFlags,
453+
)
451454
from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import (
452455
COMPONENTS_MODULE_NAME,
453456
SDM_COMPONENTS_MODULE_NAME,
@@ -667,13 +670,14 @@ def __init__(
667670
self._collected_deprecation_logs: List[ConnectorBuilderLogMessage] = []
668671

669672
# support the dependency constructors with the re-usable parts from this Factory
670-
self._flags = {
671-
"_limit_pages_fetched_per_slice": self._limit_pages_fetched_per_slice,
672-
"_limit_slices_fetched": self._limit_slices_fetched,
673-
"_emit_connector_builder_messages": self._emit_connector_builder_messages,
674-
"_disable_retries": self._disable_retries,
675-
"_message_repository": self._message_repository,
676-
}
673+
self._flags = AdditionalFlags(
674+
emit_connector_builder_messages=self._emit_connector_builder_messages,
675+
disable_retries=self._disable_retries,
676+
message_repository=self._message_repository,
677+
connector_state_manager=self._connector_state_manager,
678+
limit_pages_fetched_per_slice=self._limit_pages_fetched_per_slice,
679+
limit_slices_fetched=self._limit_slices_fetched,
680+
)
677681

678682
def _init_mappings(self) -> None:
679683
self.PYDANTIC_MODEL_TO_CONSTRUCTOR: Dict[
@@ -755,7 +759,7 @@ def _init_mappings(self) -> None:
755759
PredicateValidatorModel: self.create_predicate_validator,
756760
PropertiesFromEndpointModel: self.create_properties_from_endpoint,
757761
PropertyChunkingModel: self.create_property_chunking,
758-
QueryPropertiesModel: self.create_query_properties,
762+
QueryPropertiesModel: QueryProperties,
759763
RecordFilterModel: RecordFilter,
760764
RecordSelectorModel: self.create_record_selector,
761765
RemoveFieldsModel: self.create_remove_fields,
@@ -3012,32 +3016,6 @@ def create_property_chunking(
30123016
parameters=model.parameters or {},
30133017
)
30143018

3015-
def create_query_properties(
3016-
self, model: QueryPropertiesModel, config: Config, **kwargs: Any
3017-
) -> QueryProperties:
3018-
if isinstance(model.property_list, list):
3019-
property_list = model.property_list
3020-
else:
3021-
property_list = self._create_component_from_model(
3022-
model=model.property_list, config=config, **kwargs
3023-
)
3024-
3025-
property_chunking = (
3026-
self._create_component_from_model(
3027-
model=model.property_chunking, config=config, **kwargs
3028-
)
3029-
if model.property_chunking
3030-
else None
3031-
)
3032-
3033-
return QueryProperties(
3034-
property_list=property_list,
3035-
always_include_properties=model.always_include_properties,
3036-
property_chunking=property_chunking,
3037-
config=config,
3038-
parameters=model.parameters or {},
3039-
)
3040-
30413019
@staticmethod
30423020
def create_request_path(model: RequestPathModel, config: Config, **kwargs: Any) -> RequestPath:
30433021
return RequestPath(parameters={})
@@ -3168,257 +3146,13 @@ def create_legacy_session_token_authenticator(
31683146
parameters=model.parameters or {},
31693147
)
31703148

3171-
def create_simple_retriever(
3172-
self,
3173-
model: SimpleRetrieverModel,
3174-
config: Config,
3175-
*,
3176-
name: str,
3177-
primary_key: Optional[Union[str, List[str], List[List[str]]]],
3178-
stream_slicer: Optional[StreamSlicer],
3179-
request_options_provider: Optional[RequestOptionsProvider] = None,
3180-
stop_condition_on_cursor: bool = False,
3181-
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
3182-
transformations: List[RecordTransformation],
3183-
file_uploader: Optional[DefaultFileUploader] = None,
3184-
incremental_sync: Optional[
3185-
Union[
3186-
IncrementingCountCursorModel, DatetimeBasedCursorModel, CustomIncrementalSyncModel
3187-
]
3188-
] = None,
3189-
use_cache: Optional[bool] = None,
3190-
log_formatter: Optional[Callable[[Response], Any]] = None,
3191-
**kwargs: Any,
3192-
) -> SimpleRetriever:
3193-
def _get_url() -> str:
3194-
"""
3195-
Closure to get the URL from the requester. This is used to get the URL in the case of a lazy retriever.
3196-
This is needed because the URL is not set until the requester is created.
3197-
"""
3198-
3199-
_url: str = (
3200-
model.requester.url
3201-
if hasattr(model.requester, "url") and model.requester.url is not None
3202-
else requester.get_url()
3203-
)
3204-
_url_base: str = (
3205-
model.requester.url_base
3206-
if hasattr(model.requester, "url_base") and model.requester.url_base is not None
3207-
else requester.get_url_base()
3208-
)
3209-
3210-
return _url or _url_base
3211-
3212-
decoder = (
3213-
self._create_component_from_model(model=model.decoder, config=config)
3214-
if model.decoder
3215-
else JsonDecoder(parameters={})
3216-
)
3217-
record_selector = self._create_component_from_model(
3218-
model=model.record_selector,
3219-
name=name,
3220-
config=config,
3221-
decoder=decoder,
3222-
transformations=transformations,
3223-
client_side_incremental_sync=client_side_incremental_sync,
3224-
file_uploader=file_uploader,
3225-
)
3226-
3227-
query_properties: Optional[QueryProperties] = None
3228-
query_properties_key: Optional[str] = None
3229-
if self._query_properties_in_request_parameters(model.requester):
3230-
# It is better to be explicit about an error if PropertiesFromEndpoint is defined in multiple
3231-
# places instead of default to request_parameters which isn't clearly documented
3232-
if (
3233-
hasattr(model.requester, "fetch_properties_from_endpoint")
3234-
and model.requester.fetch_properties_from_endpoint
3235-
):
3236-
raise ValueError(
3237-
f"PropertiesFromEndpoint should only be specified once per stream, but found in {model.requester.type}.fetch_properties_from_endpoint and {model.requester.type}.request_parameters"
3238-
)
3239-
3240-
query_properties_definitions = []
3241-
for key, request_parameter in model.requester.request_parameters.items(): # type: ignore # request_parameters is already validated to be a Mapping using _query_properties_in_request_parameters()
3242-
if isinstance(request_parameter, QueryPropertiesModel):
3243-
query_properties_key = key
3244-
query_properties_definitions.append(request_parameter)
3245-
3246-
if len(query_properties_definitions) > 1:
3247-
raise ValueError(
3248-
f"request_parameters only supports defining one QueryProperties field, but found {len(query_properties_definitions)} usages"
3249-
)
3250-
3251-
if len(query_properties_definitions) == 1:
3252-
query_properties = self._create_component_from_model(
3253-
model=query_properties_definitions[0], config=config
3254-
)
3255-
elif (
3256-
hasattr(model.requester, "fetch_properties_from_endpoint")
3257-
and model.requester.fetch_properties_from_endpoint
3258-
):
3259-
# todo: Deprecate this condition once dependent connectors migrate to query_properties
3260-
query_properties_definition = QueryPropertiesModel(
3261-
type="QueryProperties",
3262-
property_list=model.requester.fetch_properties_from_endpoint,
3263-
always_include_properties=None,
3264-
property_chunking=None,
3265-
) # type: ignore # $parameters has a default value
3266-
3267-
query_properties = self.create_query_properties(
3268-
model=query_properties_definition,
3269-
config=config,
3270-
)
3271-
elif hasattr(model.requester, "query_properties") and model.requester.query_properties:
3272-
query_properties = self.create_query_properties(
3273-
model=model.requester.query_properties,
3274-
config=config,
3275-
)
3276-
3277-
requester = self._create_component_from_model(
3278-
model=model.requester,
3279-
decoder=decoder,
3280-
name=name,
3281-
query_properties_key=query_properties_key,
3282-
use_cache=use_cache,
3283-
config=config,
3284-
)
3285-
3286-
# Define cursor only if per partition or common incremental support is needed
3287-
cursor = stream_slicer if isinstance(stream_slicer, DeclarativeCursor) else None
3288-
3289-
if (
3290-
not isinstance(stream_slicer, DatetimeBasedCursor)
3291-
or type(stream_slicer) is not DatetimeBasedCursor
3292-
):
3293-
# Many of the custom component implementations of DatetimeBasedCursor override get_request_params() (or other methods).
3294-
# Because we're decoupling RequestOptionsProvider from the Cursor, custom components will eventually need to reimplement
3295-
# their own RequestOptionsProvider. However, right now the existing StreamSlicer/Cursor still can act as the SimpleRetriever's
3296-
# request_options_provider
3297-
request_options_provider = stream_slicer or DefaultRequestOptionsProvider(parameters={})
3298-
elif not request_options_provider:
3299-
request_options_provider = DefaultRequestOptionsProvider(parameters={})
3300-
3301-
stream_slicer = stream_slicer or SinglePartitionRouter(parameters={})
3302-
if self._should_limit_slices_fetched():
3303-
stream_slicer = cast(
3304-
StreamSlicer,
3305-
StreamSlicerTestReadDecorator(
3306-
wrapped_slicer=stream_slicer,
3307-
maximum_number_of_slices=self._limit_slices_fetched or 5,
3308-
),
3309-
)
3310-
3311-
cursor_used_for_stop_condition = cursor if stop_condition_on_cursor else None
3312-
paginator = (
3313-
self._create_component_from_model(
3314-
model=model.paginator,
3315-
config=config,
3316-
url_base=_get_url(),
3317-
extractor_model=model.record_selector.extractor,
3318-
decoder=decoder,
3319-
cursor_used_for_stop_condition=cursor_used_for_stop_condition,
3320-
)
3321-
if model.paginator
3322-
else NoPagination(parameters={})
3323-
)
3324-
3325-
ignore_stream_slicer_parameters_on_paginated_requests = (
3326-
model.ignore_stream_slicer_parameters_on_paginated_requests or False
3327-
)
3328-
3329-
if (
3330-
model.partition_router
3331-
and isinstance(model.partition_router, SubstreamPartitionRouterModel)
3332-
and not bool(self._connector_state_manager.get_stream_state(name, None))
3333-
and any(
3334-
parent_stream_config.lazy_read_pointer
3335-
for parent_stream_config in model.partition_router.parent_stream_configs
3336-
)
3337-
):
3338-
if incremental_sync:
3339-
if incremental_sync.type != "DatetimeBasedCursor":
3340-
raise ValueError(
3341-
f"LazySimpleRetriever only supports DatetimeBasedCursor. Found: {incremental_sync.type}."
3342-
)
3343-
3344-
elif incremental_sync.step or incremental_sync.cursor_granularity:
3345-
raise ValueError(
3346-
f"Found more that one slice per parent. LazySimpleRetriever only supports single slice read for stream - {name}."
3347-
)
3348-
3349-
if model.decoder and model.decoder.type != "JsonDecoder":
3350-
raise ValueError(
3351-
f"LazySimpleRetriever only supports JsonDecoder. Found: {model.decoder.type}."
3352-
)
3353-
3354-
return LazySimpleRetriever(
3355-
name=name,
3356-
paginator=paginator,
3357-
primary_key=primary_key,
3358-
requester=requester,
3359-
record_selector=record_selector,
3360-
stream_slicer=stream_slicer,
3361-
request_option_provider=request_options_provider,
3362-
cursor=cursor,
3363-
config=config,
3364-
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
3365-
parameters=model.parameters or {},
3366-
)
3367-
3368-
return SimpleRetriever(
3369-
name=name,
3370-
paginator=paginator,
3371-
primary_key=primary_key,
3372-
requester=requester,
3373-
record_selector=record_selector,
3374-
stream_slicer=stream_slicer,
3375-
request_option_provider=request_options_provider,
3376-
cursor=cursor,
3377-
config=config,
3378-
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
3379-
additional_query_properties=query_properties,
3380-
log_formatter=self._get_log_formatter(log_formatter, name),
3381-
parameters=model.parameters or {},
3382-
)
3383-
3384-
def _get_log_formatter(
3385-
self, log_formatter: Callable[[Response], Any] | None, name: str
3386-
) -> Callable[[Response], Any] | None:
3387-
if self._should_limit_slices_fetched():
3388-
return (
3389-
(
3390-
lambda response: format_http_message(
3391-
response,
3392-
f"Stream '{name}' request",
3393-
f"Request performed in order to extract records for stream '{name}'",
3394-
name,
3395-
)
3396-
)
3397-
if not log_formatter
3398-
else log_formatter
3399-
)
3400-
return None
3401-
34023149
def _should_limit_slices_fetched(self) -> bool:
34033150
"""
34043151
Returns True if the number of slices fetched should be limited, False otherwise.
34053152
This is used to limit the number of slices fetched during tests.
34063153
"""
34073154
return bool(self._limit_slices_fetched or self._emit_connector_builder_messages)
34083155

3409-
@staticmethod
3410-
def _query_properties_in_request_parameters(
3411-
requester: Union[HttpRequesterModel, CustomRequesterModel],
3412-
) -> bool:
3413-
if not hasattr(requester, "request_parameters"):
3414-
return False
3415-
request_parameters = requester.request_parameters
3416-
if request_parameters and isinstance(request_parameters, Mapping):
3417-
for request_parameter in request_parameters.values():
3418-
if isinstance(request_parameter, QueryPropertiesModel):
3419-
return True
3420-
return False
3421-
34223156
@staticmethod
34233157
def _remove_query_properties(
34243158
request_parameters: Mapping[str, Union[str, QueryPropertiesModel]],

0 commit comments

Comments
 (0)