33#
44
55import logging
6+ from typing import Iterator , List , Tuple
67from unittest .mock import MagicMock , patch
78
89import orjson
10+ import requests_mock
911
1012from airbyte_cdk .models import (
13+ AirbyteMessage ,
14+ AirbyteRecordMessage ,
1115 AirbyteStateBlob ,
1216 AirbyteStateMessage ,
1317 AirbyteStateType ,
1822 DestinationSyncMode ,
1923 StreamDescriptor ,
2024 SyncMode ,
25+ Type ,
2126)
2227from airbyte_cdk .sources .declarative .concurrent_declarative_source import (
2328 ConcurrentDeclarativeSource ,
2429)
2530from airbyte_cdk .sources .declarative .incremental import ConcurrentPerPartitionCursor
2631from airbyte_cdk .sources .declarative .retrievers .simple_retriever import SimpleRetriever
2732from airbyte_cdk .sources .types import Record , StreamSlice
33+ from airbyte_cdk .test .catalog_builder import CatalogBuilder , ConfiguredAirbyteStreamBuilder
2834
2935CURSOR_FIELD = "cursor_field"
3036SYNC_MODE = SyncMode .incremental
@@ -35,6 +41,7 @@ def __init__(self):
3541 self ._incremental_sync = {}
3642 self ._partition_router = {}
3743 self ._substream_partition_router = {}
44+ self ._concurrency_default = None
3845
3946 def with_list_partition_router (self , stream_name , cursor_field , partitions ):
4047 self ._partition_router [stream_name ] = {
@@ -44,7 +51,7 @@ def with_list_partition_router(self, stream_name, cursor_field, partitions):
4451 }
4552 return self
4653
47- def with_substream_partition_router (self , stream_name ):
54+ def with_substream_partition_router (self , stream_name , incremental_dependency = False ):
4855 self ._substream_partition_router [stream_name ] = {
4956 "type" : "SubstreamPartitionRouter" ,
5057 "parent_stream_configs" : [
@@ -53,6 +60,7 @@ def with_substream_partition_router(self, stream_name):
5360 "stream" : "#/definitions/Rates" ,
5461 "parent_key" : "id" ,
5562 "partition_field" : "parent_id" ,
63+ "incremental_dependency" : incremental_dependency ,
5664 }
5765 ],
5866 }
@@ -76,9 +84,23 @@ def with_incremental_sync(
7684 "cursor_field" : cursor_field ,
7785 "step" : step ,
7886 "cursor_granularity" : cursor_granularity ,
87+ "start_time_option" : {
88+ "type" : "RequestOption" ,
89+ "field_name" : "from" ,
90+ "inject_into" : "request_parameter" ,
91+ },
92+ "end_time_option" : {
93+ "type" : "RequestOption" ,
94+ "field_name" : "to" ,
95+ "inject_into" : "request_parameter" ,
96+ },
7997 }
8098 return self
8199
100+ def with_concurrency (self , default : int ) -> "ManifestBuilder" :
101+ self ._concurrency_default = default
102+ return self
103+
82104 def build (self ):
83105 manifest = {
84106 "version" : "0.34.2" ,
@@ -102,7 +124,7 @@ def build(self):
102124 "requester" : {
103125 "type" : "HttpRequester" ,
104126 "url_base" : "https://api.apilayer.com" ,
105- "path" : "/exchangerates_data/latest" ,
127+ "path" : "/exchangerates_data/parent/{{ stream_partition['parent_id'] }}/child/ latest" ,
106128 "http_method" : "GET" ,
107129 },
108130 "record_selector" : {
@@ -128,7 +150,7 @@ def build(self):
128150 "requester" : {
129151 "type" : "HttpRequester" ,
130152 "url_base" : "https://api.apilayer.com" ,
131- "path" : "/exchangerates_data/latest" ,
153+ "path" : "/exchangerates_data/parent/ latest" ,
132154 "http_method" : "GET" ,
133155 },
134156 "record_selector" : {
@@ -161,6 +183,12 @@ def build(self):
161183 manifest ["definitions" ][stream_name ]["retriever" ]["partition_router" ] = (
162184 partition_router_definition
163185 )
186+
187+ if self ._concurrency_default :
188+ manifest ["concurrency_level" ] = {
189+ "type" : "ConcurrencyLevel" ,
190+ "default_concurrency" : self ._concurrency_default ,
191+ }
164192 return manifest
165193
166194
@@ -872,3 +900,82 @@ def test_per_partition_cursor_within_limit(caplog):
872900 },
873901 ],
874902 }
903+
904+
905+ def test_parent_stream_is_updated_after_parent_record_fully_consumed ():
906+ source = ConcurrentDeclarativeSource (
907+ source_config = ManifestBuilder ()
908+ .with_substream_partition_router ("AnotherStream" , incremental_dependency = True )
909+ .with_incremental_sync (
910+ "AnotherStream" ,
911+ start_datetime = "2022-01-01" ,
912+ end_datetime = "2022-02-28" ,
913+ datetime_format = "%Y-%m-%d" ,
914+ cursor_field = CURSOR_FIELD ,
915+ step = "P1M" ,
916+ cursor_granularity = "P1D" ,
917+ )
918+ .with_incremental_sync (
919+ "Rates" ,
920+ start_datetime = "2022-01-01" ,
921+ end_datetime = "2022-02-28" ,
922+ datetime_format = "%Y-%m-%d" ,
923+ cursor_field = CURSOR_FIELD ,
924+ step = "P1Y" ,
925+ cursor_granularity = "P1D" ,
926+ )
927+ .with_concurrency (1 ) # so that we know partition 1 gets processed before 2
928+ .build (),
929+ config = {},
930+ catalog = None ,
931+ state = None ,
932+ )
933+
934+ with requests_mock .Mocker () as m :
935+ # Request for parent stream
936+ m .get (
937+ "https://api.apilayer.com/exchangerates_data/parent/latest?from=2022-01-01&to=2022-02-28" ,
938+ json = [{"id" : "1" }],
939+ )
940+
941+ # Requests for child stream
942+ record_from_first_cursor_interval = {"id" : "child_1.1" }
943+ m .get (
944+ "https://api.apilayer.com/exchangerates_data/parent/1/child/latest?from=2022-01-01&to=2022-01-31" ,
945+ json = [record_from_first_cursor_interval ],
946+ )
947+ record_from_second_cursor_interval = {"id" : "child_1.2" }
948+ m .get (
949+ "https://api.apilayer.com/exchangerates_data/parent/1/child/latest?from=2022-02-01&to=2022-02-28" ,
950+ json = [record_from_second_cursor_interval ],
951+ )
952+
953+ message_iterator = source .read (
954+ MagicMock (),
955+ {},
956+ CatalogBuilder ()
957+ .with_stream (ConfiguredAirbyteStreamBuilder ().with_name ("AnotherStream" ))
958+ .build (),
959+ None ,
960+ )
961+
962+ records , state = get_records_until_state_message (message_iterator )
963+
964+ assert len (records ) == 1 and records [0 ].data == record_from_first_cursor_interval
965+ assert "parent_state" not in state .stream .stream_state .__dict__
966+
967+ records , state = get_records_until_state_message (message_iterator )
968+ assert "parent_state" in state .stream .stream_state .__dict__
969+
970+
971+ def get_records_until_state_message (
972+ message_iterator : Iterator [AirbyteMessage ],
973+ ) -> Tuple [List [AirbyteRecordMessage ], AirbyteStateMessage ]:
974+ records = []
975+ for message in message_iterator :
976+ if message .type == Type .RECORD :
977+ records .append (message .record )
978+ elif message .type == Type .STATE :
979+ return records , message .state
980+
981+ raise ValueError ("No state message encountered" )
0 commit comments