@@ -39,7 +39,7 @@ class TopicReaderUnexpectedCodec(YdbError):
3939 pass
4040
4141
42- class TopicReaderCommitToExpiredPartition (TopicReaderError ):
42+ class PublicTopicReaderPartitionExpiredError (TopicReaderError ):
4343 """
4444 Commit message when partition read session are dropped.
4545 It is ok - the message/batch will not commit to server and will receive in other read session
@@ -114,15 +114,22 @@ def commit(self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBa
114114 Write commit message to a buffer.
115115
116116 For the method no way check the commit result
117- (for example if lost connection - commits will not re-send and committed messages will receive again)
117+ (for example if lost connection - commits will not re-send and committed messages will receive again).
118118 """
119- self ._reconnector .commit (batch )
119+ try :
120+ self ._reconnector .commit (batch )
121+ except PublicTopicReaderPartitionExpiredError :
122+ pass
120123
121124 async def commit_with_ack (self , batch : typing .Union [datatypes .PublicMessage , datatypes .PublicBatch ]):
122125 """
123126 write commit message to a buffer and wait ack from the server.
124127
125128 use asyncio.wait_for for wait with timeout.
129+
130+ may raise ydb.TopicReaderPartitionExpiredError, the error mean reader partition closed from server
131+ before receive commit ack. Message may be acked or not (if not - it will send in other read session,
132+ to this or other reader).
126133 """
127134 waiter = self ._reconnector .commit (batch )
128135 await waiter .future
@@ -174,6 +181,14 @@ async def _connection_loop(self):
174181 await asyncio .sleep (retry_info .sleep_timeout_seconds )
175182
176183 attempt += 1
184+ finally :
185+ if self ._stream_reader is not None :
186+ # noinspection PyBroadException
187+ try :
188+ await self ._stream_reader .close ()
189+ except BaseException :
190+ # supress any error on close stream reader
191+ pass
177192
178193 async def wait_message (self ):
179194 while True :
@@ -366,10 +381,10 @@ def commit(self, batch: datatypes.ICommittable) -> datatypes.PartitionSession.Co
366381 raise TopicReaderError ("reader can commit only self-produced messages" )
367382
368383 if partition_session .reader_stream_id != self ._id :
369- raise TopicReaderCommitToExpiredPartition ("commit messages after reconnect to server" )
384+ raise PublicTopicReaderPartitionExpiredError ("commit messages after reconnect to server" )
370385
371386 if partition_session .id not in self ._partition_sessions :
372- raise TopicReaderCommitToExpiredPartition ("commit messages after server stop the partition read session" )
387+ raise PublicTopicReaderPartitionExpiredError ("commit messages after server stop the partition read session" )
373388
374389 commit_range = batch ._commit_get_offsets_range ()
375390 waiter = partition_session .add_waiter (commit_range .end )
@@ -617,6 +632,7 @@ async def flush(self):
617632 async def close (self ):
618633 if self ._closed :
619634 return
635+
620636 self ._closed = True
621637
622638 self ._set_first_error (TopicReaderStreamClosedError ())
@@ -625,6 +641,7 @@ async def close(self):
625641
626642 for session in self ._partition_sessions .values ():
627643 session .close ()
644+ self ._partition_sessions .clear ()
628645
629646 for task in self ._background_tasks :
630647 task .cancel ()
0 commit comments