Skip to content

Commit 56b88af

Browse files
committed
Add dynamic stream name to stream model
1 parent 837913f commit 56b88af

File tree

5 files changed

+82
-28
lines changed

5 files changed

+82
-28
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1427,6 +1427,13 @@ definitions:
14271427
- "$ref": "#/definitions/LegacyToPerPartitionStateMigration"
14281428
- "$ref": "#/definitions/CustomStateMigration"
14291429
default: []
1430+
dynamic_stream_name:
1431+
title: Dynamic Stream Name
1432+
description: The dynamic stream name that create current stream, if None is static stream.
1433+
type: string
1434+
default: None
1435+
example:
1436+
- "Tables"
14301437
$parameters:
14311438
type: object
14321439
additional_properties: true
@@ -3766,6 +3773,13 @@ definitions:
37663773
type:
37673774
type: string
37683775
enum: [DynamicDeclarativeStream]
3776+
name:
3777+
title: Name
3778+
description: The dynamic stream name.
3779+
type: string
3780+
default: ""
3781+
example:
3782+
- "Tables"
37693783
stream_template:
37703784
title: Stream Template
37713785
description: Reference to the stream template.

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,7 @@ def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]:
343343
# This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config
344344
stream_configs: List[Dict[str, Any]] = manifest.get("streams", [])
345345
for s in stream_configs:
346+
s["dynamic_stream_name"] = None
346347
if "type" not in s:
347348
s["type"] = "DeclarativeStream"
348349
return stream_configs
@@ -354,7 +355,7 @@ def _dynamic_stream_configs(
354355
dynamic_stream_configs: List[Dict[str, Any]] = []
355356
seen_dynamic_streams: Set[str] = set()
356357

357-
for dynamic_definition in dynamic_stream_definitions:
358+
for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions):
358359
components_resolver_config = dynamic_definition["components_resolver"]
359360

360361
if not components_resolver_config:
@@ -393,6 +394,8 @@ def _dynamic_stream_configs(
393394
# Ensure that each stream is created with a unique name
394395
name = dynamic_stream.get("name")
395396

397+
dynamic_stream["dynamic_stream_name"] = dynamic_definition.get("name", f"dynamic_stream_{dynamic_definition_index}")
398+
396399
if not isinstance(name, str):
397400
raise ValueError(
398401
f"Expected stream name {name} to be a string, got {type(name)}."

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 54 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -609,7 +609,9 @@ class OAuthAuthenticator(BaseModel):
609609
scopes: Optional[List[str]] = Field(
610610
None,
611611
description="List of scopes that should be granted to the access token.",
612-
examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]],
612+
examples=[
613+
["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]
614+
],
613615
title="Scopes",
614616
)
615617
token_expiry_date: Optional[str] = Field(
@@ -1083,24 +1085,28 @@ class OAuthConfigSpecification(BaseModel):
10831085
class Config:
10841086
extra = Extra.allow
10851087

1086-
oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field(
1087-
None,
1088-
description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }",
1089-
examples=[
1090-
{"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}},
1091-
{
1092-
"app_id": {
1093-
"type": "string",
1094-
"path_in_connector_config": ["info", "app_id"],
1095-
}
1096-
},
1097-
],
1098-
title="OAuth user input",
1088+
oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = (
1089+
Field(
1090+
None,
1091+
description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }",
1092+
examples=[
1093+
{"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}},
1094+
{
1095+
"app_id": {
1096+
"type": "string",
1097+
"path_in_connector_config": ["info", "app_id"],
1098+
}
1099+
},
1100+
],
1101+
title="OAuth user input",
1102+
)
10991103
)
1100-
oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field(
1101-
None,
1102-
description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }',
1103-
title="DeclarativeOAuth Connector Specification",
1104+
oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = (
1105+
Field(
1106+
None,
1107+
description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }',
1108+
title="DeclarativeOAuth Connector Specification",
1109+
)
11041110
)
11051111
complete_oauth_output_specification: Optional[Dict[str, Any]] = Field(
11061112
None,
@@ -1118,7 +1124,9 @@ class Config:
11181124
complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field(
11191125
None,
11201126
description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }",
1121-
examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}],
1127+
examples=[
1128+
{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}
1129+
],
11221130
title="OAuth input specification",
11231131
)
11241132
complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field(
@@ -1781,7 +1789,9 @@ class RecordSelector(BaseModel):
17811789
description="Responsible for filtering records to be emitted by the Source.",
17821790
title="Record Filter",
17831791
)
1784-
schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field(
1792+
schema_normalization: Optional[
1793+
Union[SchemaNormalization, CustomSchemaNormalization]
1794+
] = Field(
17851795
SchemaNormalization.None_,
17861796
description="Responsible for normalization according to the schema.",
17871797
title="Schema Normalization",
@@ -2008,7 +2018,9 @@ class Config:
20082018
description="Component used to fetch data incrementally based on a time field in the data.",
20092019
title="Incremental Sync",
20102020
)
2011-
name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name")
2021+
name: Optional[str] = Field(
2022+
"", description="The stream name.", example=["Users"], title="Name"
2023+
)
20122024
primary_key: Optional[PrimaryKey] = Field(
20132025
"", description="The primary key of the stream.", title="Primary Key"
20142026
)
@@ -2049,6 +2061,12 @@ class Config:
20492061
description="Array of state migrations to be applied on the input state",
20502062
title="State Migrations",
20512063
)
2064+
dynamic_stream_name: Optional[str] = Field(
2065+
"None",
2066+
description="The dynamic stream name that create current stream, if None is static stream.",
2067+
example=["Tables"],
2068+
title="Dynamic Stream Name",
2069+
)
20522070
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
20532071

20542072

@@ -2266,7 +2284,9 @@ class ParentStreamConfig(BaseModel):
22662284

22672285
class StateDelegatingStream(BaseModel):
22682286
type: Literal["StateDelegatingStream"]
2269-
name: str = Field(..., description="The stream name.", example=["Users"], title="Name")
2287+
name: str = Field(
2288+
..., description="The stream name.", example=["Users"], title="Name"
2289+
)
22702290
full_refresh_stream: DeclarativeStream = Field(
22712291
...,
22722292
description="Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.",
@@ -2355,7 +2375,9 @@ class AsyncRetriever(BaseModel):
23552375
)
23562376
download_extractor: Optional[
23572377
Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor]
2358-
] = Field(None, description="Responsible for fetching the records from provided urls.")
2378+
] = Field(
2379+
None, description="Responsible for fetching the records from provided urls."
2380+
)
23592381
creation_requester: Union[CustomRequester, HttpRequester] = Field(
23602382
...,
23612383
description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.",
@@ -2489,13 +2511,18 @@ class HttpComponentsResolver(BaseModel):
24892511

24902512
class DynamicDeclarativeStream(BaseModel):
24912513
type: Literal["DynamicDeclarativeStream"]
2514+
name: Optional[str] = Field(
2515+
"", description="The dynamic stream name.", example=["Tables"], title="Name"
2516+
)
24922517
stream_template: DeclarativeStream = Field(
24932518
..., description="Reference to the stream template.", title="Stream Template"
24942519
)
2495-
components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = Field(
2496-
...,
2497-
description="Component resolve and populates stream templates with components values.",
2498-
title="Components Resolver",
2520+
components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = (
2521+
Field(
2522+
...,
2523+
description="Component resolve and populates stream templates with components values.",
2524+
title="Components Resolver",
2525+
)
24992526
)
25002527

25012528

unit_tests/sources/declarative/resolvers/test_http_components_resolver.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ def to_configured_catalog(
5757
"dynamic_streams": [
5858
{
5959
"type": "DynamicDeclarativeStream",
60+
"name": "TestDynamicStream",
6061
"stream_template": {
6162
"type": "DeclarativeStream",
6263
"name": "",
@@ -487,6 +488,9 @@ def test_dynamic_streams_read_with_http_components_resolver():
487488
]
488489
configured_catalog = to_configured_catalog(configured_streams)
489490

491+
for stream in source._dynamic_stream_configs(source.resolved_manifest, _CONFIG):
492+
assert stream["dynamic_stream_name"] == "TestDynamicStream"
493+
490494
records = [
491495
message.record
492496
for message in source.read(MagicMock(), _CONFIG, configured_catalog)
@@ -572,6 +576,9 @@ def test_dynamic_streams_with_http_components_resolver_retriever_with_parent_str
572576
]
573577
configured_catalog = to_configured_catalog(configured_streams)
574578

579+
for stream in source._dynamic_stream_configs(source.resolved_manifest, _CONFIG):
580+
assert stream["dynamic_stream_name"] == "dynamic_stream_0"
581+
575582
records = [
576583
message.record
577584
for message in source.read(MagicMock(), _CONFIG, configured_catalog)

unit_tests/sources/declarative/test_concurrent_declarative_source.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -709,6 +709,9 @@ def test_group_streams():
709709
)
710710
concurrent_streams, synchronous_streams = source._group_streams(config=_CONFIG)
711711

712+
for stream in source._stream_configs(source.resolved_manifest):
713+
assert stream["dynamic_stream_name"] is None
714+
712715
# 1 full refresh stream, 3 incremental streams, 1 substream w/o incremental, 1 list based substream w/o incremental
713716
# 1 async job stream, 1 substream w/ incremental
714717
assert len(concurrent_streams) == 8

0 commit comments

Comments
 (0)