Skip to content

Commit 949872b

Browse files
committed
Add Combined Extractor
1 parent fa8d54d commit 949872b

File tree

10 files changed

+373
-70
lines changed

10 files changed

+373
-70
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1741,6 +1741,54 @@ definitions:
17411741
$parameters:
17421742
type: object
17431743
additionalProperties: true
1744+
KeyValueExtractor:
1745+
title: Key Value Extractor
1746+
description: Record extractor that extract with .
1747+
type: object
1748+
required:
1749+
- type
1750+
- keys_extractor
1751+
- values_extractor
1752+
properties:
1753+
type:
1754+
type: string
1755+
enum: [ KeyValueExtractor ]
1756+
keys_extractor:
1757+
description: placeholder
1758+
anyOf:
1759+
- "$ref": "#/definitions/DpathExtractor"
1760+
- "$ref": "#/definitions/CustomRecordExtractor"
1761+
values_extractor:
1762+
description: placeholder
1763+
anyOf:
1764+
- "$ref": "#/definitions/DpathExtractor"
1765+
- "$ref": "#/definitions/CustomRecordExtractor"
1766+
$parameters:
1767+
type: object
1768+
additionalProperties: true
1769+
CombinedExtractor:
1770+
title: Combined Extractor
1771+
description: Record extractor that extract with .
1772+
type: object
1773+
required:
1774+
- type
1775+
- extractors
1776+
properties:
1777+
type:
1778+
type: string
1779+
enum: [ CombinedExtractor ]
1780+
extractors:
1781+
description: placeholder
1782+
type: array
1783+
items:
1784+
anyOf:
1785+
- "$ref": "#/definitions/DpathExtractor"
1786+
- "$ref": "#/definitions/CombinedExtractor"
1787+
- "$ref": "#/definitions/KeyValueExtractor"
1788+
- "$ref": "#/definitions/CustomRecordExtractor"
1789+
$parameters:
1790+
type: object
1791+
additionalProperties: true
17441792
DpathExtractor:
17451793
title: Dpath Extractor
17461794
description: Record extractor that searches a decoded response over a path defined as an array of fields.
@@ -2318,6 +2366,12 @@ definitions:
23182366
- "$ref": "#/definitions/AsyncRetriever"
23192367
- "$ref": "#/definitions/CustomRetriever"
23202368
- "$ref": "#/definitions/SimpleRetriever"
2369+
schema_filter:
2370+
title: Schema Filter
2371+
description: placeholder
2372+
anyOf:
2373+
- "$ref": "#/definitions/RecordFilter"
2374+
- "$ref": "#/definitions/CustomRecordFilter"
23212375
schema_transformations:
23222376
title: Schema Transformations
23232377
description: A list of transformations to be applied to the schema.
@@ -3315,6 +3369,8 @@ definitions:
33153369
extractor:
33163370
anyOf:
33173371
- "$ref": "#/definitions/DpathExtractor"
3372+
- "$ref": "#/definitions/CombinedExtractor"
3373+
- "$ref": "#/definitions/KeyValueExtractor"
33183374
- "$ref": "#/definitions/CustomRecordExtractor"
33193375
record_filter:
33203376
title: Record Filter
@@ -3994,6 +4050,9 @@ definitions:
39944050
title: Value Type
39954051
description: The expected data type of the value. If omitted, the type will be inferred from the value provided.
39964052
"$ref": "#/definitions/ValueType"
4053+
create_or_update:
4054+
type: boolean
4055+
default: false
39974056
$parameters:
39984057
type: object
39994058
additionalProperties: true
@@ -4045,6 +4104,10 @@ definitions:
40454104
- ["data"]
40464105
- ["data", "streams"]
40474106
- ["data", "{{ parameters.name }}"]
4107+
default_values:
4108+
title: Default Values
4109+
description: placeholder
4110+
type: array
40484111
$parameters:
40494112
type: object
40504113
additionalProperties: true
@@ -4056,7 +4119,11 @@ definitions:
40564119
type: string
40574120
enum: [ConfigComponentsResolver]
40584121
stream_config:
4059-
"$ref": "#/definitions/StreamConfig"
4122+
anyOf:
4123+
- type: array
4124+
items:
4125+
"$ref": "#/definitions/StreamConfig"
4126+
- "$ref": "#/definitions/StreamConfig"
40604127
components_mapping:
40614128
type: array
40624129
items:

airbyte_cdk/sources/declarative/extractors/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
#
44

55
from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor
6+
from airbyte_cdk.sources.declarative.extractors.key_value_extractor import KeyValueExtractor
7+
from airbyte_cdk.sources.declarative.extractors.combined_extractor import CombinedExtractor
68
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
79
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
810
from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector
@@ -18,4 +20,6 @@
1820
"RecordFilter",
1921
"RecordSelector",
2022
"ResponseToFileExtractor",
23+
"KeyValueExtractor",
24+
"CombinedExtractor",
2125
]
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from dataclasses import dataclass
6+
from typing import Any, Iterable, List, MutableMapping
7+
8+
import requests
9+
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
10+
11+
12+
@dataclass
13+
class CombinedExtractor(RecordExtractor):
14+
"""
15+
Extractor that merges the output of multiple sub-extractors into a single record.
16+
17+
This extractor takes a list of `RecordExtractor` instances (`extractors`), each of which
18+
independently extracts records from the response. For each response, the extractor:
19+
20+
1. Invokes each sub-extractor to generate iterables of records.
21+
2. Zips the results together, so that the first record from each extractor is combined,
22+
the second from each, and so on.
23+
3. Merges each group of records into a single dictionary using `dict.update()`.
24+
25+
The result is a sequence of dictionaries where each dictionary contains the merged keys
26+
and values from the corresponding records across all extractors.
27+
28+
Example:
29+
keys_extractor -> yields: [{"name": "Alice", "age": 30}]
30+
extra_data_extractor -> yields: [{"country": "US"}]
31+
CombinedExtractor(extractors=[keys_extractor, extra_data_extractor]) ->
32+
yields: [{"name": "Alice", "age": 30, "country": "US"}]
33+
"""
34+
35+
extractors: List[RecordExtractor]
36+
37+
def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]:
38+
extractors_records = [extractor.extract_records(response) for extractor in self.extractors]
39+
40+
for records in zip(*extractors_records):
41+
merged = {}
42+
for record in records:
43+
merged.update(record) # merge all fields
44+
yield merged
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from dataclasses import dataclass
6+
from typing import Any, Iterable, MutableMapping
7+
8+
import requests
9+
from itertools import islice
10+
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
11+
12+
13+
@dataclass
14+
class KeyValueExtractor(RecordExtractor):
15+
"""
16+
Extractor that combines keys and values from two separate extractors.
17+
18+
The `keys_extractor` and `values_extractor` extract records independently
19+
from the response. Their outputs are zipped together to form key-value mappings.
20+
21+
Each key from `keys_extractor` should correspond to a key in the resulting dictionary,
22+
and each value from `values_extractor` is the value for that key.
23+
24+
Example:
25+
keys_extractor -> yields: ["name", "age"]
26+
values_extractor -> yields: ["Alice", 30]
27+
result: { "name": "Alice", "age": 30 }
28+
"""
29+
30+
keys_extractor: RecordExtractor
31+
values_extractor: RecordExtractor
32+
33+
def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]:
34+
keys = list(self.keys_extractor.extract_records(response))
35+
values = self.values_extractor.extract_records(response)
36+
37+
while True:
38+
chunk = list(islice(values, len(keys)))
39+
if not chunk:
40+
break
41+
yield dict(zip(keys, chunk))

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -262,9 +262,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
262262
}
263263
)
264264

265-
stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
266-
self._source_config, config
267-
)
265+
stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams
268266

269267
api_budget_model = self._source_config.get("api_budget")
270268
if api_budget_model:

0 commit comments

Comments
 (0)