Skip to content

Commit 91869c1

Browse files
fix: memory leak CancellationTokenRegistration (#242)
1) Do not pessimize the node on Grpc.Core.StatusCode.Cancelled and Grpc.Core.StatusCode.DeadlineExceeded. 2) Dispose of WriterSession using dispose CancellationToken. 3) BidirectionalStream is internal class. 4) Move Metadata class to Ydb.Sdk.Services.Topic. 5) Fixed memory leak CancellationTokenRegistration. 6) Cancel writing tasks after disposing of Writer.
1 parent a03d8f6 commit 91869c1

File tree

8 files changed

+53
-35
lines changed

8 files changed

+53
-35
lines changed

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
- Do not pessimize the node on Grpc.Core.StatusCode.Cancelled and Grpc.Core.StatusCode.DeadlineExceeded.
2+
- Dispose of WriterSession using dispose CancellationToken.
3+
- BidirectionalStream is internal class.
4+
- Move Metadata class to Ydb.Sdk.Services.Topic.
5+
- Fixed memory leak CancellationTokenRegistration.
6+
- Cancel writing tasks after disposing of Writer.
7+
18
## v0.9.3
29
- Fixed bug in Topic Writer: worker is stopped by disposeCts
310
- Fixed bug in sql parser ADO.NET: deduplication declare param in YQL query

src/Ydb.Sdk/src/Driver.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,12 @@ protected override (string, GrpcChannel) GetChannel(long nodeId)
9797
protected override void OnRpcError(string endpoint, RpcException e)
9898
{
9999
Logger.LogWarning("gRPC error [{Status}] on channel {Endpoint}", e.Status, endpoint);
100+
101+
if (e.StatusCode is Grpc.Core.StatusCode.Cancelled or Grpc.Core.StatusCode.DeadlineExceeded)
102+
{
103+
return;
104+
}
105+
100106
if (!_endpointPool.PessimizeEndpoint(endpoint))
101107
{
102108
return;

src/Ydb.Sdk/src/GrpcRequestSettings.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ public class GrpcRequestSettings
1111
public string TraceId { get; set; } = string.Empty;
1212
public TimeSpan TransportTimeout { get; set; } = TimeSpan.Zero;
1313
public ImmutableArray<string> CustomClientHeaders { get; } = new();
14+
public CancellationToken CancellationToken = default;
1415

1516
internal long NodeId { get; set; }
1617
internal Action<Grpc.Core.Metadata> TrailersHandler { get; set; } = _ => { };

src/Ydb.Sdk/src/IDriver.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,8 @@ protected CallOptions GetCallOptions(GrpcRequestSettings settings)
144144
}
145145

146146
var options = new CallOptions(
147-
headers: meta
147+
headers: meta,
148+
cancellationToken: settings.CancellationToken
148149
);
149150

150151
if (settings.TransportTimeout != TimeSpan.Zero)
@@ -213,7 +214,7 @@ public async ValueTask<bool> MoveNextAsync()
213214
}
214215
}
215216

216-
public class BidirectionalStream<TRequest, TResponse> : IBidirectionalStream<TRequest, TResponse>
217+
internal class BidirectionalStream<TRequest, TResponse> : IBidirectionalStream<TRequest, TResponse>
217218
{
218219
private readonly AsyncDuplexStreamingCall<TRequest, TResponse> _stream;
219220
private readonly Action<RpcException> _rpcErrorAction;
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
namespace Ydb.Sdk.Services.Topic;
2+
3+
public record Metadata(string Key, byte[] Value);

src/Ydb.Sdk/src/Services/Topic/Writer/Message.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,3 @@ public Message(TValue data)
1313

1414
public List<Metadata> Metadata { get; } = new();
1515
}
16-
17-
public record Metadata(string Key, byte[] Value);

src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs

Lines changed: 16 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ internal class Writer<TValue> : IWriter<TValue>
2121
private readonly WriterConfig _config;
2222
private readonly ILogger<Writer<TValue>> _logger;
2323
private readonly ISerializer<TValue> _serializer;
24+
private readonly GrpcRequestSettings _writerGrpcRequestSettings;
2425
private readonly ConcurrentQueue<MessageSending> _toSendBuffer = new();
2526
private readonly ConcurrentQueue<MessageSending> _inFlightMessages = new();
2627
private readonly CancellationTokenSource _disposeCts = new();
@@ -37,6 +38,7 @@ internal Writer(IDriver driver, WriterConfig config, ISerializer<TValue> seriali
3738
_logger = driver.LoggerFactory.CreateLogger<Writer<TValue>>();
3839
_serializer = serializer;
3940
_limitBufferMaxSize = config.BufferMaxSize;
41+
_writerGrpcRequestSettings = new GrpcRequestSettings { CancellationToken = _disposeCts.Token };
4042

4143
StartWriteWorker();
4244
}
@@ -49,11 +51,15 @@ public Task<WriteResult> WriteAsync(TValue data, CancellationToken cancellationT
4951
public async Task<WriteResult> WriteAsync(Message<TValue> message, CancellationToken cancellationToken)
5052
{
5153
TaskCompletionSource<WriteResult> tcs = new();
52-
cancellationToken.Register(
54+
await using var registrationUserCancellationTokenRegistration = cancellationToken.Register(
5355
() => tcs.TrySetException(
5456
new WriterException("The write operation was canceled before it could be completed")
5557
), useSynchronizationContext: false
5658
);
59+
await using var writerDisposedCancellationTokenRegistration = _disposeCts.Token.Register(
60+
() => tcs.TrySetException(new WriterException($"Writer[{_config}] is disposed")),
61+
useSynchronizationContext: false
62+
);
5763

5864
byte[] data;
5965
try
@@ -184,10 +190,7 @@ private async Task Initialize()
184190

185191
_logger.LogInformation("Writer session initialization started. WriterConfig: {WriterConfig}", _config);
186192

187-
var stream = _driver.BidirectionalStreamCall(
188-
TopicService.StreamWriteMethod,
189-
GrpcRequestSettings.DefaultInstance
190-
);
193+
var stream = _driver.BidirectionalStreamCall(TopicService.StreamWriteMethod, _writerGrpcRequestSettings);
191194

192195
var initRequest = new StreamWriteMessage.Types.InitRequest { Path = _config.TopicPath };
193196
if (_config.ProducerId != null)
@@ -304,22 +307,15 @@ private async Task Initialize()
304307

305308
public void Dispose()
306309
{
307-
try
308-
{
309-
_disposeCts.Cancel();
310+
_disposeCts.Cancel();
310311

311-
_session.Dispose();
312-
}
313-
finally
314-
{
315-
_disposeCts.Dispose();
316-
}
312+
_session = new NotStartedWriterSession("Writer is disposed");
317313
}
318314
}
319315

320316
internal record MessageSending(MessageData MessageData, TaskCompletionSource<WriteResult> Tcs);
321317

322-
internal interface IWriteSession : IDisposable
318+
internal interface IWriteSession
323319
{
324320
Task Write(ConcurrentQueue<MessageSending> toSendBuffer);
325321
}
@@ -347,11 +343,6 @@ public Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
347343

348344
return Task.CompletedTask;
349345
}
350-
351-
public void Dispose()
352-
{
353-
// Do nothing
354-
}
355346
}
356347

357348
internal class DummyWriterSession : IWriteSession
@@ -362,10 +353,6 @@ private DummyWriterSession()
362353
{
363354
}
364355

365-
public void Dispose()
366-
{
367-
}
368-
369356
public Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
370357
{
371358
return Task.CompletedTask;
@@ -501,18 +488,16 @@ Completing task on exception...
501488
_inFlightMessages.TryDequeue(out _); // Dequeue
502489
}
503490
}
491+
492+
Logger.LogWarning("WriterSession[{SessionId}]: stream is closed", SessionId);
504493
}
505494
catch (Driver.TransportException e)
506495
{
507496
Logger.LogError(e, "WriterSession[{SessionId}] have error on processing writeAck", SessionId);
508-
497+
}
498+
finally
499+
{
509500
ReconnectSession();
510-
511-
return;
512501
}
513-
514-
Logger.LogWarning("WriterSession[{SessionId}]: stream is closed", SessionId);
515-
516-
ReconnectSession();
517502
}
518503
}

src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -704,6 +704,23 @@ public async Task WriteAsync_WhenInFlightBufferSendInInitialize_ReturnCompletedT
704704
_mockStream.Verify(stream => stream.Current, Times.Exactly(3));
705705
}
706706

707+
[Fact]
708+
public async Task WriteAsync_WhenWriterIsDisposed_ThrowWriterException()
709+
{
710+
_mockStream.SetupSequence(stream => stream.Write(It.IsAny<FromClient>()))
711+
.Returns(Task.CompletedTask);
712+
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
713+
.ReturnsAsync(true);
714+
SetupReadOneWriteAckMessage();
715+
716+
var writer = new WriterBuilder<string>(_mockIDriver.Object, "/topic")
717+
{ ProducerId = "producerId" }.Build();
718+
writer.Dispose();
719+
720+
Assert.Equal("Writer[TopicPath: /topic, ProducerId: producerId, Codec: Raw] is disposed",
721+
(await Assert.ThrowsAsync<WriterException>(() => writer.WriteAsync("abacaba"))).Message);
722+
}
723+
707724
private void SetupReadOneWriteAckMessage()
708725
{
709726
_mockStream.SetupSequence(stream => stream.Current)

0 commit comments

Comments
 (0)