Skip to content

feat: provide support for Kafka message keys different than "string?" #314

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1;net8.0</TargetFrameworks>
<Description>Kafka extensions for CloudNative.CloudEvents</Description>
<PackageTags>cncf;cloudnative;cloudevents;events;kafka</PackageTags>
<LangVersion>8.0</LangVersion>
<LangVersion>9.0</LangVersion>
<Nullable>enable</Nullable>
</PropertyGroup>

Expand Down
56 changes: 41 additions & 15 deletions src/CloudNative.CloudEvents.Kafka/KafkaExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
// Copyright (c) Cloud Native Foundation.
// Copyright (c) Cloud Native Foundation.
// Licensed under the Apache 2.0 license.
// See LICENSE file in the project root for full license information.

using CloudNative.CloudEvents.Core;
using CloudNative.CloudEvents.Extensions;
using CloudNative.CloudEvents.Kafka.PartitionKeyAdapters;
using Confluent.Kafka;
using System;
using System.Collections.Generic;
Expand Down Expand Up @@ -32,7 +33,7 @@ public static class KafkaExtensions
/// </remarks>
/// <param name="message">The message to check for the presence of a CloudEvent. Must not be null.</param>
/// <returns>true, if the request is a CloudEvent</returns>
public static bool IsCloudEvent(this Message<string?, byte[]> message) =>
public static bool IsCloudEvent<TKey>(this Message<TKey, byte[]> message) =>
GetHeaderValue(message, SpecVersionKafkaHeader) is object ||
MimeUtilities.IsCloudEventsContentType(GetHeaderValue(message, KafkaContentTypeAttributeName));

Expand All @@ -56,6 +57,21 @@ public static CloudEvent ToCloudEvent(this Message<string?, byte[]> message,
/// <returns>A reference to a validated CloudEvent instance.</returns>
public static CloudEvent ToCloudEvent(this Message<string?, byte[]> message,
CloudEventFormatter formatter, IEnumerable<CloudEventAttribute>? extensionAttributes)
{
return ToCloudEvent(message, formatter, extensionAttributes, new StringPartitionKeyAdapter());
}

/// <summary>
/// Converts this Kafka message into a CloudEvent object.
/// </summary>
/// <param name="message">The Kafka message to convert. Must not be null.</param>
/// <param name="formatter">The event formatter to use to parse the CloudEvent. Must not be null.</param>
/// <param name="extensionAttributes">The extension attributes to use when parsing the CloudEvent. May be null.</param>
/// <param name="partitionKeyAdapter">The PartitionKey Adapter responsible for determining wether to set the partitionKey attribute and its value.</param>
/// <typeparam name="TKey">The type of key of the Kafka message.</typeparam>
/// <returns>A reference to a validated CloudEvent instance.</returns>
public static CloudEvent ToCloudEvent<TKey>(this Message<TKey, byte[]> message,
CloudEventFormatter formatter, IEnumerable<CloudEventAttribute>? extensionAttributes, IPartitionKeyAdapter<TKey> partitionKeyAdapter)
{
Validation.CheckNotNull(message, nameof(message));
Validation.CheckNotNull(formatter, nameof(formatter));
Expand Down Expand Up @@ -109,16 +125,11 @@ public static CloudEvent ToCloudEvent(this Message<string?, byte[]> message,
formatter.DecodeBinaryModeEventData(message.Value, cloudEvent);
}

InitPartitioningKey(message, cloudEvent);
return Validation.CheckCloudEventArgument(cloudEvent, nameof(message));
}

private static void InitPartitioningKey(Message<string?, byte[]> message, CloudEvent cloudEvent)
{
if (!string.IsNullOrEmpty(message.Key))
if (partitionKeyAdapter.ConvertKeyToPartitionKeyAttributeValue(message.Key, out var partitionKeyAttributeValue))
{
cloudEvent[Partitioning.PartitionKeyAttribute] = message.Key;
cloudEvent[Partitioning.PartitionKeyAttribute] = partitionKeyAttributeValue;
}
return Validation.CheckCloudEventArgument(cloudEvent, nameof(message));
}

/// <summary>
Expand All @@ -136,12 +147,22 @@ private static void InitPartitioningKey(Message<string?, byte[]> message, CloudE
/// <param name="contentMode">Content mode. Structured or binary.</param>
/// <param name="formatter">The formatter to use within the conversion. Must not be null.</param>
public static Message<string?, byte[]> ToKafkaMessage(this CloudEvent cloudEvent, ContentMode contentMode, CloudEventFormatter formatter)
=> ToKafkaMessage(cloudEvent, contentMode, formatter, new StringPartitionKeyAdapter());

/// <summary>
/// Converts a CloudEvent to a Kafka message.
/// </summary>
/// <param name="cloudEvent">The CloudEvent to convert. Must not be null, and must be a valid CloudEvent.</param>
/// <param name="contentMode">Content mode. Structured or binary.</param>
/// <param name="formatter">The formatter to use within the conversion. Must not be null.</param>
/// <param name="partitionKeyAdapter">The partition key adapter responsible for transforming the cloud event partitioning key into the desired Kafka key type.</param>
/// <typeparam name="TKey">The Kafka Key type to be used </typeparam>
public static Message<TKey, byte[]> ToKafkaMessage<TKey>(this CloudEvent cloudEvent, ContentMode contentMode, CloudEventFormatter formatter, IPartitionKeyAdapter<TKey> partitionKeyAdapter)
{
Validation.CheckCloudEventArgument(cloudEvent, nameof(cloudEvent));
Validation.CheckNotNull(formatter, nameof(formatter));

var headers = MapHeaders(cloudEvent);
string? key = (string?) cloudEvent[Partitioning.PartitionKeyAttribute];
byte[] value;
string? contentTypeHeaderValue;

Expand All @@ -163,12 +184,17 @@ private static void InitPartitioningKey(Message<string?, byte[]> message, CloudE
{
headers.Add(KafkaContentTypeAttributeName, Encoding.UTF8.GetBytes(contentTypeHeaderValue));
}
return new Message<string?, byte[]>
var message = new Message<TKey, byte[]>
{
Headers = headers,
Value = value,
Key = key
Value = value
};
if (partitionKeyAdapter.ConvertPartitionKeyAttributeValueToKey((string?)cloudEvent[Partitioning.PartitionKeyAttribute], out var keyValue)
&& keyValue != null)
{
message.Key = keyValue;
}
return message;
}

private static Headers MapHeaders(CloudEvent cloudEvent)
Expand All @@ -191,4 +217,4 @@ private static Headers MapHeaders(CloudEvent cloudEvent)
return headers;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using System;

namespace CloudNative.CloudEvents.Kafka.PartitionKeyAdapters
{
/// <summary>
/// Partion Key Adapter that converts to and from Guids in binary representation.
/// </summary>
public class BinaryGuidPartitionKeyAdapter : IPartitionKeyAdapter<byte[]?>
{
/// <inheritdoc/>
public bool ConvertKeyToPartitionKeyAttributeValue(byte[]? keyValue, out string? attributeValue)
{
if (keyValue == null)
{
attributeValue = null;
return false;
}

attributeValue = new Guid(keyValue).ToString();
return true;
}

/// <inheritdoc/>
public bool ConvertPartitionKeyAttributeValueToKey(string? attributeValue, out byte[]? keyValue)
{
if (string.IsNullOrEmpty(attributeValue))
{
keyValue = default;
return false;
}

keyValue = Guid.Parse(attributeValue).ToByteArray();
return true;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
namespace CloudNative.CloudEvents.Kafka.PartitionKeyAdapters
{
/// <summary>
/// Defines the methods of the adapters responsible for transforming from cloud event
/// PartitionKey Attribute to Kafka Message Key.
/// </summary>
/// <typeparam name="TKey"></typeparam>
public interface IPartitionKeyAdapter<TKey>
{
/// <summary>
/// Converts a Message Key to PartionKey Attribute Value.
/// </summary>
/// <param name="keyValue">The key value to transform.</param>
/// <param name="attributeValue">The transformed attribute value (output).</param>
/// <returns>Whether the attribute should be set.</returns>
bool ConvertKeyToPartitionKeyAttributeValue(TKey keyValue, out string? attributeValue);

/// <summary>
/// Converts a PartitionKey Attribute value to a Message Key.
/// </summary>
/// <param name="attributeValue">The attribute value to transform.</param>
/// <param name="keyValue">The transformed key value (output)</param>
/// <returns>Whether the key should be set.</returns>
bool ConvertPartitionKeyAttributeValueToKey(string? attributeValue, out TKey? keyValue);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
namespace CloudNative.CloudEvents.Kafka.PartitionKeyAdapters
{
/// <summary>
/// Partion Key Adapter that skips handling the key.
/// </summary>
/// <typeparam name="TKey">The type of Kafka Message Key</typeparam>
public class NullPartitionKeyAdapter<TKey> : IPartitionKeyAdapter<TKey>
{
/// <inheritdoc/>
public bool ConvertKeyToPartitionKeyAttributeValue(TKey keyValue, out string? attributeValue)
{
attributeValue = null;
return false;
}

/// <inheritdoc/>
public bool ConvertPartitionKeyAttributeValueToKey(string? attributeValue, out TKey? keyValue)
{
keyValue = default;
return false;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
namespace CloudNative.CloudEvents.Kafka.PartitionKeyAdapters
{
/// <summary>
/// Partion Key Adapter that skips handling the key.
/// </summary>
public class StringPartitionKeyAdapter : IPartitionKeyAdapter<string?>
{
/// <inheritdoc/>
public bool ConvertKeyToPartitionKeyAttributeValue(string? keyValue, out string? attributeValue)
{
attributeValue = keyValue;
return true;
}

/// <inheritdoc/>
public bool ConvertPartitionKeyAttributeValueToKey(string? attributeValue, out string? keyValue)
{
keyValue = attributeValue;
return true;
}
}
}
131 changes: 109 additions & 22 deletions test/CloudNative.CloudEvents.UnitTests/Kafka/KafkaTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,9 @@ public void IsCloudEvent(string headerName, string headerValue, bool expectedRes
public void IsCloudEvent_NoHeaders() =>
Assert.False(new Message<string?, byte[]>().IsCloudEvent());

[Fact]
public void KafkaStructuredMessageTest()
private static CloudEvent CreateTestCloudEvent()
{
// Kafka doesn't provide any way to get to the message transport level to do the test properly
// and it doesn't have an embedded version of a server for .Net so the lowest we can get is
// the `Message<T, K>`

var jsonEventFormatter = new JsonEventFormatter();

var cloudEvent = new CloudEvent
return new CloudEvent
{
Type = "com.github.pull.create",
Source = new Uri("https://github.com/cloudevents/spec/pull"),
Expand All @@ -55,21 +48,12 @@ public void KafkaStructuredMessageTest()
Time = new DateTimeOffset(2018, 4, 5, 17, 31, 0, TimeSpan.Zero),
DataContentType = MediaTypeNames.Text.Xml,
Data = "<much wow=\"xml\"/>",
["comexampleextension1"] = "value"
["comexampleextension1"] = "value",
};
}

var message = cloudEvent.ToKafkaMessage(ContentMode.Structured, new JsonEventFormatter());

Assert.True(message.IsCloudEvent());

// Using serialization to create fully independent copy thus simulating message transport.
// The real transport will work in a similar way.
var serialized = JsonConvert.SerializeObject(message, new HeaderConverter());
var messageCopy = JsonConvert.DeserializeObject<Message<string?, byte[]>>(serialized, new HeadersConverter(), new HeaderConverter())!;

Assert.True(messageCopy.IsCloudEvent());
var receivedCloudEvent = messageCopy.ToCloudEvent(jsonEventFormatter);

private static void VerifyTestCloudEvent(CloudEvent receivedCloudEvent)
{
Assert.Equal(CloudEventsSpecVersion.Default, receivedCloudEvent.SpecVersion);
Assert.Equal("com.github.pull.create", receivedCloudEvent.Type);
Assert.Equal(new Uri("https://github.com/cloudevents/spec/pull"), receivedCloudEvent.Source);
Expand All @@ -82,6 +66,109 @@ public void KafkaStructuredMessageTest()
Assert.Equal("value", (string?) receivedCloudEvent["comexampleextension1"]);
}

private static Message<TKey, byte[]>? SimulateMessageTransport<TKey>(Message<TKey, byte[]> message)
{
// Using serialization to create fully independent copy thus simulating message transport.
// The real transport will work in a similar way.
var serialized = JsonConvert.SerializeObject(message, new HeaderConverter());
var messageCopy = JsonConvert.DeserializeObject<Message<TKey, byte[]>>(serialized, new HeadersConverter(), new HeaderConverter())!;
return messageCopy;
}

[Fact]
public void KafkaStructuredMessageTest()
{
// Kafka doesn't provide any way to get to the message transport level to do the test properly
// and it doesn't have an embedded version of a server for .Net so the lowest we can get is
// the `Message<T, K>`

var jsonEventFormatter = new JsonEventFormatter();
var key = "Test";
var cloudEvent = CreateTestCloudEvent();
cloudEvent[Partitioning.PartitionKeyAttribute] = key;

var message = cloudEvent.ToKafkaMessage(ContentMode.Structured, jsonEventFormatter);

Assert.True(message.IsCloudEvent());

var messageCopy = SimulateMessageTransport(message);

Assert.NotNull(messageCopy);
Assert.Equal(key, messageCopy.Key);
Assert.True(messageCopy.IsCloudEvent());
var receivedCloudEvent = messageCopy.ToCloudEvent(jsonEventFormatter, null);

VerifyTestCloudEvent(receivedCloudEvent);
}

[Fact]
public void KafkaBinaryGuidKeyedStructuredMessageTest()
{
// In order to test the most extreme case of key management we will simulate
// using Guid Keys serialized in their binary form in kafka that are converted
// back to their string representation in the cloudEvent.
var partitionKeyAdapter = new PartitionKeyAdapters.BinaryGuidPartitionKeyAdapter();
var jsonEventFormatter = new JsonEventFormatter();
var key = Guid.NewGuid();
var cloudEvent = CreateTestCloudEvent();
cloudEvent[Partitioning.PartitionKeyAttribute] = key.ToString();

var message = cloudEvent.ToKafkaMessage<byte[]?>(
ContentMode.Structured,
jsonEventFormatter,
partitionKeyAdapter);

Assert.True(message.IsCloudEvent());

var messageCopy = SimulateMessageTransport(message);

Assert.NotNull(messageCopy);
Assert.True(messageCopy.IsCloudEvent());

var receivedCloudEvent = messageCopy.ToCloudEvent<byte[]?>(
jsonEventFormatter,
null,
partitionKeyAdapter);

Assert.NotNull(message.Key);
// The key should be the original Guid in the binary representation.
Assert.Equal(key, new Guid(messageCopy.Key!));
VerifyTestCloudEvent(receivedCloudEvent);
}

[Fact]
public void KafkaNullKeyedStructuredMessageTest()
{
// It will test the serialization using Confluent's Confluent.Kafka.Null type for the key.
// As the default behavior without adapter is to skip the key it will work properly.
var partitionKeyAdapter = new PartitionKeyAdapters.NullPartitionKeyAdapter<Confluent.Kafka.Null>();
var jsonEventFormatter = new JsonEventFormatter();
var cloudEvent = CreateTestCloudEvent();
// Even if the key is established in the cloud event it won't flow.
cloudEvent[Partitioning.PartitionKeyAttribute] = "Test";

var message = cloudEvent.ToKafkaMessage<Confluent.Kafka.Null>(
ContentMode.Structured,
jsonEventFormatter,
partitionKeyAdapter);

Assert.True(message.IsCloudEvent());

var messageCopy = SimulateMessageTransport(message);

Assert.NotNull(messageCopy);
Assert.True(messageCopy.IsCloudEvent());

var receivedCloudEvent = messageCopy.ToCloudEvent<Confluent.Kafka.Null>(
jsonEventFormatter,
null,
partitionKeyAdapter);

//The Message key will continue to be null.
Assert.Null(message.Key);
VerifyTestCloudEvent(receivedCloudEvent);
}

[Fact]
public void KafkaBinaryMessageTest()
{
Expand Down