Skip to content

Commit 8bf79ad

Browse files
fix
1 parent ca941e9 commit 8bf79ad

File tree

3 files changed

+17
-25
lines changed

3 files changed

+17
-25
lines changed

src/Ydb.Sdk/src/Driver.cs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -367,8 +367,7 @@ public async ValueTask<bool> MoveNextAsync()
367367
}
368368
}
369369

370-
internal sealed class BidirectionalStream<TRequest, TResponse> : IAsyncEnumerator<TResponse>,
371-
IAsyncEnumerable<TResponse>
370+
internal sealed class BidirectionalStream<TRequest, TResponse> : IDisposable
372371
{
373372
private readonly AsyncDuplexStreamingCall<TRequest, TResponse> _bidirectionalStream;
374373
private readonly Action _rpcErrorAction;
@@ -394,13 +393,6 @@ public async Task Write(TRequest request)
394393
}
395394
}
396395

397-
public ValueTask DisposeAsync()
398-
{
399-
_bidirectionalStream.Dispose();
400-
401-
return default;
402-
}
403-
404396
public async ValueTask<bool> MoveNextAsync()
405397
{
406398
try
@@ -417,9 +409,9 @@ public async ValueTask<bool> MoveNextAsync()
417409

418410
public TResponse Current => _bidirectionalStream.ResponseStream.Current;
419411

420-
public IAsyncEnumerator<TResponse> GetAsyncEnumerator(CancellationToken cancellationToken = new())
412+
public void Dispose()
421413
{
422-
return this;
414+
_bidirectionalStream.Dispose();
423415
}
424416
}
425417

src/Ydb.Sdk/src/Services/Topic/TopicSession.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,21 @@
22

33
namespace Ydb.Sdk.Services.Topic;
44

5-
internal abstract class TopicSession : IDisposable
5+
internal abstract class TopicSession<TFromClient, TFromServer> : IDisposable
66
{
77
private readonly Func<Task> _initialize;
88

9+
protected readonly Driver.BidirectionalStream<TFromClient, TFromServer> Stream;
910
protected readonly ILogger Logger;
1011
protected readonly string SessionId;
1112

1213
private int _isActive = 1;
1314
private bool _disposed;
1415

15-
protected TopicSession(ILogger logger, string sessionId, Func<Task> initialize)
16+
protected TopicSession(Driver.BidirectionalStream<TFromClient, TFromServer> stream, ILogger logger,
17+
string sessionId, Func<Task> initialize)
1618
{
19+
Stream = stream;
1720
Logger = logger;
1821
SessionId = sessionId;
1922
_initialize = initialize;
@@ -50,5 +53,7 @@ public void Dispose()
5053
{
5154
_disposed = true;
5255
}
56+
57+
Stream.Dispose();
5358
}
5459
}

src/Ydb.Sdk/src/Services/Topic/Writer.cs

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ namespace Ydb.Sdk.Services.Topic;
1111
using InitResponse = StreamWriteMessage.Types.InitResponse;
1212
using MessageData = StreamWriteMessage.Types.WriteRequest.Types.MessageData;
1313
using MessageFromClient = StreamWriteMessage.Types.FromClient;
14+
using MessageFromServer = StreamWriteMessage.Types.FromServer;
1415
using WriterStream = Driver.BidirectionalStream<
1516
StreamWriteMessage.Types.FromClient,
1617
StreamWriteMessage.Types.FromServer
@@ -143,10 +144,9 @@ public async Task<WriteResult> WriteAsync(Message<TValue> message)
143144
}
144145

145146
// No thread safe
146-
internal class WriterSession : TopicSession
147+
internal class WriterSession : TopicSession<MessageFromClient, MessageFromServer>
147148
{
148149
private readonly WriterConfig _config;
149-
private readonly WriterStream _stream;
150150

151151
private long _seqNum;
152152

@@ -155,11 +155,10 @@ public WriterSession(
155155
WriterStream stream,
156156
InitResponse initResponse,
157157
Func<Task> initialize,
158-
ILogger logger) : base(logger, initResponse.SessionId, initialize)
158+
ILogger logger) : base(stream, logger, initResponse.SessionId, initialize)
159159
{
160160
_config = config;
161-
_stream = stream;
162-
_seqNum = initResponse.LastSeqNo;
161+
Volatile.Write(ref _seqNum, initResponse.LastSeqNo); // happens-before for Volatile.Read
163162
}
164163

165164
internal async Task RunProcessingWriteAck(ConcurrentQueue<MessageSending> inFlightMessages)
@@ -168,8 +167,9 @@ internal async Task RunProcessingWriteAck(ConcurrentQueue<MessageSending> inFlig
168167
{
169168
Logger.LogInformation("WriterSession[{SessionId}] is running processing writeAck", SessionId);
170169

171-
await foreach (var messageFromServer in _stream)
170+
while (await Stream.MoveNextAsync())
172171
{
172+
var messageFromServer = Stream.Current;
173173
var status = Status.FromProto(messageFromServer.Status, messageFromServer.Issues);
174174

175175
if (status.IsNotSuccess)
@@ -253,7 +253,7 @@ internal async Task Write(ConcurrentQueue<MessageSending> toSendBuffer,
253253
}
254254

255255
Volatile.Write(ref _seqNum, currentSeqNum);
256-
await _stream.Write(new MessageFromClient { WriteRequest = writeMessage });
256+
await Stream.Write(new MessageFromClient { WriteRequest = writeMessage });
257257
}
258258
catch (TransactionException e)
259259
{
@@ -263,11 +263,6 @@ internal async Task Write(ConcurrentQueue<MessageSending> toSendBuffer,
263263
ReconnectSession();
264264
}
265265
}
266-
267-
public ValueTask DisposeAsync()
268-
{
269-
return _stream.DisposeAsync();
270-
}
271266
}
272267

273268
internal record MessageSending(MessageData MessageData, TaskCompletionSource<WriteResult> TaskCompletionSource);

0 commit comments

Comments
 (0)