Skip to content

Commit 5a28fb7

Browse files
committed
Support nullable memory too
1 parent 7106e8d commit 5a28fb7

File tree

1 file changed

+54
-38
lines changed

1 file changed

+54
-38
lines changed

src/Confluent.Kafka/Producer.cs

Lines changed: 54 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -512,7 +512,10 @@ private void InitializeSerializers(
512512
{
513513
this.keySerializer = (ISerializer<TKey>)serializer;
514514
}
515-
else if (typeof(TValue) == typeof(Memory<byte>) || typeof(TValue) == typeof(ReadOnlyMemory<byte>))
515+
else if (typeof(TKey) == typeof(Memory<byte>)
516+
|| typeof(TKey) == typeof(ReadOnlyMemory<byte>)
517+
|| typeof(TKey) == typeof(Memory<byte>?)
518+
|| typeof(TKey) == typeof(ReadOnlyMemory<byte>?))
516519
{
517520
// Serializers are not used for Memory<byte>.
518521
}
@@ -542,7 +545,10 @@ private void InitializeSerializers(
542545
{
543546
this.valueSerializer = (ISerializer<TValue>)serializer;
544547
}
545-
else if (typeof(TValue) == typeof(Memory<byte>) || typeof(TValue) == typeof(ReadOnlyMemory<byte>))
548+
else if (typeof(TValue) == typeof(Memory<byte>)
549+
|| typeof(TValue) == typeof(ReadOnlyMemory<byte>)
550+
|| typeof(TValue) == typeof(Memory<byte>?)
551+
|| typeof(TValue) == typeof(ReadOnlyMemory<byte>?))
546552
{
547553
// Serializers are not used for Memory<byte>.
548554
}
@@ -765,24 +771,27 @@ public async Task<DeliveryResult<TKey, TValue>> ProduceAsync(
765771
{
766772
Headers headers = message.Headers ?? new Headers();
767773

768-
ReadOnlyMemory<byte>? keyBytes;
774+
ReadOnlyMemory<byte>? keyBytes = null;
769775
try
770776
{
771-
if (message.Key is Memory<byte> memory)
777+
if (keySerializer != null)
778+
{
779+
byte[] keyBytesArray = keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers));
780+
keyBytes = keyBytesArray == null ? (ReadOnlyMemory<byte>?)null : keyBytesArray;
781+
}
782+
else if (asyncKeySerializer != null)
783+
{
784+
byte[] keyBytesArray = await asyncKeySerializer.SerializeAsync(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)).ConfigureAwait(false);
785+
keyBytes = keyBytesArray == null ? (ReadOnlyMemory<byte>?)null : keyBytesArray;
786+
}
787+
else if (message.Key is Memory<byte> memory)
772788
{
773789
keyBytes = memory;
774790
}
775791
else if (message.Key is ReadOnlyMemory<byte> readOnlyMemory)
776792
{
777793
keyBytes = readOnlyMemory;
778794
}
779-
else
780-
{
781-
byte[] keyBytesArray = keySerializer != null
782-
? keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers))
783-
: await asyncKeySerializer.SerializeAsync(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)).ConfigureAwait(false);
784-
keyBytes = keyBytesArray == null ? (ReadOnlyMemory<byte>?)null : keyBytesArray;
785-
}
786795
}
787796
catch (Exception ex)
788797
{
@@ -796,24 +805,27 @@ public async Task<DeliveryResult<TKey, TValue>> ProduceAsync(
796805
ex);
797806
}
798807

799-
ReadOnlyMemory<byte>? valBytes;
808+
ReadOnlyMemory<byte>? valBytes = null;
800809
try
801810
{
802-
if (message.Value is Memory<byte> memory)
811+
if (valueSerializer != null)
812+
{
813+
byte[] valueBytesArray = valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers));
814+
valBytes = valueBytesArray == null ? (ReadOnlyMemory<byte>?)null : valueBytesArray;
815+
}
816+
else if (asyncValueSerializer != null)
817+
{
818+
byte[] valueBytesArray = await asyncValueSerializer.SerializeAsync(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)).ConfigureAwait(false);
819+
valBytes = valueBytesArray == null ? (ReadOnlyMemory<byte>?)null : valueBytesArray;
820+
}
821+
else if (message.Value is Memory<byte> memory)
803822
{
804823
valBytes = memory;
805824
}
806825
else if (message.Value is ReadOnlyMemory<byte> readOnlyMemory)
807826
{
808827
valBytes = readOnlyMemory;
809828
}
810-
else
811-
{
812-
byte[] valBytesArray = valueSerializer != null
813-
? valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers))
814-
: await asyncValueSerializer.SerializeAsync(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)).ConfigureAwait(false);
815-
valBytes = valBytesArray == null ? (ReadOnlyMemory<byte>?)null : valBytesArray;
816-
}
817829
}
818830
catch (Exception ex)
819831
{
@@ -912,24 +924,26 @@ public void Produce(
912924

913925
Headers headers = message.Headers ?? new Headers();
914926

915-
ReadOnlyMemory<byte>? keyBytes;
927+
ReadOnlyMemory<byte>? keyBytes = null;
916928
try
917929
{
918-
if (message.Key is Memory<byte> memory)
930+
if (keySerializer != null)
931+
{
932+
byte[] keyBytesArray = keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers));
933+
keyBytes = keyBytesArray == null ? (ReadOnlyMemory<byte>?)null : keyBytesArray;
934+
}
935+
else if (asyncKeySerializer != null)
936+
{
937+
throw new InvalidOperationException("Produce called with an IAsyncSerializer key serializer configured but an ISerializer is required.");
938+
}
939+
else if (message.Key is Memory<byte> memory)
919940
{
920941
keyBytes = memory;
921942
}
922943
else if (message.Key is ReadOnlyMemory<byte> readOnlyMemory)
923944
{
924945
keyBytes = readOnlyMemory;
925946
}
926-
else
927-
{
928-
byte[] keyBytesArray = keySerializer != null
929-
? keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers))
930-
: throw new InvalidOperationException("Produce called with an IAsyncSerializer key serializer configured but an ISerializer is required.");
931-
keyBytes = keyBytesArray == null ? (ReadOnlyMemory<byte>?)null : keyBytesArray;
932-
}
933947
}
934948
catch (Exception ex)
935949
{
@@ -943,24 +957,26 @@ public void Produce(
943957
ex);
944958
}
945959

946-
ReadOnlyMemory<byte>? valBytes;
960+
ReadOnlyMemory<byte>? valBytes = null;
947961
try
948962
{
949-
if (message.Value is Memory<byte> memory)
963+
if (valueSerializer != null)
964+
{
965+
byte[] valueBytesArray = valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers));
966+
valBytes = valueBytesArray == null ? (ReadOnlyMemory<byte>?)null : valueBytesArray;
967+
}
968+
else if (asyncValueSerializer != null)
969+
{
970+
throw new InvalidOperationException("Produce called with an IAsyncSerializer value serializer configured but an ISerializer is required.");
971+
}
972+
else if (message.Value is Memory<byte> memory)
950973
{
951974
valBytes = memory;
952975
}
953976
else if (message.Value is ReadOnlyMemory<byte> readOnlyMemory)
954977
{
955978
valBytes = readOnlyMemory;
956979
}
957-
else
958-
{
959-
byte[] valBytesArray = valueSerializer != null
960-
? valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers))
961-
: throw new InvalidOperationException("Produce called with an IAsyncSerializer value serializer configured but an ISerializer is required.");
962-
valBytes = valBytesArray == null ? (ReadOnlyMemory<byte>?)null : valBytesArray;
963-
}
964980
}
965981
catch (Exception ex)
966982
{

0 commit comments

Comments
 (0)