Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions src/Ydb.Sdk/src/Services/Topic/Exceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,13 @@ public class ReaderException : Exception
{
public ReaderException(string message) : base(message)
{
Status = new Status(StatusCode.Unspecified);
}

public ReaderException(string message, Status status) : base(message + ": " + status)
{
Status = status;
}

public ReaderException(string message, Driver.TransportException e) : base(message, e)
public ReaderException(string message, Exception inner) : base(message, inner)
{
Status = e.Status;
}

public Status Status { get; }
}
2 changes: 1 addition & 1 deletion src/Ydb.Sdk/src/Services/Topic/IReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ public interface IReader<TValue> : IDisposable
{
public ValueTask<Message<TValue>> ReadAsync(CancellationToken cancellationToken = default);

public ValueTask<BatchMessage<TValue>> ReadBatchAsync(CancellationToken cancellationToken = default);
public ValueTask<BatchMessages<TValue>> ReadBatchAsync(CancellationToken cancellationToken = default);
}
111 changes: 111 additions & 0 deletions src/Ydb.Sdk/src/Services/Topic/Reader/InternalBatchMessages.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
using System.Collections.Immutable;
using System.Diagnostics.CodeAnalysis;
using Ydb.Topic;

namespace Ydb.Sdk.Services.Topic.Reader;

internal class InternalBatchMessages<TValue>
{
private readonly StreamReadMessage.Types.ReadResponse.Types.Batch _batch;
private readonly PartitionSession _partitionSession;
private readonly IDeserializer<TValue> _deserializer;
private readonly ReaderSession<TValue> _readerSession;
private readonly long _approximatelyBatchSize;

private int _startMessageDataIndex;

private int OriginalMessageCount => _batch.MessageData.Count;
private bool IsActive => _startMessageDataIndex < OriginalMessageCount && _readerSession.IsActive;

public InternalBatchMessages(
StreamReadMessage.Types.ReadResponse.Types.Batch batch,
PartitionSession partitionsSession,
ReaderSession<TValue> readerSession,
long approximatelyBatchSize,
IDeserializer<TValue> deserializer)
{
_batch = batch;
_partitionSession = partitionsSession;
_readerSession = readerSession;
_deserializer = deserializer;
_approximatelyBatchSize = approximatelyBatchSize;
}

internal bool TryDequeueMessage([MaybeNullWhen(false)] out Message<TValue> message)
{
if (!IsActive)
{
message = default;
return false;
}

var index = _startMessageDataIndex++;
var approximatelyMessageBytesSize = Utils
.CalculateApproximatelyBytesSize(_approximatelyBatchSize, OriginalMessageCount, index);
var messageData = _batch.MessageData[index];

TValue value;
try
{
value = _deserializer.Deserialize(messageData.Data.ToByteArray());
}
catch (Exception e)
{
throw new ReaderException("Error when deserializing message data", e);
}

_readerSession.TryReadRequestBytes(approximatelyMessageBytesSize);
var nextCommitedOffset = messageData.Offset + 1;

message = new Message<TValue>(
data: value,
topic: _partitionSession.TopicPath,
partitionId: _partitionSession.PartitionId,
partitionSessionId: _partitionSession.PartitionSessionId,
producerId: _batch.ProducerId,
createdAt: messageData.CreatedAt.ToDateTime(),
metadata: messageData.MetadataItems.Select(item => new Metadata(item.Key, item.Value.ToByteArray()))
.ToImmutableArray(),
offsetsRange: new OffsetsRange
{ Start = _partitionSession.PrevEndOffsetMessage, End = nextCommitedOffset },
readerSession: _readerSession
);
_partitionSession.PrevEndOffsetMessage = nextCommitedOffset;

return true;
}

internal bool TryPublicBatch([MaybeNullWhen(false)] out BatchMessages<TValue> batchMessages)
{
if (!IsActive)
{
batchMessages = default;
return false;
}

var nextCommitedOffset = _batch.MessageData.Last().Offset + 1;
var offsetsRangeBatch = new OffsetsRange
{ Start = _partitionSession.PrevEndOffsetMessage, End = nextCommitedOffset };
_partitionSession.PrevEndOffsetMessage = nextCommitedOffset;

var messages = new List<Message<TValue>>();
while (TryDequeueMessage(out var message))
{
messages.Add(message);
}

batchMessages = new BatchMessages<TValue>(
batch: messages,
readerSession: _readerSession,
offsetsRange: offsetsRangeBatch,
partitionSessionId: _partitionSession.PartitionSessionId
);

return true;
}
}

internal record CommitSending(
OffsetsRange OffsetsRange,
TaskCompletionSource TcsCommit
);
91 changes: 0 additions & 91 deletions src/Ydb.Sdk/src/Services/Topic/Reader/InternalMessage.cs

This file was deleted.

62 changes: 22 additions & 40 deletions src/Ydb.Sdk/src/Services/Topic/Reader/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,20 @@ namespace Ydb.Sdk.Services.Topic.Reader;

public class Message<TValue>
{
private readonly long _partitionSessionId;
private readonly OffsetsRange _offsetsRange;
private readonly ReaderSession _readerSession;
private readonly long _approximatelyBytesSize;
private readonly ReaderSession<TValue> _readerSession;

internal Message(
TValue data,
string topic,
long partitionId,
long partitionSessionId,
string producerId,
DateTime createdAt,
ImmutableArray<Metadata> metadata,
OffsetsRange offsetsRange,
ReaderSession readerSession,
long approximatelyBytesSize)
ReaderSession<TValue> readerSession)
{
Data = data;
Topic = topic;
Expand All @@ -27,9 +27,9 @@ internal Message(
CreatedAt = createdAt;
Metadata = metadata;

_partitionSessionId = partitionSessionId;
_offsetsRange = offsetsRange;
_readerSession = readerSession;
_approximatelyBytesSize = approximatelyBytesSize;
}

public TValue Data { get; }
Expand All @@ -45,56 +45,38 @@ internal Message(

public DateTime CreatedAt { get; }

public ImmutableArray<Metadata> Metadata { get; }

internal long Start => _offsetsRange.Start;
internal long End => _offsetsRange.End;
public IReadOnlyCollection<Metadata> Metadata { get; }

public Task CommitAsync()
{
return _readerSession.CommitOffsetRange(_offsetsRange, PartitionId, _approximatelyBytesSize);
return _readerSession.CommitOffsetRange(_offsetsRange, _partitionSessionId);
}
}

public class BatchMessage<TValue>
public class BatchMessages<TValue>
{
private readonly ReaderSession _readerSession;
private readonly long _approximatelyBatchSize;
private readonly ReaderSession<TValue> _readerSession;
private readonly OffsetsRange _offsetsRange;
private readonly long _partitionSessionId;

public ImmutableArray<Message<TValue>> Batch { get; }
public IReadOnlyCollection<Message<TValue>> Batch { get; }

internal BatchMessage(
ImmutableArray<Message<TValue>> batch,
ReaderSession readerSession,
long approximatelyBatchSize)
internal BatchMessages(
IReadOnlyCollection<Message<TValue>> batch,
ReaderSession<TValue> readerSession,
OffsetsRange offsetsRange,
long partitionSessionId)
{
Batch = batch;
_readerSession = readerSession;
_approximatelyBatchSize = approximatelyBatchSize;
_offsetsRange = offsetsRange;
_partitionSessionId = partitionSessionId;
}

public Task CommitBatchAsync()
{
if (Batch.Length == 0)
{
return Task.CompletedTask;
}

var offsetsRange = new OffsetsRange { Start = Batch.First().Start, End = Batch.Last().End };

return _readerSession.CommitOffsetRange(offsetsRange, Batch.First().PartitionId, _approximatelyBatchSize);
return Batch.Count == 0
? Task.CompletedTask
: _readerSession.CommitOffsetRange(_offsetsRange, _partitionSessionId);
}
}

public class TopicPartitionOffset
{
public TopicPartitionOffset(long offset, long partitionId)
{
Offset = offset;
PartitionId = partitionId;
}

public long Offset { get; }

public long PartitionId { get; }
}
Loading
Loading