Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
* PartitionSession.Stop uses committedOffset to complete commit tasks when stopPartitionSessionRequest.Graceful is true.
* Changed batch type: IReadOnlyCollection<Message<TValue>> -> IReadOnlyList<Message<TValue>>.
* Invoking TryReadRequestBytes before deserializing message.
* Updated Ydb.Protos 1.0.6 -> 1.1.0: Updated version of the Grpc.Net.Client library to 2.67.0.
* Fixed: YdbDataReader.GetDataTypeName for optional values.
* Added support for "Columns" collectionName in YdbConnection.GetSchema(Async).

Expand Down
2 changes: 1 addition & 1 deletion src/Ydb.Sdk/src/Driver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private async Task<Status> DiscoverEndpoints()
};

var options = GetCallOptions(requestSettings);
options.Headers.Add(Metadata.RpcSdkInfoHeader, _sdkInfo);
options.Headers?.Add(Metadata.RpcSdkInfoHeader, _sdkInfo);

var response = await client.ListEndpointsAsync(
request: request,
Expand Down
2 changes: 1 addition & 1 deletion src/Ydb.Sdk/src/Services/Table/Transaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ internal Transaction(string txId)
if (!string.IsNullOrEmpty(proto.Id))
{
tx.TxNum = IncTxCounter();
logger.LogTrace($"Received tx #{tx.TxNum}");
logger?.LogTrace($"Received tx #{tx.TxNum}");
}

return tx;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ internal bool TryDequeueMessage([MaybeNullWhen(false)] out Message<TValue> messa
}

var index = _startMessageDataIndex++;
var approximatelyMessageBytesSize = Utils
.CalculateApproximatelyBytesSize(_approximatelyBatchSize, OriginalMessageCount, index);
var messageData = _batch.MessageData[index];
_readerSession.TryReadRequestBytes(Utils
.CalculateApproximatelyBytesSize(_approximatelyBatchSize, OriginalMessageCount, index));

TValue value;
try
Expand All @@ -54,7 +54,6 @@ internal bool TryDequeueMessage([MaybeNullWhen(false)] out Message<TValue> messa
throw new ReaderException("Error when deserializing message data", e);
}

_readerSession.TryReadRequestBytes(approximatelyMessageBytesSize);
var nextCommitedOffset = messageData.Offset + 1;

message = new Message<TValue>(
Expand Down
4 changes: 2 additions & 2 deletions src/Ydb.Sdk/src/Services/Topic/Reader/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ public class BatchMessages<TValue>
private readonly OffsetsRange _offsetsRange;
private readonly long _partitionSessionId;

public IReadOnlyCollection<Message<TValue>> Batch { get; }
public IReadOnlyList<Message<TValue>> Batch { get; }

internal BatchMessages(
IReadOnlyCollection<Message<TValue>> batch,
IReadOnlyList<Message<TValue>> batch,
ReaderSession<TValue> readerSession,
OffsetsRange offsetsRange,
long partitionSessionId)
Expand Down
13 changes: 10 additions & 3 deletions src/Ydb.Sdk/src/Services/Topic/Reader/PartitionSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public PartitionSession(
internal long PrevEndOffsetMessage { get; set; }

// Each offset up to and including (committed_offset - 1) was fully processed.
internal long CommitedOffset { get; private set; }
private long CommitedOffset { get; set; }

internal void RegisterCommitRequest(CommitSending commitSending)
{
Expand Down Expand Up @@ -78,12 +78,19 @@ internal void HandleCommitedOffset(long commitedOffset)
}
}

internal void Stop()
internal void Stop(long commitedOffset)
{
_isStopped = true;
while (_waitCommitMessages.TryDequeue(out var commitSending))
{
Utils.SetPartitionClosedException(commitSending, PartitionSessionId);
if (commitSending.OffsetsRange.End <= commitedOffset)
{
commitSending.TcsCommit.SetResult();
}
else
{
Utils.SetPartitionClosedException(commitSending, PartitionSessionId);
}
}
}
}
15 changes: 10 additions & 5 deletions src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
using System.Threading.Channels;
using Google.Protobuf.WellKnownTypes;
using Microsoft.Extensions.Logging;
using Ydb.Sdk.Ado;
using Ydb.Topic;
using Ydb.Topic.V1;
using static Ydb.Topic.StreamReadMessage.Types.FromServer;
Expand Down Expand Up @@ -203,6 +202,8 @@ public void Dispose()
{
try
{
_receivedMessagesChannel.Writer.TryComplete();

_disposeCts.Cancel();
}
finally
Expand Down Expand Up @@ -246,7 +247,6 @@ internal class ReaderSession<TValue> : TopicSession<MessageFromClient, MessageFr
Channel.CreateUnbounded<MessageFromClient>(
new UnboundedChannelOptions
{
SingleWriter = true,
SingleReader = true,
AllowSynchronousContinuations = false
}
Expand Down Expand Up @@ -420,14 +420,19 @@ private async Task StopPartitionSessionRequest(
{
if (stopPartitionSessionRequest.Graceful)
{
partitionSession.Stop(stopPartitionSessionRequest.CommittedOffset);

await _channelFromClientMessageSending.Writer.WriteAsync(new MessageFromClient
{
StopPartitionSessionResponse = new StreamReadMessage.Types.StopPartitionSessionResponse
{ PartitionSessionId = partitionSession.PartitionSessionId }
});
}

partitionSession.Stop();
else
{
// Maybe a race condition with the server dropping all waiters before they can commit.
partitionSession.Stop(-1);
}
}
else
{
Expand All @@ -441,7 +446,7 @@ public async Task CommitOffsetRange(OffsetsRange offsetsRange, long partitionSes
var tcsCommit = new TaskCompletionSource();

await using var register = _lifecycleReaderSessionCts.Token.Register(
() => tcsCommit.TrySetException(new YdbException($"ReaderSession[{SessionId}] was deactivated"))
() => tcsCommit.TrySetException(new ReaderException($"ReaderSession[{SessionId}] was deactivated"))
);

var commitSending = new CommitSending(offsetsRange, tcsCommit);
Expand Down
2 changes: 1 addition & 1 deletion src/Ydb.Sdk/src/Ydb.Sdk.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Ydb.Protos" Version="1.0.6" />
<PackageReference Include="Ydb.Protos" Version="1.1.0" />
<PackageReference Include="Portable.BouncyCastle" Version="1.9.0" />
<PackageReference Include="System.IdentityModel.Tokens.Jwt" Version="7.0.0" />
</ItemGroup>
Expand Down
Loading
Loading