Skip to content

Commit 9fe5205

Browse files
committed
add a few more tests, formatting, fix accidentally removed tests
1 parent bd75d85 commit 9fe5205

File tree

3 files changed

+109
-86
lines changed

3 files changed

+109
-86
lines changed

unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py

Lines changed: 68 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -31,71 +31,71 @@ def large_event_response_fixture():
3131
os.remove(file_path)
3232

3333

34-
# @pytest.mark.slow
35-
# @pytest.mark.limit_memory("20 MB")
36-
# @pytest.mark.parametrize(
37-
# "decoder_yaml_definition",
38-
# [
39-
# "type: JsonlDecoder",
40-
# ],
41-
# )
42-
# def test_jsonl_decoder_memory_usage(
43-
# requests_mock, large_events_response, decoder_yaml_definition: str
44-
# ):
45-
# #
46-
# lines_in_response, file_path = large_events_response
47-
# content = f"""
48-
# name: users
49-
# type: DeclarativeStream
50-
# retriever:
51-
# type: SimpleRetriever
52-
# decoder:
53-
# {decoder_yaml_definition}
54-
# paginator:
55-
# type: "NoPagination"
56-
# requester:
57-
# path: "users/{{{{ stream_slice.slice }}}}"
58-
# type: HttpRequester
59-
# url_base: "https://for-all-mankind.nasa.com/api/v1"
60-
# http_method: GET
61-
# authenticator:
62-
# type: NoAuth
63-
# request_headers: {{}}
64-
# request_body_json: {{}}
65-
# record_selector:
66-
# type: RecordSelector
67-
# extractor:
68-
# type: DpathExtractor
69-
# field_path: []
70-
# partition_router:
71-
# type: ListPartitionRouter
72-
# cursor_field: "slice"
73-
# values:
74-
# - users1
75-
# - users2
76-
# - users3
77-
# - users4
78-
# primary_key: []
79-
# """
80-
#
81-
# factory = ModelToComponentFactory()
82-
# stream_manifest = YamlDeclarativeSource._parse(content)
83-
# stream = factory.create_component(
84-
# model_type=DeclarativeStreamModel, component_definition=stream_manifest, config={}
85-
# )
86-
#
87-
# def get_body():
88-
# return open(file_path, "rb", buffering=30)
89-
#
90-
# counter = 0
91-
# requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users1", body=get_body())
92-
# requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users2", body=get_body())
93-
# requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users3", body=get_body())
94-
# requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users4", body=get_body())
95-
#
96-
# stream_slices = list(stream.stream_slices(sync_mode=SyncMode.full_refresh))
97-
# for stream_slice in stream_slices:
98-
# for _ in stream.retriever.read_records(records_schema={}, stream_slice=stream_slice):
99-
# counter += 1
100-
#
101-
# assert counter == lines_in_response * len(stream_slices)
34+
@pytest.mark.slow
35+
@pytest.mark.limit_memory("20 MB")
36+
@pytest.mark.parametrize(
37+
"decoder_yaml_definition",
38+
[
39+
"type: JsonlDecoder",
40+
],
41+
)
42+
def test_jsonl_decoder_memory_usage(
43+
requests_mock, large_events_response, decoder_yaml_definition: str
44+
):
45+
#
46+
lines_in_response, file_path = large_events_response
47+
content = f"""
48+
name: users
49+
type: DeclarativeStream
50+
retriever:
51+
type: SimpleRetriever
52+
decoder:
53+
{decoder_yaml_definition}
54+
paginator:
55+
type: "NoPagination"
56+
requester:
57+
path: "users/{{{{ stream_slice.slice }}}}"
58+
type: HttpRequester
59+
url_base: "https://for-all-mankind.nasa.com/api/v1"
60+
http_method: GET
61+
authenticator:
62+
type: NoAuth
63+
request_headers: {{}}
64+
request_body_json: {{}}
65+
record_selector:
66+
type: RecordSelector
67+
extractor:
68+
type: DpathExtractor
69+
field_path: []
70+
partition_router:
71+
type: ListPartitionRouter
72+
cursor_field: "slice"
73+
values:
74+
- users1
75+
- users2
76+
- users3
77+
- users4
78+
primary_key: []
79+
"""
80+
81+
factory = ModelToComponentFactory()
82+
stream_manifest = YamlDeclarativeSource._parse(content)
83+
stream = factory.create_component(
84+
model_type=DeclarativeStreamModel, component_definition=stream_manifest, config={}
85+
)
86+
87+
def get_body():
88+
return open(file_path, "rb", buffering=30)
89+
90+
counter = 0
91+
requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users1", body=get_body())
92+
requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users2", body=get_body())
93+
requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users3", body=get_body())
94+
requests_mock.get("https://for-all-mankind.nasa.com/api/v1/users/users4", body=get_body())
95+
96+
stream_slices = list(stream.stream_slices(sync_mode=SyncMode.full_refresh))
97+
for stream_slice in stream_slices:
98+
for _ in stream.retriever.read_records(records_schema={}, stream_slice=stream_slice):
99+
counter += 1
100+
101+
assert counter == lines_in_response * len(stream_slices)

unit_tests/sources/declarative/extractors/test_response_to_file_extractor.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -72,17 +72,17 @@ def large_event_response_fixture():
7272
os.remove(file_path)
7373

7474

75-
# @pytest.mark.slow
76-
# @pytest.mark.limit_memory("20 MB")
77-
# def test_response_to_file_extractor_memory_usage(requests_mock, large_events_response):
78-
# lines_in_response, file_path = large_events_response
79-
# extractor = ResponseToFileExtractor({})
80-
#
81-
# url = "https://for-all-mankind.nasa.com/api/v1/users/users1"
82-
# requests_mock.get(url, body=open(file_path, "rb"))
83-
#
84-
# counter = 0
85-
# for _ in extractor.extract_records(requests.get(url, stream=True)):
86-
# counter += 1
87-
#
88-
# assert counter == lines_in_response
75+
@pytest.mark.slow
76+
@pytest.mark.limit_memory("20 MB")
77+
def test_response_to_file_extractor_memory_usage(requests_mock, large_events_response):
78+
lines_in_response, file_path = large_events_response
79+
extractor = ResponseToFileExtractor({})
80+
81+
url = "https://for-all-mankind.nasa.com/api/v1/users/users1"
82+
requests_mock.get(url, body=open(file_path, "rb"))
83+
84+
counter = 0
85+
for _ in extractor.extract_records(requests.get(url, stream=True)):
86+
counter += 1
87+
88+
assert counter == lines_in_response

unit_tests/sources/streams/concurrent/test_partition_reader.py

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
#
2-
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3-
#
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
43
import unittest
54
from queue import Queue
65
from typing import Callable, Iterable, List
@@ -57,7 +56,7 @@ def test_given_read_partition_successful_when_process_partition_then_queue_recor
5756
cursor.observe.assert_called()
5857
cursor.close_partition.assert_called_once()
5958

60-
def test_given_exception_when_process_partition_then_queue_records_and_exception_and_sentinel(
59+
def test_given_exception_from_read_when_process_partition_then_queue_records_and_exception_and_sentinel(
6160
self,
6261
):
6362
partition = Mock()
@@ -73,6 +72,23 @@ def test_given_exception_when_process_partition_then_queue_records_and_exception
7372
PartitionCompleteSentinel(partition),
7473
]
7574

75+
def test_given_exception_from_close_slice_when_process_partition_then_queue_records_and_exception_and_sentinel(
76+
self,
77+
):
78+
partition = self._a_partition(_RECORDS)
79+
cursor = Mock()
80+
exception = ValueError()
81+
cursor.close_partition.side_effect = self._close_partition_with_exception(exception)
82+
self._partition_reader.process_partition(partition, cursor)
83+
84+
queue_content = self._consume_queue()
85+
86+
# 4 total messages in queue. 2 records, 1 thread exception, 1 partition sentinel value
87+
assert len(queue_content) == 4
88+
assert queue_content[:2] == _RECORDS
89+
assert isinstance(queue_content[2], StreamThreadException)
90+
assert queue_content[3] == PartitionCompleteSentinel(partition)
91+
7692
def _a_partition(self, records: List[Record]) -> Partition:
7793
partition = Mock(spec=Partition)
7894
partition.read.return_value = iter(records)
@@ -88,6 +104,13 @@ def mocked_function() -> Iterable[Record]:
88104

89105
return mocked_function
90106

107+
@staticmethod
108+
def _close_partition_with_exception(exception: Exception) -> Callable[[Partition], None]:
109+
def mocked_function(partition: Partition) -> None:
110+
raise exception
111+
112+
return mocked_function
113+
91114
def _consume_queue(self):
92115
queue_content = []
93116
while queue_item := self._queue.get():

0 commit comments

Comments
 (0)