Skip to content

Commit c035138

Browse files
feat: Add RecordExpander to declarative component schema
- Add RecordExpander definition to declarative_component_schema.yaml - Add record_expander property to DpathExtractor schema - Update create_dpath_extractor in model_to_component_factory.py to handle record_expander - Auto-generate models from schema using poetry run poe build - All 24 tests passing This completes the schema registration for RecordExpander component, enabling YAML manifests to properly instantiate RecordExpander when used with DpathExtractor. Co-Authored-By: unknown <>
1 parent 91690f4 commit c035138

File tree

3 files changed

+80
-15
lines changed

3 files changed

+80
-15
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1912,6 +1912,10 @@ definitions:
19121912
- ["data", "records"]
19131913
- ["data", "{{ parameters.name }}"]
19141914
- ["data", "*", "record"]
1915+
record_expander:
1916+
title: Record Expander
1917+
description: Optional component to expand records by extracting items from nested array fields.
1918+
"$ref": "#/definitions/RecordExpander"
19151919
$parameters:
19161920
type: object
19171921
additionalProperties: true
@@ -1928,6 +1932,37 @@ definitions:
19281932
$parameters:
19291933
type: object
19301934
additionalProperties: true
1935+
RecordExpander:
1936+
title: Record Expander
1937+
description: Expands records by extracting items from a nested array field. When configured, this component extracts items from a specified nested array path within each record and emits each item as a separate record. Optionally, the original parent record can be embedded in each expanded item for context preservation.
1938+
type: object
1939+
required:
1940+
- type
1941+
- expand_records_from_field
1942+
properties:
1943+
type:
1944+
type: string
1945+
enum: [RecordExpander]
1946+
expand_records_from_field:
1947+
title: Expand Records From Field
1948+
description: Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records.
1949+
type: array
1950+
items:
1951+
type: string
1952+
interpolation_context:
1953+
- config
1954+
examples:
1955+
- ["lines", "data"]
1956+
- ["items"]
1957+
- ["nested", "array"]
1958+
remain_original_record:
1959+
title: Remain Original Record
1960+
description: If true, each expanded record will include the original parent record in an "original_record" field. Defaults to false.
1961+
type: boolean
1962+
default: false
1963+
$parameters:
1964+
type: object
1965+
additionalProperties: true
19311966
ExponentialBackoffStrategy:
19321967
title: Exponential Backoff
19331968
description: Backoff strategy with an exponential backoff interval. The interval is defined as factor * 2^attempt_count.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -482,24 +482,24 @@ class Config:
482482
)
483483

484484

485-
class DpathExtractor(BaseModel):
486-
type: Literal["DpathExtractor"]
487-
field_path: List[str] = Field(
488-
...,
489-
description='List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).',
490-
examples=[
491-
["data"],
492-
["data", "records"],
493-
["data", "{{ parameters.name }}"],
494-
["data", "*", "record"],
495-
],
496-
title="Field Path",
497-
)
485+
class ResponseToFileExtractor(BaseModel):
486+
type: Literal["ResponseToFileExtractor"]
498487
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
499488

500489

501-
class ResponseToFileExtractor(BaseModel):
502-
type: Literal["ResponseToFileExtractor"]
490+
class RecordExpander(BaseModel):
491+
type: Literal["RecordExpander"]
492+
expand_records_from_field: List[str] = Field(
493+
...,
494+
description="Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records.",
495+
examples=[["lines", "data"], ["items"], ["nested", "array"]],
496+
title="Expand Records From Field",
497+
)
498+
remain_original_record: Optional[bool] = Field(
499+
False,
500+
description='If true, each expanded record will include the original parent record in an "original_record" field. Defaults to false.',
501+
title="Remain Original Record",
502+
)
503503
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
504504

505505

@@ -2034,6 +2034,27 @@ class DefaultPaginator(BaseModel):
20342034
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
20352035

20362036

2037+
class DpathExtractor(BaseModel):
2038+
type: Literal["DpathExtractor"]
2039+
field_path: List[str] = Field(
2040+
...,
2041+
description='List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).',
2042+
examples=[
2043+
["data"],
2044+
["data", "records"],
2045+
["data", "{{ parameters.name }}"],
2046+
["data", "*", "record"],
2047+
],
2048+
title="Field Path",
2049+
)
2050+
record_expander: Optional[RecordExpander] = Field(
2051+
None,
2052+
description="Optional component to expand records by extracting items from nested array fields.",
2053+
title="Record Expander",
2054+
)
2055+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
2056+
2057+
20372058
class SessionTokenRequestApiKeyAuthenticator(BaseModel):
20382059
type: Literal["ApiKey"]
20392060
inject_into: RequestOption = Field(

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2314,11 +2314,20 @@ def create_dpath_extractor(
23142314
else:
23152315
decoder_to_use = JsonDecoder(parameters={})
23162316
model_field_path: List[Union[InterpolatedString, str]] = [x for x in model.field_path]
2317+
2318+
record_expander = None
2319+
if hasattr(model, "record_expander") and model.record_expander:
2320+
record_expander = self._create_component_from_model(
2321+
model=model.record_expander,
2322+
config=config,
2323+
)
2324+
23172325
return DpathExtractor(
23182326
decoder=decoder_to_use,
23192327
field_path=model_field_path,
23202328
config=config,
23212329
parameters=model.parameters or {},
2330+
record_expander=record_expander,
23222331
)
23232332

23242333
@staticmethod

0 commit comments

Comments
 (0)