Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3130,12 +3130,14 @@ definitions:
- "$ref": "#/definitions/CustomPartitionRouter"
- "$ref": "#/definitions/ListPartitionRouter"
- "$ref": "#/definitions/SubstreamPartitionRouter"
- "$ref": "#/definitions/GroupingPartitionRouter"
- type: array
items:
anyOf:
- "$ref": "#/definitions/CustomPartitionRouter"
- "$ref": "#/definitions/ListPartitionRouter"
- "$ref": "#/definitions/SubstreamPartitionRouter"
- "$ref": "#/definitions/GroupingPartitionRouter"
decoder:
title: Decoder
description: Component decoding the response so records can be extracted.
Expand Down Expand Up @@ -3290,12 +3292,14 @@ definitions:
- "$ref": "#/definitions/CustomPartitionRouter"
- "$ref": "#/definitions/ListPartitionRouter"
- "$ref": "#/definitions/SubstreamPartitionRouter"
- "$ref": "#/definitions/GroupingPartitionRouter"
- type: array
items:
anyOf:
- "$ref": "#/definitions/CustomPartitionRouter"
- "$ref": "#/definitions/ListPartitionRouter"
- "$ref": "#/definitions/SubstreamPartitionRouter"
- "$ref": "#/definitions/GroupingPartitionRouter"
decoder:
title: Decoder
description: Component decoding the response so records can be extracted.
Expand Down Expand Up @@ -3412,6 +3416,45 @@ definitions:
$parameters:
type: object
additionalProperties: true
GroupingPartitionRouter:
title: Grouping Partition Router
description: >
A decorator on top of a partition router that groups partitions into batches of a specified size.
This is useful for APIs that support filtering by multiple partition keys in a single request.
Note that per-partition incremental syncs may not work as expected because the grouping
of partitions might change between syncs, potentially leading to inconsistent state tracking.
type: object
required:
- type
- group_size
- underlying_partition_router
properties:
type:
type: string
enum: [GroupingPartitionRouter]
group_size:
title: Group Size
description: The number of partitions to include in each group. This determines how many partition values are batched together in a single slice.
type: integer
minimum: 1
examples:
- 10
- 50
underlying_partition_router:
title: Underlying Partition Router
description: The partition router whose output will be grouped. This can be any valid partition router component.
anyOf:
- "$ref": "#/definitions/CustomPartitionRouter"
- "$ref": "#/definitions/ListPartitionRouter"
- "$ref": "#/definitions/SubstreamPartitionRouter"
deduplicate:
title: Deduplicate Partitions
description: If true, ensures that partitions are unique within each group by removing duplicates based on the partition key.
type: boolean
default: true
$parameters:
type: object
additionalProperties: true
WaitUntilTimeFromHeader:
title: Wait Until Time Defined In Response Header
description: Extract time at which we can retry the request from response header and wait for the difference between now and that time.
Expand Down
123 changes: 90 additions & 33 deletions airbyte_cdk/sources/declarative/models/declarative_component_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from enum import Enum
from typing import Any, Dict, List, Literal, Optional, Union

from pydantic.v1 import BaseModel, Extra, Field
from pydantic.v1 import BaseModel, Extra, Field, conint


class AuthFlowType(Enum):
Expand Down Expand Up @@ -609,7 +609,9 @@ class OAuthAuthenticator(BaseModel):
scopes: Optional[List[str]] = Field(
None,
description="List of scopes that should be granted to the access token.",
examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]],
examples=[
["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]
],
title="Scopes",
)
token_expiry_date: Optional[str] = Field(
Expand Down Expand Up @@ -1078,24 +1080,28 @@ class OAuthConfigSpecification(BaseModel):
class Config:
extra = Extra.allow

oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field(
None,
description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }",
examples=[
{"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}},
{
"app_id": {
"type": "string",
"path_in_connector_config": ["info", "app_id"],
}
},
],
title="OAuth user input",
oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = (
Field(
None,
description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }",
examples=[
{"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}},
{
"app_id": {
"type": "string",
"path_in_connector_config": ["info", "app_id"],
}
},
],
title="OAuth user input",
)
)
oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field(
None,
description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }',
title="DeclarativeOAuth Connector Specification",
oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = (
Field(
None,
description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }',
title="DeclarativeOAuth Connector Specification",
)
)
complete_oauth_output_specification: Optional[Dict[str, Any]] = Field(
None,
Expand All @@ -1113,7 +1119,9 @@ class Config:
complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field(
None,
description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }",
examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}],
examples=[
{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}
],
title="OAuth input specification",
)
complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field(
Expand Down Expand Up @@ -1744,7 +1752,9 @@ class RecordSelector(BaseModel):
description="Responsible for filtering records to be emitted by the Source.",
title="Record Filter",
)
schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field(
schema_normalization: Optional[
Union[SchemaNormalization, CustomSchemaNormalization]
] = Field(
SchemaNormalization.None_,
description="Responsible for normalization according to the schema.",
title="Schema Normalization",
Expand Down Expand Up @@ -1948,12 +1958,16 @@ class Config:
description="Component used to coordinate how records are extracted across stream slices and request pages.",
title="Retriever",
)
incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field(
None,
description="Component used to fetch data incrementally based on a time field in the data.",
title="Incremental Sync",
incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = (
Field(
None,
description="Component used to fetch data incrementally based on a time field in the data.",
title="Incremental Sync",
)
)
name: Optional[str] = Field(
"", description="The stream name.", example=["Users"], title="Name"
)
name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name")
primary_key: Optional[PrimaryKey] = Field(
"", description="The primary key of the stream.", title="Primary Key"
)
Expand Down Expand Up @@ -2225,7 +2239,15 @@ class SimpleRetriever(BaseModel):
CustomPartitionRouter,
ListPartitionRouter,
SubstreamPartitionRouter,
List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]],
GroupingPartitionRouter,
List[
Union[
CustomPartitionRouter,
ListPartitionRouter,
SubstreamPartitionRouter,
GroupingPartitionRouter,
]
],
]
] = Field(
[],
Expand Down Expand Up @@ -2269,7 +2291,9 @@ class AsyncRetriever(BaseModel):
)
download_extractor: Optional[
Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor]
] = Field(None, description="Responsible for fetching the records from provided urls.")
] = Field(
None, description="Responsible for fetching the records from provided urls."
)
creation_requester: Union[CustomRequester, HttpRequester] = Field(
...,
description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.",
Expand Down Expand Up @@ -2303,7 +2327,15 @@ class AsyncRetriever(BaseModel):
CustomPartitionRouter,
ListPartitionRouter,
SubstreamPartitionRouter,
List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]],
GroupingPartitionRouter,
List[
Union[
CustomPartitionRouter,
ListPartitionRouter,
SubstreamPartitionRouter,
GroupingPartitionRouter,
]
],
]
] = Field(
[],
Expand Down Expand Up @@ -2355,6 +2387,29 @@ class SubstreamPartitionRouter(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class GroupingPartitionRouter(BaseModel):
type: Literal["GroupingPartitionRouter"]
group_size: conint(ge=1) = Field(
...,
description="The number of partitions to include in each group. This determines how many partition values are batched together in a single slice.",
examples=[10, 50],
title="Group Size",
)
underlying_partition_router: Union[
CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter
] = Field(
...,
description="The partition router whose output will be grouped. This can be any valid partition router component.",
title="Underlying Partition Router",
)
deduplicate: Optional[bool] = Field(
True,
description="If true, ensures that partitions are unique within each group by removing duplicates based on the partition key.",
title="Deduplicate Partitions",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class HttpComponentsResolver(BaseModel):
type: Literal["HttpComponentsResolver"]
retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field(
Expand All @@ -2371,10 +2426,12 @@ class DynamicDeclarativeStream(BaseModel):
stream_template: DeclarativeStream = Field(
..., description="Reference to the stream template.", title="Stream Template"
)
components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = Field(
...,
description="Component resolve and populates stream templates with components values.",
title="Components Resolver",
components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = (
Field(
...,
description="Component resolve and populates stream templates with components values.",
title="Components Resolver",
)
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
FlattenFields as FlattenFieldsModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
GroupingPartitionRouter as GroupingPartitionRouterModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
GzipDecoder as GzipDecoderModel,
)
Expand Down Expand Up @@ -379,6 +382,7 @@
)
from airbyte_cdk.sources.declarative.partition_routers import (
CartesianProductStreamSlicer,
GroupingPartitionRouter,
ListPartitionRouter,
PartitionRouter,
SinglePartitionRouter,
Expand Down Expand Up @@ -3044,3 +3048,23 @@ def set_api_budget(self, component_definition: ComponentDefinition, config: Conf
self._api_budget = self.create_component(
model_type=HTTPAPIBudgetModel, component_definition=component_definition, config=config
)

def create_grouping_partition_router(
self, model: GroupingPartitionRouterModel, config: Config, **kwargs: Any
) -> GroupingPartitionRouter:
underlying_router = self._create_component_from_model(
model=model.underlying_partition_router, config=config
)

if not isinstance(underlying_router, PartitionRouter):
raise ValueError(
f"Underlying partition router must be a PartitionRouter subclass, got {type(underlying_router)}"
)

return GroupingPartitionRouter(
group_size=model.group_size,
underlying_partition_router=underlying_router,
deduplicate=model.deduplicate if model.deduplicate is not None else True,
parameters=model.parameters or {},
config=config,
)
Loading
Loading