Skip to content

Commit 7106e8d

Browse files
committed
Fix null values beingconverted to empty array
1 parent b19ee6a commit 7106e8d

File tree

1 file changed

+14
-10
lines changed

1 file changed

+14
-10
lines changed

src/Confluent.Kafka/Producer.cs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -279,8 +279,8 @@ private void DeliveryReportCallbackImpl(IntPtr rk, IntPtr rkmessage, IntPtr opaq
279279

280280
private void ProduceImpl(
281281
string topic,
282-
ReadOnlyMemory<byte> val,
283-
ReadOnlyMemory<byte> key,
282+
ReadOnlyMemory<byte>? val,
283+
ReadOnlyMemory<byte>? key,
284284
Timestamp timestamp,
285285
Partition partition,
286286
IReadOnlyList<IHeader> headers,
@@ -765,7 +765,7 @@ public async Task<DeliveryResult<TKey, TValue>> ProduceAsync(
765765
{
766766
Headers headers = message.Headers ?? new Headers();
767767

768-
ReadOnlyMemory<byte> keyBytes;
768+
ReadOnlyMemory<byte>? keyBytes;
769769
try
770770
{
771771
if (message.Key is Memory<byte> memory)
@@ -778,9 +778,10 @@ public async Task<DeliveryResult<TKey, TValue>> ProduceAsync(
778778
}
779779
else
780780
{
781-
keyBytes = (keySerializer != null)
781+
byte[] keyBytesArray = keySerializer != null
782782
? keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers))
783783
: await asyncKeySerializer.SerializeAsync(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)).ConfigureAwait(false);
784+
keyBytes = keyBytesArray == null ? (ReadOnlyMemory<byte>?)null : keyBytesArray;
784785
}
785786
}
786787
catch (Exception ex)
@@ -795,7 +796,7 @@ public async Task<DeliveryResult<TKey, TValue>> ProduceAsync(
795796
ex);
796797
}
797798

798-
ReadOnlyMemory<byte> valBytes;
799+
ReadOnlyMemory<byte>? valBytes;
799800
try
800801
{
801802
if (message.Value is Memory<byte> memory)
@@ -808,9 +809,10 @@ public async Task<DeliveryResult<TKey, TValue>> ProduceAsync(
808809
}
809810
else
810811
{
811-
valBytes = (valueSerializer != null)
812+
byte[] valBytesArray = valueSerializer != null
812813
? valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers))
813814
: await asyncValueSerializer.SerializeAsync(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)).ConfigureAwait(false);
815+
valBytes = valBytesArray == null ? (ReadOnlyMemory<byte>?)null : valBytesArray;
814816
}
815817
}
816818
catch (Exception ex)
@@ -910,7 +912,7 @@ public void Produce(
910912

911913
Headers headers = message.Headers ?? new Headers();
912914

913-
ReadOnlyMemory<byte> keyBytes;
915+
ReadOnlyMemory<byte>? keyBytes;
914916
try
915917
{
916918
if (message.Key is Memory<byte> memory)
@@ -923,9 +925,10 @@ public void Produce(
923925
}
924926
else
925927
{
926-
keyBytes = (keySerializer != null)
928+
byte[] keyBytesArray = keySerializer != null
927929
? keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers))
928930
: throw new InvalidOperationException("Produce called with an IAsyncSerializer key serializer configured but an ISerializer is required.");
931+
keyBytes = keyBytesArray == null ? (ReadOnlyMemory<byte>?)null : keyBytesArray;
929932
}
930933
}
931934
catch (Exception ex)
@@ -940,7 +943,7 @@ public void Produce(
940943
ex);
941944
}
942945

943-
ReadOnlyMemory<byte> valBytes;
946+
ReadOnlyMemory<byte>? valBytes;
944947
try
945948
{
946949
if (message.Value is Memory<byte> memory)
@@ -953,9 +956,10 @@ public void Produce(
953956
}
954957
else
955958
{
956-
valBytes = (valueSerializer != null)
959+
byte[] valBytesArray = valueSerializer != null
957960
? valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers))
958961
: throw new InvalidOperationException("Produce called with an IAsyncSerializer value serializer configured but an ISerializer is required.");
962+
valBytes = valBytesArray == null ? (ReadOnlyMemory<byte>?)null : valBytesArray;
959963
}
960964
}
961965
catch (Exception ex)

0 commit comments

Comments
 (0)