Skip to content

Commit a07b04a

Browse files
lazebnyioctavia-squidington-iii
andauthored
feat(low-code cdk): add component resolver and http component resolver (#88)
Co-authored-by: octavia-squidington-iii <[email protected]>
1 parent 3e671b8 commit a07b04a

File tree

13 files changed

+864
-38
lines changed

13 files changed

+864
-38
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -192,9 +192,13 @@ def _group_streams(
192192

193193
state_manager = ConnectorStateManager(state=self._state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later
194194

195-
name_to_stream_mapping = {
196-
stream["name"]: stream for stream in self.resolved_manifest["streams"]
197-
}
195+
# Combine streams and dynamic_streams. Note: both cannot be empty at the same time,
196+
# and this is validated during the initialization of the source.
197+
streams = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
198+
self._source_config, config
199+
)
200+
201+
name_to_stream_mapping = {stream["name"]: stream for stream in streams}
198202

199203
for declarative_stream in self.streams(config=config):
200204
# Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 100 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,12 @@ version: 1.0.0
77
required:
88
- type
99
- check
10-
- streams
1110
- version
11+
anyOf:
12+
- required:
13+
- streams
14+
- required:
15+
- dynamic_streams
1216
properties:
1317
type:
1418
type: string
@@ -19,6 +23,10 @@ properties:
1923
type: array
2024
items:
2125
"$ref": "#/definitions/DeclarativeStream"
26+
dynamic_streams:
27+
type: array
28+
items:
29+
"$ref": "#/definitions/DynamicDeclarativeStream"
2230
version:
2331
type: string
2432
description: The version of the Airbyte CDK used to build and test the source.
@@ -1321,7 +1329,7 @@ definitions:
13211329
type: array
13221330
items:
13231331
- type: string
1324-
interpolation_content:
1332+
interpolation_context:
13251333
- config
13261334
examples:
13271335
- ["data"]
@@ -2895,6 +2903,96 @@ definitions:
28952903
$parameters:
28962904
type: object
28972905
additionalProperties: true
2906+
ComponentMappingDefinition:
2907+
title: Component Mapping Definition
2908+
description: (This component is experimental. Use at your own risk.) Specifies a mapping definition to update or add fields in a record or configuration. This allows dynamic mapping of data by interpolating values into the template based on provided contexts.
2909+
type: object
2910+
required:
2911+
- type
2912+
- field_path
2913+
- value
2914+
properties:
2915+
type:
2916+
type: string
2917+
enum: [ComponentMappingDefinition]
2918+
field_path:
2919+
title: Field Path
2920+
description: A list of potentially nested fields indicating the full path where value will be added or updated.
2921+
type: array
2922+
items:
2923+
- type: string
2924+
interpolation_context:
2925+
- config
2926+
- components_values
2927+
- stream_template_config
2928+
examples:
2929+
- ["data"]
2930+
- ["data", "records"]
2931+
- ["data", "{{ parameters.name }}"]
2932+
- ["data", "*", "record"]
2933+
value:
2934+
title: Value
2935+
description: The dynamic or static value to assign to the key. Interpolated values can be used to dynamically determine the value during runtime.
2936+
type: string
2937+
interpolation_context:
2938+
- config
2939+
- stream_template_config
2940+
- components_values
2941+
examples:
2942+
- "{{ components_values['updates'] }}"
2943+
- "{{ components_values['MetaData']['LastUpdatedTime'] }}"
2944+
- "{{ config['segment_id'] }}"
2945+
value_type:
2946+
title: Value Type
2947+
description: The expected data type of the value. If omitted, the type will be inferred from the value provided.
2948+
"$ref": "#/definitions/ValueType"
2949+
$parameters:
2950+
type: object
2951+
additionalProperties: true
2952+
HttpComponentsResolver:
2953+
type: object
2954+
description: (This component is experimental. Use at your own risk.) Component resolve and populates stream templates with components fetched via an HTTP retriever.
2955+
properties:
2956+
type:
2957+
type: string
2958+
enum: [HttpComponentsResolver]
2959+
retriever:
2960+
title: Retriever
2961+
description: Component used to coordinate how records are extracted across stream slices and request pages.
2962+
anyOf:
2963+
- "$ref": "#/definitions/AsyncRetriever"
2964+
- "$ref": "#/definitions/CustomRetriever"
2965+
- "$ref": "#/definitions/SimpleRetriever"
2966+
components_mapping:
2967+
type: array
2968+
items:
2969+
"$ref": "#/definitions/ComponentMappingDefinition"
2970+
$parameters:
2971+
type: object
2972+
additionalProperties: true
2973+
required:
2974+
- type
2975+
- retriever
2976+
- components_mapping
2977+
DynamicDeclarativeStream:
2978+
type: object
2979+
description: (This component is experimental. Use at your own risk.) A component that described how will be created declarative streams based on stream template.
2980+
properties:
2981+
type:
2982+
type: string
2983+
enum: [DynamicDeclarativeStream]
2984+
stream_template:
2985+
title: Stream Template
2986+
description: Reference to the stream template.
2987+
"$ref": "#/definitions/DeclarativeStream"
2988+
components_resolver:
2989+
title: Components Resolver
2990+
description: Component resolve and populates stream templates with components values.
2991+
"$ref": "#/definitions/HttpComponentsResolver"
2992+
required:
2993+
- type
2994+
- stream_template
2995+
- components_resolver
28982996
interpolation:
28992997
variables:
29002998
- title: config

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
4040
ModelToComponentFactory,
4141
)
42+
from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING
4243
from airbyte_cdk.sources.message import MessageRepository
4344
from airbyte_cdk.sources.streams.core import Stream
4445
from airbyte_cdk.sources.types import ConnectionDefinition
@@ -120,7 +121,10 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
120121
self._emit_manifest_debug_message(
121122
extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)}
122123
)
123-
stream_configs = self._stream_configs(self._source_config)
124+
125+
stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
126+
self._source_config, config
127+
)
124128

125129
source_streams = [
126130
self._constructor.create_component(
@@ -234,7 +238,8 @@ def _validate_source(self) -> None:
234238
)
235239

236240
streams = self._source_config.get("streams")
237-
if not streams:
241+
dynamic_streams = self._source_config.get("dynamic_streams")
242+
if not (streams or dynamic_streams):
238243
raise ValidationError(
239244
f"A valid manifest should have at least one stream defined. Got {streams}"
240245
)
@@ -303,5 +308,51 @@ def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]:
303308
s["type"] = "DeclarativeStream"
304309
return stream_configs
305310

311+
def _dynamic_stream_configs(
312+
self, manifest: Mapping[str, Any], config: Mapping[str, Any]
313+
) -> List[Dict[str, Any]]:
314+
dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", [])
315+
dynamic_stream_configs: List[Dict[str, Any]] = []
316+
317+
for dynamic_definition in dynamic_stream_definitions:
318+
components_resolver_config = dynamic_definition["components_resolver"]
319+
320+
if not components_resolver_config:
321+
raise ValueError(
322+
f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}"
323+
)
324+
325+
resolver_type = components_resolver_config.get("type")
326+
if not resolver_type:
327+
raise ValueError(
328+
f"Missing 'type' in components resolver configuration: {components_resolver_config}"
329+
)
330+
331+
if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING:
332+
raise ValueError(
333+
f"Invalid components resolver type '{resolver_type}'. "
334+
f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}."
335+
)
336+
337+
if "retriever" in components_resolver_config:
338+
components_resolver_config["retriever"]["requester"]["use_cache"] = True
339+
340+
# Create a resolver for dynamic components based on type
341+
components_resolver = self._constructor.create_component(
342+
COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], components_resolver_config, config
343+
)
344+
345+
stream_template_config = dynamic_definition["stream_template"]
346+
347+
for dynamic_stream in components_resolver.resolve_components(
348+
stream_template_config=stream_template_config
349+
):
350+
if "type" not in dynamic_stream:
351+
dynamic_stream["type"] = "DeclarativeStream"
352+
353+
dynamic_stream_configs.append(dynamic_stream)
354+
355+
return dynamic_stream_configs
356+
306357
def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None:
307358
self.logger.debug("declarative source created from manifest", extra=extra_args)

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 95 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1158,6 +1158,37 @@ class WaitUntilTimeFromHeader(BaseModel):
11581158
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
11591159

11601160

1161+
class ComponentMappingDefinition(BaseModel):
1162+
type: Literal["ComponentMappingDefinition"]
1163+
field_path: List[str] = Field(
1164+
...,
1165+
description="A list of potentially nested fields indicating the full path where value will be added or updated.",
1166+
examples=[
1167+
["data"],
1168+
["data", "records"],
1169+
["data", "{{ parameters.name }}"],
1170+
["data", "*", "record"],
1171+
],
1172+
title="Field Path",
1173+
)
1174+
value: str = Field(
1175+
...,
1176+
description="The dynamic or static value to assign to the key. Interpolated values can be used to dynamically determine the value during runtime.",
1177+
examples=[
1178+
"{{ components_values['updates'] }}",
1179+
"{{ components_values['MetaData']['LastUpdatedTime'] }}",
1180+
"{{ config['segment_id'] }}",
1181+
],
1182+
title="Value",
1183+
)
1184+
value_type: Optional[ValueType] = Field(
1185+
None,
1186+
description="The expected data type of the value. If omitted, the type will be inferred from the value provided.",
1187+
title="Value Type",
1188+
)
1189+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
1190+
1191+
11611192
class AddedFieldDefinition(BaseModel):
11621193
type: Literal["AddedFieldDefinition"]
11631194
path: List[str] = Field(
@@ -1455,13 +1486,40 @@ class CompositeErrorHandler(BaseModel):
14551486
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
14561487

14571488

1458-
class DeclarativeSource(BaseModel):
1489+
class DeclarativeSource1(BaseModel):
14591490
class Config:
14601491
extra = Extra.forbid
14611492

14621493
type: Literal["DeclarativeSource"]
14631494
check: CheckStream
14641495
streams: List[DeclarativeStream]
1496+
dynamic_streams: Optional[List[DynamicDeclarativeStream]] = None
1497+
version: str = Field(
1498+
...,
1499+
description="The version of the Airbyte CDK used to build and test the source.",
1500+
)
1501+
schemas: Optional[Schemas] = None
1502+
definitions: Optional[Dict[str, Any]] = None
1503+
spec: Optional[Spec] = None
1504+
concurrency_level: Optional[ConcurrencyLevel] = None
1505+
metadata: Optional[Dict[str, Any]] = Field(
1506+
None,
1507+
description="For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.",
1508+
)
1509+
description: Optional[str] = Field(
1510+
None,
1511+
description="A description of the connector. It will be presented on the Source documentation page.",
1512+
)
1513+
1514+
1515+
class DeclarativeSource2(BaseModel):
1516+
class Config:
1517+
extra = Extra.forbid
1518+
1519+
type: Literal["DeclarativeSource"]
1520+
check: CheckStream
1521+
streams: Optional[List[DeclarativeStream]] = None
1522+
dynamic_streams: List[DynamicDeclarativeStream]
14651523
version: str = Field(
14661524
...,
14671525
description="The version of the Airbyte CDK used to build and test the source.",
@@ -1480,6 +1538,17 @@ class Config:
14801538
)
14811539

14821540

1541+
class DeclarativeSource(BaseModel):
1542+
class Config:
1543+
extra = Extra.forbid
1544+
1545+
__root__: Union[DeclarativeSource1, DeclarativeSource2] = Field(
1546+
...,
1547+
description="An API source that extracts data according to its declarative components.",
1548+
title="DeclarativeSource",
1549+
)
1550+
1551+
14831552
class SelectiveAuthenticator(BaseModel):
14841553
class Config:
14851554
extra = Extra.allow
@@ -1883,8 +1952,32 @@ class SubstreamPartitionRouter(BaseModel):
18831952
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
18841953

18851954

1955+
class HttpComponentsResolver(BaseModel):
1956+
type: Literal["HttpComponentsResolver"]
1957+
retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field(
1958+
...,
1959+
description="Component used to coordinate how records are extracted across stream slices and request pages.",
1960+
title="Retriever",
1961+
)
1962+
components_mapping: List[ComponentMappingDefinition]
1963+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
1964+
1965+
1966+
class DynamicDeclarativeStream(BaseModel):
1967+
type: Literal["DynamicDeclarativeStream"]
1968+
stream_template: DeclarativeStream = Field(
1969+
..., description="Reference to the stream template.", title="Stream Template"
1970+
)
1971+
components_resolver: HttpComponentsResolver = Field(
1972+
...,
1973+
description="Component resolve and populates stream templates with components values.",
1974+
title="Components Resolver",
1975+
)
1976+
1977+
18861978
CompositeErrorHandler.update_forward_refs()
1887-
DeclarativeSource.update_forward_refs()
1979+
DeclarativeSource1.update_forward_refs()
1980+
DeclarativeSource2.update_forward_refs()
18881981
SelectiveAuthenticator.update_forward_refs()
18891982
DeclarativeStream.update_forward_refs()
18901983
SessionTokenAuthenticator.update_forward_refs()

airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@
3131
# DeclarativeStream
3232
"DeclarativeStream.retriever": "SimpleRetriever",
3333
"DeclarativeStream.schema_loader": "JsonFileSchemaLoader",
34+
# DynamicDeclarativeStream
35+
"DynamicDeclarativeStream.stream_template": "DeclarativeStream",
36+
"DynamicDeclarativeStream.components_resolver": "HttpComponentsResolver",
37+
# HttpComponentsResolver
38+
"HttpComponentsResolver.retriever": "SimpleRetriever",
39+
"HttpComponentsResolver.components_mapping": "ComponentMappingDefinition",
3440
# DefaultErrorHandler
3541
"DefaultErrorHandler.response_filters": "HttpResponseFilter",
3642
# DefaultPaginator

0 commit comments

Comments
 (0)