diff --git a/src/Confluent.Kafka/Impl/SafeKafkaHandle.cs b/src/Confluent.Kafka/Impl/SafeKafkaHandle.cs index 60d0e34ba..ce9b98bec 100644 --- a/src/Confluent.Kafka/Impl/SafeKafkaHandle.cs +++ b/src/Confluent.Kafka/Impl/SafeKafkaHandle.cs @@ -18,6 +18,7 @@ // Refer to LICENSE for more information. using System; +using System.Buffers; using System.Collections.Generic; using System.Linq; using System.Runtime.InteropServices; @@ -357,45 +358,35 @@ private IntPtr marshalHeaders(IReadOnlyList headers) return headersPtr; } - internal ErrorCode Produce( + internal unsafe ErrorCode Produce( string topic, - byte[] val, int valOffset, int valLength, - byte[] key, int keyOffset, int keyLength, + ReadOnlyMemory? val, + ReadOnlyMemory? key, int partition, long timestamp, IReadOnlyList headers, IntPtr opaque) { - var pValue = IntPtr.Zero; - var pKey = IntPtr.Zero; + MemoryHandle? valueHandle = null; + IntPtr valuePtr = IntPtr.Zero; + UIntPtr valueLength = UIntPtr.Zero; - var gchValue = default(GCHandle); - var gchKey = default(GCHandle); + MemoryHandle? keyHandle = null; + IntPtr keyPtr = IntPtr.Zero; + UIntPtr keyLength = UIntPtr.Zero; - if (val == null) + if (val != null) { - if (valOffset != 0 || valLength != 0) - { - throw new ArgumentException("valOffset and valLength parameters must be 0 when producing null values."); - } - } - else - { - gchValue = GCHandle.Alloc(val, GCHandleType.Pinned); - pValue = Marshal.UnsafeAddrOfPinnedArrayElement(val, valOffset); + valueHandle = val.Value.Pin(); + valuePtr = (IntPtr)valueHandle.Value.Pointer; + valueLength = (UIntPtr)val.Value.Length; } - if (key == null) + if (key != null) { - if (keyOffset != 0 || keyLength != 0) - { - throw new ArgumentException("keyOffset and keyLength parameters must be 0 when producing null key values."); - } - } - else - { - gchKey = GCHandle.Alloc(key, GCHandleType.Pinned); - pKey = Marshal.UnsafeAddrOfPinnedArrayElement(key, keyOffset); + keyHandle = key.Value.Pin(); + keyPtr = (IntPtr)keyHandle.Value.Pointer; + keyLength = (UIntPtr)key.Value.Length; } IntPtr headersPtr = marshalHeaders(headers); @@ -407,8 +398,8 @@ internal ErrorCode Produce( topic, partition, (IntPtr)MsgFlags.MSG_F_COPY, - pValue, (UIntPtr)valLength, - pKey, (UIntPtr)keyLength, + valuePtr, valueLength, + keyPtr, keyLength, timestamp, headersPtr, opaque); @@ -433,15 +424,8 @@ internal ErrorCode Produce( } finally { - if (val != null) - { - gchValue.Free(); - } - - if (key != null) - { - gchKey.Free(); - } + valueHandle?.Dispose(); + keyHandle?.Dispose(); } } diff --git a/src/Confluent.Kafka/Producer.cs b/src/Confluent.Kafka/Producer.cs index 79867514b..a8c507e66 100644 --- a/src/Confluent.Kafka/Producer.cs +++ b/src/Confluent.Kafka/Producer.cs @@ -42,8 +42,8 @@ internal class Config public PartitionerDelegate defaultPartitioner; } - private ISerializer keySerializer; - private ISerializer valueSerializer; + private Func?> serializeKey; + private Func?> serializeValue; private IAsyncSerializer asyncKeySerializer; private IAsyncSerializer asyncValueSerializer; @@ -58,6 +58,14 @@ internal class Config { typeof(byte[]), Serializers.ByteArray } }; + private static readonly Dictionary memorySerializeFuncs = new Dictionary + { + [typeof(Memory)] = (Memory x, SerializationContext _) => (ReadOnlyMemory?)x, + [typeof(Memory?)] = (Memory? x, SerializationContext _) => (ReadOnlyMemory?)x, + [typeof(ReadOnlyMemory)] = (ReadOnlyMemory x, SerializationContext _) => (ReadOnlyMemory?)x, + [typeof(ReadOnlyMemory?)] = (ReadOnlyMemory? x, SerializationContext _) => x, + }; + private int cancellationDelayMaxMs; private bool disposeHasBeenCalled = false; private object disposeHasBeenCalledLockObj = new object(); @@ -279,8 +287,8 @@ private void DeliveryReportCallbackImpl(IntPtr rk, IntPtr rkmessage, IntPtr opaq private void ProduceImpl( string topic, - byte[] val, int valOffset, int valLength, - byte[] key, int keyOffset, int keyLength, + ReadOnlyMemory? val, + ReadOnlyMemory? key, Timestamp timestamp, Partition partition, IReadOnlyList headers, @@ -308,8 +316,8 @@ private void ProduceImpl( err = KafkaHandle.Produce( topic, - val, valOffset, valLength, - key, keyOffset, keyLength, + val, + key, partition.Value, timestamp.UnixTimestampMs, headers, @@ -325,8 +333,8 @@ private void ProduceImpl( { err = KafkaHandle.Produce( topic, - val, valOffset, valLength, - key, keyOffset, keyLength, + val, + key, partition.Value, timestamp.UnixTimestampMs, headers, @@ -508,12 +516,20 @@ private void InitializeSerializers( // setup key serializer. if (keySerializer == null && asyncKeySerializer == null) { - if (!defaultSerializers.TryGetValue(typeof(TKey), out object serializer)) + if (defaultSerializers.TryGetValue(typeof(TKey), out object serializer)) + { + keySerializer = (ISerializer)serializer; + this.serializeKey = (k, ctx) => keySerializer.Serialize(k, ctx)?.AsMemory(); + } + else if (memorySerializeFuncs.TryGetValue(typeof(TKey), out object serialize)) + { + this.serializeKey = (Func?>)serialize; + } + else { throw new ArgumentNullException( $"Key serializer not specified and there is no default serializer defined for type {typeof(TKey).Name}."); } - this.keySerializer = (ISerializer)serializer; } else if (keySerializer == null && asyncKeySerializer != null) { @@ -521,7 +537,7 @@ private void InitializeSerializers( } else if (keySerializer != null && asyncKeySerializer == null) { - this.keySerializer = keySerializer; + this.serializeKey = (k, ctx) => keySerializer.Serialize(k, ctx)?.AsMemory(); } else { @@ -531,12 +547,20 @@ private void InitializeSerializers( // setup value serializer. if (valueSerializer == null && asyncValueSerializer == null) { - if (!defaultSerializers.TryGetValue(typeof(TValue), out object serializer)) + if (defaultSerializers.TryGetValue(typeof(TValue), out object serializer)) + { + valueSerializer = (ISerializer)serializer; + this.serializeValue = (k, ctx) => valueSerializer.Serialize(k, ctx)?.AsMemory(); + } + else if (memorySerializeFuncs.TryGetValue(typeof(TValue), out object serialize)) + { + this.serializeValue = (Func?>)serialize; + } + else { throw new ArgumentNullException( $"Value serializer not specified and there is no default serializer defined for type {typeof(TValue).Name}."); } - this.valueSerializer = (ISerializer)serializer; } else if (valueSerializer == null && asyncValueSerializer != null) { @@ -544,7 +568,7 @@ private void InitializeSerializers( } else if (valueSerializer != null && asyncValueSerializer == null) { - this.valueSerializer = valueSerializer; + this.serializeValue = (k, ctx) => valueSerializer.Serialize(k, ctx)?.AsMemory(); } else { @@ -750,11 +774,11 @@ public async Task> ProduceAsync( { Headers headers = message.Headers ?? new Headers(); - byte[] keyBytes; + ReadOnlyMemory? keyBytes; try { - keyBytes = (keySerializer != null) - ? keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)) + keyBytes = (serializeKey != null) + ? serializeKey(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)) : await asyncKeySerializer.SerializeAsync(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)).ConfigureAwait(false); } catch (Exception ex) @@ -769,11 +793,11 @@ public async Task> ProduceAsync( ex); } - byte[] valBytes; + ReadOnlyMemory? valBytes; try { - valBytes = (valueSerializer != null) - ? valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)) + valBytes = (serializeValue != null) + ? serializeValue(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)) : await asyncValueSerializer.SerializeAsync(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)).ConfigureAwait(false); } catch (Exception ex) @@ -805,8 +829,8 @@ public async Task> ProduceAsync( ProduceImpl( topicPartition.Topic, - valBytes, 0, valBytes == null ? 0 : valBytes.Length, - keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length, + valBytes, + keyBytes, message.Timestamp, topicPartition.Partition, headers.BackingList, handler); @@ -816,8 +840,8 @@ public async Task> ProduceAsync( { ProduceImpl( topicPartition.Topic, - valBytes, 0, valBytes == null ? 0 : valBytes.Length, - keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length, + valBytes, + keyBytes, message.Timestamp, topicPartition.Partition, headers.BackingList, null); @@ -873,11 +897,11 @@ public void Produce( Headers headers = message.Headers ?? new Headers(); - byte[] keyBytes; + ReadOnlyMemory? keyBytes; try { - keyBytes = (keySerializer != null) - ? keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)) + keyBytes = (serializeKey != null) + ? serializeKey(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)) : throw new InvalidOperationException("Produce called with an IAsyncSerializer key serializer configured but an ISerializer is required."); } catch (Exception ex) @@ -892,11 +916,11 @@ public void Produce( ex); } - byte[] valBytes; + ReadOnlyMemory? valBytes; try { - valBytes = (valueSerializer != null) - ? valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)) + valBytes = (serializeValue != null) + ? serializeValue(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)) : throw new InvalidOperationException("Produce called with an IAsyncSerializer value serializer configured but an ISerializer is required."); } catch (Exception ex) @@ -915,8 +939,8 @@ public void Produce( { ProduceImpl( topicPartition.Topic, - valBytes, 0, valBytes == null ? 0 : valBytes.Length, - keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length, + valBytes, + keyBytes, message.Timestamp, topicPartition.Partition, headers.BackingList, deliveryHandler == null diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce.cs index e21ceffa8..2361b572b 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce.cs @@ -107,6 +107,71 @@ public void Producer_Produce(string bootstrapServers) Assert.Equal(2, count); + // Memory case. + + count = 0; + Action, ReadOnlyMemory>> dh3 = dr => + { + Assert.Equal(ErrorCode.NoError, dr.Error.Code); + Assert.Equal(PersistenceStatus.Persisted, dr.Status); + Assert.Equal((Partition)0, dr.Partition); + Assert.Equal(singlePartitionTopic, dr.Topic); + Assert.True(dr.Offset >= 0); + Assert.Equal($"test key {count + 42}", Encoding.UTF8.GetString(dr.Message.Key.Span)); + Assert.Equal($"test val {count + 42}", Encoding.UTF8.GetString(dr.Message.Value.Span)); + Assert.Equal(TimestampType.CreateTime, dr.Message.Timestamp.Type); + Assert.True(Math.Abs((DateTime.UtcNow - dr.Message.Timestamp.UtcDateTime).TotalMinutes) < 1.0); + count += 1; + }; + + using (var producer = new TestProducerBuilder, ReadOnlyMemory>(producerConfig).Build()) + { + producer.Produce( + new TopicPartition(singlePartitionTopic, 0), + new Message, ReadOnlyMemory> { Key = Encoding.UTF8.GetBytes("test key 42"), Value = Encoding.UTF8.GetBytes("test val 42") }, dh3); + + producer.Produce( + singlePartitionTopic, + new Message, ReadOnlyMemory> { Key = Encoding.UTF8.GetBytes("test key 43"), Value = Encoding.UTF8.GetBytes("test val 43") }, dh3); + + producer.Flush(TimeSpan.FromSeconds(10)); + } + + Assert.Equal(2, count); + + // Memory? case. + + count = 0; + Action?, Memory?>> dh4 = dr => + { + Assert.Equal(ErrorCode.NoError, dr.Error.Code); + Assert.Equal(PersistenceStatus.Persisted, dr.Status); + Assert.Equal((Partition)0, dr.Partition); + Assert.Equal(singlePartitionTopic, dr.Topic); + Assert.True(dr.Offset >= 0); + Assert.True(dr.Message.Key.HasValue); + Assert.Equal($"test key {count + 42}", Encoding.UTF8.GetString(dr.Message.Key.Value.Span)); + Assert.True(dr.Message.Value.HasValue); + Assert.Equal($"test val {count + 42}", Encoding.UTF8.GetString(dr.Message.Value.Value.Span)); + Assert.Equal(TimestampType.CreateTime, dr.Message.Timestamp.Type); + Assert.True(Math.Abs((DateTime.UtcNow - dr.Message.Timestamp.UtcDateTime).TotalMinutes) < 1.0); + count += 1; + }; + + using (var producer = new TestProducerBuilder?, Memory?>(producerConfig).Build()) + { + producer.Produce( + new TopicPartition(singlePartitionTopic, 0), + new Message?, Memory?> { Key = Encoding.UTF8.GetBytes("test key 42"), Value = Encoding.UTF8.GetBytes("test val 42") }, dh4); + + producer.Produce( + singlePartitionTopic, + new Message?, Memory?> { Key = Encoding.UTF8.GetBytes("test key 43"), Value = Encoding.UTF8.GetBytes("test val 43") }, dh4); + + producer.Flush(TimeSpan.FromSeconds(10)); + } + + Assert.Equal(2, count); Assert.Equal(0, Library.HandleCount); LogToFile("end Producer_Produce"); diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_ProduceAsync_Error.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_ProduceAsync_Error.cs index 8bc1df92b..40f398b67 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_ProduceAsync_Error.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_ProduceAsync_Error.cs @@ -106,6 +106,40 @@ public void Producer_ProduceAsync_Error(string bootstrapServers) Assert.Equal(TimestampType.NotAvailable, dr.Message.Timestamp.Type); } + // Memory case + + Task, Memory>> drt3; + using (var producer = new TestProducerBuilder, Memory>(producerConfig).Build()) + { + drt3 = producer.ProduceAsync( + new TopicPartition(partitionedTopic, 42), + new Message, Memory> { Key = new byte[] { 100 }, Value = new byte[] { 101 } }); + Assert.Equal(0, producer.Flush(TimeSpan.FromSeconds(10))); + } + + Assert.Throws(() => { drt.Wait(); }); + + try + { + _ = drt3.Result; + } + catch (AggregateException e) + { + var inner = e.InnerException; + Assert.IsType, Memory>>(inner); + var dr = ((ProduceException, Memory>)inner).DeliveryResult; + var err = ((ProduceException, Memory>)inner).Error; + + Assert.True(err.IsError); + Assert.False(err.IsFatal); + Assert.Equal(partitionedTopic, dr.Topic); + Assert.Equal(Offset.Unset, dr.Offset); + Assert.True(dr.Partition == 42); + Assert.Equal(new byte[] { 100 }, dr.Message.Key.ToArray()); + Assert.Equal(new byte[] { 101 }, dr.Message.Value.ToArray()); + Assert.Equal(TimestampType.NotAvailable, dr.Message.Timestamp.Type); + } + Assert.Equal(0, Library.HandleCount); LogToFile("end Producer_ProduceAsync_Error"); } diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_ProduceAsync_Null_Task.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_ProduceAsync_Null_Task.cs index b5463cc37..973f49ad2 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_ProduceAsync_Null_Task.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_ProduceAsync_Null_Task.cs @@ -90,6 +90,29 @@ public void Producer_ProduceAsync_Null_Task(string bootstrapServers) Assert.Equal((Partition)1, drs2[0].Result.Partition); + // Memory? case + + var drs3 = new List?, Memory?>>>(); + using (var producer = new TestProducerBuilder?, Memory?>(producerConfig).Build()) + { + drs3.Add(producer.ProduceAsync(new TopicPartition(partitionedTopic, 1), new Message?, Memory?>())); + drs3.Add(producer.ProduceAsync(partitionedTopic, new Message?, Memory?>())); + Assert.Equal(0, producer.Flush(TimeSpan.FromSeconds(10))); + } + + for (int i = 0; i < 2; ++i) + { + var dr = drs3[i].Result; + Assert.True(dr.Partition == 0 || dr.Partition == 1); + Assert.Equal(partitionedTopic, dr.Topic); + Assert.True(dr.Offset >= 0); + Assert.Null(dr.Message.Key); + Assert.Null(dr.Message.Value); + Assert.Equal(TimestampType.CreateTime, dr.Message.Timestamp.Type); + Assert.True(Math.Abs((DateTime.UtcNow - dr.Message.Timestamp.UtcDateTime).TotalMinutes) < 1.0); + } + + Assert.Equal((Partition)1, drs3[0].Result.Partition); Assert.Equal(0, Library.HandleCount); LogToFile("end Producer_ProduceAsync_Null_Task"); diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_ProduceAsync_Task.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_ProduceAsync_Task.cs index 98edbcb09..4e6b095b6 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_ProduceAsync_Task.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_ProduceAsync_Task.cs @@ -98,6 +98,64 @@ public void Producer_ProduceAsync_Task(string bootstrapServers) Assert.Equal((Partition)1, drs2[0].Result.Partition); + // Memory case + + var drs3 = new List, ReadOnlyMemory>>>(); + using (var producer = new TestProducerBuilder, ReadOnlyMemory>(producerConfig).Build()) + { + drs3.Add(producer.ProduceAsync( + new TopicPartition(partitionedTopic, 1), + new Message, ReadOnlyMemory> { Key = Encoding.UTF8.GetBytes("test key 2"), Value = Encoding.UTF8.GetBytes("test val 2") })); + drs3.Add(producer.ProduceAsync( + partitionedTopic, + new Message, ReadOnlyMemory> { Key = Encoding.UTF8.GetBytes("test key 3"), Value = Encoding.UTF8.GetBytes("test val 3") })); + Assert.Equal(0, producer.Flush(TimeSpan.FromSeconds(10))); + } + + for (int i = 0; i < 2; ++i) + { + var dr = drs3[i].Result; + Assert.Equal(partitionedTopic, dr.Topic); + Assert.True(dr.Offset >= 0); + Assert.True(dr.Partition == 0 || dr.Partition == 1); + Assert.Equal($"test key {i+2}", Encoding.UTF8.GetString(dr.Message.Key.Span)); + Assert.Equal($"test val {i+2}", Encoding.UTF8.GetString(dr.Message.Value.Span)); + Assert.Equal(TimestampType.CreateTime, dr.Message.Timestamp.Type); + Assert.True(Math.Abs((DateTime.UtcNow - dr.Message.Timestamp.UtcDateTime).TotalMinutes) < 1.0); + } + + Assert.Equal((Partition)1, drs3[0].Result.Partition); + + // Memory? case + + var drs4 = new List?, Memory?>>>(); + using (var producer = new TestProducerBuilder?, Memory?>(producerConfig).Build()) + { + drs4.Add(producer.ProduceAsync( + new TopicPartition(partitionedTopic, 1), + new Message?, Memory?> { Key = Encoding.UTF8.GetBytes("test key 2"), Value = Encoding.UTF8.GetBytes("test val 2") })); + drs4.Add(producer.ProduceAsync( + partitionedTopic, + new Message?, Memory?> { Key = Encoding.UTF8.GetBytes("test key 3"), Value = Encoding.UTF8.GetBytes("test val 3") })); + Assert.Equal(0, producer.Flush(TimeSpan.FromSeconds(10))); + } + + for (int i = 0; i < 2; ++i) + { + var dr = drs4[i].Result; + Assert.Equal(partitionedTopic, dr.Topic); + Assert.True(dr.Offset >= 0); + Assert.True(dr.Partition == 0 || dr.Partition == 1); + Assert.True(dr.Message.Key.HasValue); + Assert.Equal($"test key {i+2}", Encoding.UTF8.GetString(dr.Message.Key.Value.Span)); + Assert.True(dr.Message.Value.HasValue); + Assert.Equal($"test val {i+2}", Encoding.UTF8.GetString(dr.Message.Value.Value.Span)); + Assert.Equal(TimestampType.CreateTime, dr.Message.Timestamp.Type); + Assert.True(Math.Abs((DateTime.UtcNow - dr.Message.Timestamp.UtcDateTime).TotalMinutes) < 1.0); + } + + Assert.Equal((Partition)1, drs4[0].Result.Partition); + Assert.Equal(0, Library.HandleCount); LogToFile("end Producer_ProduceAsync_Task"); } diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce_Error.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce_Error.cs index 6b5818b59..40c136147 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce_Error.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce_Error.cs @@ -89,6 +89,29 @@ public void Producer_Produce_Error(string bootstrapServers) Assert.Equal(1, count); + // Memory case. + + count = 0; + Action, ReadOnlyMemory?>> dh3 = dr => + { + Assert.Equal(ErrorCode.Local_UnknownPartition, dr.Error.Code); + Assert.Equal((Partition)42, dr.Partition); + Assert.Equal(singlePartitionTopic, dr.Topic); + Assert.Equal(Offset.Unset, dr.Offset); + Assert.Equal(new byte[] { 11 }, dr.Message.Key.ToArray()); + Assert.Null(dr.Message.Value); + Assert.Equal(TimestampType.NotAvailable, dr.Message.Timestamp.Type); + count += 1; + }; + + using (var producer = new TestProducerBuilder, ReadOnlyMemory?>(producerConfig).Build()) + { + producer.Produce(new TopicPartition(singlePartitionTopic, 42), new Message, ReadOnlyMemory?> { Key = new byte[] { 11 } }, dh3); + producer.Flush(TimeSpan.FromSeconds(10)); + } + + Assert.Equal(1, count); + Assert.Equal(0, Library.HandleCount); LogToFile("end Producer_Produce_Error"); } diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce_Null.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce_Null.cs index fffc515d0..300c30fb2 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce_Null.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce_Null.cs @@ -89,6 +89,31 @@ public void Producer_Produce_Null(string bootstrapServers) Assert.Equal(2, count); + // Memory? case. + + count = 0; + Action?, Memory?>> dh3 = dr => + { + Assert.Equal(ErrorCode.NoError, dr.Error.Code); + Assert.Equal((Partition)0, dr.Partition); + Assert.Equal(singlePartitionTopic, dr.Topic); + Assert.True(dr.Offset >= 0); + Assert.Null(dr.Message.Key); + Assert.Null(dr.Message.Value); + Assert.Equal(TimestampType.CreateTime, dr.Message.Timestamp.Type); + Assert.True(Math.Abs((DateTime.UtcNow - dr.Message.Timestamp.UtcDateTime).TotalMinutes) < 1.0); + count += 1; + }; + + using (var producer = new TestProducerBuilder?, Memory?>(producerConfig).Build()) + { + producer.Produce(new TopicPartition(singlePartitionTopic, 0), new Message?, Memory?>(), dh3); + producer.Produce(singlePartitionTopic, new Message?, Memory?>(), dh3); + producer.Flush(TimeSpan.FromSeconds(10)); + } + + Assert.Equal(2, count); + Assert.Equal(0, Library.HandleCount); LogToFile("end Producer_Produce_Null"); } diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/SimpleProduceConsume.cs b/test/Confluent.Kafka.IntegrationTests/Tests/SimpleProduceConsume.cs index cbab5abd8..191c79ba2 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/SimpleProduceConsume.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/SimpleProduceConsume.cs @@ -50,21 +50,33 @@ public void SimpleProduceConsume(string bootstrapServers) string testString1 = "hello world"; string testString2 = null; + string testString3 = "dlrow olleh"; + string testString4 = null; DeliveryResult produceResult1; DeliveryResult produceResult2; + DeliveryResult?> produceResult3; + DeliveryResult?> produceResult4; using (var producer = new TestProducerBuilder(producerConfig).Build()) { produceResult1 = ProduceMessage(singlePartitionTopic, producer, testString1); produceResult2 = ProduceMessage(singlePartitionTopic, producer, testString2); } + using (var producer = new TestProducerBuilder?>(producerConfig).Build()) + { + produceResult3 = ProduceMessage(singlePartitionTopic, producer, testString3); + produceResult4 = ProduceMessage(singlePartitionTopic, producer, testString4); + } + using (var consumer = new TestConsumerBuilder(consumerConfig).Build()) { ConsumeMessage(consumer, produceResult1, testString1); ConsumeMessage(consumer, produceResult2, testString2); + ConsumeMessage(consumer, produceResult3, testString3); + ConsumeMessage(consumer, produceResult4, testString4); } - + Assert.Equal(0, Library.HandleCount); LogToFile("end SimpleProduceConsume"); } @@ -80,6 +92,17 @@ private static void ConsumeMessage(IConsumer consumer, DeliveryR Assert.Equal(r.Message.Timestamp.UnixTimestampMs, dr.Message.Timestamp.UnixTimestampMs); } + private static void ConsumeMessage(IConsumer consumer, DeliveryResult?> dr, string testString) + { + consumer.Assign(new List { dr.TopicPartitionOffset }); + var r = consumer.Consume(TimeSpan.FromSeconds(10)); + Assert.NotNull(r?.Message); + Assert.Equal(testString, r.Message.Value == null ? null : Encoding.UTF8.GetString(r.Message.Value)); + Assert.Null(r.Message.Key); + Assert.Equal(r.Message.Timestamp.Type, dr.Message.Timestamp.Type); + Assert.Equal(r.Message.Timestamp.UnixTimestampMs, dr.Message.Timestamp.UnixTimestampMs); + } + private static DeliveryResult ProduceMessage(string topic, IProducer producer, string testString) { var result = producer.ProduceAsync(topic, new Message { Value = testString }).Result; @@ -91,5 +114,20 @@ private static DeliveryResult ProduceMessage(string topic, IProduc Assert.Equal(0, producer.Flush(TimeSpan.FromSeconds(10))); return result; } + + private static DeliveryResult?> ProduceMessage(string topic, IProducer?> producer, string testString) + { + var result = producer.ProduceAsync(topic, new Message?> + { + Value = testString == null ? null : Encoding.UTF8.GetBytes(testString), + }).Result; + Assert.NotNull(result?.Message); + Assert.Equal(topic, result.Topic); + Assert.NotEqual(result.Offset, Offset.Unset); + Assert.Equal(TimestampType.CreateTime, result.Message.Timestamp.Type); + Assert.True(Math.Abs((DateTime.UtcNow - result.Message.Timestamp.UtcDateTime).TotalMinutes) < 1.0); + Assert.Equal(0, producer.Flush(TimeSpan.FromSeconds(10))); + return result; + } } }