Skip to content

Commit 8eabfca

Browse files
committed
Merge branch 'lazebnyi/add-schema-filter' of github.com:airbytehq/airbyte-python-cdk into lazebnyi/add-schema-filter
2 parents 11f2bd5 + 3961f6c commit 8eabfca

File tree

2 files changed

+43
-61
lines changed

2 files changed

+43
-61
lines changed

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 38 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1126,28 +1126,24 @@ class OAuthConfigSpecification(BaseModel):
11261126
class Config:
11271127
extra = Extra.allow
11281128

1129-
oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = (
1130-
Field(
1131-
None,
1132-
description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }",
1133-
examples=[
1134-
{"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}},
1135-
{
1136-
"app_id": {
1137-
"type": "string",
1138-
"path_in_connector_config": ["info", "app_id"],
1139-
}
1140-
},
1141-
],
1142-
title="OAuth user input",
1143-
)
1129+
oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field(
1130+
None,
1131+
description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }",
1132+
examples=[
1133+
{"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}},
1134+
{
1135+
"app_id": {
1136+
"type": "string",
1137+
"path_in_connector_config": ["info", "app_id"],
1138+
}
1139+
},
1140+
],
1141+
title="OAuth user input",
11441142
)
1145-
oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = (
1146-
Field(
1147-
None,
1148-
description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }',
1149-
title="DeclarativeOAuth Connector Specification",
1150-
)
1143+
oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field(
1144+
None,
1145+
description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }',
1146+
title="DeclarativeOAuth Connector Specification",
11511147
)
11521148
complete_oauth_output_specification: Optional[Dict[str, Any]] = Field(
11531149
None,
@@ -1165,9 +1161,7 @@ class Config:
11651161
complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field(
11661162
None,
11671163
description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }",
1168-
examples=[
1169-
{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}
1170-
],
1164+
examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}],
11711165
title="OAuth input specification",
11721166
)
11731167
complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field(
@@ -1893,9 +1887,7 @@ class RecordSelector(BaseModel):
18931887
description="Responsible for filtering records to be emitted by the Source.",
18941888
title="Record Filter",
18951889
)
1896-
schema_normalization: Optional[
1897-
Union[SchemaNormalization, CustomSchemaNormalization]
1898-
] = Field(
1890+
schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field(
18991891
None,
19001892
description="Responsible for normalization according to the schema.",
19011893
title="Schema Normalization",
@@ -2141,9 +2133,7 @@ class Config:
21412133
extra = Extra.allow
21422134

21432135
type: Literal["DeclarativeStream"]
2144-
name: Optional[str] = Field(
2145-
"", description="The stream name.", example=["Users"], title="Name"
2146-
)
2136+
name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name")
21472137
retriever: Union[SimpleRetriever, AsyncRetriever, CustomRetriever] = Field(
21482138
...,
21492139
description="Component used to coordinate how records are extracted across stream slices and request pages.",
@@ -2321,20 +2311,18 @@ class HttpRequester(BaseModelWithDeprecations):
23212311
description="Allows for retrieving a dynamic set of properties from an API endpoint which can be injected into outbound request using the stream_partition.extra_fields.",
23222312
title="Fetch Properties from Endpoint",
23232313
)
2324-
request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = (
2325-
Field(
2326-
None,
2327-
description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.",
2328-
examples=[
2329-
{"unit": "day"},
2330-
{
2331-
"query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"'
2332-
},
2333-
{"searchIn": "{{ ','.join(config.get('search_in', [])) }}"},
2334-
{"sort_by[asc]": "updated_at"},
2335-
],
2336-
title="Query Parameters",
2337-
)
2314+
request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = Field(
2315+
None,
2316+
description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.",
2317+
examples=[
2318+
{"unit": "day"},
2319+
{
2320+
"query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"'
2321+
},
2322+
{"searchIn": "{{ ','.join(config.get('search_in', [])) }}"},
2323+
{"sort_by[asc]": "updated_at"},
2324+
],
2325+
title="Query Parameters",
23382326
)
23392327
request_headers: Optional[Union[Dict[str, str], str]] = Field(
23402328
None,
@@ -2529,9 +2517,7 @@ class QueryProperties(BaseModel):
25292517

25302518
class StateDelegatingStream(BaseModel):
25312519
type: Literal["StateDelegatingStream"]
2532-
name: str = Field(
2533-
..., description="The stream name.", example=["Users"], title="Name"
2534-
)
2520+
name: str = Field(..., description="The stream name.", example=["Users"], title="Name")
25352521
full_refresh_stream: DeclarativeStream = Field(
25362522
...,
25372523
description="Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.",
@@ -2620,9 +2606,7 @@ class AsyncRetriever(BaseModel):
26202606
)
26212607
download_extractor: Optional[
26222608
Union[DpathExtractor, CustomRecordExtractor, ResponseToFileExtractor]
2623-
] = Field(
2624-
None, description="Responsible for fetching the records from provided urls."
2625-
)
2609+
] = Field(None, description="Responsible for fetching the records from provided urls.")
26262610
creation_requester: Union[HttpRequester, CustomRequester] = Field(
26272611
...,
26282612
description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.",
@@ -2762,12 +2746,10 @@ class DynamicDeclarativeStream(BaseModel):
27622746
stream_template: DeclarativeStream = Field(
27632747
..., description="Reference to the stream template.", title="Stream Template"
27642748
)
2765-
components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = (
2766-
Field(
2767-
...,
2768-
description="Component resolve and populates stream templates with components values.",
2769-
title="Components Resolver",
2770-
)
2749+
components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = Field(
2750+
...,
2751+
description="Component resolve and populates stream templates with components values.",
2752+
title="Components Resolver",
27712753
)
27722754

27732755

airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@
1010
import dpath
1111
from typing_extensions import deprecated
1212

13+
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
1314
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
1415
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
1516
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
1617
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
17-
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
1818
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
1919
from airbyte_cdk.sources.source import ExperimentalClassWarning
2020
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
@@ -174,16 +174,16 @@ def _transform(
174174
return properties
175175

176176
def _filter(
177-
self,
178-
properties: Mapping[str, Any],
177+
self,
178+
properties: Mapping[str, Any],
179179
) -> Mapping[str, Any]:
180180
if not self.schema_filter:
181181
return properties
182182

183183
filtered_properties: MutableMapping[str, Any] = {}
184184
for item in self.schema_filter.filter_records(
185-
({k: v} for k, v in properties.items()),
186-
{},
185+
({k: v} for k, v in properties.items()),
186+
{},
187187
):
188188
filtered_properties.update(item)
189189
return filtered_properties

0 commit comments

Comments
 (0)