Skip to content

Commit 9820bc4

Browse files
feat: initial commit impl reader
1 parent a03d8f6 commit 9820bc4

File tree

7 files changed

+561
-81
lines changed

7 files changed

+561
-81
lines changed

src/Ydb.Sdk/src/IDriver.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ public async ValueTask<bool> MoveNextAsync()
213213
}
214214
}
215215

216-
public class BidirectionalStream<TRequest, TResponse> : IBidirectionalStream<TRequest, TResponse>
216+
internal class BidirectionalStream<TRequest, TResponse> : IBidirectionalStream<TRequest, TResponse>
217217
{
218218
private readonly AsyncDuplexStreamingCall<TRequest, TResponse> _stream;
219219
private readonly Action<RpcException> _rpcErrorAction;

src/Ydb.Sdk/src/Services/Topic/Exceptions.cs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,20 @@ public WriterException(string message, Exception inner) : base(message, inner)
1717

1818
public class ReaderException : Exception
1919
{
20-
protected ReaderException(string message) : base(message)
20+
public ReaderException(string message) : base(message)
2121
{
22+
Status = new Status(StatusCode.Unspecified);
2223
}
24+
25+
public ReaderException(string message, Status status) : base(message + ": " + status)
26+
{
27+
Status = status;
28+
}
29+
30+
public ReaderException(string message, Driver.TransportException e) : base(message, e)
31+
{
32+
Status = e.Status;
33+
}
34+
35+
public Status Status { get; }
2336
}

src/Ydb.Sdk/src/Services/Topic/IReader.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
namespace Ydb.Sdk.Services.Topic;
44

5-
public interface IReader<TValue>
5+
public interface IReader<TValue> : IDisposable
66
{
7-
public Task<TValue> ReadAsync();
7+
public ValueTask<Message<TValue>> ReadAsync(CancellationToken cancellationToken = default);
88

9-
public Task<Message<TValue>> ReadMessageAsync();
9+
public ValueTask<IReadOnlyList<Message<TValue>>> ReadBatchAsync(CancellationToken cancellationToken = default);
1010
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,45 @@
1+
using Google.Protobuf;
2+
using Google.Protobuf.Collections;
3+
using Google.Protobuf.WellKnownTypes;
4+
using Ydb.Topic;
5+
16
namespace Ydb.Sdk.Services.Topic.Reader;
27

38
public class Message<TValue>
49
{
10+
internal Message(TValue data, string topic, string producerId)
11+
{
12+
Data = data;
13+
Topic = topic;
14+
ProducerId = producerId;
15+
}
16+
17+
public TValue Data { get; }
18+
19+
/// <summary>
20+
/// The topic associated with the message.
21+
/// </summary>
22+
public string Topic { get; }
23+
24+
public string ProducerId { get; }
25+
26+
public Task Commit()
27+
{
28+
throw new NotImplementedException();
29+
}
30+
}
31+
32+
public class BatchMessage<TValue>
33+
{
34+
public BatchMessage(IReadOnlyCollection<Message<TValue>> batch)
35+
{
36+
Batch = batch;
37+
}
38+
39+
public IReadOnlyCollection<Message<TValue>> Batch { get; }
40+
41+
public Task Commit()
42+
{
43+
throw new NotImplementedException();
44+
}
545
}

0 commit comments

Comments
 (0)