@@ -839,7 +839,7 @@ def test_concurrent_cursor_with_state_in_read_method():
839839 type = AirbyteStateType .STREAM ,
840840 stream = AirbyteStreamState (
841841 stream_descriptor = StreamDescriptor (name = "party_members" , namespace = None ),
842- stream_state = AirbyteStateBlob (updated_at = "2024-09-05 " ),
842+ stream_state = AirbyteStateBlob (updated_at = "2024-09-04 " ),
843843 ),
844844 ),
845845 ]
@@ -858,20 +858,15 @@ def test_concurrent_cursor_with_state_in_read_method():
858858 ]
859859 )
860860
861- def get_source ():
862- return ConcurrentDeclarativeSource (
863- source_config = _MANIFEST , config = _CONFIG , catalog = catalog , state = []
864- )
865-
866- source = get_source ()
861+ source = ConcurrentDeclarativeSource (
862+ source_config = _MANIFEST , config = _CONFIG , catalog = catalog , state = []
863+ )
864+
867865 with HttpMocker () as http_mocker :
868866 _mock_party_members_requests (http_mocker , _NO_STATE_PARTY_MEMBERS_SLICES_AND_RESPONSES )
869- messages_iterator = source .read (
867+ messages = list ( source .read (
870868 logger = source .logger , config = _CONFIG , catalog = catalog , state = state
871- )
872-
873- # Get the first message only to get the stream running so that we can check the cursor
874- first_message = next (messages_iterator )
869+ ))
875870
876871 concurrent_streams , _ = source ._group_streams (config = _CONFIG )
877872 party_members_stream = [s for s in concurrent_streams if s .name == "party_members" ][0 ]
@@ -883,20 +878,24 @@ def get_source():
883878 assert party_members_cursor ._stream_name == "party_members"
884879 assert party_members_cursor ._cursor_field .cursor_field_key == "updated_at"
885880
886- cursor_value = AirbyteDateTime .strptime ("2024-09-05 " , "%Y-%m-%d" )
881+ cursor_value = AirbyteDateTime .strptime ("2024-09-04 " , "%Y-%m-%d" )
887882
888883 assert len (party_members_cursor ._concurrent_state ["slices" ]) == 1
889884 assert (
890885 party_members_cursor ._concurrent_state ["slices" ][0 ]["most_recent_cursor_value" ]
891886 == cursor_value
892887 )
893888
894- messages = list (messages_iterator )
895- party_members_records = get_records_for_stream ("party_members" , messages )
896889 # There is only one record after 2024-09-05
890+ party_members_records = get_records_for_stream ("party_members" , messages )
897891 assert len (party_members_records ) == 1
898892 assert party_members_records [0 ].data ["id" ] == "yoshizawa"
899893
894+ # Emitted state should have the updated_at of the one record read
895+ states = get_states_for_stream ("party_members" , messages )
896+ assert len (states ) == 2
897+ assert states [1 ].stream .stream_state == AirbyteStateBlob (updated_at = party_members_records [0 ].data ["updated_at" ])
898+
900899
901900def test_check ():
902901 """
0 commit comments