@@ -39,7 +39,7 @@ class TopicReaderUnexpectedCodec(YdbError):
3939 pass
4040
4141
42- class TopicReaderCommitToExpiredPartition (TopicReaderError ):
42+ class PublicTopicReaderPartitionExpiredError (TopicReaderError ):
4343 """
4444 Commit message when partition read session are dropped.
4545 It is ok - the message/batch will not commit to server and will receive in other read session
@@ -114,15 +114,22 @@ def commit(self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBa
114114 Write commit message to a buffer.
115115
116116 For the method no way check the commit result
117- (for example if lost connection - commits will not re-send and committed messages will receive again)
117+ (for example if lost connection - commits will not re-send and committed messages will receive again).
118118 """
119- self ._reconnector .commit (batch )
119+ try :
120+ self ._reconnector .commit (batch )
121+ except PublicTopicReaderPartitionExpiredError :
122+ pass
120123
121124 async def commit_with_ack (self , batch : typing .Union [datatypes .PublicMessage , datatypes .PublicBatch ]):
122125 """
123126 write commit message to a buffer and wait ack from the server.
124127
125128 use asyncio.wait_for for wait with timeout.
129+
130+ may raise ydb.TopicReaderPartitionExpiredError, the error mean reader partition closed from server
131+ before receive commit ack. Message may be acked or not (if not - it will send in other read session,
132+ to this or other reader).
126133 """
127134 waiter = self ._reconnector .commit (batch )
128135 await waiter .future
@@ -174,6 +181,14 @@ async def _connection_loop(self):
174181 await asyncio .sleep (retry_info .sleep_timeout_seconds )
175182
176183 attempt += 1
184+ finally :
185+ if self ._stream_reader is not None :
186+ # noinspection PyBroadException
187+ try :
188+ await self ._stream_reader .close ()
189+ except BaseException :
190+ # supress any error on close stream reader
191+ pass
177192
178193 async def wait_message (self ):
179194 while True :
@@ -369,10 +384,10 @@ def commit(self, batch: datatypes.ICommittable) -> datatypes.PartitionSession.Co
369384 raise TopicReaderError ("reader can commit only self-produced messages" )
370385
371386 if partition_session .reader_stream_id != self ._id :
372- raise TopicReaderCommitToExpiredPartition ("commit messages after reconnect to server" )
387+ raise PublicTopicReaderPartitionExpiredError ("commit messages after reconnect to server" )
373388
374389 if partition_session .id not in self ._partition_sessions :
375- raise TopicReaderCommitToExpiredPartition ("commit messages after server stop the partition read session" )
390+ raise PublicTopicReaderPartitionExpiredError ("commit messages after server stop the partition read session" )
376391
377392 commit_range = batch ._commit_get_offsets_range ()
378393 waiter = partition_session .add_waiter (commit_range .end )
@@ -620,6 +635,7 @@ async def flush(self):
620635 async def close (self ):
621636 if self ._closed :
622637 return
638+
623639 self ._closed = True
624640
625641 self ._set_first_error (TopicReaderStreamClosedError ())
@@ -628,6 +644,7 @@ async def close(self):
628644
629645 for session in self ._partition_sessions .values ():
630646 session .close ()
647+ self ._partition_sessions .clear ()
631648
632649 for task in self ._background_tasks :
633650 task .cancel ()
0 commit comments