Skip to content

Commit 6ec240a

Browse files
author
Oleksandr Bazarnov
committed
updated normalizer
1 parent 9f7d498 commit 6ec240a

File tree

6 files changed

+66
-31
lines changed

6 files changed

+66
-31
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1604,7 +1604,6 @@ definitions:
16041604
title: URL Base
16051605
description: The base URL (scheme and host, e.g. "https://api.example.com") to match.
16061606
type: string
1607-
sharable: True
16081607
url_path_pattern:
16091608
title: URL Path Pattern
16101609
description: A regular expression pattern to match the URL path.
@@ -1846,8 +1845,9 @@ definitions:
18461845
type: string
18471846
enum: [HttpRequester]
18481847
url_base:
1848+
sharable: true
18491849
title: API Base URL
1850-
description: Base URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.
1850+
description: The Base URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.
18511851
type: string
18521852
interpolation_context:
18531853
- config
@@ -1863,10 +1863,9 @@ definitions:
18631863
- "{{ config['base_url'] or 'https://app.posthog.com'}}/api"
18641864
- "https://connect.squareup.com/v2/quotes/{{ stream_partition['id'] }}/quote_line_groups"
18651865
- "https://example.com/api/v1/resource/{{ next_page_token['id'] }}"
1866-
sharable: True
18671866
path:
18681867
title: URL Path
1869-
description: Path the specific API endpoint that this stream represents. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.
1868+
description: The Path the specific API endpoint that this stream represents. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.
18701869
type: string
18711870
interpolation_context:
18721871
- config
@@ -1882,6 +1881,7 @@ definitions:
18821881
- "/quotes/{{ stream_partition['id'] }}/quote_line_groups"
18831882
- "/trades/{{ config['symbol_id'] }}/history"
18841883
authenticator:
1884+
sharable: true
18851885
title: Authenticator
18861886
description: Authentication method to use for requests sent to the API.
18871887
anyOf:
@@ -1895,7 +1895,6 @@ definitions:
18951895
- "$ref": "#/definitions/SessionTokenAuthenticator"
18961896
- "$ref": "#/definitions/LegacySessionTokenAuthenticator"
18971897
- "$ref": "#/definitions/SelectiveAuthenticator"
1898-
sharable: True
18991898
error_handler:
19001899
title: Error Handler
19011900
description: Error handler component that defines how to handle errors.

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@
2929
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
3030
DeclarativeStream as DeclarativeStreamModel,
3131
)
32-
from airbyte_cdk.sources.declarative.models.declarative_component_schema import Spec as SpecModel
32+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
33+
Spec as SpecModel,
34+
)
3335
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
3436
StateDelegatingStream as StateDelegatingStreamModel,
3537
)
@@ -89,7 +91,7 @@ def __init__(
8991
debug: bool = False,
9092
emit_connector_builder_messages: bool = False,
9193
component_factory: Optional[ModelToComponentFactory] = None,
92-
):
94+
) -> None:
9395
"""
9496
Args:
9597
config: The provided config dict.
@@ -109,18 +111,24 @@ def __init__(
109111
# If custom components are needed, locate and/or register them.
110112
self.components_module: ModuleType | None = get_registered_components_module(config=config)
111113

114+
# resolve all `$ref` references in the manifest
112115
resolved_source_config = ManifestReferenceResolver().preprocess_manifest(manifest)
116+
# resolve all components in the manifest
117+
propagated_source_config = ManifestComponentTransformer().propagate_types_and_parameters(
118+
"", resolved_source_config, {}
119+
)
113120

114121
if emit_connector_builder_messages:
115-
# reduce commonalities in the manifest after the references have been resolved,
116-
# used mostly for Connector Builder use cases.
117-
resolved_source_config = ManifestNormalizer(
118-
resolved_source_config, self._declarative_component_schema
122+
# Connector Builder Ui rendering requires the manifest to be in a specific format.
123+
# 1) references have been resolved
124+
# 2) deprecated fields have been migrated
125+
# 3) the commonly used definitions are extracted to the `definitions.shared.*`
126+
# 4) ! the normalized manifest could be validated after the additional UI post-processing.
127+
propagated_source_config = ManifestNormalizer(
128+
propagated_source_config,
129+
self._declarative_component_schema,
119130
).normalize()
120131

121-
propagated_source_config = ManifestComponentTransformer().propagate_types_and_parameters(
122-
"", resolved_source_config, {}
123-
)
124132
self._source_config = propagated_source_config
125133
self._debug = debug
126134
self._emit_connector_builder_messages = emit_connector_builder_messages
@@ -151,7 +159,9 @@ def message_repository(self) -> MessageRepository:
151159
@property
152160
def dynamic_streams(self) -> List[Dict[str, Any]]:
153161
return self._dynamic_stream_configs(
154-
manifest=self._source_config, config=self._config, with_dynamic_stream_name=True
162+
manifest=self._source_config,
163+
config=self._config,
164+
with_dynamic_stream_name=True,
155165
)
156166

157167
@property
@@ -174,7 +184,10 @@ def connection_checker(self) -> ConnectionChecker:
174184

175185
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
176186
self._emit_manifest_debug_message(
177-
extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)}
187+
extra_args={
188+
"source_name": self.name,
189+
"parsed_config": json.dumps(self._source_config),
190+
}
178191
)
179192

180193
stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
@@ -187,9 +200,11 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
187200

188201
source_streams = [
189202
self._constructor.create_component(
190-
StateDelegatingStreamModel
191-
if stream_config.get("type") == StateDelegatingStreamModel.__name__
192-
else DeclarativeStreamModel,
203+
(
204+
StateDelegatingStreamModel
205+
if stream_config.get("type") == StateDelegatingStreamModel.__name__
206+
else DeclarativeStreamModel
207+
),
193208
stream_config,
194209
config,
195210
emit_connector_builder_messages=self._emit_connector_builder_messages,
@@ -205,7 +220,9 @@ def _initialize_cache_for_parent_streams(
205220
) -> List[Dict[str, Any]]:
206221
parent_streams = set()
207222

208-
def update_with_cache_parent_configs(parent_configs: list[dict[str, Any]]) -> None:
223+
def update_with_cache_parent_configs(
224+
parent_configs: list[dict[str, Any]],
225+
) -> None:
209226
for parent_config in parent_configs:
210227
parent_streams.add(parent_config["stream"]["name"])
211228
if parent_config["stream"]["type"] == "StateDelegatingStream":
@@ -260,7 +277,10 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification:
260277
"""
261278
self._configure_logger_level(logger)
262279
self._emit_manifest_debug_message(
263-
extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)}
280+
extra_args={
281+
"source_name": self.name,
282+
"parsed_config": json.dumps(self._source_config),
283+
}
264284
)
265285

266286
spec = self._source_config.get("spec")
@@ -404,7 +424,9 @@ def _dynamic_stream_configs(
404424

405425
# Create a resolver for dynamic components based on type
406426
components_resolver = self._constructor.create_component(
407-
COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], components_resolver_config, config
427+
COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
428+
components_resolver_config,
429+
config,
408430
)
409431

410432
stream_template_config = dynamic_definition["stream_template"]

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class DynamicStreamCheckConfig(BaseModel):
4949
)
5050
stream_count: Optional[int] = Field(
5151
0,
52-
description="Numbers of the streams to try reading from when running a check operation.",
52+
description="The number of streams to attempt reading from during a check operation. If `stream_count` exceeds the total number of available streams, the minimum of the two values will be used.",
5353
title="Stream Count",
5454
)
5555

@@ -2113,7 +2113,7 @@ class HttpRequester(BaseModel):
21132113
type: Literal["HttpRequester"]
21142114
url_base: str = Field(
21152115
...,
2116-
description="Base URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.",
2116+
description="The Base URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.",
21172117
examples=[
21182118
"https://connect.squareup.com/v2",
21192119
"{{ config['base_url'] or 'https://app.posthog.com'}}/api",
@@ -2124,7 +2124,7 @@ class HttpRequester(BaseModel):
21242124
)
21252125
path: Optional[str] = Field(
21262126
None,
2127-
description="Path the specific API endpoint that this stream represents. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.",
2127+
description="The Path the specific API endpoint that this stream represents. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.",
21282128
examples=[
21292129
"/products",
21302130
"/quotes/{{ stream_partition['id'] }}/quote_line_groups",

airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,12 @@ def __init__(
8484
) -> None:
8585
self._resolved_manifest = resolved_manifest
8686
self._declarative_schema = declarative_schema
87+
self._normalized_manifest: ManifestType = copy.deepcopy(self._resolved_manifest)
8788
# get the tags marked as `sharable` in the component schema
8889
self._sharable_tags = _get_sharable_schema_tags(self._declarative_schema)
89-
self._normalized_manifest: ManifestType = copy.deepcopy(self._resolved_manifest)
90+
91+
def to_json_str(self) -> str:
92+
return json.dumps(self._normalized_manifest, indent=2)
9093

9194
def normalize(self) -> ManifestType:
9295
"""
@@ -193,7 +196,7 @@ def _replace_duplicates_with_refs(self, duplicates: DuplicatesType) -> None:
193196
self._add_to_shared_definitions(type_key, key, value)
194197

195198
# Replace occurrences with references
196-
for path, parent_obj, value in occurrences:
199+
for _, parent_obj, value in occurrences:
197200
if is_shared_def:
198201
if value == self._get_shared_definition_value(type_key, key):
199202
parent_obj[key] = self._create_shared_definition_ref(type_key, key)
@@ -235,7 +238,7 @@ def _add_duplicate(
235238
"""
236239

237240
# create hash for each duplicate observed
238-
value_to_hash = value if key is None else {key: value}
241+
value_to_hash = {key: value} if key is not None else value
239242
duplicates[self._hash_object(value_to_hash)].append((current_path, obj, value))
240243

241244
def _add_to_shared_definitions(

unit_tests/sources/declarative/interpolation/test_macros.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ def test_macros_export(test_name, fn_name, found_in_macros):
3030
assert fn_name not in macros
3131

3232

33-
@freeze_time("2022-01-01")
3433
@pytest.mark.parametrize(
3534
"input_value, format, input_format, expected_output",
3635
[

unit_tests/sources/declarative/parsers/test_manifest_normalizer.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,29 @@ def test_deduplicate_manifest_when_multiple_url_base_are_resolved_and_most_frequ
2020
manifest_with_multiple_url_base,
2121
expected_manifest_with_multiple_url_base_normalized,
2222
) -> None:
23-
resolved_manifest = resolver.preprocess_manifest(manifest_with_multiple_url_base)
23+
"""
24+
This test is to check that the manifest is normalized when multiple url_base are resolved
25+
and the most frequent one is shared.
26+
"""
27+
2428
schema = _get_declarative_component_schema()
29+
resolved_manifest = resolver.preprocess_manifest(manifest_with_multiple_url_base)
2530
normalized_manifest = ManifestNormalizer(resolved_manifest, schema).normalize()
31+
2632
assert normalized_manifest == expected_manifest_with_multiple_url_base_normalized
2733

2834

2935
def test_deduplicate_manifest_with_shared_definitions_url_base_are_present(
3036
manifest_with_url_base_shared_definition,
3137
expected_manifest_with_url_base_shared_definition_normalized,
3238
) -> None:
33-
resolved_manifest = resolver.preprocess_manifest(manifest_with_url_base_shared_definition)
39+
"""
40+
This test is to check that the manifest is normalized when the `url_base` is shared
41+
between the definitions and the `url_base` is present in the manifest.
42+
"""
43+
3444
schema = _get_declarative_component_schema()
45+
resolved_manifest = resolver.preprocess_manifest(manifest_with_url_base_shared_definition)
3546
normalized_manifest = ManifestNormalizer(resolved_manifest, schema).normalize()
47+
3648
assert normalized_manifest == expected_manifest_with_url_base_shared_definition_normalized

0 commit comments

Comments
 (0)