Skip to content

Commit 4c2bc3c

Browse files
authored
run async serdes on thread pool thread (#838)
1 parent d1557b2 commit 4c2bc3c

File tree

2 files changed

+5
-5
lines changed

2 files changed

+5
-5
lines changed

src/Confluent.Kafka/Consumer.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1102,14 +1102,14 @@ private ConsumeResult<TKey, TValue> ConsumeViaBytes(int millisecondsTimeout)
11021102

11031103
TKey key = keyDeserializer != null
11041104
? keyDeserializer.Deserialize(rawResult.Key, rawResult.Key == null, new SerializationContext(MessageComponentType.Key, rawResult.Topic))
1105-
: asyncKeyDeserializer.DeserializeAsync(new ReadOnlyMemory<byte>(rawResult.Key), rawResult.Key == null, new SerializationContext(MessageComponentType.Key, rawResult.Topic))
1105+
: Task.Run(async () => await asyncKeyDeserializer.DeserializeAsync(new ReadOnlyMemory<byte>(rawResult.Key), rawResult.Key == null, new SerializationContext(MessageComponentType.Key, rawResult.Topic)))
11061106
.ConfigureAwait(continueOnCapturedContext: false)
11071107
.GetAwaiter()
11081108
.GetResult();
11091109

11101110
TValue val = valueDeserializer != null
11111111
? valueDeserializer.Deserialize(rawResult.Value, rawResult.Value == null, new SerializationContext(MessageComponentType.Value, rawResult.Topic))
1112-
: asyncValueDeserializer.DeserializeAsync(new ReadOnlyMemory<byte>(rawResult.Value), rawResult == null, new SerializationContext(MessageComponentType.Value, rawResult.Topic))
1112+
: Task.Run(async () => await asyncValueDeserializer.DeserializeAsync(new ReadOnlyMemory<byte>(rawResult.Value), rawResult == null, new SerializationContext(MessageComponentType.Value, rawResult.Topic)))
11131113
.ConfigureAwait(continueOnCapturedContext: false)
11141114
.GetAwaiter()
11151115
.GetResult();

src/Confluent.Kafka/Producer.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -826,8 +826,8 @@ public void BeginProduce(
826826
{
827827
keyBytes = (keySerializer != null)
828828
? keySerializer.Serialize(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic))
829-
: asyncKeySerializer.SerializeAsync(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic))
830-
.ConfigureAwait(continueOnCapturedContext: false)
829+
: Task.Run(async () => await asyncKeySerializer.SerializeAsync(message.Key, new SerializationContext(MessageComponentType.Key, topicPartition.Topic)))
830+
.ConfigureAwait(false)
831831
.GetAwaiter()
832832
.GetResult();
833833
}
@@ -848,7 +848,7 @@ public void BeginProduce(
848848
{
849849
valBytes = (valueSerializer != null)
850850
? valueSerializer.Serialize(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic))
851-
: asyncValueSerializer.SerializeAsync(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic))
851+
: Task.Run(async () => await asyncValueSerializer.SerializeAsync(message.Value, new SerializationContext(MessageComponentType.Value, topicPartition.Topic)))
852852
.ConfigureAwait(continueOnCapturedContext: false)
853853
.GetAwaiter()
854854
.GetResult();

0 commit comments

Comments
 (0)