Skip to content

Commit bfa4fb7

Browse files
authored
feat: enable custom validation strategies (#610)
1 parent 8e36382 commit bfa4fb7

File tree

3 files changed

+79
-37
lines changed

3 files changed

+79
-37
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4204,7 +4204,13 @@ definitions:
42044204
items:
42054205
type: object
42064206
examples:
4207-
- [{"name": "test stream", "$parameters": {"entity": "test entity"}, "primary_key": "test key"}]
4207+
- [
4208+
{
4209+
"name": "test stream",
4210+
"$parameters": { "entity": "test entity" },
4211+
"primary_key": "test key",
4212+
},
4213+
]
42084214
ParametrizedComponentsResolver:
42094215
type: object
42104216
title: Parametrized Components Resolver
@@ -4355,6 +4361,7 @@ definitions:
43554361
description: The condition that the specified config value will be evaluated against
43564362
anyOf:
43574363
- "$ref": "#/definitions/ValidateAdheresToSchema"
4364+
- "$ref": "#/definitions/CustomValidationStrategy"
43584365
PredicateValidator:
43594366
title: Predicate Validator
43604367
description: Validator that applies a validation strategy to a specified value.
@@ -4389,6 +4396,7 @@ definitions:
43894396
description: The validation strategy to apply to the value.
43904397
anyOf:
43914398
- "$ref": "#/definitions/ValidateAdheresToSchema"
4399+
- "$ref": "#/definitions/CustomValidationStrategy"
43924400
ValidateAdheresToSchema:
43934401
title: Validate Adheres To Schema
43944402
description: Validates that a user-provided schema adheres to a specified JSON schema.
@@ -4442,6 +4450,25 @@ definitions:
44424450
required:
44434451
- name
44444452
- age
4453+
CustomValidationStrategy:
4454+
title: Custom Validation Strategy
4455+
description: Custom validation strategy that allows for custom validation logic.
4456+
type: object
4457+
additionalProperties: true
4458+
required:
4459+
- type
4460+
- class_name
4461+
properties:
4462+
type:
4463+
type: string
4464+
enum: [CustomValidationStrategy]
4465+
class_name:
4466+
title: Class Name
4467+
description: Fully-qualified name of the class that will be implementing the custom validation strategy. Has to be a sub class of ValidationStrategy. The format is `source_<name>.<package>.<class_name>`.
4468+
type: string
4469+
additionalProperties: true
4470+
examples:
4471+
- "source_declarative_manifest.components.MyCustomValidationStrategy"
44454472
ConfigRemapField:
44464473
title: Remap Field
44474474
description: Transformation that remaps a field's value to another value based on a static map.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 47 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2-
31
# generated by datamodel-codegen:
42
# filename: declarative_component_schema.yaml
53

@@ -1586,6 +1584,19 @@ class ValidateAdheresToSchema(BaseModel):
15861584
)
15871585

15881586

1587+
class CustomValidationStrategy(BaseModel):
1588+
class Config:
1589+
extra = Extra.allow
1590+
1591+
type: Literal["CustomValidationStrategy"]
1592+
class_name: str = Field(
1593+
...,
1594+
description="Fully-qualified name of the class that will be implementing the custom validation strategy. Has to be a sub class of ValidationStrategy. The format is `source_<name>.<package>.<class_name>`.",
1595+
examples=["source_declarative_manifest.components.MyCustomValidationStrategy"],
1596+
title="Class Name",
1597+
)
1598+
1599+
15891600
class ConfigRemapField(BaseModel):
15901601
type: Literal["ConfigRemapField"]
15911602
map: Union[Dict[str, Any], str] = Field(
@@ -1767,30 +1778,23 @@ class DatetimeBasedCursor(BaseModel):
17671778
examples=["created_at", "{{ config['record_cursor'] }}"],
17681779
title="Cursor Field",
17691780
)
1770-
datetime_format: str = Field(
1771-
...,
1772-
description="The datetime format used to format the datetime values that are sent in outgoing requests to the API. Use placeholders starting with \"%\" to describe the format the API is using. The following placeholders are available:\n * **%s**: Epoch unix timestamp - `1686218963`\n * **%s_as_float**: Epoch unix timestamp in seconds as float with microsecond precision - `1686218963.123456`\n * **%ms**: Epoch unix timestamp (milliseconds) - `1686218963123`\n * **%a**: Weekday (abbreviated) - `Sun`\n * **%A**: Weekday (full) - `Sunday`\n * **%w**: Weekday (decimal) - `0` (Sunday), `6` (Saturday)\n * **%d**: Day of the month (zero-padded) - `01`, `02`, ..., `31`\n * **%b**: Month (abbreviated) - `Jan`\n * **%B**: Month (full) - `January`\n * **%m**: Month (zero-padded) - `01`, `02`, ..., `12`\n * **%y**: Year (without century, zero-padded) - `00`, `01`, ..., `99`\n * **%Y**: Year (with century) - `0001`, `0002`, ..., `9999`\n * **%H**: Hour (24-hour, zero-padded) - `00`, `01`, ..., `23`\n * **%I**: Hour (12-hour, zero-padded) - `01`, `02`, ..., `12`\n * **%p**: AM/PM indicator\n * **%M**: Minute (zero-padded) - `00`, `01`, ..., `59`\n * **%S**: Second (zero-padded) - `00`, `01`, ..., `59`\n * **%f**: Microsecond (zero-padded to 6 digits) - `000000`\n * **%_ms**: Millisecond (zero-padded to 3 digits) - `000`\n * **%z**: UTC offset - `(empty)`, `+0000`, `-04:00`\n * **%Z**: Time zone name - `(empty)`, `UTC`, `GMT`\n * **%j**: Day of the year (zero-padded) - `001`, `002`, ..., `366`\n * **%U**: Week number of the year (starting Sunday) - `00`, ..., `53`\n * **%W**: Week number of the year (starting Monday) - `00`, ..., `53`\n * **%c**: Date and time - `Tue Aug 16 21:30:00 1988`\n * **%x**: Date standard format - `08/16/1988`\n * **%X**: Time standard format - `21:30:00`\n * **%%**: Literal '%' character\n\n Some placeholders depend on the locale of the underlying system - in most cases this locale is configured as en/US. For more information see the [Python documentation](https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes).\n",
1773-
examples=["%Y-%m-%dT%H:%M:%S.%f%z", "%Y-%m-%d", "%s", "%ms", "%s_as_float"],
1774-
title="Outgoing Datetime Format",
1781+
cursor_datetime_formats: Optional[List[str]] = Field(
1782+
None,
1783+
description="The possible formats for the cursor field, in order of preference. The first format that matches the cursor field value will be used to parse it. If not provided, the `datetime_format` will be used.",
1784+
title="Cursor Datetime Formats",
17751785
)
1776-
start_datetime: Union[str, MinMaxDatetime] = Field(
1786+
start_datetime: Union[MinMaxDatetime, str] = Field(
17771787
...,
17781788
description="The datetime that determines the earliest record that should be synced.",
17791789
examples=["2020-01-1T00:00:00Z", "{{ config['start_time'] }}"],
17801790
title="Start Datetime",
17811791
)
1782-
cursor_datetime_formats: Optional[List[str]] = Field(
1783-
None,
1784-
description="The possible formats for the cursor field, in order of preference. The first format that matches the cursor field value will be used to parse it. If not provided, the `datetime_format` will be used.",
1785-
title="Cursor Datetime Formats",
1786-
)
1787-
cursor_granularity: Optional[str] = Field(
1792+
start_time_option: Optional[RequestOption] = Field(
17881793
None,
1789-
description="Smallest increment the datetime_format has (ISO 8601 duration) that is used to ensure the start of a slice does not overlap with the end of the previous one, e.g. for %Y-%m-%d the granularity should be P1D, for %Y-%m-%dT%H:%M:%SZ the granularity should be PT1S. Given this field is provided, `step` needs to be provided as well.",
1790-
examples=["PT1S"],
1791-
title="Cursor Granularity",
1794+
description="Optionally configures how the start datetime will be sent in requests to the source API.",
1795+
title="Inject Start Time Into Outgoing HTTP Request",
17921796
)
1793-
end_datetime: Optional[Union[str, MinMaxDatetime]] = Field(
1797+
end_datetime: Optional[Union[MinMaxDatetime, str]] = Field(
17941798
None,
17951799
description="The datetime that determines the last record that should be synced. If not provided, `{{ now_utc() }}` will be used.",
17961800
examples=["2021-01-1T00:00:00Z", "{{ now_utc() }}", "{{ day_delta(-1) }}"],
@@ -1801,6 +1805,18 @@ class DatetimeBasedCursor(BaseModel):
18011805
description="Optionally configures how the end datetime will be sent in requests to the source API.",
18021806
title="Inject End Time Into Outgoing HTTP Request",
18031807
)
1808+
datetime_format: str = Field(
1809+
...,
1810+
description="The datetime format used to format the datetime values that are sent in outgoing requests to the API. Use placeholders starting with \"%\" to describe the format the API is using. The following placeholders are available:\n * **%s**: Epoch unix timestamp - `1686218963`\n * **%s_as_float**: Epoch unix timestamp in seconds as float with microsecond precision - `1686218963.123456`\n * **%ms**: Epoch unix timestamp (milliseconds) - `1686218963123`\n * **%a**: Weekday (abbreviated) - `Sun`\n * **%A**: Weekday (full) - `Sunday`\n * **%w**: Weekday (decimal) - `0` (Sunday), `6` (Saturday)\n * **%d**: Day of the month (zero-padded) - `01`, `02`, ..., `31`\n * **%b**: Month (abbreviated) - `Jan`\n * **%B**: Month (full) - `January`\n * **%m**: Month (zero-padded) - `01`, `02`, ..., `12`\n * **%y**: Year (without century, zero-padded) - `00`, `01`, ..., `99`\n * **%Y**: Year (with century) - `0001`, `0002`, ..., `9999`\n * **%H**: Hour (24-hour, zero-padded) - `00`, `01`, ..., `23`\n * **%I**: Hour (12-hour, zero-padded) - `01`, `02`, ..., `12`\n * **%p**: AM/PM indicator\n * **%M**: Minute (zero-padded) - `00`, `01`, ..., `59`\n * **%S**: Second (zero-padded) - `00`, `01`, ..., `59`\n * **%f**: Microsecond (zero-padded to 6 digits) - `000000`\n * **%_ms**: Millisecond (zero-padded to 3 digits) - `000`\n * **%z**: UTC offset - `(empty)`, `+0000`, `-04:00`\n * **%Z**: Time zone name - `(empty)`, `UTC`, `GMT`\n * **%j**: Day of the year (zero-padded) - `001`, `002`, ..., `366`\n * **%U**: Week number of the year (starting Sunday) - `00`, ..., `53`\n * **%W**: Week number of the year (starting Monday) - `00`, ..., `53`\n * **%c**: Date and time - `Tue Aug 16 21:30:00 1988`\n * **%x**: Date standard format - `08/16/1988`\n * **%X**: Time standard format - `21:30:00`\n * **%%**: Literal '%' character\n\n Some placeholders depend on the locale of the underlying system - in most cases this locale is configured as en/US. For more information see the [Python documentation](https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes).\n",
1811+
examples=["%Y-%m-%dT%H:%M:%S.%f%z", "%Y-%m-%d", "%s", "%ms", "%s_as_float"],
1812+
title="Outgoing Datetime Format",
1813+
)
1814+
cursor_granularity: Optional[str] = Field(
1815+
None,
1816+
description="Smallest increment the datetime_format has (ISO 8601 duration) that is used to ensure the start of a slice does not overlap with the end of the previous one, e.g. for %Y-%m-%d the granularity should be P1D, for %Y-%m-%dT%H:%M:%SZ the granularity should be PT1S. Given this field is provided, `step` needs to be provided as well.",
1817+
examples=["PT1S"],
1818+
title="Cursor Granularity",
1819+
)
18041820
is_data_feed: Optional[bool] = Field(
18051821
None,
18061822
description="A data feed API is an API that does not allow filtering and paginates the content from the most recent to the least recent. Given this, the CDK needs to know when to stop paginating and this field will generate a stop condition for pagination.",
@@ -1839,11 +1855,6 @@ class DatetimeBasedCursor(BaseModel):
18391855
examples=["starting_time"],
18401856
title="Partition Field Start",
18411857
)
1842-
start_time_option: Optional[RequestOption] = Field(
1843-
None,
1844-
description="Optionally configures how the start datetime will be sent in requests to the source API.",
1845-
title="Inject Start Time Into Outgoing HTTP Request",
1846-
)
18471858
step: Optional[str] = Field(
18481859
None,
18491860
description="The size of the time window (ISO8601 duration). Given this field is provided, `cursor_granularity` needs to be provided as well.",
@@ -1908,10 +1919,10 @@ class DefaultErrorHandler(BaseModel):
19081919
List[
19091920
Union[
19101921
ConstantBackoffStrategy,
1911-
CustomBackoffStrategy,
19121922
ExponentialBackoffStrategy,
19131923
WaitTimeFromHeader,
19141924
WaitUntilTimeFromHeader,
1925+
CustomBackoffStrategy,
19151926
]
19161927
]
19171928
] = Field(
@@ -2030,7 +2041,7 @@ class DpathValidator(BaseModel):
20302041
],
20312042
title="Field Path",
20322043
)
2033-
validation_strategy: ValidateAdheresToSchema = Field(
2044+
validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field(
20342045
...,
20352046
description="The condition that the specified config value will be evaluated against",
20362047
title="Validation Strategy",
@@ -2050,7 +2061,7 @@ class PredicateValidator(BaseModel):
20502061
],
20512062
title="Value",
20522063
)
2053-
validation_strategy: ValidateAdheresToSchema = Field(
2064+
validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field(
20542065
...,
20552066
description="The validation strategy to apply to the value.",
20562067
title="Validation Strategy",
@@ -2285,12 +2296,12 @@ class Config:
22852296
ApiKeyAuthenticator,
22862297
BasicHttpAuthenticator,
22872298
BearerAuthenticator,
2288-
CustomAuthenticator,
22892299
OAuthAuthenticator,
22902300
JwtAuthenticator,
22912301
SessionTokenAuthenticator,
2292-
NoAuth,
22932302
LegacySessionTokenAuthenticator,
2303+
CustomAuthenticator,
2304+
NoAuth,
22942305
],
22952306
] = Field(
22962307
...,
@@ -2374,7 +2385,6 @@ class Config:
23742385
InlineSchemaLoader,
23752386
DynamicSchemaLoader,
23762387
JsonFileSchemaLoader,
2377-
CustomSchemaLoader,
23782388
List[
23792389
Union[
23802390
InlineSchemaLoader,
@@ -2383,6 +2393,7 @@ class Config:
23832393
CustomSchemaLoader,
23842394
]
23852395
],
2396+
CustomSchemaLoader,
23862397
]
23872398
] = Field(
23882399
None,
@@ -2393,13 +2404,13 @@ class Config:
23932404
List[
23942405
Union[
23952406
AddFields,
2396-
CustomTransformation,
23972407
RemoveFields,
23982408
KeysToLower,
23992409
KeysToSnakeCase,
24002410
FlattenFields,
24012411
DpathFlattenFields,
24022412
KeysReplace,
2413+
CustomTransformation,
24032414
]
24042415
]
24052416
] = Field(
@@ -2631,7 +2642,7 @@ class HttpRequester(BaseModelWithDeprecations):
26312642

26322643
class DynamicSchemaLoader(BaseModel):
26332644
type: Literal["DynamicSchemaLoader"]
2634-
retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field(
2645+
retriever: Union[SimpleRetriever, AsyncRetriever, CustomRetriever] = Field(
26352646
...,
26362647
description="Component used to coordinate how records are extracted across stream slices and request pages.",
26372648
title="Retriever",
@@ -2645,13 +2656,13 @@ class DynamicSchemaLoader(BaseModel):
26452656
List[
26462657
Union[
26472658
AddFields,
2648-
CustomTransformation,
26492659
RemoveFields,
26502660
KeysToLower,
26512661
KeysToSnakeCase,
26522662
FlattenFields,
26532663
DpathFlattenFields,
26542664
KeysReplace,
2665+
CustomTransformation,
26552666
]
26562667
]
26572668
] = Field(
@@ -2895,7 +2906,7 @@ class AsyncRetriever(BaseModel):
28952906
] = Field(
28962907
None,
28972908
description="Component decoding the response so records can be extracted.",
2898-
title="Decoder",
2909+
title="HTTP Response Format",
28992910
)
29002911
download_decoder: Optional[
29012912
Union[
@@ -2911,7 +2922,7 @@ class AsyncRetriever(BaseModel):
29112922
] = Field(
29122923
None,
29132924
description="Component decoding the download response so records can be extracted.",
2914-
title="Download Decoder",
2925+
title="Download HTTP Response Format",
29152926
)
29162927
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
29172928

@@ -2935,7 +2946,7 @@ class GroupingPartitionRouter(BaseModel):
29352946
title="Group Size",
29362947
)
29372948
underlying_partition_router: Union[
2938-
CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter
2949+
ListPartitionRouter, SubstreamPartitionRouter, CustomPartitionRouter
29392950
] = Field(
29402951
...,
29412952
description="The partition router whose output will be grouped. This can be any valid partition router component.",
@@ -2951,7 +2962,7 @@ class GroupingPartitionRouter(BaseModel):
29512962

29522963
class HttpComponentsResolver(BaseModel):
29532964
type: Literal["HttpComponentsResolver"]
2954-
retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field(
2965+
retriever: Union[SimpleRetriever, AsyncRetriever, CustomRetriever] = Field(
29552966
...,
29562967
description="Component used to coordinate how records are extracted across stream slices and request pages.",
29572968
title="Retriever",

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,9 @@
222222
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
223223
CustomTransformation as CustomTransformationModel,
224224
)
225+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
226+
CustomValidationStrategy as CustomValidationStrategyModel,
227+
)
225228
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
226229
DatetimeBasedCursor as DatetimeBasedCursorModel,
227230
)
@@ -683,6 +686,7 @@ def _init_mappings(self) -> None:
683686
CustomPaginationStrategyModel: self.create_custom_component,
684687
CustomPartitionRouterModel: self.create_custom_component,
685688
CustomTransformationModel: self.create_custom_component,
689+
CustomValidationStrategyModel: self.create_custom_component,
686690
DatetimeBasedCursorModel: self.create_datetime_based_cursor,
687691
DeclarativeStreamModel: self.create_declarative_stream,
688692
DefaultErrorHandlerModel: self.create_default_error_handler,

0 commit comments

Comments
 (0)