Skip to content

Commit 3036dfc

Browse files
committed
add tests for discover catalog output and creating DefaultStream in model to component factory
1 parent 241a88d commit 3036dfc

File tree

3 files changed

+222
-2
lines changed

3 files changed

+222
-2
lines changed

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5030,6 +5030,190 @@ def test_create_stream_with_multiple_schema_loaders():
50305030
assert isinstance(schema_loader.schema_loaders[1], InlineSchemaLoader)
50315031

50325032

5033+
@pytest.mark.parametrize(
5034+
"allow_catalog_defined_cursor_field,catalog_cursor_field,expected_cursor_field",
5035+
[
5036+
pytest.param(
5037+
True,
5038+
"custom_cursor_field",
5039+
"custom_cursor_field",
5040+
id="test_catalog_defined_cursor_field",
5041+
),
5042+
pytest.param(
5043+
True,
5044+
None,
5045+
"updated_at",
5046+
id="test_no_catalog_cursor_field_defaults_to_stream_defined_cursor_field",
5047+
),
5048+
pytest.param(
5049+
False,
5050+
"custom_cursor_field",
5051+
"updated_at",
5052+
id="test_allow_catalog_defined_cursor_field_false_defaults_to_stream_defined_cursor_field",
5053+
),
5054+
],
5055+
)
5056+
def test_catalog_defined_cursor_field(
5057+
allow_catalog_defined_cursor_field, catalog_cursor_field, expected_cursor_field
5058+
):
5059+
content = """
5060+
selector:
5061+
type: RecordSelector
5062+
extractor:
5063+
type: DpathExtractor
5064+
field_path: ["extractor_path"]
5065+
requester:
5066+
type: HttpRequester
5067+
name: "{{ parameters['name'] }}"
5068+
url_base: "https://api.sendgrid.com/v3/"
5069+
http_method: "GET"
5070+
list_stream:
5071+
type: DeclarativeStream
5072+
incremental_sync:
5073+
type: DatetimeBasedCursor
5074+
$parameters:
5075+
datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z"
5076+
start_datetime:
5077+
type: MinMaxDatetime
5078+
datetime: "{{ config.get('start_date', '1970-01-01T00:00:00.0Z') }}"
5079+
datetime_format: "%Y-%m-%dT%H:%M:%S.%fZ"
5080+
cursor_field: "updated_at"
5081+
allow_catalog_defined_cursor_field: true
5082+
retriever:
5083+
type: SimpleRetriever
5084+
name: "{{ parameters['name'] }}"
5085+
paginator:
5086+
type: DefaultPaginator
5087+
pagination_strategy:
5088+
type: "CursorPagination"
5089+
cursor_value: "{{ response._metadata.next }}"
5090+
page_size: 10
5091+
requester:
5092+
$ref: "#/requester"
5093+
path: "/"
5094+
record_selector:
5095+
$ref: "#/selector"
5096+
$parameters:
5097+
name: "lists"
5098+
"""
5099+
5100+
configured_catalog = ConfiguredAirbyteCatalog(
5101+
streams=[
5102+
ConfiguredAirbyteStream(
5103+
stream=AirbyteStream(
5104+
name="lists",
5105+
json_schema={"type": "object", "properties": {"id": {"type": "string"}}},
5106+
supported_sync_modes=[SyncMode.incremental, SyncMode.full_refresh],
5107+
),
5108+
sync_mode=SyncMode.incremental,
5109+
destination_sync_mode=DestinationSyncMode.overwrite,
5110+
cursor_field=[catalog_cursor_field] if catalog_cursor_field else None,
5111+
)
5112+
]
5113+
)
5114+
5115+
model_to_component_factory = ModelToComponentFactory(
5116+
configured_catalog=configured_catalog,
5117+
)
5118+
5119+
parsed_manifest = YamlDeclarativeSource._parse(content)
5120+
resolved_manifest = resolver.preprocess_manifest(parsed_manifest)
5121+
stream_manifest = transformer.propagate_types_and_parameters(
5122+
"", resolved_manifest["list_stream"], {}
5123+
)
5124+
5125+
stream_manifest["incremental_sync"]["allow_catalog_defined_cursor_field"] = (
5126+
allow_catalog_defined_cursor_field
5127+
)
5128+
5129+
stream: DefaultStream = model_to_component_factory.create_component(
5130+
model_type=DeclarativeStreamModel, component_definition=stream_manifest, config=input_config
5131+
)
5132+
5133+
assert stream.cursor_field == expected_cursor_field
5134+
assert stream._cursor_field.cursor_field_key == expected_cursor_field
5135+
assert (
5136+
stream._cursor_field.supports_catalog_defined_cursor_field
5137+
== allow_catalog_defined_cursor_field
5138+
)
5139+
5140+
5141+
def test_catalog_defined_cursor_field_stream_missing():
5142+
content = """
5143+
selector:
5144+
type: RecordSelector
5145+
extractor:
5146+
type: DpathExtractor
5147+
field_path: ["extractor_path"]
5148+
requester:
5149+
type: HttpRequester
5150+
name: "{{ parameters['name'] }}"
5151+
url_base: "https://api.sendgrid.com/v3/"
5152+
http_method: "GET"
5153+
list_stream:
5154+
type: DeclarativeStream
5155+
incremental_sync:
5156+
type: DatetimeBasedCursor
5157+
$parameters:
5158+
datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z"
5159+
start_datetime:
5160+
type: MinMaxDatetime
5161+
datetime: "{{ config.get('start_date', '1970-01-01T00:00:00.0Z') }}"
5162+
datetime_format: "%Y-%m-%dT%H:%M:%S.%fZ"
5163+
cursor_field: "updated_at"
5164+
allow_catalog_defined_cursor_field: true
5165+
retriever:
5166+
type: SimpleRetriever
5167+
name: "{{ parameters['name'] }}"
5168+
paginator:
5169+
type: DefaultPaginator
5170+
pagination_strategy:
5171+
type: "CursorPagination"
5172+
cursor_value: "{{ response._metadata.next }}"
5173+
page_size: 10
5174+
requester:
5175+
$ref: "#/requester"
5176+
path: "/"
5177+
record_selector:
5178+
$ref: "#/selector"
5179+
$parameters:
5180+
name: "lists"
5181+
"""
5182+
5183+
configured_catalog = ConfiguredAirbyteCatalog(
5184+
streams=[
5185+
ConfiguredAirbyteStream(
5186+
stream=AirbyteStream(
5187+
name="other_stream",
5188+
json_schema={"type": "object", "properties": {"id": {"type": "string"}}},
5189+
supported_sync_modes=[SyncMode.incremental, SyncMode.full_refresh],
5190+
),
5191+
sync_mode=SyncMode.incremental,
5192+
destination_sync_mode=DestinationSyncMode.overwrite,
5193+
cursor_field=["custom_cursor_field"],
5194+
)
5195+
]
5196+
)
5197+
5198+
model_to_component_factory = ModelToComponentFactory(
5199+
configured_catalog=configured_catalog,
5200+
)
5201+
5202+
parsed_manifest = YamlDeclarativeSource._parse(content)
5203+
resolved_manifest = resolver.preprocess_manifest(parsed_manifest)
5204+
stream_manifest = transformer.propagate_types_and_parameters(
5205+
"", resolved_manifest["list_stream"], {}
5206+
)
5207+
5208+
stream: DefaultStream = model_to_component_factory.create_component(
5209+
model_type=DeclarativeStreamModel, component_definition=stream_manifest, config=input_config
5210+
)
5211+
5212+
assert stream.cursor_field == "updated_at"
5213+
assert stream._cursor_field.cursor_field_key == "updated_at"
5214+
assert stream._cursor_field.supports_catalog_defined_cursor_field == True
5215+
5216+
50335217
def get_schema_loader(stream: DefaultStream):
50345218
assert isinstance(
50355219
stream._stream_partition_generator._partition_factory._schema_loader,

unit_tests/sources/streams/concurrent/test_adapters.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
StreamPartition,
2323
StreamPartitionGenerator,
2424
)
25-
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, CursorField
25+
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
2626
from airbyte_cdk.sources.streams.concurrent.exceptions import ExceptionWithDisplayMessage
2727
from airbyte_cdk.sources.streams.core import Stream
2828
from airbyte_cdk.sources.types import Record
@@ -328,7 +328,6 @@ def test_create_from_stream_stream(self):
328328

329329
assert facade.name == "stream"
330330
assert facade.cursor_field == "cursor"
331-
# assert facade.cursor_field == CursorField(cursor_field_key="cursor")
332331
assert facade._abstract_stream._primary_key == ["id"]
333332

334333
def test_create_from_stream_stream_with_none_primary_key(self):

unit_tests/sources/streams/concurrent/test_default_stream.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,43 @@ def test_as_airbyte_stream_with_file_transfer_support(self):
272272

273273
assert actual_airbyte_stream == expected_airbyte_stream
274274

275+
def test_as_airbyte_stream_with_a_catalog_defined_cursor(self):
276+
json_schema = {
277+
"type": "object",
278+
"properties": {
279+
"id": {"type": ["null", "string"]},
280+
"date": {"type": ["null", "string"]},
281+
},
282+
}
283+
stream = DefaultStream(
284+
self._partition_generator,
285+
self._name,
286+
json_schema,
287+
self._primary_key,
288+
CursorField(cursor_field_key="date", supports_catalog_defined_cursor_field=True),
289+
self._logger,
290+
FinalStateCursor(
291+
stream_name=self._name,
292+
stream_namespace=None,
293+
message_repository=self._message_repository,
294+
),
295+
)
296+
297+
expected_airbyte_stream = AirbyteStream(
298+
name=self._name,
299+
json_schema=json_schema,
300+
supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental],
301+
source_defined_cursor=False,
302+
default_cursor_field=["date"],
303+
source_defined_primary_key=None,
304+
namespace=None,
305+
is_resumable=True,
306+
is_file_based=False,
307+
)
308+
309+
airbyte_stream = stream.as_airbyte_stream()
310+
assert airbyte_stream == expected_airbyte_stream
311+
275312
def test_given_no_partitions_when_get_availability_then_unavailable(self) -> None:
276313
self._partition_generator.generate.return_value = []
277314

0 commit comments

Comments
 (0)