44from unittest import mock
55import ydb
66
7- DEFAULT_TIMEOUT = 0.1
7+ DEFAULT_TIMEOUT = 0.5
88DEFAULT_RETRY_SETTINGS = ydb .RetrySettings (max_retries = 1 )
99
1010
@@ -19,11 +19,16 @@ async def callee(tx: ydb.aio.QueryTxContext):
1919 assert len (batch .messages ) == 1
2020 assert batch .messages [0 ].data .decode () == "123"
2121
22+ batch = await wait_for (reader .receive_batch_with_tx (tx , max_messages = 1 ), DEFAULT_TIMEOUT )
23+ assert len (batch .messages ) == 1
24+ assert batch .messages [0 ].data .decode () == "456"
25+
2226 await pool .retry_tx_async (callee , retry_settings = DEFAULT_RETRY_SETTINGS )
27+ assert len (reader ._reconnector ._tx_to_batches_map ) == 0
2328
2429 async with driver .topic_client .reader (topic_with_messages , topic_consumer ) as reader :
2530 msg = await wait_for (reader .receive_message (), DEFAULT_TIMEOUT )
26- assert msg .data .decode () == "456 "
31+ assert msg .data .decode () == "789 "
2732
2833 async def test_rollback (self , driver : ydb .aio .Driver , topic_with_messages , topic_consumer ):
2934 async with driver .topic_client .reader (topic_with_messages , topic_consumer ) as reader :
@@ -37,6 +42,7 @@ async def callee(tx: ydb.aio.QueryTxContext):
3742 await tx .rollback ()
3843
3944 await pool .retry_tx_async (callee , retry_settings = DEFAULT_RETRY_SETTINGS )
45+ assert len (reader ._reconnector ._tx_to_batches_map ) == 0
4046
4147 msg = await wait_for (reader .receive_message (), DEFAULT_TIMEOUT )
4248 assert msg .data .decode () == "123"
@@ -57,12 +63,211 @@ async def callee(tx: ydb.aio.QueryTxContext):
5763 assert len (batch .messages ) == 1
5864 assert batch .messages [0 ].data .decode () == "123"
5965
60- with pytest .raises (ydb .Error ):
66+ with pytest .raises (ydb .Error , match = "Transaction was failed" ):
6167 await pool .retry_tx_async (callee , retry_settings = DEFAULT_RETRY_SETTINGS )
6268
69+ assert len (reader ._reconnector ._tx_to_batches_map ) == 0
70+
6371 msg = await wait_for (reader .receive_message (), DEFAULT_TIMEOUT )
6472 assert msg .data .decode () == "123"
6573
74+ async def test_error_in_lambda (self , driver : ydb .aio .Driver , topic_with_messages , topic_consumer ):
75+ async with driver .topic_client .reader (topic_with_messages , topic_consumer ) as reader :
76+ async with ydb .aio .QuerySessionPool (driver ) as pool :
77+
78+ async def callee (tx : ydb .aio .QueryTxContext ):
79+ batch = await wait_for (reader .receive_batch_with_tx (tx , max_messages = 1 ), DEFAULT_TIMEOUT )
80+ assert len (batch .messages ) == 1
81+ assert batch .messages [0 ].data .decode () == "123"
82+
83+ raise RuntimeError ("Something went wrong" )
84+
85+ with pytest .raises (RuntimeError ):
86+ await pool .retry_tx_async (callee , retry_settings = DEFAULT_RETRY_SETTINGS )
87+
88+ assert len (reader ._reconnector ._tx_to_batches_map ) == 0
89+
90+ msg = await wait_for (reader .receive_message (), DEFAULT_TIMEOUT )
91+ assert msg .data .decode () == "123"
92+
93+ async def test_error_during_commit (self , driver : ydb .aio .Driver , topic_with_messages , topic_consumer ):
94+ async with driver .topic_client .reader (topic_with_messages , topic_consumer ) as reader :
95+ async with ydb .aio .QuerySessionPool (driver ) as pool :
96+
97+ async def callee (tx : ydb .aio .QueryTxContext ):
98+ with mock .patch .object (
99+ tx ,
100+ "_commit_call" ,
101+ side_effect = ydb .Unavailable ("YDB Unavailable" ),
102+ ):
103+ batch = await wait_for (reader .receive_batch_with_tx (tx , max_messages = 1 ), DEFAULT_TIMEOUT )
104+ assert len (batch .messages ) == 1
105+ assert batch .messages [0 ].data .decode () == "123"
106+
107+ await tx .commit ()
108+
109+ with pytest .raises (ydb .Unavailable ):
110+ await pool .retry_tx_async (callee , retry_settings = DEFAULT_RETRY_SETTINGS )
111+
112+ assert len (reader ._reconnector ._tx_to_batches_map ) == 0
113+
114+ msg = await wait_for (reader .receive_message (), DEFAULT_TIMEOUT )
115+ assert msg .data .decode () == "123"
116+
117+ async def test_error_during_rollback (self , driver : ydb .aio .Driver , topic_with_messages , topic_consumer ):
118+ async with driver .topic_client .reader (topic_with_messages , topic_consumer ) as reader :
119+ async with ydb .aio .QuerySessionPool (driver ) as pool :
120+
121+ async def callee (tx : ydb .aio .QueryTxContext ):
122+ with mock .patch .object (
123+ tx ,
124+ "_rollback_call" ,
125+ side_effect = ydb .Unavailable ("YDB Unavailable" ),
126+ ):
127+ batch = await wait_for (reader .receive_batch_with_tx (tx , max_messages = 1 ), DEFAULT_TIMEOUT )
128+ assert len (batch .messages ) == 1
129+ assert batch .messages [0 ].data .decode () == "123"
130+
131+ await tx .rollback ()
132+
133+ with pytest .raises (ydb .Unavailable ):
134+ await pool .retry_tx_async (callee , retry_settings = DEFAULT_RETRY_SETTINGS )
135+
136+ assert len (reader ._reconnector ._tx_to_batches_map ) == 0
137+
138+ msg = await wait_for (reader .receive_message (), DEFAULT_TIMEOUT )
139+ assert msg .data .decode () == "123"
140+
141+
142+ class TestTopicTransactionalReaderSync :
143+ def test_commit (self , driver_sync : ydb .Driver , topic_with_messages , topic_consumer ):
144+ with ydb .QuerySessionPool (driver_sync ) as pool :
145+ with driver_sync .topic_client .reader (topic_with_messages , topic_consumer ) as reader :
146+
147+ def callee (tx : ydb .QueryTxContext ):
148+ batch = reader .receive_batch_with_tx (tx , max_messages = 1 , timeout = DEFAULT_TIMEOUT )
149+ assert len (batch .messages ) == 1
150+ assert batch .messages [0 ].data .decode () == "123"
151+
152+ batch = reader .receive_batch_with_tx (tx , max_messages = 1 , timeout = DEFAULT_TIMEOUT )
153+ assert len (batch .messages ) == 1
154+ assert batch .messages [0 ].data .decode () == "456"
155+
156+ pool .retry_tx_sync (callee , retry_settings = DEFAULT_RETRY_SETTINGS )
157+ assert len (reader ._async_reader ._reconnector ._tx_to_batches_map ) == 0
158+
159+ with driver_sync .topic_client .reader (topic_with_messages , topic_consumer ) as reader :
160+ msg = reader .receive_message (timeout = DEFAULT_TIMEOUT )
161+ assert msg .data .decode () == "789"
162+
163+ def test_rollback (self , driver_sync : ydb .Driver , topic_with_messages , topic_consumer ):
164+ with driver_sync .topic_client .reader (topic_with_messages , topic_consumer ) as reader :
165+ with ydb .QuerySessionPool (driver_sync ) as pool :
166+
167+ def callee (tx : ydb .QueryTxContext ):
168+ batch = reader .receive_batch_with_tx (tx , max_messages = 1 , timeout = DEFAULT_TIMEOUT )
169+ assert len (batch .messages ) == 1
170+ assert batch .messages [0 ].data .decode () == "123"
171+
172+ tx .rollback ()
173+
174+ pool .retry_tx_sync (callee , retry_settings = DEFAULT_RETRY_SETTINGS )
175+ assert len (reader ._async_reader ._reconnector ._tx_to_batches_map ) == 0
176+
177+ msg = reader .receive_message (timeout = DEFAULT_TIMEOUT )
178+ assert msg .data .decode () == "123"
179+
180+ def test_tx_failed_if_update_offsets_call_failed (
181+ self , driver_sync : ydb .Driver , topic_with_messages , topic_consumer
182+ ):
183+ with driver_sync .topic_client .reader (topic_with_messages , topic_consumer ) as reader :
184+ with ydb .QuerySessionPool (driver_sync ) as pool :
185+ with mock .patch .object (
186+ reader ._async_reader ._reconnector ,
187+ "_do_commit_batches_with_tx_call" ,
188+ side_effect = ydb .Error ("Update offsets in tx failed" ),
189+ ):
190+
191+ def callee (tx : ydb .QueryTxContext ):
192+ batch = reader .receive_batch_with_tx (tx , max_messages = 1 , timeout = DEFAULT_TIMEOUT )
193+ assert len (batch .messages ) == 1
194+ assert batch .messages [0 ].data .decode () == "123"
195+
196+ with pytest .raises (ydb .Error , match = "Transaction was failed" ):
197+ pool .retry_tx_sync (callee , retry_settings = DEFAULT_RETRY_SETTINGS )
198+
199+ assert len (reader ._async_reader ._reconnector ._tx_to_batches_map ) == 0
200+
201+ msg = reader .receive_message (timeout = DEFAULT_TIMEOUT )
202+ assert msg .data .decode () == "123"
203+
204+ def test_error_in_lambda (self , driver_sync : ydb .Driver , topic_with_messages , topic_consumer ):
205+ with driver_sync .topic_client .reader (topic_with_messages , topic_consumer ) as reader :
206+ with ydb .QuerySessionPool (driver_sync ) as pool :
207+
208+ def callee (tx : ydb .QueryTxContext ):
209+ batch = reader .receive_batch_with_tx (tx , max_messages = 1 , timeout = DEFAULT_TIMEOUT )
210+ assert len (batch .messages ) == 1
211+ assert batch .messages [0 ].data .decode () == "123"
212+
213+ raise RuntimeError ("Something went wrong" )
214+
215+ with pytest .raises (RuntimeError ):
216+ pool .retry_tx_sync (callee , retry_settings = DEFAULT_RETRY_SETTINGS )
217+
218+ assert len (reader ._async_reader ._reconnector ._tx_to_batches_map ) == 0
219+
220+ msg = reader .receive_message (timeout = DEFAULT_TIMEOUT )
221+ assert msg .data .decode () == "123"
222+
223+ def test_error_during_commit (self , driver_sync : ydb .Driver , topic_with_messages , topic_consumer ):
224+ with driver_sync .topic_client .reader (topic_with_messages , topic_consumer ) as reader :
225+ with ydb .QuerySessionPool (driver_sync ) as pool :
226+
227+ def callee (tx : ydb .QueryTxContext ):
228+ with mock .patch .object (
229+ tx ,
230+ "_commit_call" ,
231+ side_effect = ydb .Unavailable ("YDB Unavailable" ),
232+ ):
233+ batch = reader .receive_batch_with_tx (tx , max_messages = 1 , timeout = DEFAULT_TIMEOUT )
234+ assert len (batch .messages ) == 1
235+ assert batch .messages [0 ].data .decode () == "123"
236+
237+ tx .commit ()
238+
239+ with pytest .raises (ydb .Unavailable ):
240+ pool .retry_tx_sync (callee , retry_settings = DEFAULT_RETRY_SETTINGS )
241+
242+ assert len (reader ._async_reader ._reconnector ._tx_to_batches_map ) == 0
243+
244+ msg = reader .receive_message (timeout = DEFAULT_TIMEOUT )
245+ assert msg .data .decode () == "123"
246+
247+ def test_error_during_rollback (self , driver_sync : ydb .Driver , topic_with_messages , topic_consumer ):
248+ with driver_sync .topic_client .reader (topic_with_messages , topic_consumer ) as reader :
249+ with ydb .QuerySessionPool (driver_sync ) as pool :
250+
251+ def callee (tx : ydb .QueryTxContext ):
252+ with mock .patch .object (
253+ tx ,
254+ "_rollback_call" ,
255+ side_effect = ydb .Unavailable ("YDB Unavailable" ),
256+ ):
257+ batch = reader .receive_batch_with_tx (tx , max_messages = 1 , timeout = DEFAULT_TIMEOUT )
258+ assert len (batch .messages ) == 1
259+ assert batch .messages [0 ].data .decode () == "123"
260+
261+ tx .rollback ()
262+
263+ with pytest .raises (ydb .Unavailable ):
264+ pool .retry_tx_sync (callee , retry_settings = DEFAULT_RETRY_SETTINGS )
265+
266+ assert len (reader ._async_reader ._reconnector ._tx_to_batches_map ) == 0
267+
268+ msg = reader .receive_message (timeout = DEFAULT_TIMEOUT )
269+ assert msg .data .decode () == "123"
270+
66271
67272class TestTopicTransactionalWriter :
68273 async def test_commit (self , driver : ydb .aio .Driver , topic_path , topic_reader : ydb .TopicReaderAsyncIO ):
@@ -108,6 +313,27 @@ async def callee(tx: ydb.aio.QueryTxContext):
108313 with pytest .raises (asyncio .TimeoutError ):
109314 await wait_for (topic_reader .receive_message (), 0.1 )
110315
316+ async def test_no_msg_written_in_tx_commit_error (
317+ self , driver : ydb .aio .Driver , topic_path , topic_reader : ydb .TopicReaderAsyncIO
318+ ):
319+ async with ydb .aio .QuerySessionPool (driver ) as pool :
320+
321+ async def callee (tx : ydb .aio .QueryTxContext ):
322+ with mock .patch .object (
323+ tx ,
324+ "_commit_call" ,
325+ side_effect = ydb .Unavailable ("YDB Unavailable" ),
326+ ):
327+ tx_writer = driver .topic_client .tx_writer (tx , topic_path )
328+ await tx_writer .write (ydb .TopicWriterMessage (data = "123" .encode ()))
329+ await tx .commit ()
330+
331+ with pytest .raises (ydb .Unavailable ):
332+ await pool .retry_tx_async (callee , retry_settings = DEFAULT_RETRY_SETTINGS )
333+
334+ with pytest .raises (asyncio .TimeoutError ):
335+ await wait_for (topic_reader .receive_message (), 0.1 )
336+
111337 async def test_msg_written_exactly_once_with_retries (
112338 self , driver : ydb .aio .Driver , topic_path , topic_reader : ydb .TopicReaderAsyncIO
113339 ):
@@ -140,12 +366,12 @@ def callee(tx: ydb.QueryTxContext):
140366 tx_writer = driver_sync .topic_client .tx_writer (tx , topic_path )
141367 tx_writer .write (ydb .TopicWriterMessage (data = "123" .encode ()))
142368
143- pool .retry_tx_sync (callee , retry_settings = ydb . RetrySettings ( max_retries = 1 ) )
369+ pool .retry_tx_sync (callee , retry_settings = DEFAULT_RETRY_SETTINGS )
144370
145- msg = topic_reader_sync .receive_message (timeout = 0.1 )
371+ msg = topic_reader_sync .receive_message (timeout = DEFAULT_TIMEOUT )
146372 assert msg .data .decode () == "123"
147373
148- def test_rollback (self , driver_sync : ydb .aio . Driver , topic_path , topic_reader_sync : ydb .TopicReader ):
374+ def test_rollback (self , driver_sync : ydb .Driver , topic_path , topic_reader_sync : ydb .TopicReader ):
149375 with ydb .QuerySessionPool (driver_sync ) as pool :
150376
151377 def callee (tx : ydb .QueryTxContext ):
@@ -157,10 +383,10 @@ def callee(tx: ydb.QueryTxContext):
157383 pool .retry_tx_sync (callee , retry_settings = DEFAULT_RETRY_SETTINGS )
158384
159385 with pytest .raises (TimeoutError ):
160- topic_reader_sync .receive_message (timeout = 0.1 )
386+ topic_reader_sync .receive_message (timeout = DEFAULT_TIMEOUT )
161387
162388 def test_no_msg_written_in_error_case (
163- self , driver_sync : ydb .Driver , topic_path , topic_reader_sync : ydb .TopicReaderAsyncIO
389+ self , driver_sync : ydb .Driver , topic_path , topic_reader_sync : ydb .TopicReader
164390 ):
165391 with ydb .QuerySessionPool (driver_sync ) as pool :
166392
@@ -174,4 +400,48 @@ def callee(tx: ydb.QueryTxContext):
174400 pool .retry_tx_sync (callee , retry_settings = DEFAULT_RETRY_SETTINGS )
175401
176402 with pytest .raises (TimeoutError ):
177- topic_reader_sync .receive_message (timeout = 0.1 )
403+ topic_reader_sync .receive_message (timeout = DEFAULT_TIMEOUT )
404+
405+ def test_no_msg_written_in_tx_commit_error (
406+ self , driver_sync : ydb .Driver , topic_path , topic_reader_sync : ydb .TopicReader
407+ ):
408+ with ydb .QuerySessionPool (driver_sync ) as pool :
409+
410+ def callee (tx : ydb .QueryTxContext ):
411+ with mock .patch .object (
412+ tx ,
413+ "_commit_call" ,
414+ side_effect = ydb .Unavailable ("YDB Unavailable" ),
415+ ):
416+ tx_writer = driver_sync .topic_client .tx_writer (tx , topic_path )
417+ tx_writer .write (ydb .TopicWriterMessage (data = "123" .encode ()))
418+ tx .commit ()
419+
420+ with pytest .raises (ydb .Unavailable ):
421+ pool .retry_tx_sync (callee , retry_settings = DEFAULT_RETRY_SETTINGS )
422+
423+ with pytest .raises (TimeoutError ):
424+ topic_reader_sync .receive_message (timeout = DEFAULT_TIMEOUT )
425+
426+ def test_msg_written_exactly_once_with_retries (
427+ self , driver_sync : ydb .Driver , topic_path , topic_reader_sync : ydb .TopicReader
428+ ):
429+ error_raised = False
430+ with ydb .QuerySessionPool (driver_sync ) as pool :
431+
432+ def callee (tx : ydb .QueryTxContext ):
433+ nonlocal error_raised
434+ tx_writer = driver_sync .topic_client .tx_writer (tx , topic_path )
435+ tx_writer .write (ydb .TopicWriterMessage (data = "123" .encode ()))
436+
437+ if not error_raised :
438+ error_raised = True
439+ raise ydb .issues .Unavailable ("some retriable error" )
440+
441+ pool .retry_tx_sync (callee , retry_settings = DEFAULT_RETRY_SETTINGS )
442+
443+ msg = topic_reader_sync .receive_message (timeout = DEFAULT_TIMEOUT )
444+ assert msg .data .decode () == "123"
445+
446+ with pytest .raises (TimeoutError ):
447+ topic_reader_sync .receive_message (timeout = DEFAULT_TIMEOUT )
0 commit comments