Skip to content

Commit 5a964cd

Browse files
feat: Init MVP Writer implementation (#206)
1 parent d0d22de commit 5a964cd

File tree

13 files changed

+462
-185
lines changed

13 files changed

+462
-185
lines changed

src/Ydb.Sdk/src/Driver.cs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -367,8 +367,7 @@ public async ValueTask<bool> MoveNextAsync()
367367
}
368368
}
369369

370-
internal sealed class BidirectionalStream<TRequest, TResponse> : IAsyncEnumerator<TResponse>,
371-
IAsyncEnumerable<TResponse>
370+
internal sealed class BidirectionalStream<TRequest, TResponse> : IDisposable
372371
{
373372
private readonly AsyncDuplexStreamingCall<TRequest, TResponse> _bidirectionalStream;
374373
private readonly Action _rpcErrorAction;
@@ -394,13 +393,6 @@ public async Task Write(TRequest request)
394393
}
395394
}
396395

397-
public ValueTask DisposeAsync()
398-
{
399-
_bidirectionalStream.Dispose();
400-
401-
return default;
402-
}
403-
404396
public async ValueTask<bool> MoveNextAsync()
405397
{
406398
try
@@ -417,9 +409,9 @@ public async ValueTask<bool> MoveNextAsync()
417409

418410
public TResponse Current => _bidirectionalStream.ResponseStream.Current;
419411

420-
public IAsyncEnumerator<TResponse> GetAsyncEnumerator(CancellationToken cancellationToken = new())
412+
public void Dispose()
421413
{
422-
return this;
414+
_bidirectionalStream.Dispose();
423415
}
424416
}
425417

src/Ydb.Sdk/src/Services/Topic/Exceptions.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ protected YdbTopicException(string message) : base(message)
77
}
88
}
99

10-
public class YdbProducerException : YdbTopicException
10+
public class YdbWriterException : YdbTopicException
1111
{
12-
public YdbProducerException(string message) : base(message)
12+
public YdbWriterException(string message) : base(message)
1313
{
1414
}
1515
}

src/Ydb.Sdk/src/Services/Topic/IProducer.cs

Lines changed: 0 additions & 8 deletions
This file was deleted.
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
namespace Ydb.Sdk.Services.Topic;
2+
3+
public interface IWriter<TValue>
4+
{
5+
public Task<WriteResult> WriteAsync(TValue data);
6+
7+
public Task<WriteResult> WriteAsync(Message<TValue> message);
8+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
namespace Ydb.Sdk.Services.Topic;
2+
3+
public class Message<TValue>
4+
{
5+
public Message(TValue data)
6+
{
7+
Data = data;
8+
}
9+
10+
public DateTime Timestamp { get; set; } = DateTime.Now;
11+
12+
public TValue Data { get; }
13+
14+
public List<Metadata> Metadata { get; } = new();
15+
}
16+
17+
public record Metadata(string Key, byte[] Value);

src/Ydb.Sdk/src/Services/Topic/Producer.cs

Lines changed: 0 additions & 88 deletions
This file was deleted.

src/Ydb.Sdk/src/Services/Topic/ProducerBuilder.cs

Lines changed: 0 additions & 60 deletions
This file was deleted.

src/Ydb.Sdk/src/Services/Topic/ProducerConfig.cs

Lines changed: 0 additions & 16 deletions
This file was deleted.
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
using Microsoft.Extensions.Logging;
2+
3+
namespace Ydb.Sdk.Services.Topic;
4+
5+
internal abstract class TopicSession<TFromClient, TFromServer> : IDisposable
6+
{
7+
private readonly Func<Task> _initialize;
8+
9+
protected readonly Driver.BidirectionalStream<TFromClient, TFromServer> Stream;
10+
protected readonly ILogger Logger;
11+
protected readonly string SessionId;
12+
13+
private int _isActive = 1;
14+
private bool _disposed;
15+
16+
protected TopicSession(Driver.BidirectionalStream<TFromClient, TFromServer> stream, ILogger logger,
17+
string sessionId, Func<Task> initialize)
18+
{
19+
Stream = stream;
20+
Logger = logger;
21+
SessionId = sessionId;
22+
_initialize = initialize;
23+
}
24+
25+
protected async void ReconnectSession()
26+
{
27+
if (Interlocked.CompareExchange(ref _isActive, 0, 1) == 0)
28+
{
29+
Logger.LogWarning("Skipping reconnect. A reconnect session has already been initiated");
30+
31+
return;
32+
}
33+
34+
Logger.LogInformation("WriterSession[{SessionId}] has been deactivated, starting to reconnect", SessionId);
35+
36+
while (!_disposed)
37+
{
38+
try
39+
{
40+
await _initialize();
41+
break;
42+
}
43+
catch (Exception e)
44+
{
45+
Logger.LogError(e, "Unable to reconnect the session due to the following error");
46+
}
47+
}
48+
}
49+
50+
public void Dispose()
51+
{
52+
lock (this)
53+
{
54+
_disposed = true;
55+
}
56+
57+
Stream.Dispose();
58+
}
59+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
using Ydb.Topic;
2+
3+
namespace Ydb.Sdk.Services.Topic;
4+
5+
public class WriteResult
6+
{
7+
private readonly long _offset;
8+
9+
internal WriteResult(StreamWriteMessage.Types.WriteResponse.Types.WriteAck ack)
10+
{
11+
switch (ack.MessageWriteStatusCase)
12+
{
13+
case StreamWriteMessage.Types.WriteResponse.Types.WriteAck.MessageWriteStatusOneofCase.Written:
14+
Status = PersistenceStatus.Written;
15+
_offset = ack.Written.Offset;
16+
break;
17+
case StreamWriteMessage.Types.WriteResponse.Types.WriteAck.MessageWriteStatusOneofCase.Skipped:
18+
Status = PersistenceStatus.AlreadyWritten;
19+
break;
20+
case StreamWriteMessage.Types.WriteResponse.Types.WriteAck.MessageWriteStatusOneofCase.None:
21+
default:
22+
throw new YdbWriterException($"Unexpected WriteAck status: {ack.MessageWriteStatusCase}");
23+
}
24+
}
25+
26+
public PersistenceStatus Status { get; }
27+
28+
public bool TryGetOffset(out long offset)
29+
{
30+
offset = _offset;
31+
32+
return Status == PersistenceStatus.Written;
33+
}
34+
}
35+
36+
public enum PersistenceStatus
37+
{
38+
Written,
39+
AlreadyWritten
40+
}

0 commit comments

Comments
 (0)