Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1427,6 +1427,13 @@ definitions:
- "$ref": "#/definitions/LegacyToPerPartitionStateMigration"
- "$ref": "#/definitions/CustomStateMigration"
default: []
dynamic_stream_name:
title: Dynamic Stream Name
description: The dynamic stream name that create current stream, if None is static stream.
type: string
default: None
example:
- "Tables"
$parameters:
type: object
additional_properties: true
Expand Down Expand Up @@ -3766,6 +3773,13 @@ definitions:
type:
type: string
enum: [DynamicDeclarativeStream]
name:
title: Name
description: The dynamic stream name.
type: string
default: ""
example:
- "Tables"
stream_template:
title: Stream Template
description: Reference to the stream template.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]:
# This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config
stream_configs: List[Dict[str, Any]] = manifest.get("streams", [])
for s in stream_configs:
s["dynamic_stream_name"] = None
if "type" not in s:
s["type"] = "DeclarativeStream"
return stream_configs
Expand All @@ -354,7 +355,7 @@ def _dynamic_stream_configs(
dynamic_stream_configs: List[Dict[str, Any]] = []
seen_dynamic_streams: Set[str] = set()

for dynamic_definition in dynamic_stream_definitions:
for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions):
components_resolver_config = dynamic_definition["components_resolver"]

if not components_resolver_config:
Expand Down Expand Up @@ -393,6 +394,10 @@ def _dynamic_stream_configs(
# Ensure that each stream is created with a unique name
name = dynamic_stream.get("name")

dynamic_stream["dynamic_stream_name"] = dynamic_definition.get(
"name", f"dynamic_stream_{dynamic_definition_index}"
)

if not isinstance(name, str):
raise ValueError(
f"Expected stream name {name} to be a string, got {type(name)}."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2049,6 +2049,12 @@ class Config:
description="Array of state migrations to be applied on the input state",
title="State Migrations",
)
dynamic_stream_name: Optional[str] = Field(
"None",
description="The dynamic stream name that create current stream, if None is static stream.",
example=["Tables"],
title="Dynamic Stream Name",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down Expand Up @@ -2489,6 +2495,9 @@ class HttpComponentsResolver(BaseModel):

class DynamicDeclarativeStream(BaseModel):
type: Literal["DynamicDeclarativeStream"]
name: Optional[str] = Field(
"", description="The dynamic stream name.", example=["Tables"], title="Name"
)
stream_template: DeclarativeStream = Field(
..., description="Reference to the stream template.", title="Stream Template"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def to_configured_catalog(
"dynamic_streams": [
{
"type": "DynamicDeclarativeStream",
"name": "TestDynamicStream",
"stream_template": {
"type": "DeclarativeStream",
"name": "",
Expand Down Expand Up @@ -487,6 +488,9 @@ def test_dynamic_streams_read_with_http_components_resolver():
]
configured_catalog = to_configured_catalog(configured_streams)

for stream in source._dynamic_stream_configs(source.resolved_manifest, _CONFIG):
assert stream["dynamic_stream_name"] == "TestDynamicStream"

records = [
message.record
for message in source.read(MagicMock(), _CONFIG, configured_catalog)
Expand Down Expand Up @@ -572,6 +576,9 @@ def test_dynamic_streams_with_http_components_resolver_retriever_with_parent_str
]
configured_catalog = to_configured_catalog(configured_streams)

for stream in source._dynamic_stream_configs(source.resolved_manifest, _CONFIG):
assert stream["dynamic_stream_name"] == "dynamic_stream_0"

records = [
message.record
for message in source.read(MagicMock(), _CONFIG, configured_catalog)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,9 @@ def test_group_streams():
)
concurrent_streams, synchronous_streams = source._group_streams(config=_CONFIG)

for stream in source._stream_configs(source.resolved_manifest):
assert stream["dynamic_stream_name"] is None

# 1 full refresh stream, 3 incremental streams, 1 substream w/o incremental, 1 list based substream w/o incremental
# 1 async job stream, 1 substream w/ incremental
assert len(concurrent_streams) == 8
Expand Down
Loading