Skip to content

Commit 304235c

Browse files
author
Oleksandr Bazarnov
committed
add schema extraction + unit test
1 parent 7d71f4b commit 304235c

File tree

4 files changed

+504
-16
lines changed

4 files changed

+504
-16
lines changed

airbyte_cdk/sources/declarative/parsers/manifest_normalizer.py

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#
2-
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
33
#
44

55
import copy
@@ -114,6 +114,7 @@ def normalize(self) -> ManifestType:
114114
return self._normalized_manifest
115115
except ManifestNormalizationException:
116116
# if any error occurs, we just return the original manifest.
117+
# TODO: enable debug logging
117118
return self._resolved_manifest
118119

119120
def _get_manifest_streams(self) -> Iterable[Dict[str, Any]]:
@@ -162,22 +163,41 @@ def _prepare_definitions(self) -> None:
162163
if key != LINKED_TAG:
163164
self._normalized_manifest[DEF_TAG].pop(key, None)
164165

166+
def _extract_stream_schema(self, stream: Dict[str, Any]) -> None:
167+
"""
168+
Extract the schema from the stream and add it to the `schemas` tag.
169+
"""
170+
171+
stream_name = stream["name"]
172+
# copy the value of the SCHEMA_TAG to the SCHEMAS_TAG with the stream name as key
173+
schema = stream.get(SCHEMA_LOADER_TAG, {}).get(SCHEMA_TAG)
174+
if not SCHEMAS_TAG in self._normalized_manifest.keys():
175+
self._normalized_manifest[SCHEMAS_TAG] = {}
176+
# add stream schema to the SCHEMAS_TAG
177+
if not stream_name in self._normalized_manifest[SCHEMAS_TAG].keys():
178+
# add the schema to the SCHEMAS_TAG with the stream name as key
179+
self._normalized_manifest[SCHEMAS_TAG][stream_name] = schema
180+
165181
def _reference_schemas(self) -> None:
166182
"""
167-
Process the definitions in the manifest to move streams from definitions to the main stream list.
183+
Set the schema reference for the given stream in the manifest.
168184
This function modifies the manifest in place.
169185
"""
170186

171187
# reference the stream schema for the stream to where it's stored
172188
if SCHEMAS_TAG in self._normalized_manifest.keys():
173189
for stream in self._get_manifest_streams():
174-
stream_name = stream["name"]
175-
176-
if stream_name not in self._normalized_manifest[SCHEMAS_TAG].keys():
177-
raise ManifestNormalizationException(
178-
f"Stream {stream_name} not found in `schemas`. Please check the manifest."
179-
)
190+
self._extract_stream_schema(stream)
191+
self._set_stream_schema_ref(stream)
180192

193+
def _set_stream_schema_ref(self, stream: Dict[str, Any]) -> None:
194+
"""
195+
Set the schema reference for the given stream in the manifest.
196+
This function modifies the manifest in place.
197+
"""
198+
stream_name = stream["name"]
199+
if SCHEMAS_TAG in self._normalized_manifest.keys():
200+
if stream_name in self._normalized_manifest[SCHEMAS_TAG]:
181201
stream[SCHEMA_LOADER_TAG][SCHEMA_TAG] = self._create_schema_ref(stream_name)
182202

183203
def _replace_duplicates_with_refs(self, duplicates: DuplicatesType) -> None:
@@ -266,9 +286,6 @@ def _collect_duplicates(self) -> DuplicatesType:
266286
"""
267287
Traverse the JSON object and collect all potential duplicate values and objects.
268288
269-
Args:
270-
node: The JSON object to analyze.
271-
272289
Returns:
273290
duplicates: A dictionary of duplicate objects.
274291
"""
@@ -431,15 +448,15 @@ def _create_linked_definition_ref(self, type_key: str, key: str) -> Dict[str, st
431448

432449
return {"$ref": f"#/{DEF_TAG}/{LINKED_TAG}/{type_key}/{key}"}
433450

434-
def _create_schema_ref(self, ref_key: str) -> Dict[str, str]:
451+
def _create_schema_ref(self, key: str) -> Dict[str, str]:
435452
"""
436453
Create a reference object for stream schema using the specified key.
437454
438455
Args:
439-
ref_key: The reference key to use
456+
key: The reference key to use
440457
441458
Returns:
442459
A reference object in the proper format
443460
"""
444461

445-
return {"$ref": f"#/{SCHEMAS_TAG}/{ref_key}"}
462+
return {"$ref": f"#/{SCHEMAS_TAG}/{key}"}

unit_tests/sources/declarative/parsers/conftest.py

Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from typing import Any, Dict
66

77
import pytest
8+
import yaml
89

910

1011
@pytest.fixture
@@ -640,3 +641,239 @@ def expected_manifest_with_url_base_linked_definition_normalized() -> Dict[str,
640641
},
641642
},
642643
}
644+
645+
646+
@pytest.fixture
647+
def manifest_with_linked_definitions_url_base_authenticator_abnormal_schemas() -> Dict[str, Any]:
648+
with open(
649+
"unit_tests/sources/declarative/parsers/resources/abnormal_schemas_manifest.yaml",
650+
"r",
651+
) as file:
652+
return dict(yaml.safe_load(file))
653+
654+
655+
@pytest.fixture
656+
def expected_manifest_with_linked_definitions_url_base_authenticator_normalized() -> Dict[str, Any]:
657+
return {
658+
"version": "6.44.0",
659+
"type": "DeclarativeSource",
660+
"check": {"type": "CheckStream", "stream_names": ["pokemon"]},
661+
"definitions": {
662+
"linked": {
663+
"HttpRequester": {
664+
"url_base": "https://pokeapi.co/api/v1/",
665+
"authenticator": {
666+
"type": "ApiKeyAuthenticator",
667+
"api_token": '{{ config["api_key"] }}',
668+
"inject_into": {
669+
"type": "RequestOption",
670+
"field_name": "API_KEY",
671+
"inject_into": "header",
672+
},
673+
},
674+
}
675+
}
676+
},
677+
"streams": [
678+
{
679+
"type": "DeclarativeStream",
680+
"name": "pokemon",
681+
"retriever": {
682+
"type": "SimpleRetriever",
683+
"decoder": {"type": "JsonDecoder"},
684+
"requester": {
685+
"type": "HttpRequester",
686+
"path": "pokemon",
687+
"url_base": {
688+
"$ref": "#/definitions/linked/HttpRequester/url_base",
689+
},
690+
"http_method": "GET",
691+
"authenticator": {
692+
"$ref": "#/definitions/linked/HttpRequester/authenticator",
693+
},
694+
},
695+
"record_selector": {
696+
"type": "RecordSelector",
697+
"extractor": {"type": "DpathExtractor", "field_path": []},
698+
},
699+
},
700+
"schema_loader": {
701+
"type": "InlineSchemaLoader",
702+
"schema": {"$ref": "#/schemas/pokemon"},
703+
},
704+
},
705+
{
706+
"type": "DeclarativeStream",
707+
"name": "trainers",
708+
"retriever": {
709+
"type": "SimpleRetriever",
710+
"decoder": {"type": "JsonDecoder"},
711+
"requester": {
712+
"type": "HttpRequester",
713+
"path": "pokemon",
714+
"url_base": {
715+
"$ref": "#/definitions/linked/HttpRequester/url_base",
716+
},
717+
"http_method": "GET",
718+
"authenticator": {
719+
"$ref": "#/definitions/linked/HttpRequester/authenticator",
720+
},
721+
},
722+
"record_selector": {
723+
"type": "RecordSelector",
724+
"extractor": {"type": "DpathExtractor", "field_path": []},
725+
},
726+
},
727+
"schema_loader": {
728+
"type": "InlineSchemaLoader",
729+
"schema": {"$ref": "#/schemas/trainers"},
730+
},
731+
},
732+
{
733+
"type": "DeclarativeStream",
734+
"name": "items",
735+
"retriever": {
736+
"type": "SimpleRetriever",
737+
"decoder": {"type": "JsonDecoder"},
738+
"requester": {
739+
"type": "HttpRequester",
740+
"path": "pokemon",
741+
"url_base": "https://pokeapi.co/api/v2/",
742+
"http_method": "GET",
743+
"authenticator": {
744+
"$ref": "#/definitions/linked/HttpRequester/authenticator"
745+
},
746+
},
747+
"record_selector": {
748+
"type": "RecordSelector",
749+
"extractor": {"type": "DpathExtractor", "field_path": []},
750+
},
751+
},
752+
"schema_loader": {
753+
"type": "InlineSchemaLoader",
754+
"schema": {"$ref": "#/schemas/items"},
755+
},
756+
},
757+
{
758+
"type": "DeclarativeStream",
759+
"name": "location",
760+
"retriever": {
761+
"type": "SimpleRetriever",
762+
"decoder": {"type": "JsonDecoder"},
763+
"requester": {
764+
"type": "HttpRequester",
765+
"path": "location",
766+
"url_base": "https://pokeapi.co/api/v2/",
767+
"http_method": "GET",
768+
"authenticator": {
769+
"$ref": "#/definitions/linked/HttpRequester/authenticator"
770+
},
771+
},
772+
"record_selector": {
773+
"type": "RecordSelector",
774+
"extractor": {"type": "DpathExtractor", "field_path": []},
775+
},
776+
},
777+
"schema_loader": {
778+
"type": "InlineSchemaLoader",
779+
"schema": {"$ref": "#/schemas/location"},
780+
},
781+
},
782+
{
783+
"type": "DeclarativeStream",
784+
"name": "berries",
785+
"retriever": {
786+
"type": "SimpleRetriever",
787+
"decoder": {"type": "JsonDecoder"},
788+
"requester": {
789+
"type": "HttpRequester",
790+
"path": "berries",
791+
"url_base": "https://pokeapi.co/api/v2/",
792+
"http_method": "GET",
793+
"authenticator": {
794+
"$ref": "#/definitions/linked/HttpRequester/authenticator"
795+
},
796+
},
797+
"record_selector": {
798+
"type": "RecordSelector",
799+
"extractor": {"type": "DpathExtractor", "field_path": []},
800+
},
801+
},
802+
"schema_loader": {
803+
"type": "InlineSchemaLoader",
804+
"schema": {"$ref": "#/schemas/berries"},
805+
},
806+
},
807+
],
808+
"spec": {
809+
"type": "Spec",
810+
"connection_specification": {
811+
"type": "object",
812+
"$schema": "http://json-schema.org/draft-07/schema#",
813+
"required": ["api_key"],
814+
"properties": {
815+
"api_key": {
816+
"type": "string",
817+
"order": 0,
818+
"title": "API Key",
819+
"airbyte_secret": True,
820+
}
821+
},
822+
"additionalProperties": True,
823+
},
824+
},
825+
"metadata": {
826+
"assist": {},
827+
"testedStreams": {
828+
"berries": {"streamHash": None},
829+
"pokemon": {"streamHash": None},
830+
"location": {"streamHash": None},
831+
"trainers": {"streamHash": "ca4ee51a2aaa2a53b9c0b91881a84ad621da575f"},
832+
"items": {"streamHash": "12e624ecf47c6357c74c27d6a65c72e437b1534a"},
833+
},
834+
"autoImportSchema": {"berries": True, "pokemon": True, "location": True},
835+
},
836+
"schemas": {
837+
"berries": {
838+
"type": "object",
839+
"$schema": "http://json-schema.org/draft-07/schema#",
840+
"properties": {
841+
"name": {"type": "string"},
842+
"berry_type": {"type": "integer"},
843+
},
844+
"additionalProperties": True,
845+
},
846+
"pokemon": {
847+
"type": "object",
848+
"$schema": "http://json-schema.org/draft-07/schema#",
849+
"properties": {
850+
"name": {"type": "string"},
851+
"pokemon_type": {"type": "integer"},
852+
},
853+
},
854+
"location": {
855+
"type": "object",
856+
"$schema": "http://json-schema.org/draft-07/schema#",
857+
"properties": {
858+
"name": {"type": "string"},
859+
"location_type": {"type": "string"},
860+
},
861+
},
862+
"trainers": {
863+
"type": "object",
864+
"$schema": "http://json-schema.org/draft-07/schema#",
865+
"properties": {
866+
"name": {"type": "string"},
867+
"pokemon_type": {"type": "integer"},
868+
},
869+
},
870+
"items": {
871+
"type": "object",
872+
"$schema": "http://json-schema.org/draft-07/schema#",
873+
"properties": {
874+
"name": {"type": "string"},
875+
"pokemon_type": {"type": "integer"},
876+
},
877+
},
878+
},
879+
}

0 commit comments

Comments
 (0)