Skip to content

Commit 9c5da1d

Browse files
committed
Rollback not related changes
1 parent 79f3f1a commit 9c5da1d

File tree

6 files changed

+22
-148
lines changed

6 files changed

+22
-148
lines changed

airbyte_cdk/sources/declarative/extractors/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
from airbyte_cdk.sources.declarative.extractors.combined_extractor import CombinedExtractor
66
from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor
77
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
8-
from airbyte_cdk.sources.declarative.extractors.key_value_extractor import KeyValueExtractor
98
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
109
from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector
1110
from airbyte_cdk.sources.declarative.extractors.response_to_file_extractor import (
@@ -20,6 +19,5 @@
2019
"RecordFilter",
2120
"RecordSelector",
2221
"ResponseToFileExtractor",
23-
"KeyValueExtractor",
2422
"CombinedExtractor",
2523
]

airbyte_cdk/sources/declarative/extractors/key_value_extractor.py

Lines changed: 0 additions & 42 deletions
This file was deleted.

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
262262
}
263263
)
264264

265-
stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams
265+
stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs(self._source_config, config)
266266

267267
api_budget_model = self._source_config.get("api_budget")
268268
if api_budget_model:

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 3 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@
8585
from airbyte_cdk.sources.declarative.extractors import (
8686
CombinedExtractor,
8787
DpathExtractor,
88-
KeyValueExtractor,
8988
RecordFilter,
9089
RecordSelector,
9190
ResponseToFileExtractor,
@@ -309,9 +308,6 @@
309308
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
310309
KeysToSnakeCase as KeysToSnakeCaseModel,
311310
)
312-
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
313-
KeyValueExtractor as KeyValueExtractorModel,
314-
)
315311
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
316312
LegacySessionTokenAuthenticator as LegacySessionTokenAuthenticatorModel,
317313
)
@@ -649,7 +645,6 @@ def _init_mappings(self) -> None:
649645
DefaultErrorHandlerModel: self.create_default_error_handler,
650646
DefaultPaginatorModel: self.create_default_paginator,
651647
DpathExtractorModel: self.create_dpath_extractor,
652-
KeyValueExtractorModel: self.create_key_value_extractor,
653648
CombinedExtractorModel: self.create_combined_extractor,
654649
ResponseToFileExtractorModel: self.create_response_to_file_extractor,
655650
ExponentialBackoffStrategyModel: self.create_exponential_backoff_strategy,
@@ -2228,22 +2223,6 @@ def create_dpath_extractor(
22282223
parameters=model.parameters or {},
22292224
)
22302225

2231-
def create_key_value_extractor(
2232-
self,
2233-
model: KeyValueExtractorModel,
2234-
config: Config,
2235-
decoder: Optional[Decoder] = JsonDecoder(parameters={}),
2236-
**kwargs: Any,
2237-
) -> KeyValueExtractor:
2238-
keys_extractor = self._create_component_from_model(
2239-
model=model.keys_extractor, decoder=decoder, config=config
2240-
)
2241-
values_extractor = self._create_component_from_model(
2242-
model=model.values_extractor, decoder=decoder, config=config
2243-
)
2244-
2245-
return KeyValueExtractor(keys_extractor=keys_extractor, values_extractor=values_extractor)
2246-
22472226
def create_combined_extractor(
22482227
self,
22492228
model: CombinedExtractorModel,
@@ -2472,14 +2451,10 @@ def create_dynamic_schema_loader(
24722451
schema_type_identifier = self._create_component_from_model(
24732452
model.schema_type_identifier, config=config, parameters=model.parameters or {}
24742453
)
2475-
schema_filter = self._create_component_from_model(
2476-
model.schema_filter, config=config, parameters=model.parameters or {}
2477-
)
24782454
return DynamicSchemaLoader(
24792455
retriever=retriever,
24802456
config=config,
24812457
schema_transformations=schema_transformations,
2482-
schema_filter=schema_filter,
24832458
schema_type_identifier=schema_type_identifier,
24842459
parameters=model.parameters or {},
24852460
)
@@ -3641,7 +3616,6 @@ def create_components_mapping_definition(
36413616
field_path=field_path, # type: ignore[arg-type] # field_path can be str and InterpolatedString
36423617
value=interpolated_value,
36433618
value_type=ModelToComponentFactory._json_schema_type_name_to_type(model.value_type),
3644-
create_or_update=model.create_or_update,
36453619
parameters=model.parameters or {},
36463620
)
36473621

@@ -3688,24 +3662,16 @@ def create_stream_config(
36883662

36893663
return StreamConfig(
36903664
configs_pointer=model_configs_pointer,
3691-
default_values=model.default_values,
36923665
parameters=model.parameters or {},
36933666
)
36943667

36953668
def create_config_components_resolver(
36963669
self, model: ConfigComponentsResolverModel, config: Config
36973670
) -> Any:
3698-
model_stream_configs = (
3699-
model.stream_config if isinstance(model.stream_config, list) else [model.stream_config]
3671+
stream_config = self._create_component_from_model(
3672+
model.stream_config, config=config, parameters=model.parameters or {}
37003673
)
37013674

3702-
stream_configs = [
3703-
self._create_component_from_model(
3704-
stream_config, config=config, parameters=model.parameters or {}
3705-
)
3706-
for stream_config in model_stream_configs
3707-
]
3708-
37093675
components_mapping = [
37103676
self._create_component_from_model(
37113677
model=components_mapping_definition_model,
@@ -3718,9 +3684,8 @@ def create_config_components_resolver(
37183684
]
37193685

37203686
return ConfigComponentsResolver(
3721-
stream_configs=stream_configs,
3687+
stream_config=stream_config,
37223688
config=config,
3723-
components_mapping=components_mapping,
37243689
parameters=model.parameters or {},
37253690
)
37263691

airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py

Lines changed: 14 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@
44

55
from copy import deepcopy
66
from dataclasses import InitVar, dataclass, field
7-
from itertools import product
8-
from typing import Any, Dict, Iterable, List, Mapping, Optional, Union
7+
from typing import Any, Dict, Iterable, List, Mapping, Union
98

109
import dpath
1110
from typing_extensions import deprecated
@@ -29,7 +28,6 @@ class StreamConfig:
2928

3029
configs_pointer: List[Union[InterpolatedString, str]]
3130
parameters: InitVar[Mapping[str, Any]]
32-
default_values: Optional[List[Any]] = None
3331

3432
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
3533
self.configs_pointer = [
@@ -50,7 +48,7 @@ class ConfigComponentsResolver(ComponentsResolver):
5048
parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation.
5149
"""
5250

53-
stream_configs: List[StreamConfig]
51+
stream_config: StreamConfig
5452
config: Config
5553
components_mapping: List[ComponentMappingDefinition]
5654
parameters: InitVar[Mapping[str, Any]]
@@ -84,7 +82,6 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
8482
field_path=field_path,
8583
value=interpolated_value,
8684
value_type=component_mapping.value_type,
87-
create_or_update=component_mapping.create_or_update,
8885
parameters=parameters,
8986
)
9087
)
@@ -94,35 +91,17 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
9491
)
9592

9693
@property
97-
def _stream_config(self):
98-
def resolve_path(pointer):
99-
return [
100-
node.eval(self.config) if not isinstance(node, str) else node for node in pointer
101-
]
102-
103-
def normalize_configs(configs):
104-
return configs if isinstance(configs, list) else [configs]
105-
106-
def prepare_streams():
107-
for stream_config in self.stream_configs:
108-
path = resolve_path(stream_config.configs_pointer)
109-
stream_configs = dpath.get(dict(self.config), path, default=[])
110-
stream_configs = normalize_configs(stream_configs)
111-
if stream_config.default_values:
112-
stream_configs += stream_config.default_values
113-
yield [(i, item) for i, item in enumerate(stream_configs)]
114-
115-
def merge_combination(combo):
116-
result = {}
117-
for config_index, (elem_index, elem) in enumerate(combo):
118-
if isinstance(elem, dict):
119-
result.update(elem)
120-
else:
121-
result.setdefault(f"source_config_{config_index}", (elem_index, elem))
122-
return result
123-
124-
all_indexed_streams = list(prepare_streams())
125-
return [merge_combination(combo) for combo in product(*all_indexed_streams)]
94+
def _stream_config(self) -> Iterable[Mapping[str, Any]]:
95+
path = [
96+
node.eval(self.config) if not isinstance(node, str) else node
97+
for node in self.stream_config.configs_pointer
98+
]
99+
stream_config = dpath.get(dict(self.config), path, default=[])
100+
101+
if not isinstance(stream_config, list):
102+
stream_config = [stream_config]
103+
104+
return stream_config
126105

127106
def resolve_components(
128107
self, stream_template_config: Dict[str, Any]
@@ -151,21 +130,7 @@ def resolve_components(
151130
)
152131

153132
path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path]
154-
parsed_value = self._parse_yaml_if_possible(value)
155-
updated = dpath.set(updated_config, path, parsed_value)
156133

157-
if parsed_value and not updated and resolved_component.create_or_update:
158-
dpath.new(updated_config, path, parsed_value)
134+
dpath.set(updated_config, path, value)
159135

160136
yield updated_config
161-
162-
@staticmethod
163-
def _parse_yaml_if_possible(value: Any) -> Any:
164-
if isinstance(value, str):
165-
try:
166-
import yaml
167-
168-
return yaml.safe_load(value)
169-
except Exception:
170-
return value
171-
return value

airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import dpath
1111
from typing_extensions import deprecated
1212

13-
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
1413
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
1514
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
1615
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
@@ -127,7 +126,6 @@ class DynamicSchemaLoader(SchemaLoader):
127126
parameters: InitVar[Mapping[str, Any]]
128127
schema_type_identifier: SchemaTypeIdentifier
129128
schema_transformations: List[RecordTransformation] = field(default_factory=lambda: [])
130-
schema_filter: Optional[RecordFilter] = None
131129

132130
def get_json_schema(self) -> Mapping[str, Any]:
133131
"""
@@ -153,18 +151,20 @@ def get_json_schema(self) -> Mapping[str, Any]:
153151
)
154152
properties[key] = value
155153

156-
filtred_transformed_properties = self._transform(self._filter(properties))
154+
transformed_properties = self._transform(properties, {})
157155

158156
return {
159157
"$schema": "https://json-schema.org/draft-07/schema#",
160158
"type": "object",
161159
"additionalProperties": True,
162-
"properties": filtred_transformed_properties,
160+
"properties": transformed_properties,
163161
}
164162

165163
def _transform(
166164
self,
167165
properties: Mapping[str, Any],
166+
stream_state: StreamState,
167+
stream_slice: Optional[StreamSlice] = None,
168168
) -> Mapping[str, Any]:
169169
for transformation in self.schema_transformations:
170170
transformation.transform(
@@ -173,18 +173,6 @@ def _transform(
173173
)
174174
return properties
175175

176-
def _filter(
177-
self,
178-
properties: Mapping[str, Any],
179-
) -> Mapping[str, Any]:
180-
if self.schema_filter:
181-
filtered_properties = {}
182-
for property in self.schema_filter.filter_records(properties.items(), {}):
183-
filtered_properties[property[0]] = property[1]
184-
return filtered_properties
185-
else:
186-
return properties
187-
188176
def _get_key(
189177
self,
190178
raw_schema: MutableMapping[str, Any],

0 commit comments

Comments
 (0)