@@ -422,7 +422,7 @@ public void StoreOffset(ConsumeResult<TKey, TValue> result)
422422 /// `enable.auto.offset.store` must be set to "false" when using this API.
423423 /// </remarks>
424424 /// <param name="offset">
425- /// The offset to be commited .
425+ /// The offset to be committed .
426426 /// </param>
427427 /// <exception cref="Confluent.Kafka.KafkaException">
428428 /// Thrown if the request failed.
@@ -1014,7 +1014,7 @@ private ConsumeResult<K, V> ConsumeImpl<K,V>(
10141014 new SerializationContext ( MessageComponentType . Key , topic ) ) ;
10151015 }
10161016 }
1017- catch ( Exception exception )
1017+ catch ( Exception ex )
10181018 {
10191019 throw new ConsumeException (
10201020 new ConsumeResult < byte [ ] , byte [ ] >
@@ -1030,7 +1030,7 @@ private ConsumeResult<K, V> ConsumeImpl<K,V>(
10301030 IsPartitionEOF = false
10311031 } ,
10321032 new Error ( ErrorCode . Local_KeyDeserialization ) ,
1033- exception ) ;
1033+ ex ) ;
10341034 }
10351035
10361036 V val ;
@@ -1046,7 +1046,7 @@ private ConsumeResult<K, V> ConsumeImpl<K,V>(
10461046 new SerializationContext ( MessageComponentType . Value , topic ) ) ;
10471047 }
10481048 }
1049- catch ( Exception exception )
1049+ catch ( Exception ex )
10501050 {
10511051 throw new ConsumeException (
10521052 new ConsumeResult < byte [ ] , byte [ ] >
@@ -1062,7 +1062,7 @@ private ConsumeResult<K, V> ConsumeImpl<K,V>(
10621062 IsPartitionEOF = false
10631063 } ,
10641064 new Error ( ErrorCode . Local_ValueDeserialization ) ,
1065- exception ) ;
1065+ ex ) ;
10661066 }
10671067
10681068 return new ConsumeResult < K , V >
@@ -1100,19 +1100,35 @@ private ConsumeResult<TKey, TValue> ConsumeViaBytes(int millisecondsTimeout)
11001100 } ;
11011101 }
11021102
1103- TKey key = keyDeserializer != null
1104- ? keyDeserializer . Deserialize ( rawResult . Key , rawResult . Key == null , new SerializationContext ( MessageComponentType . Key , rawResult . Topic ) )
1105- : asyncKeyDeserializer . DeserializeAsync ( new ReadOnlyMemory < byte > ( rawResult . Key ) , rawResult . Key == null , new SerializationContext ( MessageComponentType . Key , rawResult . Topic ) )
1106- . ConfigureAwait ( continueOnCapturedContext : false )
1107- . GetAwaiter ( )
1108- . GetResult ( ) ;
1103+ TKey key ;
1104+ try
1105+ {
1106+ key = keyDeserializer != null
1107+ ? keyDeserializer . Deserialize ( rawResult . Key , rawResult . Key == null , new SerializationContext ( MessageComponentType . Key , rawResult . Topic ) )
1108+ : Task . Run ( async ( ) => await asyncKeyDeserializer . DeserializeAsync ( new ReadOnlyMemory < byte > ( rawResult . Key ) , rawResult . Key == null , new SerializationContext ( MessageComponentType . Key , rawResult . Topic ) ) )
1109+ . ConfigureAwait ( continueOnCapturedContext : false )
1110+ . GetAwaiter ( )
1111+ . GetResult ( ) ;
1112+ }
1113+ catch ( Exception ex )
1114+ {
1115+ throw new ConsumeException ( rawResult , new Error ( ErrorCode . Local_KeyDeserialization ) , ex ) ;
1116+ }
11091117
1110- TValue val = valueDeserializer != null
1111- ? valueDeserializer . Deserialize ( rawResult . Value , rawResult . Value == null , new SerializationContext ( MessageComponentType . Value , rawResult . Topic ) )
1112- : asyncValueDeserializer . DeserializeAsync ( new ReadOnlyMemory < byte > ( rawResult . Value ) , rawResult == null , new SerializationContext ( MessageComponentType . Value , rawResult . Topic ) )
1113- . ConfigureAwait ( continueOnCapturedContext : false )
1114- . GetAwaiter ( )
1115- . GetResult ( ) ;
1118+ TValue val ;
1119+ try
1120+ {
1121+ val = valueDeserializer != null
1122+ ? valueDeserializer . Deserialize ( rawResult . Value , rawResult . Value == null , new SerializationContext ( MessageComponentType . Value , rawResult . Topic ) )
1123+ : Task . Run ( async ( ) => await asyncValueDeserializer . DeserializeAsync ( new ReadOnlyMemory < byte > ( rawResult . Value ) , rawResult == null , new SerializationContext ( MessageComponentType . Value , rawResult . Topic ) ) )
1124+ . ConfigureAwait ( continueOnCapturedContext : false )
1125+ . GetAwaiter ( )
1126+ . GetResult ( ) ;
1127+ }
1128+ catch ( Exception ex )
1129+ {
1130+ throw new ConsumeException ( rawResult , new Error ( ErrorCode . Local_ValueDeserialization ) , ex ) ;
1131+ }
11161132
11171133 return new ConsumeResult < TKey , TValue >
11181134 {
@@ -1140,10 +1156,18 @@ private ConsumeResult<TKey, TValue> ConsumeViaBytes(int millisecondsTimeout)
11401156 /// The consume result.
11411157 /// </returns>
11421158 /// <remarks>
1143- /// OnPartitionsAssigned/Revoked and OnOffsetsCommitted events may
1159+ /// The partitions assigned/revoked and offsets committed handlers may
11441160 /// be invoked as a side-effect of calling this method (on the same
11451161 /// thread).
11461162 /// </remarks>
1163+ /// <exception cref="ConsumeException">
1164+ /// Thrown when a call to this method is unsuccessful for any reason
1165+ /// (except cancellation by user). Inspect the Error property of the
1166+ /// exception for detailed information.
1167+ /// </exception>
1168+ /// <exception cref="OperationCanceledException">
1169+ /// Thrown on cancellation.
1170+ /// </exception>
11471171 public ConsumeResult < TKey , TValue > Consume ( CancellationToken cancellationToken = default ( CancellationToken ) )
11481172 {
11491173 while ( true )
@@ -1172,10 +1196,14 @@ private ConsumeResult<TKey, TValue> ConsumeViaBytes(int millisecondsTimeout)
11721196 /// The consume result.
11731197 /// </returns>
11741198 /// <remarks>
1175- /// OnPartitionsAssigned/Revoked and OnOffsetsCommitted events may
1199+ /// The partitions assigned/revoked and offsets committed handlers may
11761200 /// be invoked as a side-effect of calling this method (on the same
11771201 /// thread).
11781202 /// </remarks>
1203+ /// <exception cref="ConsumeException">
1204+ /// Thrown when a call to this method is unsuccessful for any reason.
1205+ /// Inspect the Error property of the exception for detailed information.
1206+ /// </exception>
11791207 public ConsumeResult < TKey , TValue > Consume ( TimeSpan timeout )
11801208 => ( keyDeserializer != null && valueDeserializer != null )
11811209 ? ConsumeImpl < TKey , TValue > ( timeout . TotalMillisecondsAsInt ( ) , keyDeserializer , valueDeserializer ) // fast path for simple case
0 commit comments