Skip to content

Commit 4ffdad9

Browse files
committed
Natively support Memory<byte>
1 parent 6b281a8 commit 4ffdad9

File tree

3 files changed

+141
-68
lines changed

3 files changed

+141
-68
lines changed

src/Confluent.Kafka/Impl/SafeKafkaHandle.cs

Lines changed: 22 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
// Refer to LICENSE for more information.
1919

2020
using System;
21+
using System.Buffers;
2122
using System.Collections.Generic;
2223
using System.Linq;
2324
using System.Runtime.InteropServices;
@@ -357,45 +358,35 @@ private IntPtr marshalHeaders(IReadOnlyList<IHeader> headers)
357358
return headersPtr;
358359
}
359360

360-
internal ErrorCode Produce(
361+
internal unsafe ErrorCode Produce(
361362
string topic,
362-
byte[] val, int valOffset, int valLength,
363-
byte[] key, int keyOffset, int keyLength,
363+
ReadOnlyMemory<byte>? val,
364+
ReadOnlyMemory<byte>? key,
364365
int partition,
365366
long timestamp,
366367
IReadOnlyList<IHeader> headers,
367368
IntPtr opaque)
368369
{
369-
var pValue = IntPtr.Zero;
370-
var pKey = IntPtr.Zero;
370+
MemoryHandle? valueHandle = null;
371+
IntPtr valuePtr = IntPtr.Zero;
372+
UIntPtr valueLength = UIntPtr.Zero;
371373

372-
var gchValue = default(GCHandle);
373-
var gchKey = default(GCHandle);
374+
MemoryHandle? keyHandle = null;
375+
IntPtr keyPtr = IntPtr.Zero;
376+
UIntPtr keyLength = UIntPtr.Zero;
374377

375-
if (val == null)
378+
if (val != null)
376379
{
377-
if (valOffset != 0 || valLength != 0)
378-
{
379-
throw new ArgumentException("valOffset and valLength parameters must be 0 when producing null values.");
380-
}
381-
}
382-
else
383-
{
384-
gchValue = GCHandle.Alloc(val, GCHandleType.Pinned);
385-
pValue = Marshal.UnsafeAddrOfPinnedArrayElement(val, valOffset);
380+
valueHandle = val.Value.Pin();
381+
valuePtr = (IntPtr)valueHandle.Value.Pointer;
382+
valueLength = (UIntPtr)val.Value.Length;
386383
}
387384

388-
if (key == null)
385+
if (key != null)
389386
{
390-
if (keyOffset != 0 || keyLength != 0)
391-
{
392-
throw new ArgumentException("keyOffset and keyLength parameters must be 0 when producing null key values.");
393-
}
394-
}
395-
else
396-
{
397-
gchKey = GCHandle.Alloc(key, GCHandleType.Pinned);
398-
pKey = Marshal.UnsafeAddrOfPinnedArrayElement(key, keyOffset);
387+
keyHandle = key.Value.Pin();
388+
keyPtr = (IntPtr)keyHandle.Value.Pointer;
389+
keyLength = (UIntPtr)key.Value.Length;
399390
}
400391

401392
IntPtr headersPtr = marshalHeaders(headers);
@@ -407,8 +398,8 @@ internal ErrorCode Produce(
407398
topic,
408399
partition,
409400
(IntPtr)MsgFlags.MSG_F_COPY,
410-
pValue, (UIntPtr)valLength,
411-
pKey, (UIntPtr)keyLength,
401+
valuePtr, valueLength,
402+
keyPtr, keyLength,
412403
timestamp,
413404
headersPtr,
414405
opaque);
@@ -433,15 +424,8 @@ internal ErrorCode Produce(
433424
}
434425
finally
435426
{
436-
if (val != null)
437-
{
438-
gchValue.Free();
439-
}
440-
441-
if (key != null)
442-
{
443-
gchKey.Free();
444-
}
427+
valueHandle?.Dispose();
428+
keyHandle?.Dispose();
445429
}
446430
}
447431

src/Confluent.Kafka/Producer.cs

Lines changed: 88 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -279,8 +279,8 @@ private void DeliveryReportCallbackImpl(IntPtr rk, IntPtr rkmessage, IntPtr opaq
279279

280280
private void ProduceImpl(
281281
string topic,
282-
byte[] val, int valOffset, int valLength,
283-
byte[] key, int keyOffset, int keyLength,
282+
ReadOnlyMemory<byte> val,
283+
ReadOnlyMemory<byte> key,
284284
Timestamp timestamp,
285285
Partition partition,
286286
IReadOnlyList<IHeader> headers,
@@ -308,8 +308,8 @@ private void ProduceImpl(
308308

309309
err = KafkaHandle.Produce(
310310
topic,
311-
val, valOffset, valLength,
312-
key, keyOffset, keyLength,
311+
val,
312+
key,
313313
partition.Value,
314314
timestamp.UnixTimestampMs,
315315
headers,
@@ -325,8 +325,8 @@ private void ProduceImpl(
325325
{
326326
err = KafkaHandle.Produce(
327327
topic,
328-
val, valOffset, valLength,
329-
key, keyOffset, keyLength,
328+
val,
329+
key,
330330
partition.Value,
331331
timestamp.UnixTimestampMs,
332332
headers,
@@ -506,7 +506,14 @@ private void InitializeSerializers(
506506
IAsyncSerializer<TValue> asyncValueSerializer)
507507
{
508508
// setup key serializer.
509-
if (keySerializer == null && asyncKeySerializer == null)
509+
if (typeof(TKey) == typeof(Memory<byte>) || typeof(TKey) == typeof(ReadOnlyMemory<byte>))
510+
{
511+
if (keySerializer != null || asyncKeySerializer != null)
512+
{
513+
throw new ArgumentNullException(null, "Key serializer should not be specified for Memory<byte>");
514+
}
515+
}
516+
else if (keySerializer == null && asyncKeySerializer == null)
510517
{
511518
if (!defaultSerializers.TryGetValue(typeof(TKey), out object serializer))
512519
{
@@ -529,7 +536,14 @@ private void InitializeSerializers(
529536
}
530537

531538
// setup value serializer.
532-
if (valueSerializer == null && asyncValueSerializer == null)
539+
if (typeof(TValue) == typeof(Memory<byte>) || typeof(TValue) == typeof(ReadOnlyMemory<byte>))
540+
{
541+
if (valueSerializer != null || asyncValueSerializer != null)
542+
{
543+
throw new ArgumentNullException(null, "Value serializer should not be specified for Memory<byte>");
544+
}
545+
}
546+
else if (valueSerializer == null && asyncValueSerializer == null)
533547
{
534548
if (!defaultSerializers.TryGetValue(typeof(TValue), out object serializer))
535549
{
@@ -750,12 +764,23 @@ public async Task<DeliveryResult<TKey, TValue>> ProduceAsync(
750764
{
751765
Headers headers = message.Headers ?? new Headers();
752766

753-
byte[] keyBytes;
767+
ReadOnlyMemory<byte> keyBytes;
754768
try
755769
{
756-
keyBytes = (keySerializer != null)
757-
? keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers))
758-
: await asyncKeySerializer.SerializeAsync(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)).ConfigureAwait(false);
770+
if (message.Key is Memory<byte> memory)
771+
{
772+
keyBytes = memory;
773+
}
774+
else if (message.Key is ReadOnlyMemory<byte> readOnlyMemory)
775+
{
776+
keyBytes = readOnlyMemory;
777+
}
778+
else
779+
{
780+
keyBytes = (keySerializer != null)
781+
? keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers))
782+
: await asyncKeySerializer.SerializeAsync(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)).ConfigureAwait(false);
783+
}
759784
}
760785
catch (Exception ex)
761786
{
@@ -769,12 +794,23 @@ public async Task<DeliveryResult<TKey, TValue>> ProduceAsync(
769794
ex);
770795
}
771796

772-
byte[] valBytes;
797+
ReadOnlyMemory<byte> valBytes;
773798
try
774799
{
775-
valBytes = (valueSerializer != null)
776-
? valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers))
777-
: await asyncValueSerializer.SerializeAsync(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)).ConfigureAwait(false);
800+
if (message.Value is Memory<byte> memory)
801+
{
802+
valBytes = memory;
803+
}
804+
else if (message.Value is ReadOnlyMemory<byte> readOnlyMemory)
805+
{
806+
valBytes = readOnlyMemory;
807+
}
808+
else
809+
{
810+
valBytes = (valueSerializer != null)
811+
? valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers))
812+
: await asyncValueSerializer.SerializeAsync(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)).ConfigureAwait(false);
813+
}
778814
}
779815
catch (Exception ex)
780816
{
@@ -805,8 +841,8 @@ public async Task<DeliveryResult<TKey, TValue>> ProduceAsync(
805841

806842
ProduceImpl(
807843
topicPartition.Topic,
808-
valBytes, 0, valBytes == null ? 0 : valBytes.Length,
809-
keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length,
844+
valBytes,
845+
keyBytes,
810846
message.Timestamp, topicPartition.Partition, headers.BackingList,
811847
handler);
812848

@@ -816,8 +852,8 @@ public async Task<DeliveryResult<TKey, TValue>> ProduceAsync(
816852
{
817853
ProduceImpl(
818854
topicPartition.Topic,
819-
valBytes, 0, valBytes == null ? 0 : valBytes.Length,
820-
keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length,
855+
valBytes,
856+
keyBytes,
821857
message.Timestamp, topicPartition.Partition, headers.BackingList,
822858
null);
823859

@@ -873,12 +909,23 @@ public void Produce(
873909

874910
Headers headers = message.Headers ?? new Headers();
875911

876-
byte[] keyBytes;
912+
ReadOnlyMemory<byte> keyBytes;
877913
try
878914
{
879-
keyBytes = (keySerializer != null)
880-
? keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers))
881-
: throw new InvalidOperationException("Produce called with an IAsyncSerializer key serializer configured but an ISerializer is required.");
915+
if (message.Key is Memory<byte> memory)
916+
{
917+
keyBytes = memory;
918+
}
919+
else if (message.Key is ReadOnlyMemory<byte> readOnlyMemory)
920+
{
921+
keyBytes = readOnlyMemory;
922+
}
923+
else
924+
{
925+
keyBytes = (keySerializer != null)
926+
? keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers))
927+
: throw new InvalidOperationException("Produce called with an IAsyncSerializer key serializer configured but an ISerializer is required.");
928+
}
882929
}
883930
catch (Exception ex)
884931
{
@@ -892,12 +939,23 @@ public void Produce(
892939
ex);
893940
}
894941

895-
byte[] valBytes;
942+
ReadOnlyMemory<byte> valBytes;
896943
try
897944
{
898-
valBytes = (valueSerializer != null)
899-
? valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers))
900-
: throw new InvalidOperationException("Produce called with an IAsyncSerializer value serializer configured but an ISerializer is required.");
945+
if (message.Value is Memory<byte> memory)
946+
{
947+
valBytes = memory;
948+
}
949+
else if (message.Value is ReadOnlyMemory<byte> readOnlyMemory)
950+
{
951+
valBytes = readOnlyMemory;
952+
}
953+
else
954+
{
955+
valBytes = (valueSerializer != null)
956+
? valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers))
957+
: throw new InvalidOperationException("Produce called with an IAsyncSerializer value serializer configured but an ISerializer is required.");
958+
}
901959
}
902960
catch (Exception ex)
903961
{
@@ -915,8 +973,8 @@ public void Produce(
915973
{
916974
ProduceImpl(
917975
topicPartition.Topic,
918-
valBytes, 0, valBytes == null ? 0 : valBytes.Length,
919-
keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length,
976+
valBytes,
977+
keyBytes,
920978
message.Timestamp, topicPartition.Partition,
921979
headers.BackingList,
922980
deliveryHandler == null

test/Confluent.Kafka.IntegrationTests/Tests/Producer_Produce_Async.cs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,5 +60,36 @@ public void Producer_Produce_Async(string bootstrapServers)
6060
Assert.Equal(0, Library.HandleCount);
6161
LogToFile("end Producer_Produce_Async");
6262
}
63+
64+
[Theory, MemberData(nameof(KafkaParameters))]
65+
public void Producer_Produce_Memory_Async(string bootstrapServers)
66+
{
67+
LogToFile("start Producer_Produce_Memory_Async");
68+
69+
var producerConfig = new ProducerConfig { BootstrapServers = bootstrapServers };
70+
71+
using (var testTopic = new TemporaryTopic(bootstrapServers, 1))
72+
using (var producer = new TestProducerBuilder<Null, Memory<byte>>(producerConfig)
73+
.Build())
74+
using (var dProducer = new DependentProducerBuilder<ReadOnlyMemory<byte>, Null>(producer.Handle)
75+
.Build())
76+
{
77+
Memory<byte> data = new byte[] { 1, 2, 3, 4 };
78+
Assert.Throws<ProduceException<Null, Memory<byte>>>(
79+
() => producer.Produce(testTopic.Name, new Message<Null, Memory<byte>> { Value = data }));
80+
81+
Assert.Throws<ProduceException<Null, Memory<byte>>>(
82+
() => producer.Produce(testTopic.Name, new Message<Null, Memory<byte>> { Value = data }, dr => { Assert.True(false); }));
83+
84+
Assert.Throws<ProduceException<ReadOnlyMemory<byte>, Null>>(
85+
() => dProducer.Produce(testTopic.Name, new Message<ReadOnlyMemory<byte>, Null> { Key = data }));
86+
87+
Assert.Throws<ProduceException<ReadOnlyMemory<byte>, Null>>(
88+
() => dProducer.Produce(testTopic.Name, new Message<ReadOnlyMemory<byte>, Null> { Key = data }, dr => { Assert.True(false); }));
89+
}
90+
91+
Assert.Equal(0, Library.HandleCount);
92+
LogToFile("end Producer_Produce_Memory_Async");
93+
}
6394
}
6495
}

0 commit comments

Comments
 (0)