Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/Ydb.Sdk/src/Driver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ protected override (string, GrpcChannel) GetChannel(long nodeId)
protected override void OnRpcError(string endpoint, RpcException e)
{
Logger.LogWarning("gRPC error [{Status}] on channel {Endpoint}", e.Status, endpoint);

if (e.StatusCode == Grpc.Core.StatusCode.Cancelled)
{
return;
}

if (!_endpointPool.PessimizeEndpoint(endpoint))
{
return;
Expand Down
1 change: 1 addition & 0 deletions src/Ydb.Sdk/src/GrpcRequestSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public class GrpcRequestSettings
public string TraceId { get; set; } = string.Empty;
public TimeSpan TransportTimeout { get; set; } = TimeSpan.Zero;
public ImmutableArray<string> CustomClientHeaders { get; } = new();
public CancellationToken CancellationToken = default;

internal long NodeId { get; set; }
internal Action<Grpc.Core.Metadata> TrailersHandler { get; set; } = _ => { };
Expand Down
5 changes: 3 additions & 2 deletions src/Ydb.Sdk/src/IDriver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ protected CallOptions GetCallOptions(GrpcRequestSettings settings)
}

var options = new CallOptions(
headers: meta
headers: meta,
cancellationToken: settings.CancellationToken
);

if (settings.TransportTimeout != TimeSpan.Zero)
Expand Down Expand Up @@ -213,7 +214,7 @@ public async ValueTask<bool> MoveNextAsync()
}
}

public class BidirectionalStream<TRequest, TResponse> : IBidirectionalStream<TRequest, TResponse>
internal class BidirectionalStream<TRequest, TResponse> : IBidirectionalStream<TRequest, TResponse>
{
private readonly AsyncDuplexStreamingCall<TRequest, TResponse> _stream;
private readonly Action<RpcException> _rpcErrorAction;
Expand Down
3 changes: 3 additions & 0 deletions src/Ydb.Sdk/src/Services/Topic/Metadata.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
namespace Ydb.Sdk.Services.Topic;

public record Metadata(string Key, byte[] Value);
2 changes: 0 additions & 2 deletions src/Ydb.Sdk/src/Services/Topic/Writer/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,3 @@ public Message(TValue data)

public List<Metadata> Metadata { get; } = new();
}

public record Metadata(string Key, byte[] Value);
49 changes: 18 additions & 31 deletions src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ internal class Writer<TValue> : IWriter<TValue>
private readonly WriterConfig _config;
private readonly ILogger<Writer<TValue>> _logger;
private readonly ISerializer<TValue> _serializer;
private readonly GrpcRequestSettings _writerGrpcRequestSettings;
private readonly ConcurrentQueue<MessageSending> _toSendBuffer = new();
private readonly ConcurrentQueue<MessageSending> _inFlightMessages = new();
private readonly CancellationTokenSource _disposeCts = new();
Expand All @@ -37,6 +38,7 @@ internal Writer(IDriver driver, WriterConfig config, ISerializer<TValue> seriali
_logger = driver.LoggerFactory.CreateLogger<Writer<TValue>>();
_serializer = serializer;
_limitBufferMaxSize = config.BufferMaxSize;
_writerGrpcRequestSettings = new GrpcRequestSettings { CancellationToken = _disposeCts.Token };

StartWriteWorker();
}
Expand All @@ -49,11 +51,15 @@ public Task<WriteResult> WriteAsync(TValue data, CancellationToken cancellationT
public async Task<WriteResult> WriteAsync(Message<TValue> message, CancellationToken cancellationToken)
{
TaskCompletionSource<WriteResult> tcs = new();
cancellationToken.Register(
await using var registrationUserCancellationTokenRegistration = cancellationToken.Register(
() => tcs.TrySetException(
new WriterException("The write operation was canceled before it could be completed")
), useSynchronizationContext: false
);
await using var writerDisposedCancellationTokenRegistration = _disposeCts.Token.Register(
() => tcs.TrySetException(new WriterException($"Writer[{_config}] is disposed")),
useSynchronizationContext: false
);

byte[] data;
try
Expand Down Expand Up @@ -184,10 +190,7 @@ private async Task Initialize()

_logger.LogInformation("Writer session initialization started. WriterConfig: {WriterConfig}", _config);

var stream = _driver.BidirectionalStreamCall(
TopicService.StreamWriteMethod,
GrpcRequestSettings.DefaultInstance
);
var stream = _driver.BidirectionalStreamCall(TopicService.StreamWriteMethod, _writerGrpcRequestSettings);

var initRequest = new StreamWriteMessage.Types.InitRequest { Path = _config.TopicPath };
if (_config.ProducerId != null)
Expand Down Expand Up @@ -304,22 +307,15 @@ private async Task Initialize()

public void Dispose()
{
try
{
_disposeCts.Cancel();
_disposeCts.Cancel();

_session.Dispose();
}
finally
{
_disposeCts.Dispose();
}
_session = new NotStartedWriterSession("Writer is disposed");
}
}

internal record MessageSending(MessageData MessageData, TaskCompletionSource<WriteResult> Tcs);

internal interface IWriteSession : IDisposable
internal interface IWriteSession
{
Task Write(ConcurrentQueue<MessageSending> toSendBuffer);
}
Expand Down Expand Up @@ -347,11 +343,6 @@ public Task Write(ConcurrentQueue<MessageSending> toSendBuffer)

return Task.CompletedTask;
}

public void Dispose()
{
// Do nothing
}
}

internal class DummyWriterSession : IWriteSession
Expand All @@ -362,10 +353,6 @@ private DummyWriterSession()
{
}

public void Dispose()
{
}

public Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
{
return Task.CompletedTask;
Expand Down Expand Up @@ -505,14 +492,14 @@ Completing task on exception...
catch (Driver.TransportException e)
{
Logger.LogError(e, "WriterSession[{SessionId}] have error on processing writeAck", SessionId);

}
catch (ObjectDisposedException)
{
Logger.LogWarning("WriterSession[{SessionId}]: stream is closed", SessionId);
}
finally
{
ReconnectSession();

return;
}

Logger.LogWarning("WriterSession[{SessionId}]: stream is closed", SessionId);

ReconnectSession();
}
}
17 changes: 17 additions & 0 deletions src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,23 @@ public async Task WriteAsync_WhenInFlightBufferSendInInitialize_ReturnCompletedT
_mockStream.Verify(stream => stream.Current, Times.Exactly(3));
}

[Fact]
public async Task WriteAsync_WhenWriterIsDisposed_ThrowWriterException()
{
_mockStream.SetupSequence(stream => stream.Write(It.IsAny<FromClient>()))
.Returns(Task.CompletedTask);
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
.ReturnsAsync(true);
SetupReadOneWriteAckMessage();

var writer = new WriterBuilder<string>(_mockIDriver.Object, "/topic")
{ ProducerId = "producerId" }.Build();
writer.Dispose();

Assert.Equal("Writer[TopicPath: /topic, ProducerId: producerId, Codec: Raw] is disposed",
(await Assert.ThrowsAsync<WriterException>(() => writer.WriteAsync("abacaba"))).Message);
}

private void SetupReadOneWriteAckMessage()
{
_mockStream.SetupSequence(stream => stream.Current)
Expand Down
Loading