Skip to content

Commit 81c82e0

Browse files
authored
feat: Allow empty payloads in Kafka, in order to support compaction tombstones (#224)
Signed-off-by: Magne Helleborg <[email protected]>
1 parent d9e10ea commit 81c82e0

File tree

2 files changed

+51
-2
lines changed

2 files changed

+51
-2
lines changed

src/CloudNative.CloudEvents.Kafka/KafkaExtensions.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,6 @@ private static void InitPartitioningKey(Message<string?, byte[]> message, CloudE
140140
Validation.CheckCloudEventArgument(cloudEvent, nameof(cloudEvent));
141141
Validation.CheckNotNull(formatter, nameof(formatter));
142142

143-
// TODO: Is this appropriate? Why can't we transport a CloudEvent without data in Kafka?
144-
Validation.CheckArgument(cloudEvent.Data is object, nameof(cloudEvent), "Only CloudEvents with data can be converted to Kafka messages");
145143
var headers = MapHeaders(cloudEvent);
146144
string? key = (string?) cloudEvent[Partitioning.PartitionKeyAttribute];
147145
byte[] value;

test/CloudNative.CloudEvents.UnitTests/Kafka/KafkaTest.cs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,57 @@ public void KafkaBinaryMessageTest()
129129
Assert.Equal("value", (string?)receivedCloudEvent["comexampleextension1"]);
130130
}
131131

132+
[Theory]
133+
[InlineData(MediaTypeNames.Application.Octet, new byte[0])]
134+
[InlineData(MediaTypeNames.Application.Json, null)]
135+
[InlineData(MediaTypeNames.Application.Xml, "")]
136+
[InlineData(MediaTypeNames.Text.Plain, "")]
137+
[InlineData(null, null)]
138+
public void KafkaBinaryMessageTombstoneTest(string contentType, object? expectedDecodedResult)
139+
{
140+
var jsonEventFormatter = new JsonEventFormatter();
141+
var cloudEvent = new CloudEvent(Partitioning.AllAttributes)
142+
{
143+
Type = "com.github.pull.create",
144+
Source = new Uri("https://github.com/cloudevents/spec/pull/123"),
145+
Id = "A234-1234-1234",
146+
Time = new DateTimeOffset(2018, 4, 5, 17, 31, 0, TimeSpan.Zero),
147+
DataContentType = contentType,
148+
Data = null,
149+
["comexampleextension1"] = "value",
150+
[Partitioning.PartitionKeyAttribute] = "hello much wow"
151+
};
152+
153+
var message = cloudEvent.ToKafkaMessage(ContentMode.Binary, new JsonEventFormatter());
154+
Assert.True(message.IsCloudEvent());
155+
156+
// Sending an empty message is equivalent to a delete (tombstone) for that partition key, when using compacted topics in Kafka.
157+
// This is the main use case for empty data messages with Kafka.
158+
Assert.Empty(message.Value);
159+
160+
// Using serialization to create fully independent copy thus simulating message transport.
161+
// The real transport will work in a similar way.
162+
var serialized = JsonConvert.SerializeObject(message, new HeaderConverter());
163+
var settings = new JsonSerializerSettings
164+
{
165+
Converters = { new HeadersConverter(), new HeaderConverter() }
166+
};
167+
var messageCopy = JsonConvert.DeserializeObject<Message<string?, byte[]>>(serialized, settings)!;
168+
169+
Assert.True(messageCopy.IsCloudEvent());
170+
var receivedCloudEvent = messageCopy.ToCloudEvent(jsonEventFormatter, Partitioning.AllAttributes);
171+
172+
Assert.Equal(CloudEventsSpecVersion.Default, receivedCloudEvent.SpecVersion);
173+
Assert.Equal("com.github.pull.create", receivedCloudEvent.Type);
174+
Assert.Equal(new Uri("https://github.com/cloudevents/spec/pull/123"), receivedCloudEvent.Source);
175+
Assert.Equal("A234-1234-1234", receivedCloudEvent.Id);
176+
AssertTimestampsEqual("2018-04-05T17:31:00Z", receivedCloudEvent.Time!.Value);
177+
Assert.Equal(contentType, receivedCloudEvent.DataContentType);
178+
Assert.Equal(expectedDecodedResult, receivedCloudEvent.Data);
179+
Assert.Equal("hello much wow", (string?)receivedCloudEvent[Partitioning.PartitionKeyAttribute]);
180+
Assert.Equal("value", (string?)receivedCloudEvent["comexampleextension1"]);
181+
}
182+
132183
[Fact]
133184
public void ContentTypeCanBeInferredByFormatter()
134185
{

0 commit comments

Comments
 (0)