Skip to content

Commit ca82c65

Browse files
authored
SerializationContext and IHeader (#784)
* typed header values * adding to the integration test * serde interface changes * minor tweaks * review changes * SerializationContext.Empty
1 parent b7b04fe commit ca82c65

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+343
-232
lines changed

examples/Producer/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public static async Task Main(string[] args)
9292

9393
try
9494
{
95-
// Notes: Awaiting the asynchronous produce request below prevents flow of execution
95+
// Note: Awaiting the asynchronous produce request below prevents flow of execution
9696
// from proceeding until the acknowledgement from the broker is received (at the
9797
// expense of low throughput).
9898
var deliveryReport = await producer.ProduceAsync(

examples/Protobuf/Program.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ namespace Confluent.Kafka.Examples.Protobuf
3535
/// </summary>
3636
public class ProtobufSerializer<T> : ISerializer<T> where T : IMessage<T>, new()
3737
{
38-
public byte[] Serialize(T data, bool isKey, MessageMetadata messageMetadata, TopicPartition destination)
38+
public byte[] Serialize(T data, SerializationContext context)
3939
=> data.ToByteArray();
4040
}
4141

@@ -51,7 +51,7 @@ public ProtobufDeserializer()
5151
parser = new MessageParser<T>(() => new T());
5252
}
5353

54-
public T Deserialize(ReadOnlySpan<byte> data, bool isNull, bool isKey, MessageMetadata messageMetadata, TopicPartition source)
54+
public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
5555
=> parser.ParseFrom(data.ToArray());
5656
}
5757

src/Confluent.Kafka/Consumer.cs

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -905,7 +905,7 @@ private ConsumeResult<K, V> ConsumeImpl<K,V>(
905905
headerValue = new byte[(int)sizep];
906906
Marshal.Copy(valuep, headerValue, 0, (int)sizep);
907907
}
908-
headers.Add(new Header(headerName, headerValue));
908+
headers.Add(headerName, headerValue);
909909
}
910910
}
911911
}
@@ -938,9 +938,7 @@ private ConsumeResult<K, V> ConsumeImpl<K,V>(
938938
? ReadOnlySpan<byte>.Empty
939939
: new ReadOnlySpan<byte>(msg.key.ToPointer(), (int)msg.key_len),
940940
msg.key == IntPtr.Zero,
941-
true,
942-
new MessageMetadata { Timestamp = timestamp, Headers = headers },
943-
new TopicPartition(topic, msg.partition));
941+
new SerializationContext(MessageComponentType.Key, topic));
944942
}
945943
}
946944
catch (Exception exception)
@@ -972,9 +970,7 @@ private ConsumeResult<K, V> ConsumeImpl<K,V>(
972970
? ReadOnlySpan<byte>.Empty
973971
: new ReadOnlySpan<byte>(msg.val.ToPointer(), (int)msg.len),
974972
msg.val == IntPtr.Zero,
975-
false,
976-
new MessageMetadata { Timestamp = timestamp, Headers = headers },
977-
new TopicPartition(topic, msg.partition));
973+
new SerializationContext(MessageComponentType.Value, topic));
978974
}
979975
}
980976
catch (Exception exception)
@@ -1032,15 +1028,15 @@ private ConsumeResult<TKey, TValue> ConsumeViaBytes(int millisecondsTimeout)
10321028
}
10331029

10341030
TKey key = keyDeserializer != null
1035-
? keyDeserializer.Deserialize(rawResult.Key, rawResult.Key == null, true, rawResult.Message, rawResult.TopicPartition)
1036-
: asyncKeyDeserializer.DeserializeAsync(new ReadOnlyMemory<byte>(rawResult.Key), rawResult.Key == null, true, rawResult.Message, rawResult.TopicPartition)
1031+
? keyDeserializer.Deserialize(rawResult.Key, rawResult.Key == null, new SerializationContext(MessageComponentType.Key, rawResult.Topic))
1032+
: asyncKeyDeserializer.DeserializeAsync(new ReadOnlyMemory<byte>(rawResult.Key), rawResult.Key == null, new SerializationContext(MessageComponentType.Key, rawResult.Topic))
10371033
.ConfigureAwait(continueOnCapturedContext: false)
10381034
.GetAwaiter()
10391035
.GetResult();
10401036

10411037
TValue val = valueDeserializer != null
1042-
? valueDeserializer.Deserialize(rawResult.Value, rawResult.Value == null, false, rawResult.Message, rawResult.TopicPartition)
1043-
: asyncValueDeserializer.DeserializeAsync(new ReadOnlyMemory<byte>(rawResult.Value), rawResult == null, false, rawResult.Message, rawResult.TopicPartition)
1038+
? valueDeserializer.Deserialize(rawResult.Value, rawResult.Value == null, new SerializationContext(MessageComponentType.Value, rawResult.Topic))
1039+
: asyncValueDeserializer.DeserializeAsync(new ReadOnlyMemory<byte>(rawResult.Value), rawResult == null, new SerializationContext(MessageComponentType.Value, rawResult.Topic))
10441040
.ConfigureAwait(continueOnCapturedContext: false)
10451041
.GetAwaiter()
10461042
.GetResult();

src/Confluent.Kafka/Deserializers.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public static class Deserializers
3232

3333
private class Utf8Deserializer : IDeserializer<string>
3434
{
35-
public string Deserialize(ReadOnlySpan<byte> data, bool isNull, bool isKey, MessageMetadata messageMetadata, TopicPartition source)
35+
public string Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
3636
{
3737
if (isNull)
3838
{
@@ -65,7 +65,7 @@ public string Deserialize(ReadOnlySpan<byte> data, bool isNull, bool isKey, Mess
6565

6666
private class NullDeserializer : IDeserializer<Null>
6767
{
68-
public Null Deserialize(ReadOnlySpan<byte> data, bool isNull, bool isKey, MessageMetadata messageMetadata, TopicPartition source)
68+
public Null Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
6969
{
7070
if (!isNull)
7171
{
@@ -83,7 +83,7 @@ public Null Deserialize(ReadOnlySpan<byte> data, bool isNull, bool isKey, Messag
8383

8484
private class IgnoreDeserializer : IDeserializer<Ignore>
8585
{
86-
public Ignore Deserialize(ReadOnlySpan<byte> data, bool isNull, bool isKey, MessageMetadata messageMetadata, TopicPartition source)
86+
public Ignore Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
8787
=> null;
8888
}
8989

@@ -94,7 +94,7 @@ public Ignore Deserialize(ReadOnlySpan<byte> data, bool isNull, bool isKey, Mess
9494

9595
private class Int64Deserializer : IDeserializer<long>
9696
{
97-
public long Deserialize(ReadOnlySpan<byte> data, bool isNull, bool isKey, MessageMetadata messageMetadata, TopicPartition source)
97+
public long Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
9898
{
9999
if (isNull)
100100
{
@@ -126,7 +126,7 @@ public long Deserialize(ReadOnlySpan<byte> data, bool isNull, bool isKey, Messag
126126

127127
private class Int32Deserializer : IDeserializer<int>
128128
{
129-
public int Deserialize(ReadOnlySpan<byte> data, bool isNull, bool isKey, MessageMetadata messageMetadata, TopicPartition source)
129+
public int Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
130130
{
131131
if (isNull)
132132
{
@@ -154,7 +154,7 @@ public int Deserialize(ReadOnlySpan<byte> data, bool isNull, bool isKey, Message
154154

155155
private class SingleDeserializer : IDeserializer<float>
156156
{
157-
public float Deserialize(ReadOnlySpan<byte> data, bool isNull, bool isKey, MessageMetadata messageMetadata, TopicPartition source)
157+
public float Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
158158
{
159159
if (isNull)
160160
{
@@ -205,7 +205,7 @@ public float Deserialize(ReadOnlySpan<byte> data, bool isNull, bool isKey, Messa
205205

206206
private class DoubleDeserializer : IDeserializer<double>
207207
{
208-
public double Deserialize(ReadOnlySpan<byte> data, bool isNull, bool isKey, MessageMetadata messageMetadata, TopicPartition source)
208+
public double Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
209209
{
210210
if (isNull)
211211
{
@@ -263,7 +263,7 @@ public double Deserialize(ReadOnlySpan<byte> data, bool isNull, bool isKey, Mess
263263

264264
private class ByteArrayDeserializer : IDeserializer<byte[]>
265265
{
266-
public byte[] Deserialize(ReadOnlySpan<byte> data, bool isNull, bool isKey, MessageMetadata messageMetadata, TopicPartition source)
266+
public byte[] Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
267267
{
268268
if (isNull) { return null; }
269269
return data.ToArray();

src/Confluent.Kafka/Header.cs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,23 @@ namespace Confluent.Kafka
2525
/// <remarks>
2626
/// Message headers are supported by v0.11 brokers and above.
2727
/// </remarks>
28-
public class Header
28+
public class Header : IHeader
2929
{
30+
private byte[] val;
31+
3032
/// <summary>
3133
/// The header key.
3234
/// </summary>
3335
public string Key { get; private set; }
3436

3537
/// <summary>
36-
/// The header value.
38+
/// Get the serialized header value data.
3739
/// </summary>
38-
public byte[] Value { get; private set; }
39-
40+
public byte[] GetValueBytes()
41+
{
42+
return val;
43+
}
44+
4045
/// <summary>
4146
/// Create a new Header instance.
4247
/// </summary>
@@ -54,7 +59,7 @@ public Header(string key, byte[] value)
5459
}
5560

5661
Key = key;
57-
Value = value;
62+
val = value;
5863
}
5964
}
6065
}

src/Confluent.Kafka/Headers.cs

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ namespace Confluent.Kafka
2727
/// <remarks>
2828
/// Message headers are supported by v0.11 brokers and above.
2929
/// </remarks>
30-
public class Headers : IEnumerable<Header>
30+
public class Headers : IEnumerable<IHeader>
3131
{
32-
private readonly List<Header> headers = new List<Header>();
32+
private readonly List<IHeader> headers = new List<IHeader>();
3333

3434
/// <summary>
3535
/// Append a new header to the collection.
@@ -75,9 +75,9 @@ public void Add(Header header)
7575
/// <exception cref="KeyNotFoundException">
7676
/// The key <paramref name="key" /> was not present in the collection.
7777
/// </exception>
78-
public byte[] GetLast(string key)
78+
public byte[] GetLastBytes(string key)
7979
{
80-
if (TryGetLast(key, out byte[] result))
80+
if (TryGetLastBytes(key, out byte[] result))
8181
{
8282
return result;
8383
}
@@ -100,21 +100,22 @@ public byte[] GetLast(string key)
100100
/// true if the a value with the specified key was present in
101101
/// the collection, false otherwise.
102102
/// </returns>
103-
public bool TryGetLast(string key, out byte[] lastHeader)
103+
public bool TryGetLastBytes(string key, out byte[] lastHeader)
104104
{
105105
for (int i=headers.Count-1; i>=0; --i)
106106
{
107107
if (headers[i].Key == key)
108108
{
109-
lastHeader = headers[i].Value;
109+
lastHeader = headers[i].GetValueBytes();
110110
return true;
111111
}
112112
}
113113

114-
lastHeader = null;
114+
lastHeader = default(byte[]);
115115
return false;
116116
}
117117

118+
118119
/// <summary>
119120
/// Removes all headers for the given key.
120121
/// </summary>
@@ -124,7 +125,7 @@ public bool TryGetLast(string key, out byte[] lastHeader)
124125
public void Remove(string key)
125126
=> headers.RemoveAll(a => a.Key == key);
126127

127-
internal class HeadersEnumerator : IEnumerator<Header>
128+
internal class HeadersEnumerator : IEnumerator<IHeader>
128129
{
129130
private Headers headers;
130131

@@ -136,10 +137,10 @@ public HeadersEnumerator(Headers headers)
136137
}
137138

138139
public object Current
139-
=> ((IEnumerator<Header>)this).Current;
140+
=> ((IEnumerator<IHeader>)this).Current;
140141

141-
Header IEnumerator<Header>.Current
142-
=> new Header(headers.headers[location].Key, headers.headers[location].Value);
142+
IHeader IEnumerator<IHeader>.Current
143+
=> headers.headers[location];
143144

144145
public void Dispose() {}
145146

@@ -166,7 +167,7 @@ public void Reset()
166167
/// <returns>
167168
/// An enumerator object that can be used to iterate through the headers collection.
168169
/// </returns>
169-
public IEnumerator<Header> GetEnumerator()
170+
public IEnumerator<IHeader> GetEnumerator()
170171
=> new HeadersEnumerator(this);
171172

172173
/// <summary>
@@ -184,7 +185,7 @@ IEnumerator IEnumerable.GetEnumerator()
184185
/// <param key="index">
185186
/// The zero-based index of the element to get.
186187
/// </param>
187-
public Header this[int index]
188+
public IHeader this[int index]
188189
=> headers[index];
189190

190191
/// <summary>

src/Confluent.Kafka/IAsyncDeserializer.cs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,21 +34,13 @@ public interface IAsyncDeserializer<T>
3434
/// <param name="isNull">
3535
/// True if this is a null value.
3636
/// </param>
37-
/// <param name="messageMetadata">
38-
/// Properties of the message the data is associated with
39-
/// extra to the key or value.
40-
/// </param>
41-
/// <param name="source">
42-
/// The TopicPartition from which the message was consumed.
43-
/// </param>
44-
/// <param name="isKey">
45-
/// True if deserializing the message key, false if deserializing the
46-
/// message value.
37+
/// <param name="context">
38+
/// Context relevant to the deserialize operation.
4739
/// </param>
4840
/// <returns>
4941
/// A <see cref="System.Threading.Tasks.Task" /> that completes
5042
/// with the deserialized value.
5143
/// </returns>
52-
Task<T> DeserializeAsync(ReadOnlyMemory<byte> data, bool isNull, bool isKey, MessageMetadata messageMetadata, TopicPartition source);
44+
Task<T> DeserializeAsync(ReadOnlyMemory<byte> data, bool isNull, SerializationContext context);
5345
}
5446
}

src/Confluent.Kafka/IAsyncSerializer.cs

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,22 +32,13 @@ public interface IAsyncSerializer<T>
3232
/// <param name="data">
3333
/// The value to serialize.
3434
/// </param>
35-
/// <param name="messageMetadata">
36-
/// Properties of the message the data is associated with
37-
/// extra to the key or value.
38-
/// </param>
39-
/// <param name="destination">
40-
/// The TopicPartition to which the message is to be sent
41-
/// (partition may be Partition.Any).
42-
/// </param>
43-
/// <param name="isKey">
44-
/// True if serializing the message key, false if serializing the
45-
/// message value.
35+
/// <param name="context">
36+
/// Context relevant to the serialize operation.
4637
/// </param>
4738
/// <returns>
4839
/// A <see cref="System.Threading.Tasks.Task" /> that
4940
/// completes with the serialized data.
5041
/// </returns>
51-
Task<byte[]> SerializeAsync(T data, bool isKey, MessageMetadata messageMetadata, TopicPartition destination);
42+
Task<byte[]> SerializeAsync(T data, SerializationContext context);
5243
}
5344
}

src/Confluent.Kafka/IDeserializer.cs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,20 +33,12 @@ public interface IDeserializer<T>
3333
/// <param name="isNull">
3434
/// Whether or not the value is null.
3535
/// </param>
36-
/// <param name="messageMetadata">
37-
/// Properties of the message the data is associated with
38-
/// extra to the key or value.
39-
/// </param>
40-
/// <param name="source">
41-
/// The TopicPartition from which the message was consumed.
42-
/// </param>
43-
/// <param name="isKey">
44-
/// True if deserializing the message key, false if deserializing the
45-
/// message value.
36+
/// <param name="context">
37+
/// Context relevant to the deserialize operation.
4638
/// </param>
4739
/// <returns>
4840
/// The deserialized value.
4941
/// </returns>
50-
T Deserialize(ReadOnlySpan<byte> data, bool isNull, bool isKey, MessageMetadata messageMetadata, TopicPartition source);
42+
T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context);
5143
}
5244
}

src/Confluent.Kafka/IHeader.cs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Copyright 2019 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
18+
namespace Confluent.Kafka
19+
{
20+
/// <summary>
21+
/// Defines a Kafka message header.
22+
/// </summary>
23+
public interface IHeader
24+
{
25+
/// <summary>
26+
/// The header key.
27+
/// </summary>
28+
string Key { get; }
29+
30+
/// <summary>
31+
/// The serialized header value data.
32+
/// </summary>
33+
byte[] GetValueBytes();
34+
}
35+
}

0 commit comments

Comments
 (0)