Skip to content

Commit 263e662

Browse files
committed
CDK: add test for concurrent sate cursor
Signed-off-by: Artem Inzhyyants <[email protected]>
1 parent 0cddc3a commit 263e662

File tree

1 file changed

+65
-0
lines changed

1 file changed

+65
-0
lines changed

unit_tests/sources/declarative/test_concurrent_declarative_source.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import copy
66
import json
7+
import math
78
from datetime import datetime, timedelta, timezone
89
from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple, Union
910
from unittest.mock import patch
@@ -43,6 +44,9 @@
4344
from airbyte_cdk.sources.streams.checkpoint import Cursor
4445
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor
4546
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
47+
from airbyte_cdk.sources.streams.concurrent.state_converters.incrementing_count_stream_state_converter import (
48+
IncrementingCountStreamStateConverter,
49+
)
4650
from airbyte_cdk.sources.streams.core import StreamData
4751
from airbyte_cdk.sources.types import Record, StreamSlice
4852
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse
@@ -230,6 +234,16 @@
230234
"inject_into": "request_parameter",
231235
},
232236
},
237+
"incremental_counting_cursor": {
238+
"type": "IncrementingCountCursor",
239+
"cursor_field": "id",
240+
"start_value": 0,
241+
"start_time_option": {
242+
"type": "RequestOption",
243+
"field_name": "since_id",
244+
"inject_into": "request_parameter",
245+
},
246+
},
233247
"base_stream": {"retriever": {"$ref": "#/definitions/retriever"}},
234248
"base_incremental_stream": {
235249
"retriever": {
@@ -238,6 +252,13 @@
238252
},
239253
"incremental_sync": {"$ref": "#/definitions/incremental_cursor"},
240254
},
255+
"base_incremental_counting_stream": {
256+
"retriever": {
257+
"$ref": "#/definitions/retriever",
258+
"requester": {"$ref": "#/definitions/requester"},
259+
},
260+
"incremental_sync": {"$ref": "#/definitions/incremental_counting_cursor"},
261+
},
241262
"party_members_stream": {
242263
"$ref": "#/definitions/base_incremental_stream",
243264
"retriever": {
@@ -527,6 +548,35 @@
527548
},
528549
},
529550
},
551+
"incremental_counting_stream": {
552+
"$ref": "#/definitions/base_incremental_counting_stream",
553+
"retriever": {
554+
"$ref": "#/definitions/base_incremental_counting_stream/retriever",
555+
"record_selector": {"$ref": "#/definitions/selector"},
556+
},
557+
"$parameters": {
558+
"name": "incremental_counting_stream",
559+
"primary_key": "id",
560+
"path": "/party_members",
561+
},
562+
"schema_loader": {
563+
"type": "InlineSchemaLoader",
564+
"schema": {
565+
"$schema": "https://json-schema.org/draft-07/schema#",
566+
"type": "object",
567+
"properties": {
568+
"id": {
569+
"description": "The identifier",
570+
"type": ["null", "string"],
571+
},
572+
"name": {
573+
"description": "The name of the party member",
574+
"type": ["null", "string"],
575+
},
576+
},
577+
},
578+
},
579+
},
530580
},
531581
"streams": [
532582
"#/definitions/party_members_stream",
@@ -536,6 +586,7 @@
536586
"#/definitions/arcana_personas_stream",
537587
"#/definitions/palace_enemies_stream",
538588
"#/definitions/async_job_stream",
589+
"#/definitions/incremental_counting_stream",
539590
],
540591
"check": {"stream_names": ["party_members", "locations"]},
541592
"concurrency_level": {
@@ -756,6 +807,20 @@ def test_create_concurrent_cursor():
756807
"state_type": "date-range",
757808
}
758809

810+
incremental_counting_stream = concurrent_streams[7]
811+
assert isinstance(incremental_counting_stream, DefaultStream)
812+
incremental_counting_cursor = incremental_counting_stream.cursor
813+
814+
assert isinstance(incremental_counting_cursor, ConcurrentCursor)
815+
assert isinstance(
816+
incremental_counting_cursor._connector_state_converter,
817+
IncrementingCountStreamStateConverter,
818+
)
819+
assert incremental_counting_cursor._stream_name == "incremental_counting_stream"
820+
assert incremental_counting_cursor._cursor_field.cursor_field_key == "id"
821+
assert incremental_counting_cursor._start == 0
822+
assert incremental_counting_cursor._end_provider() == math.inf
823+
759824

760825
def test_check():
761826
"""

0 commit comments

Comments
 (0)