Skip to content

Commit 739c429

Browse files
committed
Add unit tests
1 parent a2ce98b commit 739c429

File tree

5 files changed

+46
-46
lines changed

5 files changed

+46
-46
lines changed

airbyte_cdk/sources/declarative/checks/check_stream.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
from typing import Any, List, Mapping, Tuple
99

1010
from airbyte_cdk import AbstractSource
11-
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
1211
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
1312
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
1413

@@ -74,15 +73,14 @@ def check_connection(
7473
)
7574
return False, f"Unable to connect to stream {stream_name} - {error}"
7675

77-
if isinstance(source, ManifestDeclarativeSource) and self.dynamic_streams_check_configs:
78-
dynamic_stream_name_to_dynamic_stream = {dynamic_stream["name"]: dynamic_stream for dynamic_stream in
79-
source.resolved_manifest.get("dynamic_streams", [])}
76+
if hasattr(source, "resolved_manifest") and hasattr(source, "dynamic_streams") and self.dynamic_streams_check_configs:
77+
dynamic_stream_name_to_dynamic_stream = {dynamic_stream.get("name", f"dynamic_stream_{i}"): dynamic_stream for i, dynamic_stream in enumerate(source.resolved_manifest.get("dynamic_streams", []))}
8078

8179
dynamic_stream_name_to_generated_streams = {}
8280
for stream in source.dynamic_streams:
8381
dynamic_stream_name_to_generated_streams[
8482
stream["dynamic_stream_name"]] = dynamic_stream_name_to_generated_streams.setdefault(
85-
stream["dynamic_stream_name"], []).append(stream)
83+
stream["dynamic_stream_name"], []) + [stream]
8684

8785
for dynamic_streams_check_config in self.dynamic_streams_check_configs:
8886
dynamic_stream = dynamic_stream_name_to_dynamic_stream.get(dynamic_streams_check_config.dynamic_stream_name)
@@ -95,7 +93,8 @@ def check_connection(
9593
generated_streams = dynamic_stream_name_to_generated_streams.get(dynamic_streams_check_config.dynamic_stream_name)
9694
availability_strategy = HttpAvailabilityStrategy()
9795

98-
for stream in generated_streams[: min(dynamic_streams_check_config.stream_count, len(generated_streams))]:
96+
for declarative_stream in generated_streams[: min(dynamic_streams_check_config.stream_count, len(generated_streams))]:
97+
stream = stream_name_to_stream.get(declarative_stream["name"])
9998
try:
10099
stream_is_available, reason = availability_strategy.check_availability(
101100
stream, logger

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -316,11 +316,6 @@ definitions:
316316
type: object
317317
required:
318318
- type
319-
anyOf:
320-
- required:
321-
- stream_names
322-
- required:
323-
- dynamic_streams_check_configs
324319
properties:
325320
type:
326321
type: string

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1533,34 +1533,15 @@ class AuthFlow(BaseModel):
15331533
oauth_config_specification: Optional[OAuthConfigSpecification] = None
15341534

15351535

1536-
class CheckStream1(BaseModel):
1537-
type: Literal["CheckStream"]
1538-
stream_names: List[str] = Field(
1539-
...,
1540-
description="Names of the streams to try reading from when running a check operation.",
1541-
examples=[["users"], ["users", "contacts"]],
1542-
title="Stream Names",
1543-
)
1544-
dynamic_streams_check_configs: Optional[List[DynamicStreamCheckConfig]] = None
1545-
1546-
1547-
class CheckStream2(BaseModel):
1536+
class CheckStream(BaseModel):
15481537
type: Literal["CheckStream"]
15491538
stream_names: Optional[List[str]] = Field(
15501539
None,
15511540
description="Names of the streams to try reading from when running a check operation.",
15521541
examples=[["users"], ["users", "contacts"]],
15531542
title="Stream Names",
15541543
)
1555-
dynamic_streams_check_configs: List[DynamicStreamCheckConfig]
1556-
1557-
1558-
class CheckStream(BaseModel):
1559-
__root__: Union[CheckStream1, CheckStream2] = Field(
1560-
...,
1561-
description="Defines the streams to try reading when running a check operation.",
1562-
title="Streams to Check",
1563-
)
1544+
dynamic_streams_check_configs: Optional[List[DynamicStreamCheckConfig]] = None
15641545

15651546

15661547
class IncrementingCountCursor(BaseModel):

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -957,9 +957,9 @@ def create_check_stream(self, model: CheckStreamModel, config: Config, **kwargs:
957957
dynamic_streams_check_configs = [
958958
self._create_component_from_model(model=dynamic_stream_check_config, config=config)
959959
for dynamic_stream_check_config in model.dynamic_streams_check_configs
960-
]
960+
] if model.dynamic_streams_check_configs else []
961961

962-
return CheckStream(stream_names=model.stream_names or [], dynamic_streams_check_configs=dynamic_streams_check_configs or [], parameters={})
962+
return CheckStream(stream_names=model.stream_names or [], dynamic_streams_check_configs=dynamic_streams_check_configs, parameters={})
963963

964964
@staticmethod
965965
def create_check_dynamic_stream(

unit_tests/sources/declarative/checks/test_check_stream.py

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,8 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
166166

167167

168168
_CONFIG = {"start_date": "2024-07-01T00:00:00.000Z", "custom_streams": [
169-
{"id": 1, "name": "item_1"},
170-
{"id": 2, "name": "item_2"},
169+
{"id": 3, "name": "item_3"},
170+
{"id": 4, "name": "item_4"},
171171
]}
172172

173173
_MANIFEST_WITHOUT_CHECK_COMPONENT = {
@@ -342,6 +342,17 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
342342
},
343343
"name": "static_stream",
344344
"primary_key": "id",
345+
"schema_loader": {
346+
"type": "InlineSchemaLoader",
347+
"schema": {
348+
"$schema": "http://json-schema.org/schema#",
349+
"properties": {
350+
"id": {"type": "integer"},
351+
"name": {"type": "string"},
352+
},
353+
"type": "object",
354+
},
355+
}
345356
}
346357
]
347358
}
@@ -355,18 +366,19 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
355366
pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"],
356367
"dynamic_streams_check_configs": [
357368
{"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream",
358-
"stream_count": 2}]}}, id="test_check_static_streams_and_http_dynamic_stream"),
369+
"stream_count": 1}]}}, id="test_check_static_streams_and_http_dynamic_stream"),
359370
pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"],
360371
"dynamic_streams_check_configs": [
361-
{"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream",
362-
"stream_count": 2}]}}, id="test_check_static_streams_and_config_dynamic_stream"),
363-
pytest.param({"check": {"type": "CheckStream", "dynamic_streams_check_configs": [
364-
{"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream", "stream_count": 2}]}},
372+
{"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "dynamic_stream_1",
373+
"stream_count": 1}]}}, id="test_check_static_streams_and_config_dynamic_stream"),
374+
pytest.param({"check": {"type": "CheckStream", "dynamic_streams_check_configs": [{"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "dynamic_stream_1",
375+
"stream_count": 1}, {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream"}]}},
365376
id="test_check_http_dynamic_stream_and_config_dynamic_stream"),
366377
pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"],
367378
"dynamic_streams_check_configs": [
368-
{"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream",
369-
"stream_count": 2}]}},
379+
{"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "dynamic_stream_1",
380+
"stream_count": 1},
381+
{"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream"}]}},
370382
id="test_check_static_streams_and_http_dynamic_stream_and_config_dynamic_stream"),
371383
],
372384
)
@@ -388,7 +400,10 @@ def test_check_stream(check_component):
388400

389401
item_request = HttpRequest(url="https://api.test.com/items/1")
390402
item_response = HttpResponse(body=json.dumps([]), status_code=200)
391-
item_request_count = 1
403+
http_mocker.get(item_request, item_response)
404+
405+
item_request = HttpRequest(url="https://api.test.com/items/3")
406+
item_response = HttpResponse(body=json.dumps([]), status_code=200)
392407
http_mocker.get(item_request, item_response)
393408

394409
source = ConcurrentDeclarativeSource(
@@ -400,6 +415,16 @@ def test_check_stream(check_component):
400415

401416
stream_is_available, reason = source.check_connection(logger, _CONFIG)
402417

403-
http_mocker.assert_number_of_calls(item_request, item_request_count)
404-
405418
assert stream_is_available
419+
420+
421+
def test_check_stream_only_type_provided():
422+
manifest = {**deepcopy(_MANIFEST_WITHOUT_CHECK_COMPONENT), **{"check": {"type": "CheckStream"}}}
423+
source = ConcurrentDeclarativeSource(
424+
source_config=manifest,
425+
config=_CONFIG,
426+
catalog=None,
427+
state=None,
428+
)
429+
with pytest.raises(ValueError):
430+
source.check_connection(logger, _CONFIG)

0 commit comments

Comments
 (0)