Skip to content

Add a new type of Serializer for supporting zero allocation scenarios #2177

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions src/Confluent.Kafka/DependentProducerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class DependentProducerBuilder<TKey, TValue>
/// The configured client handle.
/// </summary>
public Handle Handle { get; set; }

/// <summary>
/// The configured key serializer.
/// </summary>
Expand All @@ -54,9 +54,14 @@ public class DependentProducerBuilder<TKey, TValue>
/// </summary>
public IAsyncSerializer<TValue> AsyncValueSerializer { get; set; }

/// <summary>
/// The configured async value serializer.
/// </summary>
public ISegmentSerializer<TValue> SegmentValueSerializer { get; set; }


/// <summary>
/// An underlying librdkafka client handle that the Producer will use to
/// An underlying librdkafka client handle that the Producer will use to
/// make broker requests. The handle must be from another Producer
/// instance (not Consumer or AdminClient).
/// </summary>
Expand Down Expand Up @@ -101,6 +106,15 @@ public DependentProducerBuilder<TKey, TValue> SetValueSerializer(IAsyncSerialize
return this;
}

/// <summary>
/// The segment serializer to use to serialize values.
/// </summary>
public DependentProducerBuilder<TKey, TValue> SetValueSerializer(ISegmentSerializer<TValue> serializer)
{
this.SegmentValueSerializer = serializer;
return this;
}

/// <summary>
/// Build a new IProducer implementation instance.
/// </summary>
Expand Down
25 changes: 25 additions & 0 deletions src/Confluent.Kafka/ISegmentSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using System;

namespace Confluent.Kafka
{
/// <summary>
/// Defines a serializer for use with <see cref="Confluent.Kafka.Producer{TKey,TValue}" />.
/// </summary>
public interface ISegmentSerializer<T>
{
/// <summary>
/// Serialize the key or value of a <see cref="Message{TKey,TValue}" />
/// instance.
/// </summary>
/// <param name="data">
/// The value to serialize.
/// </param>
/// <param name="context">
/// Context relevant to the serialize operation.
/// </param>
/// <returns>
/// A <see cref="ArraySegment{T}"/> containing the serialized value.
/// </returns>
ArraySegment<byte> Serialize(T data, SerializationContext context);
}
}
98 changes: 62 additions & 36 deletions src/Confluent.Kafka/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ internal class Config
private ISerializer<TValue> valueSerializer;
private IAsyncSerializer<TKey> asyncKeySerializer;
private IAsyncSerializer<TValue> asyncValueSerializer;
private ISegmentSerializer<TValue> segmentValueSerializer;

private static readonly Dictionary<Type, object> defaultSerializers = new Dictionary<Type, object>
{
Expand Down Expand Up @@ -74,7 +75,7 @@ internal class Config
private Handle borrowedHandle;

private SafeKafkaHandle KafkaHandle
=> ownedKafkaHandle != null
=> ownedKafkaHandle != null
? ownedKafkaHandle
: borrowedHandle.LibrdkafkaHandle;

Expand All @@ -101,7 +102,7 @@ private Task StartPollTask(CancellationToken ct)
this.handlerException = null;
}

// note: lock {} is equivalent to Monitor.Enter then Monitor.Exit
// note: lock {} is equivalent to Monitor.Enter then Monitor.Exit
if (eventsServedCount_ > 0)
{
lock (pollSyncObj)
Expand Down Expand Up @@ -220,7 +221,7 @@ private void DeliveryReportCallbackImpl(IntPtr rk, IntPtr rkmessage, IntPtr opaq
gch.Free();

Headers headers = null;
if (this.enableDeliveryReportHeaders)
if (this.enableDeliveryReportHeaders)
{
headers = new Headers();
Librdkafka.message_headers(rkmessage, out IntPtr hdrsPtr);
Expand Down Expand Up @@ -263,8 +264,8 @@ private void DeliveryReportCallbackImpl(IntPtr rk, IntPtr rkmessage, IntPtr opaq
{
// Topic is not set here in order to avoid the marshalling cost.
// Instead, the delivery handler is expected to cache the topic string.
Partition = msg.partition,
Offset = msg.offset,
Partition = msg.partition,
Offset = msg.offset,
Error = KafkaHandle.CreatePossiblyFatalError(msg.err, null),
Status = messageStatus,
Message = new Message<Null, Null> { Timestamp = new Timestamp(timestamp, (TimestampType)timestampType), Headers = headers }
Expand Down Expand Up @@ -399,9 +400,9 @@ public void Flush(CancellationToken cancellationToken)
throw new OperationCanceledException();
}
}
}
}


/// <inheritdoc/>
public void Dispose()
{
Expand All @@ -423,7 +424,7 @@ protected virtual void Dispose(bool disposing)
{
// Calling Dispose a second or subsequent time should be a no-op.
lock (disposeHasBeenCalledLockObj)
{
{
if (disposeHasBeenCalled) { return; }
disposeHasBeenCalled = true;
}
Expand Down Expand Up @@ -486,7 +487,7 @@ public void SetSaslCredentials(string username, string password)


/// <inheritdoc/>
public Handle Handle
public Handle Handle
{
get
{
Expand All @@ -503,7 +504,8 @@ private void InitializeSerializers(
ISerializer<TKey> keySerializer,
ISerializer<TValue> valueSerializer,
IAsyncSerializer<TKey> asyncKeySerializer,
IAsyncSerializer<TValue> asyncValueSerializer)
IAsyncSerializer<TValue> asyncValueSerializer,
ISegmentSerializer<TValue> segmentValueSerializer)
{
// setup key serializer.
if (keySerializer == null && asyncKeySerializer == null)
Expand All @@ -529,7 +531,7 @@ private void InitializeSerializers(
}

// setup value serializer.
if (valueSerializer == null && asyncValueSerializer == null)
if (valueSerializer == null && asyncValueSerializer == null && segmentValueSerializer == null)
{
if (!defaultSerializers.TryGetValue(typeof(TValue), out object serializer))
{
Expand All @@ -538,17 +540,21 @@ private void InitializeSerializers(
}
this.valueSerializer = (ISerializer<TValue>)serializer;
}
else if (valueSerializer == null && asyncValueSerializer != null)
else if (valueSerializer == null && asyncValueSerializer != null && segmentValueSerializer == null)
{
this.asyncValueSerializer = asyncValueSerializer;
}
else if (valueSerializer != null && asyncValueSerializer == null)
else if (valueSerializer != null && asyncValueSerializer == null && segmentValueSerializer == null)
{
this.valueSerializer = valueSerializer;
}
else if (valueSerializer == null && asyncValueSerializer == null && segmentValueSerializer != null)
{
this.segmentValueSerializer = segmentValueSerializer;
}
else
{
throw new InvalidOperationException("FATAL: Both async and sync value serializers were set.");
throw new InvalidOperationException("FATAL: More than one of (async, sync, segment) value serializers were set.");
}
}

Expand All @@ -563,7 +569,8 @@ internal Producer(DependentProducerBuilder<TKey, TValue> builder)

InitializeSerializers(
builder.KeySerializer, builder.ValueSerializer,
builder.AsyncKeySerializer, builder.AsyncValueSerializer);
builder.AsyncKeySerializer, builder.AsyncValueSerializer,
builder.SegmentValueSerializer);
}

internal Producer(ProducerBuilder<TKey, TValue> builder)
Expand All @@ -573,7 +580,7 @@ internal Producer(ProducerBuilder<TKey, TValue> builder)
var defaultPartitioner = baseConfig.defaultPartitioner;

// TODO: Make Tasks auto complete when EnableDeliveryReportsPropertyName is set to false.
// TODO: Hijack the "delivery.report.only.error" configuration parameter and add functionality to enforce that Tasks
// TODO: Hijack the "delivery.report.only.error" configuration parameter and add functionality to enforce that Tasks
// that never complete are never created when this is set to true.

this.statisticsHandler = baseConfig.statisticsHandler;
Expand All @@ -598,8 +605,8 @@ internal Producer(ProducerBuilder<TKey, TValue> builder)
if (modifiedConfig.Where(obj => obj.Key == "delivery.report.only.error").Count() > 0)
{
// A managed object is kept alive over the duration of the produce request. If there is no
// delivery report generated, there will be a memory leak. We could possibly support this
// property by keeping track of delivery reports in managed code, but this seems like
// delivery report generated, there will be a memory leak. We could possibly support this
// property by keeping track of delivery reports in managed code, but this seems like
// more trouble than it's worth.
throw new ArgumentException("The 'delivery.report.only.error' property is not supported by this client");
}
Expand Down Expand Up @@ -738,7 +745,8 @@ internal Producer(ProducerBuilder<TKey, TValue> builder)

InitializeSerializers(
builder.KeySerializer, builder.ValueSerializer,
builder.AsyncKeySerializer, builder.AsyncValueSerializer);
builder.AsyncKeySerializer, builder.AsyncValueSerializer,
builder.SegmentValueSerializer);
}


Expand Down Expand Up @@ -815,10 +823,10 @@ public async Task<DeliveryResult<TKey, TValue>> ProduceAsync(
else
{
ProduceImpl(
topicPartition.Topic,
valBytes, 0, valBytes == null ? 0 : valBytes.Length,
keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length,
message.Timestamp, topicPartition.Partition, headers.BackingList,
topicPartition.Topic,
valBytes, 0, valBytes == null ? 0 : valBytes.Length,
keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length,
message.Timestamp, topicPartition.Partition, headers.BackingList,
null);

var result = new DeliveryResult<TKey, TValue>
Expand Down Expand Up @@ -893,11 +901,29 @@ public void Produce(
}

byte[] valBytes;
int valOffset;
int valLength;

try
{
valBytes = (valueSerializer != null)
? valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers))
: throw new InvalidOperationException("Produce called with an IAsyncSerializer value serializer configured but an ISerializer is required.");
var serializationContext = new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers);
if (segmentValueSerializer != null)
{
var arraySegment = segmentValueSerializer.Serialize(message.Value, serializationContext);
valBytes = arraySegment.Array;
valOffset = arraySegment.Offset;
valLength = arraySegment.Count;
}
else if (valueSerializer != null)
{
valBytes = valueSerializer.Serialize(message.Value, serializationContext);
valOffset = 0;
valLength = valBytes == null ? 0 : valBytes.Length;
}
else
{
throw new InvalidOperationException("Produce called with an IAsyncSerializer value serializer configured but an ISerializer or ISegmentSerializer is required.");
}
}
catch (Exception ex)
{
Expand All @@ -915,7 +941,7 @@ public void Produce(
{
ProduceImpl(
topicPartition.Topic,
valBytes, 0, valBytes == null ? 0 : valBytes.Length,
valBytes, valOffset, valLength,
keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length,
message.Timestamp, topicPartition.Partition,
headers.BackingList,
Expand All @@ -931,7 +957,7 @@ public void Produce(
{
throw new ProduceException<TKey, TValue>(
ex.Error,
new DeliveryReport<TKey, TValue>
new DeliveryReport<TKey, TValue>
{
Message = message,
TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset)
Expand Down Expand Up @@ -979,7 +1005,7 @@ public void HandleDeliveryReport(DeliveryReport<Null, Null> deliveryReport)
Headers = deliveryReport.Message.Headers
}
};
// topic is cached in this object, not set in the deliveryReport to avoid the
// topic is cached in this object, not set in the deliveryReport to avoid the
// cost of marshalling it.
dr.Topic = Topic;

Expand Down Expand Up @@ -1023,17 +1049,17 @@ public void HandleDeliveryReport(DeliveryReport<Null, Null> deliveryReport)
{
TopicPartitionOffsetError = deliveryReport.TopicPartitionOffsetError,
Status = deliveryReport.Status,
Message = new Message<TKey, TValue>
Message = new Message<TKey, TValue>
{
Key = Key,
Value = Value,
Timestamp = deliveryReport.Message == null
? new Timestamp(0, TimestampType.NotAvailable)
Timestamp = deliveryReport.Message == null
? new Timestamp(0, TimestampType.NotAvailable)
: deliveryReport.Message.Timestamp,
Headers = deliveryReport.Message?.Headers
}
};
// topic is cached in this object, not set in the deliveryReport to avoid the
// topic is cached in this object, not set in the deliveryReport to avoid the
// cost of marshalling it.
dr.Topic = Topic;

Expand All @@ -1054,8 +1080,8 @@ public void BeginTransaction()

/// <inheritdoc/>
public void CommitTransaction(TimeSpan timeout)
=> KafkaHandle.CommitTransaction(timeout.TotalMillisecondsAsInt());
=> KafkaHandle.CommitTransaction(timeout.TotalMillisecondsAsInt());

/// <inheritdoc/>
public void CommitTransaction()
=> KafkaHandle.CommitTransaction(-1);
Expand Down
Loading