Skip to content
Merged
23 changes: 23 additions & 0 deletions airbyte_cdk/connector_builder/connector_builder_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,5 +117,28 @@ def resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage:
return error.as_airbyte_message()


def full_resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage:
try:
manifest = {**source.resolved_manifest}
streams = manifest.get("streams", [])
for stream in streams:
stream["dynamic_stream_name"] = None
streams.extend(source.dynamic_streams)
manifest["streams"] = streams
return AirbyteMessage(
type=Type.RECORD,
record=AirbyteRecordMessage(
data={"manifest": manifest},
emitted_at=_emitted_at(),
stream="full_resolve_manifest",
),
)
except Exception as exc:
error = AirbyteTracedException.from_exception(
exc, message=f"Error full resolving manifest: {str(exc)}"
)
return error.as_airbyte_message()


def _emitted_at() -> int:
return ab_datetime_now().to_epoch_millis()
3 changes: 3 additions & 0 deletions airbyte_cdk/connector_builder/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from airbyte_cdk.connector_builder.connector_builder_handler import (
TestReadLimits,
create_source,
full_resolve_manifest,
get_limits,
read_stream,
resolve_manifest,
Expand Down Expand Up @@ -81,6 +82,8 @@ def handle_connector_builder_request(
catalog is not None
), "`test_read` requires a valid `ConfiguredAirbyteCatalog`, got None."
return read_stream(source, config, catalog, state, limits)
elif command == "full_resolve_manifest":
return full_resolve_manifest(source)
else:
raise ValueError(f"Unrecognized command {command}.")

Expand Down
14 changes: 14 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1427,6 +1427,13 @@ definitions:
- "$ref": "#/definitions/LegacyToPerPartitionStateMigration"
- "$ref": "#/definitions/CustomStateMigration"
default: []
dynamic_stream_name:
title: Dynamic Stream Name
description: The dynamic stream name that create current stream, if None is static stream.
type: string
default: None
example:
- "Tables"
$parameters:
type: object
additional_properties: true
Expand Down Expand Up @@ -3766,6 +3773,13 @@ definitions:
type:
type: string
enum: [DynamicDeclarativeStream]
name:
title: Name
description: The dynamic stream name.
type: string
default: ""
example:
- "Tables"
stream_template:
title: Stream Template
description: Reference to the stream template.
Expand Down
11 changes: 10 additions & 1 deletion airbyte_cdk/sources/declarative/manifest_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def __init__(
AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
)

self._config = config or {}
self._validate_source()

@property
Expand All @@ -116,6 +117,10 @@ def resolved_manifest(self) -> Mapping[str, Any]:
def message_repository(self) -> MessageRepository:
return self._message_repository

@property
def dynamic_streams(self) -> List[Dict[str, Any]]:
return self._dynamic_stream_configs(manifest=self._source_config, config=self._config)

@property
def connection_checker(self) -> ConnectionChecker:
check = self._source_config["check"]
Expand Down Expand Up @@ -354,7 +359,7 @@ def _dynamic_stream_configs(
dynamic_stream_configs: List[Dict[str, Any]] = []
seen_dynamic_streams: Set[str] = set()

for dynamic_definition in dynamic_stream_definitions:
for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions):
components_resolver_config = dynamic_definition["components_resolver"]

if not components_resolver_config:
Expand Down Expand Up @@ -393,6 +398,10 @@ def _dynamic_stream_configs(
# Ensure that each stream is created with a unique name
name = dynamic_stream.get("name")

dynamic_stream["dynamic_stream_name"] = dynamic_definition.get(
"name", f"dynamic_stream_{dynamic_definition_index}"
)

if not isinstance(name, str):
raise ValueError(
f"Expected stream name {name} to be a string, got {type(name)}."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2049,6 +2049,12 @@ class Config:
description="Array of state migrations to be applied on the input state",
title="State Migrations",
)
dynamic_stream_name: Optional[str] = Field(
"None",
description="The dynamic stream name that create current stream, if None is static stream.",
example=["Tables"],
title="Dynamic Stream Name",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down Expand Up @@ -2489,6 +2495,9 @@ class HttpComponentsResolver(BaseModel):

class DynamicDeclarativeStream(BaseModel):
type: Literal["DynamicDeclarativeStream"]
name: Optional[str] = Field(
"", description="The dynamic stream name.", example=["Tables"], title="Name"
)
stream_template: DeclarativeStream = Field(
..., description="Reference to the stream template.", title="Stream Template"
)
Expand Down
Loading
Loading