Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
- Do not pessimize the node on Grpc.Core.StatusCode.Cancelled and Grpc.Core.StatusCode.DeadlineExceeded.
- Dispose of WriterSession using dispose CancellationToken.
- BidirectionalStream is internal class.
- Move Metadata class to Ydb.Sdk.Services.Topic.
- Fixed memory leak CancellationTokenRegistration.
- Cancel writing tasks after disposing of Writer.

## v0.9.3
- Fixed bug in Topic Writer: worker is stopped by disposeCts
- Fixed bug in sql parser ADO.NET: deduplication declare param in YQL query
Expand Down
6 changes: 6 additions & 0 deletions src/Ydb.Sdk/src/Driver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ protected override (string, GrpcChannel) GetChannel(long nodeId)
protected override void OnRpcError(string endpoint, RpcException e)
{
Logger.LogWarning("gRPC error [{Status}] on channel {Endpoint}", e.Status, endpoint);

if (e.StatusCode is Grpc.Core.StatusCode.Cancelled or Grpc.Core.StatusCode.DeadlineExceeded)
{
return;
}

if (!_endpointPool.PessimizeEndpoint(endpoint))
{
return;
Expand Down
1 change: 1 addition & 0 deletions src/Ydb.Sdk/src/GrpcRequestSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public class GrpcRequestSettings
public string TraceId { get; set; } = string.Empty;
public TimeSpan TransportTimeout { get; set; } = TimeSpan.Zero;
public ImmutableArray<string> CustomClientHeaders { get; } = new();
public CancellationToken CancellationToken = default;

internal long NodeId { get; set; }
internal Action<Grpc.Core.Metadata> TrailersHandler { get; set; } = _ => { };
Expand Down
5 changes: 3 additions & 2 deletions src/Ydb.Sdk/src/IDriver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ protected CallOptions GetCallOptions(GrpcRequestSettings settings)
}

var options = new CallOptions(
headers: meta
headers: meta,
cancellationToken: settings.CancellationToken
);

if (settings.TransportTimeout != TimeSpan.Zero)
Expand Down Expand Up @@ -213,7 +214,7 @@ public async ValueTask<bool> MoveNextAsync()
}
}

public class BidirectionalStream<TRequest, TResponse> : IBidirectionalStream<TRequest, TResponse>
internal class BidirectionalStream<TRequest, TResponse> : IBidirectionalStream<TRequest, TResponse>
{
private readonly AsyncDuplexStreamingCall<TRequest, TResponse> _stream;
private readonly Action<RpcException> _rpcErrorAction;
Expand Down
3 changes: 3 additions & 0 deletions src/Ydb.Sdk/src/Services/Topic/Metadata.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
namespace Ydb.Sdk.Services.Topic;

public record Metadata(string Key, byte[] Value);
2 changes: 0 additions & 2 deletions src/Ydb.Sdk/src/Services/Topic/Writer/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,3 @@ public Message(TValue data)

public List<Metadata> Metadata { get; } = new();
}

public record Metadata(string Key, byte[] Value);
47 changes: 16 additions & 31 deletions src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ internal class Writer<TValue> : IWriter<TValue>
private readonly WriterConfig _config;
private readonly ILogger<Writer<TValue>> _logger;
private readonly ISerializer<TValue> _serializer;
private readonly GrpcRequestSettings _writerGrpcRequestSettings;
private readonly ConcurrentQueue<MessageSending> _toSendBuffer = new();
private readonly ConcurrentQueue<MessageSending> _inFlightMessages = new();
private readonly CancellationTokenSource _disposeCts = new();
Expand All @@ -37,6 +38,7 @@ internal Writer(IDriver driver, WriterConfig config, ISerializer<TValue> seriali
_logger = driver.LoggerFactory.CreateLogger<Writer<TValue>>();
_serializer = serializer;
_limitBufferMaxSize = config.BufferMaxSize;
_writerGrpcRequestSettings = new GrpcRequestSettings { CancellationToken = _disposeCts.Token };

StartWriteWorker();
}
Expand All @@ -49,11 +51,15 @@ public Task<WriteResult> WriteAsync(TValue data, CancellationToken cancellationT
public async Task<WriteResult> WriteAsync(Message<TValue> message, CancellationToken cancellationToken)
{
TaskCompletionSource<WriteResult> tcs = new();
cancellationToken.Register(
await using var registrationUserCancellationTokenRegistration = cancellationToken.Register(
() => tcs.TrySetException(
new WriterException("The write operation was canceled before it could be completed")
), useSynchronizationContext: false
);
await using var writerDisposedCancellationTokenRegistration = _disposeCts.Token.Register(
() => tcs.TrySetException(new WriterException($"Writer[{_config}] is disposed")),
useSynchronizationContext: false
);

byte[] data;
try
Expand Down Expand Up @@ -184,10 +190,7 @@ private async Task Initialize()

_logger.LogInformation("Writer session initialization started. WriterConfig: {WriterConfig}", _config);

var stream = _driver.BidirectionalStreamCall(
TopicService.StreamWriteMethod,
GrpcRequestSettings.DefaultInstance
);
var stream = _driver.BidirectionalStreamCall(TopicService.StreamWriteMethod, _writerGrpcRequestSettings);

var initRequest = new StreamWriteMessage.Types.InitRequest { Path = _config.TopicPath };
if (_config.ProducerId != null)
Expand Down Expand Up @@ -304,22 +307,15 @@ private async Task Initialize()

public void Dispose()
{
try
{
_disposeCts.Cancel();
_disposeCts.Cancel();

_session.Dispose();
}
finally
{
_disposeCts.Dispose();
}
_session = new NotStartedWriterSession("Writer is disposed");
}
}

internal record MessageSending(MessageData MessageData, TaskCompletionSource<WriteResult> Tcs);

internal interface IWriteSession : IDisposable
internal interface IWriteSession
{
Task Write(ConcurrentQueue<MessageSending> toSendBuffer);
}
Expand Down Expand Up @@ -347,11 +343,6 @@ public Task Write(ConcurrentQueue<MessageSending> toSendBuffer)

return Task.CompletedTask;
}

public void Dispose()
{
// Do nothing
}
}

internal class DummyWriterSession : IWriteSession
Expand All @@ -362,10 +353,6 @@ private DummyWriterSession()
{
}

public void Dispose()
{
}

public Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
{
return Task.CompletedTask;
Expand Down Expand Up @@ -501,18 +488,16 @@ Completing task on exception...
_inFlightMessages.TryDequeue(out _); // Dequeue
}
}

Logger.LogWarning("WriterSession[{SessionId}]: stream is closed", SessionId);
}
catch (Driver.TransportException e)
{
Logger.LogError(e, "WriterSession[{SessionId}] have error on processing writeAck", SessionId);

}
finally
{
ReconnectSession();

return;
}

Logger.LogWarning("WriterSession[{SessionId}]: stream is closed", SessionId);

ReconnectSession();
}
}
17 changes: 17 additions & 0 deletions src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,23 @@ public async Task WriteAsync_WhenInFlightBufferSendInInitialize_ReturnCompletedT
_mockStream.Verify(stream => stream.Current, Times.Exactly(3));
}

[Fact]
public async Task WriteAsync_WhenWriterIsDisposed_ThrowWriterException()
{
_mockStream.SetupSequence(stream => stream.Write(It.IsAny<FromClient>()))
.Returns(Task.CompletedTask);
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
.ReturnsAsync(true);
SetupReadOneWriteAckMessage();

var writer = new WriterBuilder<string>(_mockIDriver.Object, "/topic")
{ ProducerId = "producerId" }.Build();
writer.Dispose();

Assert.Equal("Writer[TopicPath: /topic, ProducerId: producerId, Codec: Raw] is disposed",
(await Assert.ThrowsAsync<WriterException>(() => writer.WriteAsync("abacaba"))).Message);
}

private void SetupReadOneWriteAckMessage()
{
_mockStream.SetupSequence(stream => stream.Current)
Expand Down
Loading