Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions src/Ydb.Sdk/src/Services/Topic/IWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,31 @@ namespace Ydb.Sdk.Services.Topic;

public interface IWriter<TValue> : IDisposable
{
/// <summary>
/// Asynchronously send a data to a YDB Topic.
/// </summary>
/// <param name="data">The data to produce.</param>
/// <param name="cancellationToken">A cancellation token to observe whilst waiting the returned task to complete.</param>
/// <returns>
/// A Task which will complete with a delivery report corresponding to the produce request,
/// or an exception if an error occured.
/// </returns>
/// <exception cref="T:Ydb.Sdk.Services.Topic.WriterException">
/// Thrown in response to any write request that was unsuccessful for any reason.
/// </exception>
public Task<WriteResult> WriteAsync(TValue data, CancellationToken cancellationToken = default);

/// <summary>
/// Asynchronously send a single message to a YDB Topic.
/// </summary>
/// <param name="message">The message to produce</param>
/// <param name="cancellationToken">A cancellation token to observe whilst waiting the returned task to complete.</param>
/// <returns>
/// A Task which will complete with a delivery report corresponding to the produce request,
/// or an exception if an error occured.
/// </returns>
/// <exception cref="T:Ydb.Sdk.Services.Topic.WriterException">
/// Thrown in response to any write request that was unsuccessful for any reason.
/// </exception>
public Task<WriteResult> WriteAsync(Message<TValue> message, CancellationToken cancellationToken = default);
}
23 changes: 13 additions & 10 deletions src/Ydb.Sdk/src/Services/Topic/Writer/WriteResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,12 @@ public class WriteResult
{
internal static readonly WriteResult Skipped = new();

private readonly long _offset;

internal WriteResult(StreamWriteMessage.Types.WriteResponse.Types.WriteAck ack)
{
switch (ack.MessageWriteStatusCase)
{
case StreamWriteMessage.Types.WriteResponse.Types.WriteAck.MessageWriteStatusOneofCase.Written:
Status = PersistenceStatus.Written;
_offset = ack.Written.Offset;
break;
case StreamWriteMessage.Types.WriteResponse.Types.WriteAck.MessageWriteStatusOneofCase.Skipped:
Status = PersistenceStatus.AlreadyWritten;
Expand All @@ -30,18 +27,24 @@ private WriteResult()
Status = PersistenceStatus.AlreadyWritten;
}

/// <summary>
/// The persistence status of the message
/// </summary>
public PersistenceStatus Status { get; }

public bool TryGetOffset(out long offset)
{
offset = _offset;

return Status == PersistenceStatus.Written;
}
}

/// <summary>
/// Enumeration of possible message persistence states.
/// </summary>
public enum PersistenceStatus
{
/// <summary>
/// The message is recorded
/// </summary>
Written,

/// <summary>
/// The message was recorded in the last call session
/// </summary>
AlreadyWritten
}
2 changes: 1 addition & 1 deletion src/Ydb.Sdk/src/Services/Topic/Writer/WriterBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public WriterBuilder(IDriver driver, string topicPath)
/// This limit is applied after the first message has been added to the batch,
/// regardless of the first message's size, this is to ensure that messages that exceed buffer size are produced.
/// </summary>
public int BufferMaxSize { get; set; } = 1024 * 1024; // 1 Mb
public int BufferMaxSize { get; set; } = 20 * 1024 * 1024; // 20 Mb

/// <summary>
/// Explicit partition id to write to.
Expand Down
Loading