Skip to content

Commit 3c503b8

Browse files
next iteration impl reader
1 parent 087c2b2 commit 3c503b8

File tree

4 files changed

+198
-120
lines changed

4 files changed

+198
-120
lines changed
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
using System.Collections.Immutable;
2+
using Google.Protobuf;
3+
using Google.Protobuf.Collections;
4+
using Google.Protobuf.WellKnownTypes;
5+
using Ydb.Topic;
6+
7+
namespace Ydb.Sdk.Services.Topic.Reader;
8+
9+
internal class InternalMessage
10+
{
11+
public InternalMessage(
12+
ByteString data,
13+
string topic,
14+
long partitionId,
15+
string producerId,
16+
OffsetsRange offsetsRange,
17+
Timestamp createdAt,
18+
RepeatedField<MetadataItem> metadataItems,
19+
int dataSize)
20+
{
21+
Data = data;
22+
Topic = topic;
23+
PartitionId = partitionId;
24+
ProducerId = producerId;
25+
OffsetsRange = offsetsRange;
26+
CreatedAt = createdAt;
27+
MetadataItems = metadataItems;
28+
DataSize = dataSize;
29+
}
30+
31+
private ByteString Data { get; }
32+
33+
private string Topic { get; }
34+
35+
private long PartitionId { get; }
36+
37+
private string ProducerId { get; }
38+
39+
private OffsetsRange OffsetsRange { get; }
40+
41+
private Timestamp CreatedAt { get; }
42+
43+
private RepeatedField<MetadataItem> MetadataItems { get; }
44+
45+
private int DataSize { get; }
46+
47+
internal Message<TValue> ToPublicMessage<TValue>(IDeserializer<TValue> deserializer, ReaderSession readerSession)
48+
{
49+
return new Message<TValue>(
50+
data: deserializer.Deserialize(Data.ToByteArray()),
51+
topic: Topic,
52+
partitionId: PartitionId,
53+
producerId: ProducerId,
54+
createdAt: CreatedAt.ToDateTime(),
55+
metadata: MetadataItems.Select(item => new Metadata(item.Key, item.Value.ToByteArray())).ToImmutableArray(),
56+
offsetsRange: OffsetsRange,
57+
readerSession: readerSession
58+
);
59+
}
60+
}
61+
62+
internal class InternalBatchMessage
63+
{
64+
public InternalBatchMessage(
65+
OffsetsRange batchOffsetsRange,
66+
Queue<InternalMessage> internalMessages,
67+
ReaderSession readerSession)
68+
{
69+
BatchOffsetsRange = batchOffsetsRange;
70+
InternalMessages = internalMessages;
71+
ReaderSession = readerSession;
72+
}
73+
74+
internal OffsetsRange BatchOffsetsRange { get; }
75+
76+
internal Queue<InternalMessage> InternalMessages { get; }
77+
78+
internal ReaderSession ReaderSession { get; }
79+
}
80+
81+
internal record CommitSending(
82+
OffsetsRange OffsetsRange,
83+
long PartitionSessionId,
84+
TaskCompletionSource<TopicPartitionOffset> TcsTopicPartitionOffset
85+
);

src/Ydb.Sdk/src/Services/Topic/Reader/Message.cs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ internal Message(
4949

5050
public Task CommitAsync()
5151
{
52-
return _readerSession.CommitOffsetRange(_offsetsRange);
52+
return _readerSession.CommitOffsetRange(_offsetsRange, PartitionId);
5353
}
5454
}
5555

@@ -76,6 +76,19 @@ public Task CommitBatchAsync()
7676

7777
var offsetsRange = new OffsetsRange { Start = Batch.First().Start, End = Batch.Last().End };
7878

79-
return _readerSession.CommitOffsetRange(offsetsRange);
79+
return _readerSession.CommitOffsetRange(offsetsRange, Batch.First().PartitionId);
8080
}
8181
}
82+
83+
public class TopicPartitionOffset
84+
{
85+
public TopicPartitionOffset(long offset, long partitionId)
86+
{
87+
Offset = offset;
88+
PartitionId = partitionId;
89+
}
90+
91+
public long Offset { get; }
92+
93+
public long PartitionId { get; }
94+
}

0 commit comments

Comments
 (0)