Skip to content

Commit 343e7a6

Browse files
committed
Greatly simplify code
1 parent caf216a commit 343e7a6

File tree

1 file changed

+32
-102
lines changed

1 file changed

+32
-102
lines changed

src/Confluent.Kafka/Producer.cs

Lines changed: 32 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ internal class Config
4242
public PartitionerDelegate defaultPartitioner;
4343
}
4444

45-
private ISerializer<TKey> keySerializer;
46-
private ISerializer<TValue> valueSerializer;
45+
private Func<TKey, SerializationContext, ReadOnlyMemory<byte>?> serializeKey;
46+
private Func<TValue, SerializationContext, ReadOnlyMemory<byte>?> serializeValue;
4747
private IAsyncSerializer<TKey> asyncKeySerializer;
4848
private IAsyncSerializer<TValue> asyncValueSerializer;
4949

@@ -58,6 +58,14 @@ internal class Config
5858
{ typeof(byte[]), Serializers.ByteArray }
5959
};
6060

61+
private static readonly Dictionary<Type, object> memorySerializeFuncs = new Dictionary<Type, object>
62+
{
63+
[typeof(Memory<byte>)] = (Memory<byte> x, SerializationContext _) => (ReadOnlyMemory<byte>?)x,
64+
[typeof(Memory<byte>?)] = (Memory<byte>? x, SerializationContext _) => (ReadOnlyMemory<byte>?)x,
65+
[typeof(ReadOnlyMemory<byte>)] = (ReadOnlyMemory<byte> x, SerializationContext _) => (ReadOnlyMemory<byte>?)x,
66+
[typeof(ReadOnlyMemory<byte>?)] = (ReadOnlyMemory<byte>? x, SerializationContext _) => x,
67+
};
68+
6169
private int cancellationDelayMaxMs;
6270
private bool disposeHasBeenCalled = false;
6371
private object disposeHasBeenCalledLockObj = new object();
@@ -510,14 +518,12 @@ private void InitializeSerializers(
510518
{
511519
if (defaultSerializers.TryGetValue(typeof(TKey), out object serializer))
512520
{
513-
this.keySerializer = (ISerializer<TKey>)serializer;
521+
keySerializer = (ISerializer<TKey>)serializer;
522+
this.serializeKey = (k, ctx) => keySerializer.Serialize(k, ctx)?.AsMemory();
514523
}
515-
else if (typeof(TKey) == typeof(Memory<byte>)
516-
|| typeof(TKey) == typeof(ReadOnlyMemory<byte>)
517-
|| typeof(TKey) == typeof(Memory<byte>?)
518-
|| typeof(TKey) == typeof(ReadOnlyMemory<byte>?))
524+
else if (memorySerializeFuncs.TryGetValue(typeof(TKey), out object serialize))
519525
{
520-
// Serializers are not used for Memory<byte>.
526+
this.serializeKey = (Func<TKey, SerializationContext, ReadOnlyMemory<byte>?>)serialize;
521527
}
522528
else
523529
{
@@ -531,7 +537,7 @@ private void InitializeSerializers(
531537
}
532538
else if (keySerializer != null && asyncKeySerializer == null)
533539
{
534-
this.keySerializer = keySerializer;
540+
this.serializeKey = (k, ctx) => keySerializer.Serialize(k, ctx)?.AsMemory();
535541
}
536542
else
537543
{
@@ -543,14 +549,12 @@ private void InitializeSerializers(
543549
{
544550
if (defaultSerializers.TryGetValue(typeof(TValue), out object serializer))
545551
{
546-
this.valueSerializer = (ISerializer<TValue>)serializer;
552+
valueSerializer = (ISerializer<TValue>)serializer;
553+
this.serializeValue = (k, ctx) => valueSerializer.Serialize(k, ctx)?.AsMemory();
547554
}
548-
else if (typeof(TValue) == typeof(Memory<byte>)
549-
|| typeof(TValue) == typeof(ReadOnlyMemory<byte>)
550-
|| typeof(TValue) == typeof(Memory<byte>?)
551-
|| typeof(TValue) == typeof(ReadOnlyMemory<byte>?))
555+
else if (memorySerializeFuncs.TryGetValue(typeof(TKey), out object serialize))
552556
{
553-
// Serializers are not used for Memory<byte>.
557+
this.serializeValue = (Func<TValue, SerializationContext, ReadOnlyMemory<byte>?>)serialize;
554558
}
555559
else
556560
{
@@ -564,7 +568,7 @@ private void InitializeSerializers(
564568
}
565569
else if (valueSerializer != null && asyncValueSerializer == null)
566570
{
567-
this.valueSerializer = valueSerializer;
571+
this.serializeValue = (k, ctx) => valueSerializer.Serialize(k, ctx)?.AsMemory();
568572
}
569573
else
570574
{
@@ -773,28 +777,9 @@ public async Task<DeliveryResult<TKey, TValue>> ProduceAsync(
773777
ReadOnlyMemory<byte>? keyBytes;
774778
try
775779
{
776-
if (keySerializer != null)
777-
{
778-
SerializationContext ctx = new(MessageComponentType.Key, topicPartition.Topic, headers);
779-
keyBytes = keySerializer.Serialize(message.Key, ctx)?.AsMemory();
780-
}
781-
else if (asyncKeySerializer != null)
782-
{
783-
SerializationContext ctx = new(MessageComponentType.Key, topicPartition.Topic, headers);
784-
keyBytes = (await asyncKeySerializer.SerializeAsync(message.Key, ctx).ConfigureAwait(false))?.AsMemory();
785-
}
786-
else if (message.Key is Memory<byte> memory)
787-
{
788-
keyBytes = memory;
789-
}
790-
else if (message.Key is ReadOnlyMemory<byte> readOnlyMemory)
791-
{
792-
keyBytes = readOnlyMemory;
793-
}
794-
else // Null Memory<byte>?
795-
{
796-
keyBytes = null;
797-
}
780+
keyBytes = (serializeKey != null)
781+
? serializeKey(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers))
782+
: await asyncKeySerializer.SerializeAsync(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers)).ConfigureAwait(false);
798783
}
799784
catch (Exception ex)
800785
{
@@ -811,28 +796,9 @@ public async Task<DeliveryResult<TKey, TValue>> ProduceAsync(
811796
ReadOnlyMemory<byte>? valBytes;
812797
try
813798
{
814-
if (valueSerializer != null)
815-
{
816-
SerializationContext ctx = new(MessageComponentType.Value, topicPartition.Topic, headers);
817-
valBytes = valueSerializer.Serialize(message.Value, ctx)?.AsMemory();
818-
}
819-
else if (asyncValueSerializer != null)
820-
{
821-
SerializationContext ctx = new(MessageComponentType.Value, topicPartition.Topic, headers);
822-
valBytes = (await asyncValueSerializer.SerializeAsync(message.Value, ctx).ConfigureAwait(false))?.AsMemory();
823-
}
824-
else if (message.Value is Memory<byte> memory)
825-
{
826-
valBytes = memory;
827-
}
828-
else if (message.Value is ReadOnlyMemory<byte> readOnlyMemory)
829-
{
830-
valBytes = readOnlyMemory;
831-
}
832-
else // Null Memory<byte>?
833-
{
834-
valBytes = null;
835-
}
799+
valBytes = (serializeValue != null)
800+
? serializeValue(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers))
801+
: await asyncValueSerializer.SerializeAsync(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers)).ConfigureAwait(false);
836802
}
837803
catch (Exception ex)
838804
{
@@ -934,27 +900,9 @@ public void Produce(
934900
ReadOnlyMemory<byte>? keyBytes;
935901
try
936902
{
937-
if (keySerializer != null)
938-
{
939-
SerializationContext ctx = new(MessageComponentType.Key, topicPartition.Topic, headers);
940-
keyBytes = keySerializer.Serialize(message.Key, ctx)?.AsMemory();
941-
}
942-
else if (asyncKeySerializer != null)
943-
{
944-
throw new InvalidOperationException("Produce called with an IAsyncSerializer key serializer configured but an ISerializer is required.");
945-
}
946-
else if (message.Key is Memory<byte> memory)
947-
{
948-
keyBytes = memory;
949-
}
950-
else if (message.Key is ReadOnlyMemory<byte> readOnlyMemory)
951-
{
952-
keyBytes = readOnlyMemory;
953-
}
954-
else // Null Memory<byte>?
955-
{
956-
keyBytes = null;
957-
}
903+
keyBytes = (serializeKey != null)
904+
? serializeKey(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic, headers))
905+
: throw new InvalidOperationException("Produce called with an IAsyncSerializer key serializer configured but an ISerializer is required.");
958906
}
959907
catch (Exception ex)
960908
{
@@ -971,27 +919,9 @@ public void Produce(
971919
ReadOnlyMemory<byte>? valBytes;
972920
try
973921
{
974-
if (valueSerializer != null)
975-
{
976-
SerializationContext ctx = new(MessageComponentType.Value, topicPartition.Topic, headers);
977-
valBytes = valueSerializer.Serialize(message.Value, ctx)?.AsMemory();
978-
}
979-
else if (asyncValueSerializer != null)
980-
{
981-
throw new InvalidOperationException("Produce called with an IAsyncSerializer value serializer configured but an ISerializer is required.");
982-
}
983-
else if (message.Value is Memory<byte> memory)
984-
{
985-
valBytes = memory;
986-
}
987-
else if (message.Value is ReadOnlyMemory<byte> readOnlyMemory)
988-
{
989-
valBytes = readOnlyMemory;
990-
}
991-
else // Null Memory<byte>?
992-
{
993-
valBytes = null;
994-
}
922+
valBytes = (serializeValue != null)
923+
? serializeValue(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic, headers))
924+
: throw new InvalidOperationException("Produce called with an IAsyncSerializer value serializer configured but an ISerializer is required.");
995925
}
996926
catch (Exception ex)
997927
{

0 commit comments

Comments
 (0)