Skip to content

Commit 34f89fa

Browse files
committed
enable custom validation strategies
1 parent 8e36382 commit 34f89fa

File tree

3 files changed

+77
-34
lines changed

3 files changed

+77
-34
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4204,7 +4204,13 @@ definitions:
42044204
items:
42054205
type: object
42064206
examples:
4207-
- [{"name": "test stream", "$parameters": {"entity": "test entity"}, "primary_key": "test key"}]
4207+
- [
4208+
{
4209+
"name": "test stream",
4210+
"$parameters": { "entity": "test entity" },
4211+
"primary_key": "test key",
4212+
},
4213+
]
42084214
ParametrizedComponentsResolver:
42094215
type: object
42104216
title: Parametrized Components Resolver
@@ -4355,6 +4361,7 @@ definitions:
43554361
description: The condition that the specified config value will be evaluated against
43564362
anyOf:
43574363
- "$ref": "#/definitions/ValidateAdheresToSchema"
4364+
- "$ref": "#/definitions/CustomValidationStrategy"
43584365
PredicateValidator:
43594366
title: Predicate Validator
43604367
description: Validator that applies a validation strategy to a specified value.
@@ -4442,6 +4449,25 @@ definitions:
44424449
required:
44434450
- name
44444451
- age
4452+
CustomValidationStrategy:
4453+
title: Custom Validation Strategy
4454+
description: Custom validation strategy that allows for custom validation logic.
4455+
type: object
4456+
additionalProperties: true
4457+
required:
4458+
- type
4459+
- class_name
4460+
properties:
4461+
type:
4462+
type: string
4463+
enum: [CustomValidationStrategy]
4464+
class_name:
4465+
title: Class Name
4466+
description: Fully-qualified name of the class that will be implementing the custom decoding. Has to be a sub class of Decoder. The format is `source_<name>.<package>.<class_name>`.
4467+
type: string
4468+
additionalProperties: true
4469+
examples:
4470+
- "source_declarative_manifest.components.MyCustomValidationStrategy"
44454471
ConfigRemapField:
44464472
title: Remap Field
44474473
description: Transformation that remaps a field's value to another value based on a static map.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 46 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1586,6 +1586,19 @@ class ValidateAdheresToSchema(BaseModel):
15861586
)
15871587

15881588

1589+
class CustomValidationStrategy(BaseModel):
1590+
class Config:
1591+
extra = Extra.allow
1592+
1593+
type: Literal["CustomValidationStrategy"]
1594+
class_name: str = Field(
1595+
...,
1596+
description="Fully-qualified name of the class that will be implementing the custom decoding. Has to be a sub class of Decoder. The format is `source_<name>.<package>.<class_name>`.",
1597+
examples=["source_declarative_manifest.components.MyCustomValidationStrategy"],
1598+
title="Class Name",
1599+
)
1600+
1601+
15891602
class ConfigRemapField(BaseModel):
15901603
type: Literal["ConfigRemapField"]
15911604
map: Union[Dict[str, Any], str] = Field(
@@ -1767,30 +1780,23 @@ class DatetimeBasedCursor(BaseModel):
17671780
examples=["created_at", "{{ config['record_cursor'] }}"],
17681781
title="Cursor Field",
17691782
)
1770-
datetime_format: str = Field(
1771-
...,
1772-
description="The datetime format used to format the datetime values that are sent in outgoing requests to the API. Use placeholders starting with \"%\" to describe the format the API is using. The following placeholders are available:\n * **%s**: Epoch unix timestamp - `1686218963`\n * **%s_as_float**: Epoch unix timestamp in seconds as float with microsecond precision - `1686218963.123456`\n * **%ms**: Epoch unix timestamp (milliseconds) - `1686218963123`\n * **%a**: Weekday (abbreviated) - `Sun`\n * **%A**: Weekday (full) - `Sunday`\n * **%w**: Weekday (decimal) - `0` (Sunday), `6` (Saturday)\n * **%d**: Day of the month (zero-padded) - `01`, `02`, ..., `31`\n * **%b**: Month (abbreviated) - `Jan`\n * **%B**: Month (full) - `January`\n * **%m**: Month (zero-padded) - `01`, `02`, ..., `12`\n * **%y**: Year (without century, zero-padded) - `00`, `01`, ..., `99`\n * **%Y**: Year (with century) - `0001`, `0002`, ..., `9999`\n * **%H**: Hour (24-hour, zero-padded) - `00`, `01`, ..., `23`\n * **%I**: Hour (12-hour, zero-padded) - `01`, `02`, ..., `12`\n * **%p**: AM/PM indicator\n * **%M**: Minute (zero-padded) - `00`, `01`, ..., `59`\n * **%S**: Second (zero-padded) - `00`, `01`, ..., `59`\n * **%f**: Microsecond (zero-padded to 6 digits) - `000000`\n * **%_ms**: Millisecond (zero-padded to 3 digits) - `000`\n * **%z**: UTC offset - `(empty)`, `+0000`, `-04:00`\n * **%Z**: Time zone name - `(empty)`, `UTC`, `GMT`\n * **%j**: Day of the year (zero-padded) - `001`, `002`, ..., `366`\n * **%U**: Week number of the year (starting Sunday) - `00`, ..., `53`\n * **%W**: Week number of the year (starting Monday) - `00`, ..., `53`\n * **%c**: Date and time - `Tue Aug 16 21:30:00 1988`\n * **%x**: Date standard format - `08/16/1988`\n * **%X**: Time standard format - `21:30:00`\n * **%%**: Literal '%' character\n\n Some placeholders depend on the locale of the underlying system - in most cases this locale is configured as en/US. For more information see the [Python documentation](https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes).\n",
1773-
examples=["%Y-%m-%dT%H:%M:%S.%f%z", "%Y-%m-%d", "%s", "%ms", "%s_as_float"],
1774-
title="Outgoing Datetime Format",
1783+
cursor_datetime_formats: Optional[List[str]] = Field(
1784+
None,
1785+
description="The possible formats for the cursor field, in order of preference. The first format that matches the cursor field value will be used to parse it. If not provided, the `datetime_format` will be used.",
1786+
title="Cursor Datetime Formats",
17751787
)
1776-
start_datetime: Union[str, MinMaxDatetime] = Field(
1788+
start_datetime: Union[MinMaxDatetime, str] = Field(
17771789
...,
17781790
description="The datetime that determines the earliest record that should be synced.",
17791791
examples=["2020-01-1T00:00:00Z", "{{ config['start_time'] }}"],
17801792
title="Start Datetime",
17811793
)
1782-
cursor_datetime_formats: Optional[List[str]] = Field(
1783-
None,
1784-
description="The possible formats for the cursor field, in order of preference. The first format that matches the cursor field value will be used to parse it. If not provided, the `datetime_format` will be used.",
1785-
title="Cursor Datetime Formats",
1786-
)
1787-
cursor_granularity: Optional[str] = Field(
1794+
start_time_option: Optional[RequestOption] = Field(
17881795
None,
1789-
description="Smallest increment the datetime_format has (ISO 8601 duration) that is used to ensure the start of a slice does not overlap with the end of the previous one, e.g. for %Y-%m-%d the granularity should be P1D, for %Y-%m-%dT%H:%M:%SZ the granularity should be PT1S. Given this field is provided, `step` needs to be provided as well.",
1790-
examples=["PT1S"],
1791-
title="Cursor Granularity",
1796+
description="Optionally configures how the start datetime will be sent in requests to the source API.",
1797+
title="Inject Start Time Into Outgoing HTTP Request",
17921798
)
1793-
end_datetime: Optional[Union[str, MinMaxDatetime]] = Field(
1799+
end_datetime: Optional[Union[MinMaxDatetime, str]] = Field(
17941800
None,
17951801
description="The datetime that determines the last record that should be synced. If not provided, `{{ now_utc() }}` will be used.",
17961802
examples=["2021-01-1T00:00:00Z", "{{ now_utc() }}", "{{ day_delta(-1) }}"],
@@ -1801,6 +1807,18 @@ class DatetimeBasedCursor(BaseModel):
18011807
description="Optionally configures how the end datetime will be sent in requests to the source API.",
18021808
title="Inject End Time Into Outgoing HTTP Request",
18031809
)
1810+
datetime_format: str = Field(
1811+
...,
1812+
description="The datetime format used to format the datetime values that are sent in outgoing requests to the API. Use placeholders starting with \"%\" to describe the format the API is using. The following placeholders are available:\n * **%s**: Epoch unix timestamp - `1686218963`\n * **%s_as_float**: Epoch unix timestamp in seconds as float with microsecond precision - `1686218963.123456`\n * **%ms**: Epoch unix timestamp (milliseconds) - `1686218963123`\n * **%a**: Weekday (abbreviated) - `Sun`\n * **%A**: Weekday (full) - `Sunday`\n * **%w**: Weekday (decimal) - `0` (Sunday), `6` (Saturday)\n * **%d**: Day of the month (zero-padded) - `01`, `02`, ..., `31`\n * **%b**: Month (abbreviated) - `Jan`\n * **%B**: Month (full) - `January`\n * **%m**: Month (zero-padded) - `01`, `02`, ..., `12`\n * **%y**: Year (without century, zero-padded) - `00`, `01`, ..., `99`\n * **%Y**: Year (with century) - `0001`, `0002`, ..., `9999`\n * **%H**: Hour (24-hour, zero-padded) - `00`, `01`, ..., `23`\n * **%I**: Hour (12-hour, zero-padded) - `01`, `02`, ..., `12`\n * **%p**: AM/PM indicator\n * **%M**: Minute (zero-padded) - `00`, `01`, ..., `59`\n * **%S**: Second (zero-padded) - `00`, `01`, ..., `59`\n * **%f**: Microsecond (zero-padded to 6 digits) - `000000`\n * **%_ms**: Millisecond (zero-padded to 3 digits) - `000`\n * **%z**: UTC offset - `(empty)`, `+0000`, `-04:00`\n * **%Z**: Time zone name - `(empty)`, `UTC`, `GMT`\n * **%j**: Day of the year (zero-padded) - `001`, `002`, ..., `366`\n * **%U**: Week number of the year (starting Sunday) - `00`, ..., `53`\n * **%W**: Week number of the year (starting Monday) - `00`, ..., `53`\n * **%c**: Date and time - `Tue Aug 16 21:30:00 1988`\n * **%x**: Date standard format - `08/16/1988`\n * **%X**: Time standard format - `21:30:00`\n * **%%**: Literal '%' character\n\n Some placeholders depend on the locale of the underlying system - in most cases this locale is configured as en/US. For more information see the [Python documentation](https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes).\n",
1813+
examples=["%Y-%m-%dT%H:%M:%S.%f%z", "%Y-%m-%d", "%s", "%ms", "%s_as_float"],
1814+
title="Outgoing Datetime Format",
1815+
)
1816+
cursor_granularity: Optional[str] = Field(
1817+
None,
1818+
description="Smallest increment the datetime_format has (ISO 8601 duration) that is used to ensure the start of a slice does not overlap with the end of the previous one, e.g. for %Y-%m-%d the granularity should be P1D, for %Y-%m-%dT%H:%M:%SZ the granularity should be PT1S. Given this field is provided, `step` needs to be provided as well.",
1819+
examples=["PT1S"],
1820+
title="Cursor Granularity",
1821+
)
18041822
is_data_feed: Optional[bool] = Field(
18051823
None,
18061824
description="A data feed API is an API that does not allow filtering and paginates the content from the most recent to the least recent. Given this, the CDK needs to know when to stop paginating and this field will generate a stop condition for pagination.",
@@ -1839,11 +1857,6 @@ class DatetimeBasedCursor(BaseModel):
18391857
examples=["starting_time"],
18401858
title="Partition Field Start",
18411859
)
1842-
start_time_option: Optional[RequestOption] = Field(
1843-
None,
1844-
description="Optionally configures how the start datetime will be sent in requests to the source API.",
1845-
title="Inject Start Time Into Outgoing HTTP Request",
1846-
)
18471860
step: Optional[str] = Field(
18481861
None,
18491862
description="The size of the time window (ISO8601 duration). Given this field is provided, `cursor_granularity` needs to be provided as well.",
@@ -1908,10 +1921,10 @@ class DefaultErrorHandler(BaseModel):
19081921
List[
19091922
Union[
19101923
ConstantBackoffStrategy,
1911-
CustomBackoffStrategy,
19121924
ExponentialBackoffStrategy,
19131925
WaitTimeFromHeader,
19141926
WaitUntilTimeFromHeader,
1927+
CustomBackoffStrategy,
19151928
]
19161929
]
19171930
] = Field(
@@ -2030,7 +2043,7 @@ class DpathValidator(BaseModel):
20302043
],
20312044
title="Field Path",
20322045
)
2033-
validation_strategy: ValidateAdheresToSchema = Field(
2046+
validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field(
20342047
...,
20352048
description="The condition that the specified config value will be evaluated against",
20362049
title="Validation Strategy",
@@ -2285,12 +2298,12 @@ class Config:
22852298
ApiKeyAuthenticator,
22862299
BasicHttpAuthenticator,
22872300
BearerAuthenticator,
2288-
CustomAuthenticator,
22892301
OAuthAuthenticator,
22902302
JwtAuthenticator,
22912303
SessionTokenAuthenticator,
2292-
NoAuth,
22932304
LegacySessionTokenAuthenticator,
2305+
CustomAuthenticator,
2306+
NoAuth,
22942307
],
22952308
] = Field(
22962309
...,
@@ -2374,7 +2387,6 @@ class Config:
23742387
InlineSchemaLoader,
23752388
DynamicSchemaLoader,
23762389
JsonFileSchemaLoader,
2377-
CustomSchemaLoader,
23782390
List[
23792391
Union[
23802392
InlineSchemaLoader,
@@ -2383,6 +2395,7 @@ class Config:
23832395
CustomSchemaLoader,
23842396
]
23852397
],
2398+
CustomSchemaLoader,
23862399
]
23872400
] = Field(
23882401
None,
@@ -2393,13 +2406,13 @@ class Config:
23932406
List[
23942407
Union[
23952408
AddFields,
2396-
CustomTransformation,
23972409
RemoveFields,
23982410
KeysToLower,
23992411
KeysToSnakeCase,
24002412
FlattenFields,
24012413
DpathFlattenFields,
24022414
KeysReplace,
2415+
CustomTransformation,
24032416
]
24042417
]
24052418
] = Field(
@@ -2631,7 +2644,7 @@ class HttpRequester(BaseModelWithDeprecations):
26312644

26322645
class DynamicSchemaLoader(BaseModel):
26332646
type: Literal["DynamicSchemaLoader"]
2634-
retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field(
2647+
retriever: Union[SimpleRetriever, AsyncRetriever, CustomRetriever] = Field(
26352648
...,
26362649
description="Component used to coordinate how records are extracted across stream slices and request pages.",
26372650
title="Retriever",
@@ -2645,13 +2658,13 @@ class DynamicSchemaLoader(BaseModel):
26452658
List[
26462659
Union[
26472660
AddFields,
2648-
CustomTransformation,
26492661
RemoveFields,
26502662
KeysToLower,
26512663
KeysToSnakeCase,
26522664
FlattenFields,
26532665
DpathFlattenFields,
26542666
KeysReplace,
2667+
CustomTransformation,
26552668
]
26562669
]
26572670
] = Field(
@@ -2895,7 +2908,7 @@ class AsyncRetriever(BaseModel):
28952908
] = Field(
28962909
None,
28972910
description="Component decoding the response so records can be extracted.",
2898-
title="Decoder",
2911+
title="HTTP Response Format",
28992912
)
29002913
download_decoder: Optional[
29012914
Union[
@@ -2911,7 +2924,7 @@ class AsyncRetriever(BaseModel):
29112924
] = Field(
29122925
None,
29132926
description="Component decoding the download response so records can be extracted.",
2914-
title="Download Decoder",
2927+
title="Download HTTP Response Format",
29152928
)
29162929
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
29172930

@@ -2935,7 +2948,7 @@ class GroupingPartitionRouter(BaseModel):
29352948
title="Group Size",
29362949
)
29372950
underlying_partition_router: Union[
2938-
CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter
2951+
ListPartitionRouter, SubstreamPartitionRouter, CustomPartitionRouter
29392952
] = Field(
29402953
...,
29412954
description="The partition router whose output will be grouped. This can be any valid partition router component.",
@@ -2951,7 +2964,7 @@ class GroupingPartitionRouter(BaseModel):
29512964

29522965
class HttpComponentsResolver(BaseModel):
29532966
type: Literal["HttpComponentsResolver"]
2954-
retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field(
2967+
retriever: Union[SimpleRetriever, AsyncRetriever, CustomRetriever] = Field(
29552968
...,
29562969
description="Component used to coordinate how records are extracted across stream slices and request pages.",
29572970
title="Retriever",

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,9 @@
222222
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
223223
CustomTransformation as CustomTransformationModel,
224224
)
225+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
226+
CustomValidationStrategy as CustomValidationStrategyModel,
227+
)
225228
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
226229
DatetimeBasedCursor as DatetimeBasedCursorModel,
227230
)
@@ -683,6 +686,7 @@ def _init_mappings(self) -> None:
683686
CustomPaginationStrategyModel: self.create_custom_component,
684687
CustomPartitionRouterModel: self.create_custom_component,
685688
CustomTransformationModel: self.create_custom_component,
689+
CustomValidationStrategyModel: self.create_custom_component,
686690
DatetimeBasedCursorModel: self.create_datetime_based_cursor,
687691
DeclarativeStreamModel: self.create_declarative_stream,
688692
DefaultErrorHandlerModel: self.create_default_error_handler,

0 commit comments

Comments
 (0)