diff --git a/src/Confluent.Kafka/Producer.cs b/src/Confluent.Kafka/Producer.cs index 79867514b..c6a123bad 100644 --- a/src/Confluent.Kafka/Producer.cs +++ b/src/Confluent.Kafka/Producer.cs @@ -529,7 +529,11 @@ private void InitializeSerializers( } // setup value serializer. - if (valueSerializer == null && asyncValueSerializer == null) + if (typeof(TValue) == typeof(ArraySegment)) + { + // No serializer needed for native buffers + } + else if (valueSerializer == null && asyncValueSerializer == null) { if (!defaultSerializers.TryGetValue(typeof(TValue), out object serializer)) { @@ -770,11 +774,23 @@ public async Task> ProduceAsync( } byte[] valBytes; + int valOffset = 0, valLength = 0; try { - valBytes = (valueSerializer != null) - ? valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)) - : await asyncValueSerializer.SerializeAsync(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)).ConfigureAwait(false); + if (message.Value is ArraySegment arraySegment) + { + valBytes = arraySegment.Array; + valOffset = arraySegment.Offset; + valLength = arraySegment.Count; + } + else + { + valBytes = (valueSerializer != null) + ? valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)) + : await asyncValueSerializer.SerializeAsync(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)).ConfigureAwait(false); + valOffset = 0; + valLength = valBytes == null ? 0 : valBytes.Length; + } } catch (Exception ex) { @@ -805,7 +821,7 @@ public async Task> ProduceAsync( ProduceImpl( topicPartition.Topic, - valBytes, 0, valBytes == null ? 0 : valBytes.Length, + valBytes, valOffset, valLength, keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length, message.Timestamp, topicPartition.Partition, headers.BackingList, handler); @@ -816,7 +832,7 @@ public async Task> ProduceAsync( { ProduceImpl( topicPartition.Topic, - valBytes, 0, valBytes == null ? 0 : valBytes.Length, + valBytes, valOffset, valLength, keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length, message.Timestamp, topicPartition.Partition, headers.BackingList, null); @@ -893,11 +909,23 @@ public void Produce( } byte[] valBytes; + int valOffset = 0, valLength = 0; try { - valBytes = (valueSerializer != null) - ? valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)) - : throw new InvalidOperationException("Produce called with an IAsyncSerializer value serializer configured but an ISerializer is required."); + if (message.Value is ArraySegment arraySegment) + { + valBytes = arraySegment.Array; + valOffset = arraySegment.Offset; + valLength = arraySegment.Count; + } + else + { + valBytes = (valueSerializer != null) + ? valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)) + : throw new InvalidOperationException("Produce called with an IAsyncSerializer value serializer configured but an ISerializer is required."); + valOffset = 0; + valLength = valBytes == null ? 0 : valBytes.Length; + } } catch (Exception ex) { @@ -915,7 +943,7 @@ public void Produce( { ProduceImpl( topicPartition.Topic, - valBytes, 0, valBytes == null ? 0 : valBytes.Length, + valBytes, valOffset, valLength, keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length, message.Timestamp, topicPartition.Partition, headers.BackingList,