Skip to content

Commit 20d3c38

Browse files
commit
1 parent bd8762e commit 20d3c38

File tree

1 file changed

+16
-9
lines changed
  • src/Ydb.Sdk/src/Services/Topic/Reader

1 file changed

+16
-9
lines changed

src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -288,22 +288,29 @@ public async void RunProcessingTopic()
288288
{
289289
_ = Task.Run(async () =>
290290
{
291-
await foreach (var commitSending in _channelCommitSending.Reader.ReadAllAsync())
291+
try
292292
{
293-
await Stream.Write(new MessageFromClient
293+
await foreach (var commitSending in _channelCommitSending.Reader.ReadAllAsync())
294294
{
295-
CommitOffsetRequest = new StreamReadMessage.Types.CommitOffsetRequest
295+
await Stream.Write(new MessageFromClient
296296
{
297-
CommitOffsets =
297+
CommitOffsetRequest = new StreamReadMessage.Types.CommitOffsetRequest
298298
{
299-
new StreamReadMessage.Types.CommitOffsetRequest.Types.PartitionCommitOffset
299+
CommitOffsets =
300300
{
301-
Offsets = { commitSending.OffsetsRange },
302-
PartitionSessionId = commitSending.PartitionSessionId
301+
new StreamReadMessage.Types.CommitOffsetRequest.Types.PartitionCommitOffset
302+
{
303+
Offsets = { commitSending.OffsetsRange },
304+
PartitionSessionId = commitSending.PartitionSessionId
305+
}
303306
}
304307
}
305-
}
306-
});
308+
});
309+
}
310+
}
311+
catch (Driver.TransportException e)
312+
{
313+
ReconnectSession();
307314
}
308315
});
309316

0 commit comments

Comments
 (0)