Skip to content

Commit b524443

Browse files
update Reader
1 parent 821e9b6 commit b524443

File tree

8 files changed

+345
-332
lines changed

8 files changed

+345
-332
lines changed

src/Ydb.Sdk/src/Services/Topic/Exceptions.cs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,13 @@ public class ReaderException : Exception
1919
{
2020
public ReaderException(string message) : base(message)
2121
{
22-
Status = new Status(StatusCode.Unspecified);
2322
}
2423

2524
public ReaderException(string message, Status status) : base(message + ": " + status)
2625
{
27-
Status = status;
2826
}
2927

30-
public ReaderException(string message, Driver.TransportException e) : base(message, e)
28+
public ReaderException(string message, Exception inner) : base(message, inner)
3129
{
32-
Status = e.Status;
3330
}
34-
35-
public Status Status { get; }
3631
}

src/Ydb.Sdk/src/Services/Topic/IReader.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,5 @@ public interface IReader<TValue> : IDisposable
66
{
77
public ValueTask<Message<TValue>> ReadAsync(CancellationToken cancellationToken = default);
88

9-
public ValueTask<BatchMessage<TValue>> ReadBatchAsync(CancellationToken cancellationToken = default);
9+
public ValueTask<BatchMessages<TValue>> ReadBatchAsync(CancellationToken cancellationToken = default);
1010
}
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
using System.Collections.Immutable;
2+
using System.Diagnostics.CodeAnalysis;
3+
using Google.Protobuf;
4+
using Google.Protobuf.Collections;
5+
using Google.Protobuf.WellKnownTypes;
6+
using Ydb.Topic;
7+
8+
namespace Ydb.Sdk.Services.Topic.Reader;
9+
10+
internal class InternalBatchMessages
11+
{
12+
public InternalBatchMessages(
13+
ByteString data,
14+
string topic,
15+
long partitionId,
16+
string producerId,
17+
OffsetsRange offsetsRange,
18+
Timestamp createdAt,
19+
RepeatedField<MetadataItem> metadataItems,
20+
long approximatelyBytesSize)
21+
{
22+
Data = data;
23+
Topic = topic;
24+
PartitionId = partitionId;
25+
ProducerId = producerId;
26+
OffsetsRange = offsetsRange;
27+
CreatedAt = createdAt;
28+
MetadataItems = metadataItems;
29+
ApproximatelyBytesSize = approximatelyBytesSize;
30+
}
31+
32+
private ByteString Data { get; }
33+
private string Topic { get; }
34+
private long PartitionId { get; }
35+
private string ProducerId { get; }
36+
private OffsetsRange OffsetsRange { get; }
37+
private Timestamp CreatedAt { get; }
38+
private RepeatedField<MetadataItem> MetadataItems { get; }
39+
private long ApproximatelyBytesSize { get; }
40+
41+
internal Message<TValue> ToPublicMessage<TValue>(IDeserializer<TValue> deserializer,
42+
ReaderSession<TValue> readerSession)
43+
{
44+
readerSession.TryReadRequestBytes(ApproximatelyBytesSize);
45+
46+
return new Message<TValue>(
47+
data: deserializer.Deserialize(Data.ToByteArray()),
48+
topic: Topic,
49+
partitionId: PartitionId,
50+
producerId: ProducerId,
51+
createdAt: CreatedAt.ToDateTime(),
52+
metadata: MetadataItems.Select(item => new Metadata(item.Key, item.Value.ToByteArray())).ToImmutableArray(),
53+
offsetsRange: OffsetsRange,
54+
readerSession: readerSession,
55+
approximatelyBytesSize: ApproximatelyBytesSize
56+
);
57+
}
58+
}
59+
60+
internal class InternalBatchMessages<TValue>
61+
{
62+
private readonly StreamReadMessage.Types.ReadResponse.Types.Batch _batch;
63+
private readonly PartitionSession _partitionSession;
64+
private readonly IDeserializer<TValue> _deserializer;
65+
private readonly ReaderSession<TValue> _readerSession;
66+
private readonly long _approximatelyBatchSizeOriginal;
67+
68+
private int _startMessageDataIndex = 0;
69+
private long _approximatelyBatchSize;
70+
71+
private int OriginalMessageCount => _batch.MessageData.Count;
72+
73+
internal bool IsActive => _startMessageDataIndex < OriginalMessageCount && _readerSession.IsActive;
74+
75+
public InternalBatchMessages(
76+
StreamReadMessage.Types.ReadResponse.Types.Batch batch,
77+
PartitionSession partitionsSession,
78+
ReaderSession<TValue> readerSession,
79+
long approximatelyBatchSize,
80+
IDeserializer<TValue> deserializer)
81+
{
82+
_batch = batch;
83+
_partitionSession = partitionsSession;
84+
_readerSession = readerSession;
85+
_deserializer = deserializer;
86+
_approximatelyBatchSizeOriginal = approximatelyBatchSize;
87+
_approximatelyBatchSize = approximatelyBatchSize;
88+
}
89+
90+
internal bool TryDequeueMessage([MaybeNullWhen(false)] out Message<TValue> message)
91+
{
92+
if (!IsActive)
93+
{
94+
message = default;
95+
return false;
96+
}
97+
98+
var index = _startMessageDataIndex++;
99+
var approximatelyMessageBytesSize = Utils
100+
.CalculateApproximatelyBytesSize(_approximatelyBatchSizeOriginal, OriginalMessageCount, index);
101+
var messageData = _batch.MessageData[index];
102+
103+
TValue value;
104+
try
105+
{
106+
value = _deserializer.Deserialize(messageData.Data.ToByteArray());
107+
}
108+
catch (Exception e)
109+
{
110+
throw new ReaderException("Error when deserializing message data", e);
111+
}
112+
113+
_approximatelyBatchSize -= approximatelyMessageBytesSize;
114+
115+
message = new Message<TValue>(
116+
value,
117+
_partitionSession.TopicPath,
118+
_partitionSession.PartitionId,
119+
_batch.ProducerId,
120+
messageData.CreatedAt.ToDateTime(),
121+
messageData.MetadataItems.Select(item => new Metadata(item.Key, item.Value.ToByteArray()))
122+
.ToImmutableArray(),
123+
new OffsetsRange { Start = _partitionSession.PrevEndOffsetMessage, End = messageData.Offset },
124+
_readerSession,
125+
approximatelyMessageBytesSize
126+
);
127+
_partitionSession.PrevEndOffsetMessage = messageData.Offset + 1;
128+
129+
return true;
130+
}
131+
132+
internal bool TryPublicBatch([MaybeNullWhen(false)] out BatchMessages<TValue> batchMessages)
133+
{
134+
if (!IsActive)
135+
{
136+
batchMessages = default;
137+
return false;
138+
}
139+
140+
var offsetsRangeBatch = new OffsetsRange
141+
{ Start = _partitionSession.PrevEndOffsetMessage, End = _batch.MessageData.Last().Offset };
142+
var approximatelyBatchSize = _approximatelyBatchSize;
143+
144+
var messages = new List<Message<TValue>>();
145+
while (TryDequeueMessage(out var message))
146+
{
147+
messages.Add(message);
148+
}
149+
150+
batchMessages = new BatchMessages<TValue>(messages, _readerSession, approximatelyBatchSize, offsetsRangeBatch);
151+
152+
return true;
153+
}
154+
}
155+
156+
internal record CommitSending(
157+
OffsetsRange OffsetsRange,
158+
long PartitionSessionId,
159+
TaskCompletionSource TcsCommit,
160+
long ApproximatelyBytesSize
161+
);

src/Ydb.Sdk/src/Services/Topic/Reader/InternalMessage.cs

Lines changed: 0 additions & 91 deletions
This file was deleted.

src/Ydb.Sdk/src/Services/Topic/Reader/Message.cs

Lines changed: 16 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ namespace Ydb.Sdk.Services.Topic.Reader;
66
public class Message<TValue>
77
{
88
private readonly OffsetsRange _offsetsRange;
9-
private readonly ReaderSession _readerSession;
9+
private readonly ReaderSession<TValue> _readerSession;
1010
private readonly long _approximatelyBytesSize;
1111

1212
internal Message(
@@ -17,7 +17,7 @@ internal Message(
1717
DateTime createdAt,
1818
ImmutableArray<Metadata> metadata,
1919
OffsetsRange offsetsRange,
20-
ReaderSession readerSession,
20+
ReaderSession<TValue> readerSession,
2121
long approximatelyBytesSize)
2222
{
2323
Data = data;
@@ -45,56 +45,38 @@ internal Message(
4545

4646
public DateTime CreatedAt { get; }
4747

48-
public ImmutableArray<Metadata> Metadata { get; }
49-
50-
internal long Start => _offsetsRange.Start;
51-
internal long End => _offsetsRange.End;
48+
public IReadOnlyCollection<Metadata> Metadata { get; }
5249

5350
public Task CommitAsync()
5451
{
5552
return _readerSession.CommitOffsetRange(_offsetsRange, PartitionId, _approximatelyBytesSize);
5653
}
5754
}
5855

59-
public class BatchMessage<TValue>
56+
public class BatchMessages<TValue>
6057
{
61-
private readonly ReaderSession _readerSession;
58+
private readonly ReaderSession<TValue> _readerSession;
59+
private readonly OffsetsRange _offsetsRange;
6260
private readonly long _approximatelyBatchSize;
6361

64-
public ImmutableArray<Message<TValue>> Batch { get; }
62+
public IReadOnlyCollection<Message<TValue>> Batch { get; }
6563

66-
internal BatchMessage(
67-
ImmutableArray<Message<TValue>> batch,
68-
ReaderSession readerSession,
69-
long approximatelyBatchSize)
64+
internal BatchMessages(
65+
IReadOnlyCollection<Message<TValue>> batch,
66+
ReaderSession<TValue> readerSession,
67+
long approximatelyBatchSize,
68+
OffsetsRange offsetsRange)
7069
{
7170
Batch = batch;
7271
_readerSession = readerSession;
7372
_approximatelyBatchSize = approximatelyBatchSize;
73+
_offsetsRange = offsetsRange;
7474
}
7575

7676
public Task CommitBatchAsync()
7777
{
78-
if (Batch.Length == 0)
79-
{
80-
return Task.CompletedTask;
81-
}
82-
83-
var offsetsRange = new OffsetsRange { Start = Batch.First().Start, End = Batch.Last().End };
84-
85-
return _readerSession.CommitOffsetRange(offsetsRange, Batch.First().PartitionId, _approximatelyBatchSize);
86-
}
87-
}
88-
89-
public class TopicPartitionOffset
90-
{
91-
public TopicPartitionOffset(long offset, long partitionId)
92-
{
93-
Offset = offset;
94-
PartitionId = partitionId;
78+
return Batch.Count == 0
79+
? Task.CompletedTask
80+
: _readerSession.CommitOffsetRange(_offsetsRange, Batch.First().PartitionId, _approximatelyBatchSize);
9581
}
96-
97-
public long Offset { get; }
98-
99-
public long PartitionId { get; }
10082
}

0 commit comments

Comments
 (0)