Skip to content

Commit 7963144

Browse files
committed
Add strems limit to full resolve namifest command
1 parent b9f84d5 commit 7963144

File tree

3 files changed

+40
-90
lines changed

3 files changed

+40
-90
lines changed

airbyte_cdk/connector_builder/connector_builder_handler.py

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55

66
from dataclasses import asdict, dataclass, field
7-
from typing import Any, List, Mapping
7+
from typing import Any, List, Mapping, Dict
88

99
from airbyte_cdk.connector_builder.test_reader import TestReader
1010
from airbyte_cdk.models import (
@@ -27,30 +27,34 @@
2727
DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE = 5
2828
DEFAULT_MAXIMUM_NUMBER_OF_SLICES = 5
2929
DEFAULT_MAXIMUM_RECORDS = 100
30+
DEFAULT_MAXIMUM_STREAMS = 100
3031

3132
MAX_PAGES_PER_SLICE_KEY = "max_pages_per_slice"
3233
MAX_SLICES_KEY = "max_slices"
3334
MAX_RECORDS_KEY = "max_records"
35+
MAX_STREAMS_KEY = "max_streams"
3436

3537

3638
@dataclass
37-
class TestReadLimits:
39+
class TestLimits:
3840
max_records: int = field(default=DEFAULT_MAXIMUM_RECORDS)
3941
max_pages_per_slice: int = field(default=DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE)
4042
max_slices: int = field(default=DEFAULT_MAXIMUM_NUMBER_OF_SLICES)
43+
max_streams: int = field(default=DEFAULT_MAXIMUM_STREAMS)
4144

4245

43-
def get_limits(config: Mapping[str, Any]) -> TestReadLimits:
46+
def get_limits(config: Mapping[str, Any]) -> TestLimits:
4447
command_config = config.get("__test_read_config", {})
4548
max_pages_per_slice = (
4649
command_config.get(MAX_PAGES_PER_SLICE_KEY) or DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE
4750
)
4851
max_slices = command_config.get(MAX_SLICES_KEY) or DEFAULT_MAXIMUM_NUMBER_OF_SLICES
4952
max_records = command_config.get(MAX_RECORDS_KEY) or DEFAULT_MAXIMUM_RECORDS
50-
return TestReadLimits(max_records, max_pages_per_slice, max_slices)
53+
max_streams = command_config.get(MAX_STREAMS_KEY) or DEFAULT_MAXIMUM_RECORDS
54+
return TestLimits(max_records, max_pages_per_slice, max_slices, max_streams)
5155

5256

53-
def create_source(config: Mapping[str, Any], limits: TestReadLimits) -> ManifestDeclarativeSource:
57+
def create_source(config: Mapping[str, Any], limits: TestLimits) -> ManifestDeclarativeSource:
5458
manifest = config["__injected_declarative_manifest"]
5559
return ManifestDeclarativeSource(
5660
config=config,
@@ -71,7 +75,7 @@ def read_stream(
7175
config: Mapping[str, Any],
7276
configured_catalog: ConfiguredAirbyteCatalog,
7377
state: List[AirbyteStateMessage],
74-
limits: TestReadLimits,
78+
limits: TestLimits,
7579
) -> AirbyteMessage:
7680
try:
7781
test_read_handler = TestReader(
@@ -117,13 +121,24 @@ def resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage:
117121
return error.as_airbyte_message()
118122

119123

120-
def full_resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage:
124+
def full_resolve_manifest(source: ManifestDeclarativeSource, limits: TestLimits) -> AirbyteMessage:
121125
try:
126+
122127
manifest = {**source.resolved_manifest}
123128
streams = manifest.get("streams", [])
124129
for stream in streams:
125130
stream["dynamic_stream_name"] = None
126-
streams.extend(source.dynamic_streams)
131+
132+
mapped_streams: Dict[str, List[Dict[str, Any]]] = {}
133+
for stream in source.dynamic_streams:
134+
generated_streams = mapped_streams.setdefault(stream["dynamic_stream_name"], [])
135+
136+
if len(generated_streams) < limits.max_streams:
137+
generated_streams += [stream]
138+
139+
for generated_streams_list in mapped_streams.values():
140+
streams.extend(generated_streams_list)
141+
127142
manifest["streams"] = streams
128143
return AirbyteMessage(
129144
type=Type.RECORD,

airbyte_cdk/connector_builder/main.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
from airbyte_cdk.connector import BaseConnector
1212
from airbyte_cdk.connector_builder.connector_builder_handler import (
13-
TestReadLimits,
13+
TestLimits,
1414
create_source,
1515
full_resolve_manifest,
1616
get_limits,
@@ -73,7 +73,7 @@ def handle_connector_builder_request(
7373
config: Mapping[str, Any],
7474
catalog: Optional[ConfiguredAirbyteCatalog],
7575
state: List[AirbyteStateMessage],
76-
limits: TestReadLimits,
76+
limits: TestLimits,
7777
) -> AirbyteMessage:
7878
if command == "resolve_manifest":
7979
return resolve_manifest(source)
@@ -83,7 +83,7 @@ def handle_connector_builder_request(
8383
), "`test_read` requires a valid `ConfiguredAirbyteCatalog`, got None."
8484
return read_stream(source, config, catalog, state, limits)
8585
elif command == "full_resolve_manifest":
86-
return full_resolve_manifest(source)
86+
return full_resolve_manifest(source, limits)
8787
else:
8888
raise ValueError(f"Unrecognized command {command}.")
8989

unit_tests/connector_builder/test_connector_builder_handler.py

Lines changed: 14 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE,
2121
DEFAULT_MAXIMUM_NUMBER_OF_SLICES,
2222
DEFAULT_MAXIMUM_RECORDS,
23-
TestReadLimits,
23+
TestLimits,
2424
create_source,
2525
get_limits,
2626
resolve_manifest,
@@ -384,6 +384,7 @@
384384
RESOLVE_DYNAMIC_STREAM_MANIFEST_CONFIG = {
385385
"__injected_declarative_manifest": DYNAMIC_STREAM_MANIFEST,
386386
"__command": "full_resolve_manifest",
387+
"__test_read_config": {"max_streams": 2},
387388
}
388389

389390
TEST_READ_CONFIG = {
@@ -524,7 +525,7 @@ def test_resolve_manifest(valid_resolve_manifest_config_file):
524525
command = "resolve_manifest"
525526
config["__command"] = command
526527
source = ManifestDeclarativeSource(source_config=MANIFEST)
527-
limits = TestReadLimits()
528+
limits = TestLimits()
528529
resolved_manifest = handle_connector_builder_request(
529530
source, command, config, create_configured_catalog("dummy_stream"), _A_STATE, limits
530531
)
@@ -728,7 +729,7 @@ def test_read():
728729
emitted_at=1,
729730
),
730731
)
731-
limits = TestReadLimits()
732+
limits = TestLimits()
732733
with patch(
733734
"airbyte_cdk.connector_builder.test_reader.TestReader.run_test_read",
734735
return_value=stream_read,
@@ -789,7 +790,7 @@ def test_config_update() -> None:
789790
config,
790791
ConfiguredAirbyteCatalogSerializer.load(CONFIGURED_CATALOG),
791792
_A_PER_PARTITION_STATE,
792-
TestReadLimits(),
793+
TestLimits(),
793794
)
794795
assert output.record.data["latest_config_update"]
795796

@@ -825,7 +826,7 @@ def check_config_against_spec(self) -> Literal[False]:
825826
mock_from_exception.return_value = stack_trace
826827

827828
source = MockManifestDeclarativeSource()
828-
limits = TestReadLimits()
829+
limits = TestLimits()
829830
response = read_stream(
830831
source,
831832
TEST_READ_CONFIG,
@@ -865,7 +866,7 @@ def test_handle_429_response():
865866
] = {"backoff_strategies": [{"type": "ConstantBackoffStrategy", "backoff_time_in_seconds": 5}]}
866867

867868
config = TEST_READ_CONFIG
868-
limits = TestReadLimits()
869+
limits = TestLimits()
869870
source = create_source(config, limits)
870871

871872
with patch("requests.Session.send", return_value=response) as mock_send:
@@ -982,7 +983,7 @@ def test_create_source():
982983
max_records = 3
983984
max_pages_per_slice = 2
984985
max_slices = 1
985-
limits = TestReadLimits(max_records, max_pages_per_slice, max_slices)
986+
limits = TestLimits(max_records, max_pages_per_slice, max_slices)
986987

987988
config = {"__injected_declarative_manifest": MANIFEST}
988989

@@ -1064,7 +1065,7 @@ def test_read_source(mock_http_stream):
10641065
max_records = 100
10651066
max_pages_per_slice = 2
10661067
max_slices = 3
1067-
limits = TestReadLimits(max_records, max_pages_per_slice, max_slices)
1068+
limits = TestLimits(max_records, max_pages_per_slice, max_slices)
10681069

10691070
catalog = ConfiguredAirbyteCatalog(
10701071
streams=[
@@ -1111,7 +1112,7 @@ def test_read_source_single_page_single_slice(mock_http_stream):
11111112
max_records = 100
11121113
max_pages_per_slice = 1
11131114
max_slices = 1
1114-
limits = TestReadLimits(max_records, max_pages_per_slice, max_slices)
1115+
limits = TestLimits(max_records, max_pages_per_slice, max_slices)
11151116

11161117
catalog = ConfiguredAirbyteCatalog(
11171118
streams=[
@@ -1195,7 +1196,7 @@ def test_handle_read_external_requests(deployment_mode, url_base, expected_error
11951196
endpoints when running on Cloud or OSS deployments
11961197
"""
11971198

1198-
limits = TestReadLimits(max_records=100, max_pages_per_slice=1, max_slices=1)
1199+
limits = TestLimits(max_records=100, max_pages_per_slice=1, max_slices=1)
11991200

12001201
catalog = ConfiguredAirbyteCatalog(
12011202
streams=[
@@ -1281,7 +1282,7 @@ def test_handle_read_external_oauth_request(deployment_mode, token_url, expected
12811282
endpoints when running on Cloud or OSS deployments
12821283
"""
12831284

1284-
limits = TestReadLimits(max_records=100, max_pages_per_slice=1, max_slices=1)
1285+
limits = TestLimits(max_records=100, max_pages_per_slice=1, max_slices=1)
12851286

12861287
catalog = ConfiguredAirbyteCatalog(
12871288
streams=[
@@ -1339,7 +1340,7 @@ def test_read_stream_exception_with_secrets():
13391340
]
13401341
)
13411342
state = []
1342-
limits = TestReadLimits()
1343+
limits = TestLimits()
13431344

13441345
# Add the secret to be filtered
13451346
update_secrets([config["api_key"]])
@@ -1367,7 +1368,7 @@ def test_full_resolve_manifest(valid_resolve_manifest_config_file):
13671368
config = copy.deepcopy(RESOLVE_DYNAMIC_STREAM_MANIFEST_CONFIG)
13681369
command = config["__command"]
13691370
source = ManifestDeclarativeSource(source_config=DYNAMIC_STREAM_MANIFEST)
1370-
limits = TestReadLimits()
1371+
limits = TestLimits(max_streams=2)
13711372
with HttpMocker() as http_mocker:
13721373
http_mocker.get(
13731374
HttpRequest(url="https://api.test.com/parents"),
@@ -1625,72 +1626,6 @@ def test_full_resolve_manifest(valid_resolve_manifest_config_file):
16251626
},
16261627
"dynamic_stream_name": "TestDynamicStream",
16271628
},
1628-
{
1629-
"type": "DeclarativeStream",
1630-
"name": "parent_2_item_1",
1631-
"primary_key": [],
1632-
"schema_loader": {
1633-
"type": "InlineSchemaLoader",
1634-
"schema": {
1635-
"$schema": "http://json-schema.org/schema#",
1636-
"properties": {"ABC": {"type": "number"}, "AED": {"type": "number"}},
1637-
"type": "object",
1638-
},
1639-
},
1640-
"retriever": {
1641-
"type": "SimpleRetriever",
1642-
"requester": {
1643-
"type": "HttpRequester",
1644-
"url_base": "https://api.test.com",
1645-
"path": "2/1",
1646-
"http_method": "GET",
1647-
"authenticator": {
1648-
"type": "ApiKeyAuthenticator",
1649-
"header": "apikey",
1650-
"api_token": "{{ config['api_key'] }}",
1651-
},
1652-
},
1653-
"record_selector": {
1654-
"type": "RecordSelector",
1655-
"extractor": {"type": "DpathExtractor", "field_path": []},
1656-
},
1657-
"paginator": {"type": "NoPagination"},
1658-
},
1659-
"dynamic_stream_name": "TestDynamicStream",
1660-
},
1661-
{
1662-
"type": "DeclarativeStream",
1663-
"name": "parent_2_item_2",
1664-
"primary_key": [],
1665-
"schema_loader": {
1666-
"type": "InlineSchemaLoader",
1667-
"schema": {
1668-
"$schema": "http://json-schema.org/schema#",
1669-
"properties": {"ABC": {"type": "number"}, "AED": {"type": "number"}},
1670-
"type": "object",
1671-
},
1672-
},
1673-
"retriever": {
1674-
"type": "SimpleRetriever",
1675-
"requester": {
1676-
"type": "HttpRequester",
1677-
"url_base": "https://api.test.com",
1678-
"path": "2/2",
1679-
"http_method": "GET",
1680-
"authenticator": {
1681-
"type": "ApiKeyAuthenticator",
1682-
"header": "apikey",
1683-
"api_token": "{{ config['api_key'] }}",
1684-
},
1685-
},
1686-
"record_selector": {
1687-
"type": "RecordSelector",
1688-
"extractor": {"type": "DpathExtractor", "field_path": []},
1689-
},
1690-
"paginator": {"type": "NoPagination"},
1691-
},
1692-
"dynamic_stream_name": "TestDynamicStream",
1693-
},
16941629
],
16951630
"check": {"type": "CheckStream", "stream_names": ["lists"]},
16961631
"spec": {

0 commit comments

Comments
 (0)