Skip to content
Merged
23 changes: 23 additions & 0 deletions airbyte_cdk/connector_builder/connector_builder_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,5 +117,28 @@ def resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage:
return error.as_airbyte_message()


def full_resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage:
try:
manifest = {**source.resolved_manifest}
streams = manifest.get("streams", [])
for stream in streams:
stream["dynamic_stream_name"] = None
streams.extend(source.dynamic_streams)
manifest["streams"] = streams
return AirbyteMessage(
type=Type.RECORD,
record=AirbyteRecordMessage(
data={"manifest": manifest},
emitted_at=_emitted_at(),
stream="full_resolve_manifest",
),
)
except Exception as exc:
error = AirbyteTracedException.from_exception(
exc, message=f"Error full resolving manifest: {str(exc)}"
)
return error.as_airbyte_message()


def _emitted_at() -> int:
return ab_datetime_now().to_epoch_millis()
3 changes: 3 additions & 0 deletions airbyte_cdk/connector_builder/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from airbyte_cdk.connector_builder.connector_builder_handler import (
TestReadLimits,
create_source,
full_resolve_manifest,
get_limits,
read_stream,
resolve_manifest,
Expand Down Expand Up @@ -81,6 +82,8 @@ def handle_connector_builder_request(
catalog is not None
), "`test_read` requires a valid `ConfiguredAirbyteCatalog`, got None."
return read_stream(source, config, catalog, state, limits)
elif command == "full_resolve_manifest":
return full_resolve_manifest(source)
else:
raise ValueError(f"Unrecognized command {command}.")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3766,6 +3766,13 @@ definitions:
type:
type: string
enum: [DynamicDeclarativeStream]
name:
title: Name
description: The dynamic stream name.
type: string
default: ""
example:
- "Tables"
stream_template:
title: Stream Template
description: Reference to the stream template.
Expand Down
14 changes: 12 additions & 2 deletions airbyte_cdk/sources/declarative/manifest_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def __init__(
AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
)

self._config = config or {}
self._validate_source()

@property
Expand All @@ -116,6 +117,10 @@ def resolved_manifest(self) -> Mapping[str, Any]:
def message_repository(self) -> MessageRepository:
return self._message_repository

@property
def dynamic_streams(self) -> List[Dict[str, Any]]:
return self._dynamic_stream_configs(manifest=self._source_config, config=self._config, with_dynamic_stream_name=True)

@property
def connection_checker(self) -> ConnectionChecker:
check = self._source_config["check"]
Expand Down Expand Up @@ -348,13 +353,13 @@ def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]:
return stream_configs

def _dynamic_stream_configs(
self, manifest: Mapping[str, Any], config: Mapping[str, Any]
self, manifest: Mapping[str, Any], config: Mapping[str, Any], with_dynamic_stream_name: Optional[bool] = None
) -> List[Dict[str, Any]]:
dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", [])
dynamic_stream_configs: List[Dict[str, Any]] = []
seen_dynamic_streams: Set[str] = set()

for dynamic_definition in dynamic_stream_definitions:
for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions):
components_resolver_config = dynamic_definition["components_resolver"]

if not components_resolver_config:
Expand Down Expand Up @@ -393,6 +398,11 @@ def _dynamic_stream_configs(
# Ensure that each stream is created with a unique name
name = dynamic_stream.get("name")

if with_dynamic_stream_name:
dynamic_stream["dynamic_stream_name"] = dynamic_definition.get(
"name", f"dynamic_stream_{dynamic_definition_index}"
)

if not isinstance(name, str):
raise ValueError(
f"Expected stream name {name} to be a string, got {type(name)}."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2489,6 +2489,9 @@ class HttpComponentsResolver(BaseModel):

class DynamicDeclarativeStream(BaseModel):
type: Literal["DynamicDeclarativeStream"]
name: Optional[str] = Field(
"", description="The dynamic stream name.", example=["Tables"], title="Name"
)
stream_template: DeclarativeStream = Field(
..., description="Reference to the stream template.", title="Stream Template"
)
Expand Down
Loading
Loading