diff --git a/src/Confluent.Kafka/DependentProducerBuilder.cs b/src/Confluent.Kafka/DependentProducerBuilder.cs index d152ca652..f591b28d6 100644 --- a/src/Confluent.Kafka/DependentProducerBuilder.cs +++ b/src/Confluent.Kafka/DependentProducerBuilder.cs @@ -33,7 +33,7 @@ public class DependentProducerBuilder /// The configured client handle. /// public Handle Handle { get; set; } - + /// /// The configured key serializer. /// @@ -54,9 +54,14 @@ public class DependentProducerBuilder /// public IAsyncSerializer AsyncValueSerializer { get; set; } + /// + /// The configured async value serializer. + /// + public ISegmentSerializer SegmentValueSerializer { get; set; } + /// - /// An underlying librdkafka client handle that the Producer will use to + /// An underlying librdkafka client handle that the Producer will use to /// make broker requests. The handle must be from another Producer /// instance (not Consumer or AdminClient). /// @@ -101,6 +106,15 @@ public DependentProducerBuilder SetValueSerializer(IAsyncSerialize return this; } + /// + /// The segment serializer to use to serialize values. + /// + public DependentProducerBuilder SetValueSerializer(ISegmentSerializer serializer) + { + this.SegmentValueSerializer = serializer; + return this; + } + /// /// Build a new IProducer implementation instance. /// diff --git a/src/Confluent.Kafka/ISegmentSerializer.cs b/src/Confluent.Kafka/ISegmentSerializer.cs new file mode 100644 index 000000000..1fbd2b18d --- /dev/null +++ b/src/Confluent.Kafka/ISegmentSerializer.cs @@ -0,0 +1,25 @@ +using System; + +namespace Confluent.Kafka +{ + /// + /// Defines a serializer for use with . + /// + public interface ISegmentSerializer + { + /// + /// Serialize the key or value of a + /// instance. + /// + /// + /// The value to serialize. + /// + /// + /// Context relevant to the serialize operation. + /// + /// + /// A containing the serialized value. + /// + ArraySegment Serialize(T data, SerializationContext context); + } +} diff --git a/src/Confluent.Kafka/Producer.cs b/src/Confluent.Kafka/Producer.cs index 79867514b..4d9f4dae7 100644 --- a/src/Confluent.Kafka/Producer.cs +++ b/src/Confluent.Kafka/Producer.cs @@ -46,6 +46,7 @@ internal class Config private ISerializer valueSerializer; private IAsyncSerializer asyncKeySerializer; private IAsyncSerializer asyncValueSerializer; + private ISegmentSerializer segmentValueSerializer; private static readonly Dictionary defaultSerializers = new Dictionary { @@ -74,7 +75,7 @@ internal class Config private Handle borrowedHandle; private SafeKafkaHandle KafkaHandle - => ownedKafkaHandle != null + => ownedKafkaHandle != null ? ownedKafkaHandle : borrowedHandle.LibrdkafkaHandle; @@ -101,7 +102,7 @@ private Task StartPollTask(CancellationToken ct) this.handlerException = null; } - // note: lock {} is equivalent to Monitor.Enter then Monitor.Exit + // note: lock {} is equivalent to Monitor.Enter then Monitor.Exit if (eventsServedCount_ > 0) { lock (pollSyncObj) @@ -220,7 +221,7 @@ private void DeliveryReportCallbackImpl(IntPtr rk, IntPtr rkmessage, IntPtr opaq gch.Free(); Headers headers = null; - if (this.enableDeliveryReportHeaders) + if (this.enableDeliveryReportHeaders) { headers = new Headers(); Librdkafka.message_headers(rkmessage, out IntPtr hdrsPtr); @@ -263,8 +264,8 @@ private void DeliveryReportCallbackImpl(IntPtr rk, IntPtr rkmessage, IntPtr opaq { // Topic is not set here in order to avoid the marshalling cost. // Instead, the delivery handler is expected to cache the topic string. - Partition = msg.partition, - Offset = msg.offset, + Partition = msg.partition, + Offset = msg.offset, Error = KafkaHandle.CreatePossiblyFatalError(msg.err, null), Status = messageStatus, Message = new Message { Timestamp = new Timestamp(timestamp, (TimestampType)timestampType), Headers = headers } @@ -399,9 +400,9 @@ public void Flush(CancellationToken cancellationToken) throw new OperationCanceledException(); } } - } - - + } + + /// public void Dispose() { @@ -423,7 +424,7 @@ protected virtual void Dispose(bool disposing) { // Calling Dispose a second or subsequent time should be a no-op. lock (disposeHasBeenCalledLockObj) - { + { if (disposeHasBeenCalled) { return; } disposeHasBeenCalled = true; } @@ -486,7 +487,7 @@ public void SetSaslCredentials(string username, string password) /// - public Handle Handle + public Handle Handle { get { @@ -503,7 +504,8 @@ private void InitializeSerializers( ISerializer keySerializer, ISerializer valueSerializer, IAsyncSerializer asyncKeySerializer, - IAsyncSerializer asyncValueSerializer) + IAsyncSerializer asyncValueSerializer, + ISegmentSerializer segmentValueSerializer) { // setup key serializer. if (keySerializer == null && asyncKeySerializer == null) @@ -529,7 +531,7 @@ private void InitializeSerializers( } // setup value serializer. - if (valueSerializer == null && asyncValueSerializer == null) + if (valueSerializer == null && asyncValueSerializer == null && segmentValueSerializer == null) { if (!defaultSerializers.TryGetValue(typeof(TValue), out object serializer)) { @@ -538,17 +540,21 @@ private void InitializeSerializers( } this.valueSerializer = (ISerializer)serializer; } - else if (valueSerializer == null && asyncValueSerializer != null) + else if (valueSerializer == null && asyncValueSerializer != null && segmentValueSerializer == null) { this.asyncValueSerializer = asyncValueSerializer; } - else if (valueSerializer != null && asyncValueSerializer == null) + else if (valueSerializer != null && asyncValueSerializer == null && segmentValueSerializer == null) { this.valueSerializer = valueSerializer; } + else if (valueSerializer == null && asyncValueSerializer == null && segmentValueSerializer != null) + { + this.segmentValueSerializer = segmentValueSerializer; + } else { - throw new InvalidOperationException("FATAL: Both async and sync value serializers were set."); + throw new InvalidOperationException("FATAL: More than one of (async, sync, segment) value serializers were set."); } } @@ -563,7 +569,8 @@ internal Producer(DependentProducerBuilder builder) InitializeSerializers( builder.KeySerializer, builder.ValueSerializer, - builder.AsyncKeySerializer, builder.AsyncValueSerializer); + builder.AsyncKeySerializer, builder.AsyncValueSerializer, + builder.SegmentValueSerializer); } internal Producer(ProducerBuilder builder) @@ -573,7 +580,7 @@ internal Producer(ProducerBuilder builder) var defaultPartitioner = baseConfig.defaultPartitioner; // TODO: Make Tasks auto complete when EnableDeliveryReportsPropertyName is set to false. - // TODO: Hijack the "delivery.report.only.error" configuration parameter and add functionality to enforce that Tasks + // TODO: Hijack the "delivery.report.only.error" configuration parameter and add functionality to enforce that Tasks // that never complete are never created when this is set to true. this.statisticsHandler = baseConfig.statisticsHandler; @@ -598,8 +605,8 @@ internal Producer(ProducerBuilder builder) if (modifiedConfig.Where(obj => obj.Key == "delivery.report.only.error").Count() > 0) { // A managed object is kept alive over the duration of the produce request. If there is no - // delivery report generated, there will be a memory leak. We could possibly support this - // property by keeping track of delivery reports in managed code, but this seems like + // delivery report generated, there will be a memory leak. We could possibly support this + // property by keeping track of delivery reports in managed code, but this seems like // more trouble than it's worth. throw new ArgumentException("The 'delivery.report.only.error' property is not supported by this client"); } @@ -738,7 +745,8 @@ internal Producer(ProducerBuilder builder) InitializeSerializers( builder.KeySerializer, builder.ValueSerializer, - builder.AsyncKeySerializer, builder.AsyncValueSerializer); + builder.AsyncKeySerializer, builder.AsyncValueSerializer, + builder.SegmentValueSerializer); } @@ -815,10 +823,10 @@ public async Task> ProduceAsync( else { ProduceImpl( - topicPartition.Topic, - valBytes, 0, valBytes == null ? 0 : valBytes.Length, - keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length, - message.Timestamp, topicPartition.Partition, headers.BackingList, + topicPartition.Topic, + valBytes, 0, valBytes == null ? 0 : valBytes.Length, + keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length, + message.Timestamp, topicPartition.Partition, headers.BackingList, null); var result = new DeliveryResult @@ -893,11 +901,29 @@ public void Produce( } byte[] valBytes; + int valOffset; + int valLength; + try { - valBytes = (valueSerializer != null) - ? valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)) - : throw new InvalidOperationException("Produce called with an IAsyncSerializer value serializer configured but an ISerializer is required."); + var serializationContext = new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers); + if (segmentValueSerializer != null) + { + var arraySegment = segmentValueSerializer.Serialize(message.Value, serializationContext); + valBytes = arraySegment.Array; + valOffset = arraySegment.Offset; + valLength = arraySegment.Count; + } + else if (valueSerializer != null) + { + valBytes = valueSerializer.Serialize(message.Value, serializationContext); + valOffset = 0; + valLength = valBytes == null ? 0 : valBytes.Length; + } + else + { + throw new InvalidOperationException("Produce called with an IAsyncSerializer value serializer configured but an ISerializer or ISegmentSerializer is required."); + } } catch (Exception ex) { @@ -915,7 +941,7 @@ public void Produce( { ProduceImpl( topicPartition.Topic, - valBytes, 0, valBytes == null ? 0 : valBytes.Length, + valBytes, valOffset, valLength, keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length, message.Timestamp, topicPartition.Partition, headers.BackingList, @@ -931,7 +957,7 @@ public void Produce( { throw new ProduceException( ex.Error, - new DeliveryReport + new DeliveryReport { Message = message, TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset) @@ -979,7 +1005,7 @@ public void HandleDeliveryReport(DeliveryReport deliveryReport) Headers = deliveryReport.Message.Headers } }; - // topic is cached in this object, not set in the deliveryReport to avoid the + // topic is cached in this object, not set in the deliveryReport to avoid the // cost of marshalling it. dr.Topic = Topic; @@ -1023,17 +1049,17 @@ public void HandleDeliveryReport(DeliveryReport deliveryReport) { TopicPartitionOffsetError = deliveryReport.TopicPartitionOffsetError, Status = deliveryReport.Status, - Message = new Message + Message = new Message { Key = Key, Value = Value, - Timestamp = deliveryReport.Message == null - ? new Timestamp(0, TimestampType.NotAvailable) + Timestamp = deliveryReport.Message == null + ? new Timestamp(0, TimestampType.NotAvailable) : deliveryReport.Message.Timestamp, Headers = deliveryReport.Message?.Headers } }; - // topic is cached in this object, not set in the deliveryReport to avoid the + // topic is cached in this object, not set in the deliveryReport to avoid the // cost of marshalling it. dr.Topic = Topic; @@ -1054,8 +1080,8 @@ public void BeginTransaction() /// public void CommitTransaction(TimeSpan timeout) - => KafkaHandle.CommitTransaction(timeout.TotalMillisecondsAsInt()); - + => KafkaHandle.CommitTransaction(timeout.TotalMillisecondsAsInt()); + /// public void CommitTransaction() => KafkaHandle.CommitTransaction(-1); diff --git a/src/Confluent.Kafka/ProducerBuilder.cs b/src/Confluent.Kafka/ProducerBuilder.cs index bb24db3a2..bbef05a25 100644 --- a/src/Confluent.Kafka/ProducerBuilder.cs +++ b/src/Confluent.Kafka/ProducerBuilder.cs @@ -84,7 +84,7 @@ public class ProducerBuilder /// internal protected Action, string> OAuthBearerTokenRefreshHandler { get; set; } - /// + /// /// The per-topic custom partitioners. /// internal protected Dictionary Partitioners { get; set; } = new Dictionary(); @@ -104,6 +104,11 @@ public class ProducerBuilder /// internal protected ISerializer ValueSerializer { get; set; } + /// + /// The configured segment value serializer. + /// + internal protected ISegmentSerializer SegmentValueSerializer { get; set; } + /// /// The configured async key serializer. /// @@ -137,9 +142,9 @@ internal Producer.Config ConstructBaseConfig(Producer } /// - /// A collection of librdkafka configuration parameters + /// A collection of librdkafka configuration parameters /// (refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) - /// and parameters specific to this client (refer to: + /// and parameters specific to this client (refer to: /// ). /// At a minimum, 'bootstrap.servers' must be specified. /// @@ -321,7 +326,7 @@ public ProducerBuilder SetKeySerializer(ISerializer serializ /// public ProducerBuilder SetValueSerializer(ISerializer serializer) { - if (this.ValueSerializer != null || this.AsyncValueSerializer != null) + if (this.ValueSerializer != null || this.AsyncValueSerializer != null || this.SegmentValueSerializer != null) { throw new InvalidOperationException("Value serializer may not be specified more than once."); } @@ -359,7 +364,7 @@ public ProducerBuilder SetKeySerializer(IAsyncSerializer ser /// public ProducerBuilder SetValueSerializer(IAsyncSerializer serializer) { - if (this.ValueSerializer != null || this.AsyncValueSerializer != null) + if (this.ValueSerializer != null || this.AsyncValueSerializer != null || this.SegmentValueSerializer != null) { throw new InvalidOperationException("Value serializer may not be specified more than once."); } @@ -367,6 +372,25 @@ public ProducerBuilder SetValueSerializer(IAsyncSerializer return this; } + /// + /// The serializer to use to serialize values. + /// + /// + /// If your value serializer throws an exception, this will be + /// wrapped in a ProduceException with ErrorCode + /// Local_ValueSerialization and thrown by the initiating call to + /// Produce or ProduceAsync. + /// + public ProducerBuilder SetValueSerializer(ISegmentSerializer serializer) + { + if (this.ValueSerializer != null || this.AsyncValueSerializer != null || this.SegmentValueSerializer != null) + { + throw new InvalidOperationException("Value serializer may not be specified more than once."); + } + this.SegmentValueSerializer = serializer; + return this; + } + /// /// Build a new IProducer implementation instance. ///