Skip to content

Commit ff10aa3

Browse files
committed
add ConfigMigration class and new spec tests
1 parent c251f42 commit ff10aa3

File tree

10 files changed

+399
-80
lines changed

10 files changed

+399
-80
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 33 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3810,26 +3810,13 @@ definitions:
38103810
properties:
38113811
config_migrations:
38123812
title: Config Migrations
3813-
description: The config will be migrated according to these transformations and updated within the platform for subsequent syncs.
3814-
type: object
3815-
required:
3816-
- description
3817-
- transformations
3818-
properties:
3819-
description:
3820-
type: string
3821-
description: The description/purpose of the config migration.
3822-
transformations:
3823-
type: array
3824-
items:
3825-
anyOf:
3826-
- "$ref": "#/definitions/ConfigRemapField"
3827-
- "$ref": "#/definitions/ConfigAddFields"
3828-
- "$ref": "#/definitions/ConfigRemoveFields"
3829-
default: []
3813+
description: The discrete migrations that will be applied on the incoming config. Each migration will be applied in the order they are defined.
3814+
type: array
3815+
items:
3816+
"$ref": "#/definitions/ConfigMigration"
38303817
transformations:
38313818
title: Transformations
3832-
description: The list of transformations that will be applied on the incoming config at the start of a sync.
3819+
description: The list of transformations that will be applied on the incoming config at the start of each sync. The transformations will be applied in the order they are defined.
38333820
type: array
38343821
items:
38353822
anyOf:
@@ -3838,12 +3825,32 @@ definitions:
38383825
- "$ref": "#/definitions/ConfigRemoveFields"
38393826
validations:
38403827
title: Validations
3841-
description: The list of validations that will be performed on the incoming config before starting a sync
3828+
description: The list of validations that will be performed on the incoming config at the start of each sync.
38423829
type: array
38433830
items:
38443831
anyOf:
38453832
- "$ref": "#/definitions/DpathValidator"
38463833
- "$ref": "#/definitions/PredicateValidator"
3834+
ConfigMigration:
3835+
title: Config Migration
3836+
description: A config migration that will be applied on the incoming config at the start of a sync.
3837+
type: object
3838+
required:
3839+
- transformations
3840+
properties:
3841+
description:
3842+
type: string
3843+
description: The description/purpose of the config migration.
3844+
transformations:
3845+
title: Transformations
3846+
description: The list of transformations that will attempt to be applied on an incoming unmigrated config. The transformations will be applied in the order they are defined.
3847+
type: array
3848+
items:
3849+
anyOf:
3850+
- "$ref": "#/definitions/ConfigRemapField"
3851+
- "$ref": "#/definitions/ConfigAddFields"
3852+
- "$ref": "#/definitions/ConfigRemoveFields"
3853+
default: []
38473854
SubstreamPartitionRouter:
38483855
title: Substream Partition Router
38493856
description: Partition router that is used to retrieve records that have been partitioned according to records from the specified parent streams. An example of a parent stream is automobile brands and the substream would be the various car models associated with each branch.
@@ -4275,14 +4282,14 @@ definitions:
42754282
type: object
42764283
required:
42774284
- type
4278-
- schema
4285+
- base_schema
42794286
properties:
42804287
type:
42814288
type: string
42824289
enum: [ValidateAdheresToSchema]
4283-
schema:
4284-
title: JSON Schema
4285-
description: The JSON schema used for validation.
4290+
base_schema:
4291+
title: Base JSON Schema
4292+
description: The base JSON schema against which the user-provided schema will be validated.
42864293
type:
42874294
- string
42884295
- object
@@ -4406,9 +4413,10 @@ definitions:
44064413
description: A list of field pointers to be removed from the config.
44074414
type: array
44084415
items:
4409-
type: string
4416+
items:
4417+
type: string
44104418
examples:
4411-
- ["marketplace"]
4419+
- ["tags"]
44124420
- [["content", "html"], ["content", "plain_text"]]
44134421
condition:
44144422
description: Fields will be removed if expression is evaluated to True.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1531,10 +1531,9 @@ class Config:
15311531

15321532
class ValidateAdheresToSchema(BaseModel):
15331533
type: Literal["ValidateAdheresToSchema"]
1534-
schema_: Union[str, Dict[str, Any]] = Field(
1534+
base_schema: Union[str, Dict[str, Any]] = Field(
15351535
...,
1536-
alias="schema",
1537-
description="The JSON schema used for validation.",
1536+
description="The base JSON schema against which the user-provided schema will be validated.",
15381537
examples=[
15391538
"{{ config['report_validation_schema'] }}",
15401539
'\'{\n "$schema": "http://json-schema.org/draft-07/schema#",\n "title": "Person",\n "type": "object",\n "properties": {\n "name": {\n "type": "string",\n "description": "The person\'s name"\n },\n "age": {\n "type": "integer",\n "minimum": 0,\n "description": "The person\'s age"\n }\n },\n "required": ["name", "age"]\n}\'\n',
@@ -1553,7 +1552,7 @@ class ValidateAdheresToSchema(BaseModel):
15531552
"required": ["name", "age"],
15541553
},
15551554
],
1556-
title="JSON Schema",
1555+
title="Base JSON Schema",
15571556
)
15581557

15591558

@@ -1583,10 +1582,10 @@ class ConfigRemapField(BaseModel):
15831582

15841583
class ConfigRemoveFields(BaseModel):
15851584
type: Literal["ConfigRemoveFields"]
1586-
field_pointers: List[str] = Field(
1585+
field_pointers: List[List[str]] = Field(
15871586
...,
15881587
description="A list of field pointers to be removed from the config.",
1589-
examples=[["marketplace"], [["content", "html"], ["content", "plain_text"]]],
1588+
examples=[["tags"], [["content", "html"], ["content", "plain_text"]]],
15901589
title="Field Pointers",
15911590
)
15921591
condition: Optional[str] = Field(
@@ -2104,29 +2103,35 @@ class Config:
21042103
)
21052104

21062105

2107-
class ConfigMigrations(BaseModel):
2108-
description: str = Field(
2109-
..., description="The description/purpose of the config migration."
2106+
class ConfigMigration(BaseModel):
2107+
description: Optional[str] = Field(
2108+
None, description="The description/purpose of the config migration."
2109+
)
2110+
transformations: List[
2111+
Union[ConfigRemapField, ConfigAddFields, ConfigRemoveFields]
2112+
] = Field(
2113+
...,
2114+
description="The list of transformations that will attempt to be applied on an incoming unmigrated config. The transformations will be applied in the order they are defined.",
2115+
title="Transformations",
21102116
)
2111-
transformations: List[Union[ConfigRemapField, ConfigAddFields, ConfigRemoveFields]]
21122117

21132118

21142119
class ConfigNormalizationRules(BaseModel):
2115-
config_migrations: Optional[ConfigMigrations] = Field(
2120+
config_migrations: Optional[List[ConfigMigration]] = Field(
21162121
None,
2117-
description="The config will be migrated according to these transformations and updated within the platform for subsequent syncs.",
2122+
description="The discrete migrations that will be applied on the incoming config. Each migration will be applied in the order they are defined.",
21182123
title="Config Migrations",
21192124
)
21202125
transformations: Optional[
21212126
List[Union[ConfigRemapField, ConfigAddFields, ConfigRemoveFields]]
21222127
] = Field(
21232128
None,
2124-
description="The list of transformations that will be applied on the incoming config at the start of a sync.",
2129+
description="The list of transformations that will be applied on the incoming config at the start of each sync. The transformations will be applied in the order they are defined.",
21252130
title="Transformations",
21262131
)
21272132
validations: Optional[List[Union[DpathValidator, PredicateValidator]]] = Field(
21282133
None,
2129-
description="The list of validations that will be performed on the incoming config before starting a sync",
2134+
description="The list of validations that will be performed on the incoming config at the start of each sync.",
21302135
title="Validations",
21312136
)
21322137

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 99 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
Optional,
2020
Type,
2121
Union,
22+
cast,
2223
get_args,
2324
get_origin,
2425
get_type_hints,
@@ -154,9 +155,21 @@
154155
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
155156
ConcurrencyLevel as ConcurrencyLevelModel,
156157
)
158+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
159+
ConfigAddFields as ConfigAddFieldsModel,
160+
)
157161
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
158162
ConfigComponentsResolver as ConfigComponentsResolverModel,
159163
)
164+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
165+
ConfigMigration as ConfigMigrationModel,
166+
)
167+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
168+
ConfigRemapField as ConfigRemapFieldModel,
169+
)
170+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
171+
ConfigRemoveFields as ConfigRemoveFieldsModel,
172+
)
160173
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
161174
ConstantBackoffStrategy as ConstantBackoffStrategyModel,
162175
)
@@ -226,6 +239,9 @@
226239
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
227240
DpathFlattenFields as DpathFlattenFieldsModel,
228241
)
242+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
243+
DpathValidator as DpathValidatorModel,
244+
)
229245
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
230246
DynamicSchemaLoader as DynamicSchemaLoaderModel,
231247
)
@@ -337,6 +353,9 @@
337353
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
338354
ParentStreamConfig as ParentStreamConfigModel,
339355
)
356+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
357+
PredicateValidator as PredicateValidatorModel,
358+
)
340359
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
341360
PropertiesFromEndpoint as PropertiesFromEndpointModel,
342361
)
@@ -401,6 +420,9 @@
401420
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
402421
UnlimitedCallRatePolicy as UnlimitedCallRatePolicyModel,
403422
)
423+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
424+
ValidateAdheresToSchema as ValidateAdheresToSchemaModel,
425+
)
404426
from airbyte_cdk.sources.declarative.models.declarative_component_schema import ValueType
405427
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
406428
WaitTimeFromHeader as WaitTimeFromHeaderModel,
@@ -506,14 +528,19 @@
506528
TypesMap,
507529
)
508530
from airbyte_cdk.sources.declarative.schema.composite_schema_loader import CompositeSchemaLoader
509-
from airbyte_cdk.sources.declarative.spec import Spec
531+
from airbyte_cdk.sources.declarative.spec import ConfigMigration, Spec
510532
from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicer
511533
from airbyte_cdk.sources.declarative.transformations import (
512534
AddFields,
513535
RecordTransformation,
514536
RemoveFields,
515537
)
516538
from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition
539+
from airbyte_cdk.sources.declarative.transformations.config_transformations import (
540+
ConfigAddFields,
541+
ConfigRemapField,
542+
ConfigRemoveFields,
543+
)
517544
from airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields import (
518545
DpathFlattenFields,
519546
KeyTransformation,
@@ -530,6 +557,11 @@
530557
from airbyte_cdk.sources.declarative.transformations.keys_to_snake_transformation import (
531558
KeysToSnakeCaseTransformation,
532559
)
560+
from airbyte_cdk.sources.declarative.validators import (
561+
DpathValidator,
562+
PredicateValidator,
563+
ValidateAdheresToSchema,
564+
)
533565
from airbyte_cdk.sources.message import (
534566
InMemoryMessageRepository,
535567
LogAppenderMessageRepositoryDecorator,
@@ -617,6 +649,10 @@ def _init_mappings(self) -> None:
617649
CheckDynamicStreamModel: self.create_check_dynamic_stream,
618650
CompositeErrorHandlerModel: self.create_composite_error_handler,
619651
ConcurrencyLevelModel: self.create_concurrency_level,
652+
ConfigMigrationModel: self.create_config_migration,
653+
ConfigAddFieldsModel: self.create_config_add_fields,
654+
ConfigRemapFieldModel: self.create_config_remap_field,
655+
ConfigRemoveFieldsModel: self.create_config_remove_fields,
620656
ConstantBackoffStrategyModel: self.create_constant_backoff_strategy,
621657
CsvDecoderModel: self.create_csv_decoder,
622658
CursorPaginationModel: self.create_cursor_pagination,
@@ -640,6 +676,7 @@ def _init_mappings(self) -> None:
640676
DefaultErrorHandlerModel: self.create_default_error_handler,
641677
DefaultPaginatorModel: self.create_default_paginator,
642678
DpathExtractorModel: self.create_dpath_extractor,
679+
DpathValidatorModel: self.create_dpath_validator,
643680
ResponseToFileExtractorModel: self.create_response_to_file_extractor,
644681
ExponentialBackoffStrategyModel: self.create_exponential_backoff_strategy,
645682
SessionTokenAuthenticatorModel: self.create_session_token_authenticator,
@@ -673,6 +710,7 @@ def _init_mappings(self) -> None:
673710
OffsetIncrementModel: self.create_offset_increment,
674711
PageIncrementModel: self.create_page_increment,
675712
ParentStreamConfigModel: self.create_parent_stream_config,
713+
PredicateValidatorModel: self.create_predicate_validator,
676714
PropertiesFromEndpointModel: self.create_properties_from_endpoint,
677715
PropertyChunkingModel: self.create_property_chunking,
678716
QueryPropertiesModel: self.create_query_properties,
@@ -687,6 +725,7 @@ def _init_mappings(self) -> None:
687725
StateDelegatingStreamModel: self.create_state_delegating_stream,
688726
SpecModel: self.create_spec,
689727
SubstreamPartitionRouterModel: self.create_substream_partition_router,
728+
ValidateAdheresToSchemaModel: self.create_validate_adheres_to_schema,
690729
WaitTimeFromHeaderModel: self.create_wait_time_from_header,
691730
WaitUntilTimeFromHeaderModel: self.create_wait_until_time_from_header,
692731
AsyncRetrieverModel: self.create_async_retriever,
@@ -779,6 +818,65 @@ def _collect_model_deprecations(self, model: BaseModelWithDeprecations) -> None:
779818
if log not in self._collected_deprecation_logs:
780819
self._collected_deprecation_logs.append(log)
781820

821+
822+
def create_config_migration(self, model: ConfigMigrationModel, config: Config) -> ConfigMigration:
823+
transformations = []
824+
for transformation in model.transformations:
825+
transformations.append(self._create_component_from_model(transformation, config))
826+
827+
return ConfigMigration(
828+
description=model.description,
829+
transformations=transformations,
830+
)
831+
832+
def create_config_add_fields(self, model: ConfigAddFieldsModel, config: Config) -> ConfigAddFields:
833+
fields = [self._create_component_from_model(field, config) for field in model.fields]
834+
return ConfigAddFields(
835+
fields=fields,
836+
condition=model.condition or "",
837+
)
838+
839+
@staticmethod
840+
def create_config_remove_fields(model: ConfigRemoveFieldsModel) -> ConfigRemoveFields:
841+
return ConfigRemoveFields(
842+
field_pointers=model.field_pointers,
843+
condition=model.condition or "",
844+
)
845+
846+
@staticmethod
847+
def create_config_remap_field(model: ConfigRemapFieldModel, config: Config) -> ConfigRemapField:
848+
mapping = cast(Mapping[str, Any], model.map)
849+
return ConfigRemapField(
850+
map=mapping,
851+
field_path=model.field_path,
852+
config=config,
853+
)
854+
855+
856+
def create_dpath_validator(self, model: DpathValidatorModel, config: Config) -> DpathValidator:
857+
858+
strategy = self._create_component_from_model(model.validation_strategy, config)
859+
860+
return DpathValidator(
861+
field_path=model.field_path,
862+
strategy=strategy,
863+
)
864+
865+
def create_predicate_validator(self, model: PredicateValidatorModel, config: Config) -> PredicateValidator:
866+
strategy = self._create_component_from_model(model.validation_strategy, config)
867+
868+
return PredicateValidator(
869+
value=model.value,
870+
strategy=strategy,
871+
)
872+
873+
@staticmethod
874+
def create_validate_adheres_to_schema(model: ValidateAdheresToSchemaModel) -> ValidateAdheresToSchema:
875+
base_schema = cast(Mapping[str, Any], model.base_schema)
876+
return ValidateAdheresToSchema(
877+
schema=base_schema,
878+
)
879+
782880
@staticmethod
783881
def create_added_field_definition(
784882
model: AddedFieldDefinitionModel, config: Config, **kwargs: Any

airbyte_cdk/sources/declarative/spec/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
44

5-
from airbyte_cdk.sources.declarative.spec.spec import Spec
5+
from airbyte_cdk.sources.declarative.spec.spec import ConfigMigration, Spec
66

7-
__all__ = ["Spec"]
7+
__all__ = ["Spec", "ConfigMigration"]

0 commit comments

Comments
 (0)