Skip to content

Commit d384348

Browse files
updates
1 parent b524443 commit d384348

File tree

8 files changed

+117
-136
lines changed

8 files changed

+117
-136
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Fixed TopicSession.ReconnectSession(): CompareExchange returns the original value in location1.
12
* Fixed: YdbDataReader.GetDataTypeName for optional values.
23
* Added support for "Columns" collectionName in YdbConnection.GetSchema(Async).
34

Lines changed: 27 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1,76 +1,21 @@
11
using System.Collections.Immutable;
22
using System.Diagnostics.CodeAnalysis;
3-
using Google.Protobuf;
4-
using Google.Protobuf.Collections;
5-
using Google.Protobuf.WellKnownTypes;
63
using Ydb.Topic;
74

85
namespace Ydb.Sdk.Services.Topic.Reader;
96

10-
internal class InternalBatchMessages
11-
{
12-
public InternalBatchMessages(
13-
ByteString data,
14-
string topic,
15-
long partitionId,
16-
string producerId,
17-
OffsetsRange offsetsRange,
18-
Timestamp createdAt,
19-
RepeatedField<MetadataItem> metadataItems,
20-
long approximatelyBytesSize)
21-
{
22-
Data = data;
23-
Topic = topic;
24-
PartitionId = partitionId;
25-
ProducerId = producerId;
26-
OffsetsRange = offsetsRange;
27-
CreatedAt = createdAt;
28-
MetadataItems = metadataItems;
29-
ApproximatelyBytesSize = approximatelyBytesSize;
30-
}
31-
32-
private ByteString Data { get; }
33-
private string Topic { get; }
34-
private long PartitionId { get; }
35-
private string ProducerId { get; }
36-
private OffsetsRange OffsetsRange { get; }
37-
private Timestamp CreatedAt { get; }
38-
private RepeatedField<MetadataItem> MetadataItems { get; }
39-
private long ApproximatelyBytesSize { get; }
40-
41-
internal Message<TValue> ToPublicMessage<TValue>(IDeserializer<TValue> deserializer,
42-
ReaderSession<TValue> readerSession)
43-
{
44-
readerSession.TryReadRequestBytes(ApproximatelyBytesSize);
45-
46-
return new Message<TValue>(
47-
data: deserializer.Deserialize(Data.ToByteArray()),
48-
topic: Topic,
49-
partitionId: PartitionId,
50-
producerId: ProducerId,
51-
createdAt: CreatedAt.ToDateTime(),
52-
metadata: MetadataItems.Select(item => new Metadata(item.Key, item.Value.ToByteArray())).ToImmutableArray(),
53-
offsetsRange: OffsetsRange,
54-
readerSession: readerSession,
55-
approximatelyBytesSize: ApproximatelyBytesSize
56-
);
57-
}
58-
}
59-
607
internal class InternalBatchMessages<TValue>
618
{
629
private readonly StreamReadMessage.Types.ReadResponse.Types.Batch _batch;
6310
private readonly PartitionSession _partitionSession;
6411
private readonly IDeserializer<TValue> _deserializer;
6512
private readonly ReaderSession<TValue> _readerSession;
66-
private readonly long _approximatelyBatchSizeOriginal;
13+
private readonly long _approximatelyBatchSize;
6714

68-
private int _startMessageDataIndex = 0;
69-
private long _approximatelyBatchSize;
15+
private int _startMessageDataIndex;
7016

7117
private int OriginalMessageCount => _batch.MessageData.Count;
72-
73-
internal bool IsActive => _startMessageDataIndex < OriginalMessageCount && _readerSession.IsActive;
18+
private bool IsActive => _startMessageDataIndex < OriginalMessageCount && _readerSession.IsActive;
7419

7520
public InternalBatchMessages(
7621
StreamReadMessage.Types.ReadResponse.Types.Batch batch,
@@ -83,7 +28,6 @@ public InternalBatchMessages(
8328
_partitionSession = partitionsSession;
8429
_readerSession = readerSession;
8530
_deserializer = deserializer;
86-
_approximatelyBatchSizeOriginal = approximatelyBatchSize;
8731
_approximatelyBatchSize = approximatelyBatchSize;
8832
}
8933

@@ -97,7 +41,7 @@ internal bool TryDequeueMessage([MaybeNullWhen(false)] out Message<TValue> messa
9741

9842
var index = _startMessageDataIndex++;
9943
var approximatelyMessageBytesSize = Utils
100-
.CalculateApproximatelyBytesSize(_approximatelyBatchSizeOriginal, OriginalMessageCount, index);
44+
.CalculateApproximatelyBytesSize(_approximatelyBatchSize, OriginalMessageCount, index);
10145
var messageData = _batch.MessageData[index];
10246

10347
TValue value;
@@ -110,21 +54,23 @@ internal bool TryDequeueMessage([MaybeNullWhen(false)] out Message<TValue> messa
11054
throw new ReaderException("Error when deserializing message data", e);
11155
}
11256

113-
_approximatelyBatchSize -= approximatelyMessageBytesSize;
57+
_readerSession.TryReadRequestBytes(approximatelyMessageBytesSize);
58+
var nextCommitedOffset = messageData.Offset + 1;
11459

11560
message = new Message<TValue>(
116-
value,
117-
_partitionSession.TopicPath,
118-
_partitionSession.PartitionId,
119-
_batch.ProducerId,
120-
messageData.CreatedAt.ToDateTime(),
121-
messageData.MetadataItems.Select(item => new Metadata(item.Key, item.Value.ToByteArray()))
61+
data: value,
62+
topic: _partitionSession.TopicPath,
63+
partitionId: _partitionSession.PartitionId,
64+
partitionSessionId: _partitionSession.PartitionSessionId,
65+
producerId: _batch.ProducerId,
66+
createdAt: messageData.CreatedAt.ToDateTime(),
67+
metadata: messageData.MetadataItems.Select(item => new Metadata(item.Key, item.Value.ToByteArray()))
12268
.ToImmutableArray(),
123-
new OffsetsRange { Start = _partitionSession.PrevEndOffsetMessage, End = messageData.Offset },
124-
_readerSession,
125-
approximatelyMessageBytesSize
69+
offsetsRange: new OffsetsRange
70+
{ Start = _partitionSession.PrevEndOffsetMessage, End = nextCommitedOffset },
71+
readerSession: _readerSession
12672
);
127-
_partitionSession.PrevEndOffsetMessage = messageData.Offset + 1;
73+
_partitionSession.PrevEndOffsetMessage = nextCommitedOffset;
12874

12975
return true;
13076
}
@@ -137,25 +83,29 @@ internal bool TryPublicBatch([MaybeNullWhen(false)] out BatchMessages<TValue> ba
13783
return false;
13884
}
13985

86+
var nextCommitedOffset = _batch.MessageData.Last().Offset + 1;
14087
var offsetsRangeBatch = new OffsetsRange
141-
{ Start = _partitionSession.PrevEndOffsetMessage, End = _batch.MessageData.Last().Offset };
142-
var approximatelyBatchSize = _approximatelyBatchSize;
88+
{ Start = _partitionSession.PrevEndOffsetMessage, End = nextCommitedOffset };
89+
_partitionSession.PrevEndOffsetMessage = nextCommitedOffset;
14390

14491
var messages = new List<Message<TValue>>();
14592
while (TryDequeueMessage(out var message))
14693
{
14794
messages.Add(message);
14895
}
14996

150-
batchMessages = new BatchMessages<TValue>(messages, _readerSession, approximatelyBatchSize, offsetsRangeBatch);
97+
batchMessages = new BatchMessages<TValue>(
98+
batch: messages,
99+
readerSession: _readerSession,
100+
offsetsRange: offsetsRangeBatch,
101+
partitionSessionId: _partitionSession.PartitionSessionId
102+
);
151103

152104
return true;
153105
}
154106
}
155107

156108
internal record CommitSending(
157109
OffsetsRange OffsetsRange,
158-
long PartitionSessionId,
159-
TaskCompletionSource TcsCommit,
160-
long ApproximatelyBytesSize
110+
TaskCompletionSource TcsCommit
161111
);

src/Ydb.Sdk/src/Services/Topic/Reader/Message.cs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,20 @@ namespace Ydb.Sdk.Services.Topic.Reader;
55

66
public class Message<TValue>
77
{
8+
private readonly long _partitionSessionId;
89
private readonly OffsetsRange _offsetsRange;
910
private readonly ReaderSession<TValue> _readerSession;
10-
private readonly long _approximatelyBytesSize;
1111

1212
internal Message(
1313
TValue data,
1414
string topic,
1515
long partitionId,
16+
long partitionSessionId,
1617
string producerId,
1718
DateTime createdAt,
1819
ImmutableArray<Metadata> metadata,
1920
OffsetsRange offsetsRange,
20-
ReaderSession<TValue> readerSession,
21-
long approximatelyBytesSize)
21+
ReaderSession<TValue> readerSession)
2222
{
2323
Data = data;
2424
Topic = topic;
@@ -27,9 +27,9 @@ internal Message(
2727
CreatedAt = createdAt;
2828
Metadata = metadata;
2929

30+
_partitionSessionId = partitionSessionId;
3031
_offsetsRange = offsetsRange;
3132
_readerSession = readerSession;
32-
_approximatelyBytesSize = approximatelyBytesSize;
3333
}
3434

3535
public TValue Data { get; }
@@ -49,34 +49,34 @@ internal Message(
4949

5050
public Task CommitAsync()
5151
{
52-
return _readerSession.CommitOffsetRange(_offsetsRange, PartitionId, _approximatelyBytesSize);
52+
return _readerSession.CommitOffsetRange(_offsetsRange, _partitionSessionId);
5353
}
5454
}
5555

5656
public class BatchMessages<TValue>
5757
{
5858
private readonly ReaderSession<TValue> _readerSession;
5959
private readonly OffsetsRange _offsetsRange;
60-
private readonly long _approximatelyBatchSize;
60+
private readonly long _partitionSessionId;
6161

6262
public IReadOnlyCollection<Message<TValue>> Batch { get; }
6363

6464
internal BatchMessages(
6565
IReadOnlyCollection<Message<TValue>> batch,
6666
ReaderSession<TValue> readerSession,
67-
long approximatelyBatchSize,
68-
OffsetsRange offsetsRange)
67+
OffsetsRange offsetsRange,
68+
long partitionSessionId)
6969
{
7070
Batch = batch;
7171
_readerSession = readerSession;
72-
_approximatelyBatchSize = approximatelyBatchSize;
7372
_offsetsRange = offsetsRange;
73+
_partitionSessionId = partitionSessionId;
7474
}
7575

7676
public Task CommitBatchAsync()
7777
{
7878
return Batch.Count == 0
7979
? Task.CompletedTask
80-
: _readerSession.CommitOffsetRange(_offsetsRange, Batch.First().PartitionId, _approximatelyBatchSize);
80+
: _readerSession.CommitOffsetRange(_offsetsRange, _partitionSessionId);
8181
}
8282
}

src/Ydb.Sdk/src/Services/Topic/Reader/PartitionSession.cs

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ internal void RegisterCommitRequest(CommitSending commitSending)
4343
{
4444
var endOffset = commitSending.OffsetsRange.End;
4545

46-
if (endOffset <= CommitedOffset)
46+
if (endOffset < CommitedOffset)
4747
{
4848
commitSending.TcsCommit.SetResult();
4949
}
@@ -53,7 +53,7 @@ internal void RegisterCommitRequest(CommitSending commitSending)
5353

5454
if (_isStopped)
5555
{
56-
SetPartitionClosedException(commitSending);
56+
Utils.SetPartitionClosedException(commitSending, PartitionSessionId);
5757
}
5858
}
5959
}
@@ -78,23 +78,12 @@ internal void HandleCommitedOffset(long commitedOffset)
7878
}
7979
}
8080

81-
internal long Stop()
81+
internal void Stop()
8282
{
8383
_isStopped = true;
84-
long releaseCommitedBytes = 0;
8584
while (_waitCommitMessages.TryDequeue(out var commitSending))
8685
{
87-
SetPartitionClosedException(commitSending);
88-
89-
releaseCommitedBytes += commitSending.ApproximatelyBytesSize;
86+
Utils.SetPartitionClosedException(commitSending, PartitionSessionId);
9087
}
91-
92-
return releaseCommitedBytes;
93-
}
94-
95-
private void SetPartitionClosedException(CommitSending commitSending)
96-
{
97-
commitSending.TcsCommit.TrySetException(
98-
new ReaderException($"PartitionSession[{PartitionSessionId}] was closed by server."));
9988
}
10089
}

0 commit comments

Comments
 (0)