Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
- Topic Writer updated release candidate

## v0.9.0-rc0
- Topic Writer release candidate
- Fixed: grpc requests go via proxy on Grpc.NET.Client >= 2.44
Expand Down
7 changes: 1 addition & 6 deletions src/Ydb.Sdk/src/Services/Topic/Exceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,15 @@ public class WriterException : Exception
{
public WriterException(string message) : base(message)
{
Status = new Status(StatusCode.Unspecified);
}

public WriterException(string message, Status status) : base(message + ": " + status)
{
Status = status;
}

public WriterException(string message, Driver.TransportException e) : base(message, e)
public WriterException(string message, Exception inner) : base(message, inner)
{
Status = e.Status;
}

public Status Status { get; }
}

public class ReaderException : Exception
Expand Down
11 changes: 3 additions & 8 deletions src/Ydb.Sdk/src/Services/Topic/TopicSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ namespace Ydb.Sdk.Services.Topic;
internal abstract class TopicSession<TFromClient, TFromServer> : IDisposable
{
private readonly Func<Task> _initialize;
private readonly Action<WriterException> _resetSessionOnTransportError;

protected readonly IBidirectionalStream<TFromClient, TFromServer> Stream;
protected readonly ILogger Logger;
Expand All @@ -17,17 +16,15 @@ protected TopicSession(
IBidirectionalStream<TFromClient, TFromServer> stream,
ILogger logger,
string sessionId,
Func<Task> initialize,
Action<WriterException> resetSessionOnTransportError)
Func<Task> initialize)
{
Stream = stream;
Logger = logger;
SessionId = sessionId;
_initialize = initialize;
_resetSessionOnTransportError = resetSessionOnTransportError;
}

protected async void ReconnectSession(WriterException exception)
protected async void ReconnectSession()
{
if (Interlocked.CompareExchange(ref _isActive, 0, 1) == 0)
{
Expand All @@ -36,9 +33,7 @@ protected async void ReconnectSession(WriterException exception)
return;
}

_resetSessionOnTransportError(exception);

Logger.LogInformation("WriterSession[{SessionId}] has been deactivated, starting to reconnect", SessionId);
Logger.LogInformation("TopicSession[{SessionId}] has been deactivated, starting to reconnect", SessionId);

await _initialize();
}
Expand Down
7 changes: 7 additions & 0 deletions src/Ydb.Sdk/src/Services/Topic/Writer/WriteResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ namespace Ydb.Sdk.Services.Topic.Writer;

public class WriteResult
{
internal static readonly WriteResult Skipped = new();

private readonly long _offset;

internal WriteResult(StreamWriteMessage.Types.WriteResponse.Types.WriteAck ack)
Expand All @@ -23,6 +25,11 @@ internal WriteResult(StreamWriteMessage.Types.WriteResponse.Types.WriteAck ack)
}
}

private WriteResult()
{
Status = PersistenceStatus.AlreadyWritten;
}

public PersistenceStatus Status { get; }

public bool TryGetOffset(out long offset)
Expand Down
Loading
Loading