Skip to content

Commit ba89ddd

Browse files
dev: unit reader tests on fails (#269)
* PartitionSession.Stop uses committedOffset to complete commit tasks when stopPartitionSessionRequest.Graceful is true. * Changed batch type: IReadOnlyCollection<Message<TValue>> -> IReadOnlyList<Message<TValue>>. * Invoking TryReadRequestBytes before deserializing message. * Updated Ydb.Protos 1.0.6 -> 1.1.0: Updated version of the Grpc.Net.Client library to 2.67.0.
1 parent ab8fb03 commit ba89ddd

File tree

10 files changed

+1451
-77
lines changed

10 files changed

+1451
-77
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
* PartitionSession.Stop uses committedOffset to complete commit tasks when stopPartitionSessionRequest.Graceful is true.
2+
* Changed batch type: IReadOnlyCollection<Message<TValue>> -> IReadOnlyList<Message<TValue>>.
3+
* Invoking TryReadRequestBytes before deserializing message.
4+
* Updated Ydb.Protos 1.0.6 -> 1.1.0: Updated version of the Grpc.Net.Client library to 2.67.0.
15
* Fixed: YdbDataReader.GetDataTypeName for optional values.
26
* Added support for "Columns" collectionName in YdbConnection.GetSchema(Async).
37

src/Ydb.Sdk/src/Driver.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ private async Task<Status> DiscoverEndpoints()
130130
};
131131

132132
var options = GetCallOptions(requestSettings);
133-
options.Headers.Add(Metadata.RpcSdkInfoHeader, _sdkInfo);
133+
options.Headers?.Add(Metadata.RpcSdkInfoHeader, _sdkInfo);
134134

135135
var response = await client.ListEndpointsAsync(
136136
request: request,

src/Ydb.Sdk/src/Services/Table/Transaction.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ internal Transaction(string txId)
2828
if (!string.IsNullOrEmpty(proto.Id))
2929
{
3030
tx.TxNum = IncTxCounter();
31-
logger.LogTrace($"Received tx #{tx.TxNum}");
31+
logger?.LogTrace($"Received tx #{tx.TxNum}");
3232
}
3333

3434
return tx;

src/Ydb.Sdk/src/Services/Topic/Reader/InternalBatchMessages.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ internal bool TryDequeueMessage([MaybeNullWhen(false)] out Message<TValue> messa
4040
}
4141

4242
var index = _startMessageDataIndex++;
43-
var approximatelyMessageBytesSize = Utils
44-
.CalculateApproximatelyBytesSize(_approximatelyBatchSize, OriginalMessageCount, index);
4543
var messageData = _batch.MessageData[index];
44+
_readerSession.TryReadRequestBytes(Utils
45+
.CalculateApproximatelyBytesSize(_approximatelyBatchSize, OriginalMessageCount, index));
4646

4747
TValue value;
4848
try
@@ -54,7 +54,6 @@ internal bool TryDequeueMessage([MaybeNullWhen(false)] out Message<TValue> messa
5454
throw new ReaderException("Error when deserializing message data", e);
5555
}
5656

57-
_readerSession.TryReadRequestBytes(approximatelyMessageBytesSize);
5857
var nextCommitedOffset = messageData.Offset + 1;
5958

6059
message = new Message<TValue>(

src/Ydb.Sdk/src/Services/Topic/Reader/Message.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,10 @@ public class BatchMessages<TValue>
5959
private readonly OffsetsRange _offsetsRange;
6060
private readonly long _partitionSessionId;
6161

62-
public IReadOnlyCollection<Message<TValue>> Batch { get; }
62+
public IReadOnlyList<Message<TValue>> Batch { get; }
6363

6464
internal BatchMessages(
65-
IReadOnlyCollection<Message<TValue>> batch,
65+
IReadOnlyList<Message<TValue>> batch,
6666
ReaderSession<TValue> readerSession,
6767
OffsetsRange offsetsRange,
6868
long partitionSessionId)

src/Ydb.Sdk/src/Services/Topic/Reader/PartitionSession.cs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public PartitionSession(
3737
internal long PrevEndOffsetMessage { get; set; }
3838

3939
// Each offset up to and including (committed_offset - 1) was fully processed.
40-
internal long CommitedOffset { get; private set; }
40+
private long CommitedOffset { get; set; }
4141

4242
internal void RegisterCommitRequest(CommitSending commitSending)
4343
{
@@ -78,12 +78,19 @@ internal void HandleCommitedOffset(long commitedOffset)
7878
}
7979
}
8080

81-
internal void Stop()
81+
internal void Stop(long commitedOffset)
8282
{
8383
_isStopped = true;
8484
while (_waitCommitMessages.TryDequeue(out var commitSending))
8585
{
86-
Utils.SetPartitionClosedException(commitSending, PartitionSessionId);
86+
if (commitSending.OffsetsRange.End <= commitedOffset)
87+
{
88+
commitSending.TcsCommit.SetResult();
89+
}
90+
else
91+
{
92+
Utils.SetPartitionClosedException(commitSending, PartitionSessionId);
93+
}
8794
}
8895
}
8996
}

src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
using System.Threading.Channels;
33
using Google.Protobuf.WellKnownTypes;
44
using Microsoft.Extensions.Logging;
5-
using Ydb.Sdk.Ado;
65
using Ydb.Topic;
76
using Ydb.Topic.V1;
87
using static Ydb.Topic.StreamReadMessage.Types.FromServer;
@@ -203,6 +202,8 @@ public void Dispose()
203202
{
204203
try
205204
{
205+
_receivedMessagesChannel.Writer.TryComplete();
206+
206207
_disposeCts.Cancel();
207208
}
208209
finally
@@ -246,7 +247,6 @@ internal class ReaderSession<TValue> : TopicSession<MessageFromClient, MessageFr
246247
Channel.CreateUnbounded<MessageFromClient>(
247248
new UnboundedChannelOptions
248249
{
249-
SingleWriter = true,
250250
SingleReader = true,
251251
AllowSynchronousContinuations = false
252252
}
@@ -420,14 +420,19 @@ private async Task StopPartitionSessionRequest(
420420
{
421421
if (stopPartitionSessionRequest.Graceful)
422422
{
423+
partitionSession.Stop(stopPartitionSessionRequest.CommittedOffset);
424+
423425
await _channelFromClientMessageSending.Writer.WriteAsync(new MessageFromClient
424426
{
425427
StopPartitionSessionResponse = new StreamReadMessage.Types.StopPartitionSessionResponse
426428
{ PartitionSessionId = partitionSession.PartitionSessionId }
427429
});
428430
}
429-
430-
partitionSession.Stop();
431+
else
432+
{
433+
// Maybe a race condition with the server dropping all waiters before they can commit.
434+
partitionSession.Stop(-1);
435+
}
431436
}
432437
else
433438
{
@@ -441,7 +446,7 @@ public async Task CommitOffsetRange(OffsetsRange offsetsRange, long partitionSes
441446
var tcsCommit = new TaskCompletionSource();
442447

443448
await using var register = _lifecycleReaderSessionCts.Token.Register(
444-
() => tcsCommit.TrySetException(new YdbException($"ReaderSession[{SessionId}] was deactivated"))
449+
() => tcsCommit.TrySetException(new ReaderException($"ReaderSession[{SessionId}] was deactivated"))
445450
);
446451

447452
var commitSending = new CommitSending(offsetsRange, tcsCommit);

src/Ydb.Sdk/src/Ydb.Sdk.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
</PropertyGroup>
2727

2828
<ItemGroup>
29-
<PackageReference Include="Ydb.Protos" Version="1.0.6" />
29+
<PackageReference Include="Ydb.Protos" Version="1.1.0" />
3030
<PackageReference Include="Portable.BouncyCastle" Version="1.9.0" />
3131
<PackageReference Include="System.IdentityModel.Tokens.Jwt" Version="7.0.0" />
3232
</ItemGroup>

0 commit comments

Comments
 (0)