diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
new file mode 100644
index 000000000..19928e17b
--- /dev/null
+++ b/.github/workflows/build.yml
@@ -0,0 +1,85 @@
+name: 'confluent-kafka-dotnet build pipeline'
+
+env:
+ DOTNET_CLI_TELEMETRY_OPTOUT: 'true'
+
+on:
+ push:
+ pull_request:
+
+jobs:
+
+ build-test:
+ strategy:
+ matrix:
+ os: [ubuntu-latest, windows-latest, macos-13] # macos-13 for x86_x64 arch
+ runs-on: ${{ matrix.os }}
+ steps:
+ - uses: actions/checkout@v4
+ - name: Setup .NET
+ uses: actions/setup-dotnet@v4
+ with:
+ dotnet-version: |
+ 6.0.x
+ 8.0.x
+ - name: Build and test
+ run: |
+ dotnet nuget add source --username user --password ${{ github.token }} --store-password-in-clear-text --name github https://nuget.pkg.github.com/${{ github.repository_owner }}/index.json
+ dotnet restore
+ dotnet test -c Release test/Confluent.Kafka.UnitTests/Confluent.Kafka.UnitTests.csproj
+
+ package:
+ needs: [build-test]
+ runs-on: windows-latest
+ steps:
+
+ - name: Show default environment variables
+ run: |
+ echo "The job_id is: $GITHUB_JOB" # reference the default environment variables
+ echo "The id of this action is: $GITHUB_ACTION" # reference the default environment variables
+ echo "The run id is: $GITHUB_RUN_ID"
+ echo "The GitHub Actor's username is: $GITHUB_ACTOR"
+ echo "GitHub SHA: $GITHUB_SHA"
+ - uses: actions/checkout@v4
+ - name: Setup .NET
+ uses: actions/setup-dotnet@v4
+ with:
+ dotnet-version: |
+ 6.0.x
+ 8.0.x
+ - name: Build and create packages
+ run: |
+ dotnet restore
+ dotnet build Confluent.Kafka.sln -c Release
+
+ # Different packaging for tagged vs untagged builds
+ if ($env:GITHUB_REF -match '^refs/tags/') {
+ $suffix = "gr-experimental"
+ } else {
+ $suffix = "ci-$env:GITHUB_RUN_ID"
+ }
+
+ dotnet pack src/Confluent.Kafka/Confluent.Kafka.csproj --output dist -c Release --version-suffix $suffix
+
+ - name: Upload artifacts
+ uses: actions/upload-artifact@v4
+ with:
+ name: build-artifacts
+ path: dist/
+
+ # Publish NuGet packages when a tag is pushed.
+ # Tests need to succeed for all components and on all platforms first,
+ # including having a tag name that matches the version number.
+ publish-release:
+ if: ${{ startsWith(github.ref, 'refs/tags/v') }}
+ needs: package
+ runs-on: ubuntu-latest
+ steps:
+ - name: Download NuGet package artifacts
+ uses: actions/download-artifact@v4
+ with:
+ name: build-artifacts
+ path: dist
+ - name: Publish to NuGet
+ run: |
+ dotnet nuget push "dist/Confluent.Kafka*.nupkg" --source https://nuget.pkg.github.com/${{ github.repository_owner }}/index.json --api-key ${{ github.token }}
diff --git a/src/Confluent.Kafka/Confluent.Kafka.csproj b/src/Confluent.Kafka/Confluent.Kafka.csproj
index 984bfcede..a226d88ce 100644
--- a/src/Confluent.Kafka/Confluent.Kafka.csproj
+++ b/src/Confluent.Kafka/Confluent.Kafka.csproj
@@ -25,7 +25,7 @@
-
+
None
diff --git a/src/Confluent.Kafka/Consumer.cs b/src/Confluent.Kafka/Consumer.cs
index e75a96ee9..e216b7bfc 100644
--- a/src/Confluent.Kafka/Consumer.cs
+++ b/src/Confluent.Kafka/Consumer.cs
@@ -1000,6 +1000,52 @@ public ConsumeResult Consume(TimeSpan timeout)
=> Consume(timeout.TotalMillisecondsAsInt());
+ ///
+ public unsafe bool ConsumeWithCallback(int millisecondsTimeout, Experimental.AllocFreeConsumeCallback callback)
+ {
+ if (callback == null)
+ throw new ArgumentNullException(nameof(callback));
+
+ var msgPtr = kafkaHandle.ConsumerPoll((IntPtr)millisecondsTimeout);
+
+ if (this.handlerException != null)
+ {
+ var ex = this.handlerException;
+ this.handlerException = null;
+ if (msgPtr != IntPtr.Zero)
+ {
+ Librdkafka.message_destroy(msgPtr);
+ }
+
+ throw ex;
+ }
+
+ if (msgPtr == IntPtr.Zero)
+ {
+ return false; // Timeout occurred
+ }
+
+ try
+ {
+ var msg = (rd_kafka_message*)msgPtr;
+
+ // Handle message errors before invoking callback
+ if (msg->err != ErrorCode.NoError && msg->err != ErrorCode.Local_PartitionEOF)
+ {
+ throw new KafkaException(msg->err);
+ }
+
+ // Create the reader and invoke the callback
+ var reader = new Experimental.MessageReader(msg);
+ callback(in reader);
+ return true;
+ }
+ finally
+ {
+ Librdkafka.message_destroy(msgPtr);
+ }
+ }
+
///
public IConsumerGroupMetadata ConsumerGroupMetadata
{
diff --git a/src/Confluent.Kafka/Experimental.cs b/src/Confluent.Kafka/Experimental.cs
new file mode 100644
index 000000000..1e1a3765b
--- /dev/null
+++ b/src/Confluent.Kafka/Experimental.cs
@@ -0,0 +1,129 @@
+using System;
+using Confluent.Kafka.Impl;
+using Confluent.Kafka.Internal;
+
+namespace Confluent.Kafka;
+
+///
+/// Experimental alloc-free APIs.
+/// Note that these APIs can be changed substantially or removed entirely in the future versions.
+///
+public abstract class Experimental
+{
+ ///
+ /// Callback delegate for allocation-free message consumption.
+ ///
+ /// Message reader providing allocation-free access to message data
+ public delegate void AllocFreeConsumeCallback(in MessageReader reader);
+
+ ///
+ /// Alloc-free delivery handler
+ ///
+ public delegate void AllocFreeDeliveryHandler(in MessageReader arg);
+
+ ///
+ /// Allocation-free message reader that provides access to consumed message data.
+ /// This is a ref struct to ensure stack allocation and prevent escaping the callback scope.
+ ///
+ public unsafe ref struct MessageReader
+ {
+ private readonly rd_kafka_message* msg;
+ private IntPtr hdrsPtr = IntPtr.Zero;
+
+ internal MessageReader(
+ rd_kafka_message* msg)
+ {
+ this.msg = msg;
+ }
+
+ /// Gets the topic name
+ public string Topic =>
+ msg->rkt != IntPtr.Zero
+ ? Util.Marshal.PtrToStringUTF8(Librdkafka.topic_name(msg->rkt))
+ : null;
+
+ /// Gets the partition
+ public Partition Partition => msg->partition;
+
+ /// Gets the error code
+ public ErrorCode ErrorCode => msg->err;
+
+ /// Gets the offset
+ public Offset Offset => msg->offset;
+
+ /// Gets the leader epoch
+ public int? LeaderEpoch =>
+ msg->rkt != IntPtr.Zero && msg->offset != Offset.Unset
+ ? Librdkafka.message_leader_epoch((IntPtr)msg)
+ : null;
+
+ /// Gets the timestamp
+ public Timestamp Timestamp
+ {
+ get
+ {
+ var timestampUnix = Librdkafka.message_timestamp((IntPtr)msg, out var timestampType);
+ return new Timestamp(timestampUnix, (TimestampType)timestampType);
+ }
+ }
+
+ /// Gets whether this indicates partition EOF
+ public bool IsPartitionEOF => msg->err == ErrorCode.Local_PartitionEOF;
+
+ ///
+ /// Gets the raw key data as a ReadOnlySpan without allocation
+ ///
+ public ReadOnlySpan KeySpan =>
+ msg->key == IntPtr.Zero
+ ? ReadOnlySpan.Empty
+ : new ReadOnlySpan(msg->key.ToPointer(), (int)msg->key_len);
+
+ ///
+ /// Gets the raw value data as a ReadOnlySpan without allocation
+ ///
+ public ReadOnlySpan ValueSpan =>
+ msg->val == IntPtr.Zero
+ ? ReadOnlySpan.Empty
+ : new ReadOnlySpan(msg->val.ToPointer(), (int)msg->len);
+
+ ///
+ /// Gets a header by index without allocation
+ ///
+ /// Zero-based header index
+ /// Header name (allocated string)
+ /// Header value as ReadOnlySpan
+ /// True if header exists at the given index
+ public bool TryGetHeader(int index, out string name, out ReadOnlySpan value)
+ {
+ name = null;
+ value = ReadOnlySpan.Empty;
+
+ if (hdrsPtr == IntPtr.Zero)
+ {
+ Librdkafka.message_headers((IntPtr)msg, out hdrsPtr);
+ if (hdrsPtr == IntPtr.Zero)
+ return false;
+ }
+
+ var err = Librdkafka.header_get_all(
+ hdrsPtr,
+ (IntPtr)index,
+ out IntPtr namep,
+ out IntPtr valuep,
+ out IntPtr sizep
+ );
+
+ if (err != ErrorCode.NoError)
+ return false;
+
+ name = Util.Marshal.PtrToStringUTF8(namep);
+
+ if (valuep != IntPtr.Zero)
+ {
+ value = new ReadOnlySpan(valuep.ToPointer(), (int)sizep);
+ }
+
+ return true;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Confluent.Kafka/IConsumer.cs b/src/Confluent.Kafka/IConsumer.cs
index 38c3a5a71..f9b571fde 100644
--- a/src/Confluent.Kafka/IConsumer.cs
+++ b/src/Confluent.Kafka/IConsumer.cs
@@ -110,6 +110,15 @@ public interface IConsumer : IClient
///
ConsumeResult Consume(TimeSpan timeout);
+ ///
+ /// Consumes a message and invokes the callback with an allocation-free reader.
+ /// Returns true if a message was consumed, false if timeout occurred.
+ ///
+ /// Timeout in milliseconds
+ /// Callback to invoke with the message reader
+ /// True if a message was consumed, false if timeout occurred
+ /// Thrown when consumption fails
+ bool ConsumeWithCallback(int millisecondsTimeout, Experimental.AllocFreeConsumeCallback callback);
///
/// Gets the (dynamic) group member id of
diff --git a/src/Confluent.Kafka/IProducer.cs b/src/Confluent.Kafka/IProducer.cs
index cb3501cdc..2967ed451 100644
--- a/src/Confluent.Kafka/IProducer.cs
+++ b/src/Confluent.Kafka/IProducer.cs
@@ -182,6 +182,48 @@ void Produce(
Message message,
Action> deliveryHandler = null);
+ ///
+ /// Asynchronously send a single message to a
+ /// Kafka topic partition.
+ ///
+ ///
+ /// Topic of the message to produce to
+ ///
+ ///
+ /// Value bytes to produce
+ ///
+ ///
+ /// Key bytes to produce
+ ///
+ ///
+ /// Timestamp of the message to be produced
+ ///
+ ///
+ /// Partition of the message to produce to
+ ///
+ ///
+ /// Optional headers of the message to be produced
+ ///
+ ///
+ /// Thrown in response to any error that is known
+ /// immediately (excluding user application logic errors),
+ /// for example ErrorCode.Local_QueueFull.
+ ///
+ ///
+ /// Thrown in response to invalid argument values.
+ ///
+ ///
+ /// Thrown in response to error conditions that reflect
+ /// an error in the application logic of the calling
+ /// application.
+ ///
+ void Produce(
+ string topic,
+ ReadOnlySpan val,
+ ReadOnlySpan key,
+ Timestamp timestamp,
+ Partition partition,
+ Headers headers);
///
/// Poll for callback events.
diff --git a/src/Confluent.Kafka/Impl/SafeKafkaHandle.cs b/src/Confluent.Kafka/Impl/SafeKafkaHandle.cs
index 60d0e34ba..7a9c8ce88 100644
--- a/src/Confluent.Kafka/Impl/SafeKafkaHandle.cs
+++ b/src/Confluent.Kafka/Impl/SafeKafkaHandle.cs
@@ -359,69 +359,43 @@ private IntPtr marshalHeaders(IReadOnlyList headers)
internal ErrorCode Produce(
string topic,
- byte[] val, int valOffset, int valLength,
- byte[] key, int keyOffset, int keyLength,
+ ReadOnlySpan val,
+ ReadOnlySpan key,
int partition,
long timestamp,
IReadOnlyList headers,
IntPtr opaque)
{
- var pValue = IntPtr.Zero;
- var pKey = IntPtr.Zero;
-
- var gchValue = default(GCHandle);
- var gchKey = default(GCHandle);
-
- if (val == null)
- {
- if (valOffset != 0 || valLength != 0)
- {
- throw new ArgumentException("valOffset and valLength parameters must be 0 when producing null values.");
- }
- }
- else
- {
- gchValue = GCHandle.Alloc(val, GCHandleType.Pinned);
- pValue = Marshal.UnsafeAddrOfPinnedArrayElement(val, valOffset);
- }
-
- if (key == null)
- {
- if (keyOffset != 0 || keyLength != 0)
- {
- throw new ArgumentException("keyOffset and keyLength parameters must be 0 when producing null key values.");
- }
- }
- else
- {
- gchKey = GCHandle.Alloc(key, GCHandleType.Pinned);
- pKey = Marshal.UnsafeAddrOfPinnedArrayElement(key, keyOffset);
- }
-
IntPtr headersPtr = marshalHeaders(headers);
try
{
- var errorCode = Librdkafka.produceva(
- handle,
- topic,
- partition,
- (IntPtr)MsgFlags.MSG_F_COPY,
- pValue, (UIntPtr)valLength,
- pKey, (UIntPtr)keyLength,
- timestamp,
- headersPtr,
- opaque);
-
- if (errorCode != ErrorCode.NoError)
+ unsafe
{
- if (headersPtr != IntPtr.Zero)
+ fixed (byte* valPtr = val, keyPtr = key)
{
- Librdkafka.headers_destroy(headersPtr);
+ var errorCode = Librdkafka.produceva(
+ handle,
+ topic,
+ partition,
+ (IntPtr)MsgFlags.MSG_F_COPY,
+ (IntPtr)valPtr, (UIntPtr)val.Length,
+ (IntPtr)keyPtr, (UIntPtr)key.Length,
+ timestamp,
+ headersPtr,
+ opaque);
+
+ if (errorCode != ErrorCode.NoError)
+ {
+ if (headersPtr != IntPtr.Zero)
+ {
+ Librdkafka.headers_destroy(headersPtr);
+ }
+ }
+
+ return errorCode;
}
}
-
- return errorCode;
}
catch
{
@@ -431,18 +405,6 @@ internal ErrorCode Produce(
}
throw;
}
- finally
- {
- if (val != null)
- {
- gchValue.Free();
- }
-
- if (key != null)
- {
- gchKey.Free();
- }
- }
}
private static int[] MarshalCopy(IntPtr source, int length)
diff --git a/src/Confluent.Kafka/Producer.cs b/src/Confluent.Kafka/Producer.cs
index 79867514b..0cea9e6ea 100644
--- a/src/Confluent.Kafka/Producer.cs
+++ b/src/Confluent.Kafka/Producer.cs
@@ -36,6 +36,7 @@ internal class Config
public IEnumerable> config;
public Action errorHandler;
public Action logHandler;
+ public Experimental.AllocFreeDeliveryHandler AllocFreeAllocFreeDeliveryHandler;
public Action statisticsHandler;
public Action oAuthBearerTokenRefreshHandler;
public Dictionary partitioners;
@@ -177,6 +178,7 @@ private void OAuthBearerTokenRefreshCallback(IntPtr rk, IntPtr oauthbearer_confi
private Action logHandler;
+ private Experimental.AllocFreeDeliveryHandler allocFreeAllocFreeDeliveryHandler;
private object loggerLockObj = new object();
private Librdkafka.LogDelegate logCallbackDelegate;
private void LogCallback(IntPtr rk, SyslogLevel level, string fac, string buf)
@@ -196,26 +198,34 @@ private void LogCallback(IntPtr rk, SyslogLevel level, string fac, string buf)
private Librdkafka.DeliveryReportDelegate DeliveryReportCallback;
- private void DeliveryReportCallbackImpl(IntPtr rk, IntPtr rkmessage, IntPtr opaque)
+ private unsafe void DeliveryReportCallbackImpl(IntPtr rk, IntPtr rkmessage, IntPtr opaque)
{
// Ensure registered handlers are never called as a side-effect of Dispose/Finalize (prevents deadlocks in common scenarios).
if (ownedKafkaHandle.IsClosed) { return; }
try
{
- var msg = Util.Marshal.PtrToStructure(rkmessage);
+ var msg = (rd_kafka_message*)rkmessage;
+
+ if (allocFreeAllocFreeDeliveryHandler != null)
+ {
+ // Create the reader and invoke the callback
+ var reader = new Experimental.MessageReader(msg);
+ allocFreeAllocFreeDeliveryHandler.Invoke(reader);
+ return;
+ }
// the msg._private property has dual purpose. Here, it is an opaque pointer set
// by Topic.Produce to be an IDeliveryHandler. When Consuming, it's for internal
// use (hence the name).
- if (msg._private == IntPtr.Zero)
+ if (msg->_private == IntPtr.Zero)
{
// Note: this can occur if the ProduceAsync overload that accepts a DeliveryHandler
// was used and the delivery handler was set to null.
return;
}
- var gch = GCHandle.FromIntPtr(msg._private);
+ var gch = GCHandle.FromIntPtr(msg->_private);
var deliveryHandler = (IDeliveryHandler) gch.Target;
gch.Free();
@@ -258,14 +268,14 @@ private void DeliveryReportCallbackImpl(IntPtr rk, IntPtr rkmessage, IntPtr opaq
messageStatus = Librdkafka.message_status(rkmessage);
}
- deliveryHandler.HandleDeliveryReport(
+ deliveryHandler?.HandleDeliveryReport(
new DeliveryReport
{
// Topic is not set here in order to avoid the marshalling cost.
// Instead, the delivery handler is expected to cache the topic string.
- Partition = msg.partition,
- Offset = msg.offset,
- Error = KafkaHandle.CreatePossiblyFatalError(msg.err, null),
+ Partition = msg->partition,
+ Offset = msg->offset,
+ Error = KafkaHandle.CreatePossiblyFatalError(msg->err, null),
Status = messageStatus,
Message = new Message { Timestamp = new Timestamp(timestamp, (TimestampType)timestampType), Headers = headers }
}
@@ -279,8 +289,8 @@ private void DeliveryReportCallbackImpl(IntPtr rk, IntPtr rkmessage, IntPtr opaq
private void ProduceImpl(
string topic,
- byte[] val, int valOffset, int valLength,
- byte[] key, int keyOffset, int keyLength,
+ ReadOnlySpan val,
+ ReadOnlySpan key,
Timestamp timestamp,
Partition partition,
IReadOnlyList headers,
@@ -308,8 +318,8 @@ private void ProduceImpl(
err = KafkaHandle.Produce(
topic,
- val, valOffset, valLength,
- key, keyOffset, keyLength,
+ val,
+ key,
partition.Value,
timestamp.UnixTimestampMs,
headers,
@@ -325,8 +335,8 @@ private void ProduceImpl(
{
err = KafkaHandle.Produce(
topic,
- val, valOffset, valLength,
- key, keyOffset, keyLength,
+ val,
+ key,
partition.Value,
timestamp.UnixTimestampMs,
headers,
@@ -399,8 +409,8 @@ public void Flush(CancellationToken cancellationToken)
throw new OperationCanceledException();
}
}
- }
-
+ }
+
///
public void Dispose()
@@ -578,6 +588,7 @@ internal Producer(ProducerBuilder builder)
this.statisticsHandler = baseConfig.statisticsHandler;
this.logHandler = baseConfig.logHandler;
+ this.allocFreeAllocFreeDeliveryHandler = baseConfig.AllocFreeAllocFreeDeliveryHandler;
this.errorHandler = baseConfig.errorHandler;
this.oAuthBearerTokenRefreshHandler = baseConfig.oAuthBearerTokenRefreshHandler;
@@ -805,8 +816,8 @@ public async Task> ProduceAsync(
ProduceImpl(
topicPartition.Topic,
- valBytes, 0, valBytes == null ? 0 : valBytes.Length,
- keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length,
+ new ReadOnlySpan(valBytes, 0, valBytes?.Length ?? 0),
+ new ReadOnlySpan(keyBytes, 0, keyBytes?.Length ?? 0),
message.Timestamp, topicPartition.Partition, headers.BackingList,
handler);
@@ -816,8 +827,8 @@ public async Task> ProduceAsync(
{
ProduceImpl(
topicPartition.Topic,
- valBytes, 0, valBytes == null ? 0 : valBytes.Length,
- keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length,
+ new ReadOnlySpan(valBytes, 0, valBytes?.Length ?? 0),
+ new ReadOnlySpan(keyBytes, 0, keyBytes?.Length ?? 0),
message.Timestamp, topicPartition.Partition, headers.BackingList,
null);
@@ -915,8 +926,8 @@ public void Produce(
{
ProduceImpl(
topicPartition.Topic,
- valBytes, 0, valBytes == null ? 0 : valBytes.Length,
- keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length,
+ new ReadOnlySpan(valBytes, 0, valBytes?.Length ?? 0),
+ new ReadOnlySpan(keyBytes, 0, keyBytes?.Length ?? 0),
message.Timestamp, topicPartition.Partition,
headers.BackingList,
deliveryHandler == null
@@ -931,13 +942,44 @@ public void Produce(
{
throw new ProduceException(
ex.Error,
- new DeliveryReport
+ new DeliveryReport
{
Message = message,
TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset)
});
}
}
+
+ ///
+ public void Produce(
+ string topic,
+ ReadOnlySpan val,
+ ReadOnlySpan key,
+ Timestamp timestamp,
+ Partition partition,
+ Headers headers)
+ {
+ try
+ {
+ ProduceImpl(
+ topic,
+ val,
+ key,
+ timestamp,
+ partition,
+ headers.BackingList,
+ null);
+ }
+ catch (KafkaException ex)
+ {
+ throw new ProduceException(
+ ex.Error,
+ new DeliveryReport
+ {
+ TopicPartitionOffset = new TopicPartitionOffset(new TopicPartition(topic, partition), Offset.Unset)
+ });
+ }
+ }
private class TypedTaskDeliveryHandlerShim : TaskCompletionSource>, IDeliveryHandler
{
@@ -1054,7 +1096,7 @@ public void BeginTransaction()
///
public void CommitTransaction(TimeSpan timeout)
- => KafkaHandle.CommitTransaction(timeout.TotalMillisecondsAsInt());
+ => KafkaHandle.CommitTransaction(timeout.TotalMillisecondsAsInt());
///
public void CommitTransaction()
diff --git a/src/Confluent.Kafka/ProducerBuilder.cs b/src/Confluent.Kafka/ProducerBuilder.cs
index e2a3b488b..b67fd2eb8 100644
--- a/src/Confluent.Kafka/ProducerBuilder.cs
+++ b/src/Confluent.Kafka/ProducerBuilder.cs
@@ -73,6 +73,11 @@ public class ProducerBuilder
/// The configured log handler.
///
internal protected Action, LogMessage> LogHandler { get; set; }
+
+ ///
+ /// The configured alloc-free delivery handler.
+ ///
+ internal protected Experimental.AllocFreeDeliveryHandler AllocFreeDeliveryHandler { get; set; }
///
/// The configured statistics handler.
@@ -125,6 +130,7 @@ internal Producer.Config ConstructBaseConfig(Producer
logHandler = this.LogHandler == null
? default(Action)
: logMessage => this.LogHandler(producer, logMessage),
+ AllocFreeAllocFreeDeliveryHandler = this.AllocFreeDeliveryHandler,
statisticsHandler = this.StatisticsHandler == null
? default(Action)
: stats => this.StatisticsHandler(producer, stats),
@@ -259,6 +265,19 @@ public ProducerBuilder SetLogHandler(Action
+ /// Set a custom alloc-free delivery handler
+ ///
+ public ProducerBuilder SetAllocFreeDeliveryHandler(Experimental.AllocFreeDeliveryHandler allocFreeDeliveryHandler)
+ {
+ if (this.AllocFreeDeliveryHandler != null)
+ {
+ throw new ArgumentException("Alloc-free delivery handler may only be specified once");
+ }
+ this.AllocFreeDeliveryHandler = allocFreeDeliveryHandler;
+ return this;
+ }
+
///
/// Set SASL/OAUTHBEARER token refresh callback in provided
/// conf object. The SASL/OAUTHBEARER token refresh callback
diff --git a/test/Confluent.Kafka.Benchmark/BenchmarkConsumer.cs b/test/Confluent.Kafka.Benchmark/BenchmarkConsumer.cs
index 45fe3a615..d545ccc13 100644
--- a/test/Confluent.Kafka.Benchmark/BenchmarkConsumer.cs
+++ b/test/Confluent.Kafka.Benchmark/BenchmarkConsumer.cs
@@ -72,5 +72,55 @@ record = consumer.Consume(TimeSpan.FromSeconds(1));
public static void Consume(string bootstrapServers, string topic, string group, long firstMessageOffset, int nMessages, int nHeaders, int nTests, string username, string password)
=> BenchmarkConsumerImpl(bootstrapServers, topic, group, firstMessageOffset, nMessages, nTests, nHeaders, username, password);
+
+ public static void ConsumeAllocFree(string bootstrapServers, string topic, string group,
+ long firstMessageOffset, int nMessages, int nHeaders, int nTests, string username, string password)
+ {
+ var consumerConfig = new ConsumerConfig
+ {
+ GroupId = group,
+ BootstrapServers = bootstrapServers,
+ SessionTimeoutMs = 6000,
+ ConsumeResultFields = nHeaders == 0 ? "none" : "headers",
+ QueuedMinMessages = 1000000,
+ SaslUsername = username,
+ SaslPassword = password,
+ SecurityProtocol = username == null ? SecurityProtocol.Plaintext : SecurityProtocol.SaslSsl,
+ SaslMechanism = SaslMechanism.Plain
+ };
+
+ using (var consumer = new ConsumerBuilder(consumerConfig).Build())
+ {
+ for (var j=0; j() { new TopicPartitionOffset(topic, 0, firstMessageOffset) });
+
+ var cnt = 0;
+ Experimental.AllocFreeConsumeCallback callback = (in Experimental.MessageReader mr) =>
+ {
+ if (mr.ErrorCode != ErrorCode.NoError)
+ {
+ cnt += 1;
+ }
+ };
+
+ // consume 1 message before starting the timer to avoid including potential one-off delays.
+ consumer.ConsumeWithCallback(1000, callback);
+
+ long startTime = DateTime.Now.Ticks;
+ while (cnt < nMessages-1)
+ {
+ consumer.ConsumeWithCallback(1000, callback);
+ }
+
+ var duration = DateTime.Now.Ticks - startTime;
+
+ Console.WriteLine($"Consumed (alloc-free) {nMessages-1} messages in {duration/10000.0:F0}ms");
+ Console.WriteLine($"{(nMessages-1) / (duration/10000.0):F0}k msg/s");
+ }
+ }
+ }
}
}
diff --git a/test/Confluent.Kafka.Benchmark/BenchmarkProducer.cs b/test/Confluent.Kafka.Benchmark/BenchmarkProducer.cs
index eb2a9a736..4fda3d24d 100644
--- a/test/Confluent.Kafka.Benchmark/BenchmarkProducer.cs
+++ b/test/Confluent.Kafka.Benchmark/BenchmarkProducer.cs
@@ -180,5 +180,116 @@ public static long TaskProduce(string bootstrapServers, string topic, int nMessa
///
public static long DeliveryHandlerProduce(string bootstrapServers, string topic, int nMessages, int msgSize, int nHeaders, int nTests, string username, string password)
=> BenchmarkProducerImpl(bootstrapServers, topic, nMessages, msgSize, nTests, nHeaders, true, username, password);
+
+ public static long BenchmarkAllocFree(
+ string bootstrapServers,
+ string topic,
+ int nMessages,
+ int msgSize,
+ int nTests,
+ int nHeaders,
+ string username,
+ string password)
+ {
+ // mirrors the librdkafka performance test example.
+ var config = new ProducerConfig
+ {
+ BootstrapServers = bootstrapServers,
+ QueueBufferingMaxMessages = 2000000,
+ MessageSendMaxRetries = 3,
+ RetryBackoffMs = 500 ,
+ LingerMs = 100,
+ SaslUsername = username,
+ SaslPassword = password,
+ SecurityProtocol = username == null ? SecurityProtocol.Plaintext : SecurityProtocol.SaslSsl,
+ SaslMechanism = SaslMechanism.Plain
+ };
+
+ DeliveryResult firstDeliveryReport = null;
+
+ Headers headers = null;
+ if (nHeaders > 0)
+ {
+ headers = new Headers();
+ for (int i=0; i
+ {
+ if (mr.ErrorCode != ErrorCode.NoError)
+ {
+ // Not interested in benchmark results in the (unlikely) event there is an error.
+ Console.WriteLine($"A error occured producing a message: {mr.ErrorCode}");
+ Environment.Exit(1); // note: exceptions do not currently propagate to calling code from a deliveryHandler method.
+ }
+
+ if (--msgCount == 0)
+ {
+ autoEvent.Set();
+ }
+ };
+
+
+ using (var producer = new ProducerBuilder(config).SetAllocFreeDeliveryHandler(allocFreeDeliveryHandler).Build())
+ {
+ for (var j=0; j ++cnt).ToArray();
+
+ // this avoids including connection setup, topic creation time, etc.. in result.
+ firstDeliveryReport = producer.ProduceAsync(topic, new Message { Value = val, Headers = headers }).Result;
+
+ var startTime = DateTime.Now.Ticks;
+
+ for (int i = 0; i < nMessages; i += 1)
+ {
+ try
+ {
+ producer.Produce(topic, val.AsSpan(), ReadOnlySpan.Empty, Timestamp.Default, Partition.Any, null);
+ }
+ catch (ProduceException ex)
+ {
+ if (ex.Error.Code == ErrorCode.Local_QueueFull)
+ {
+ producer.Poll(TimeSpan.FromSeconds(1));
+ i -= 1;
+ }
+ else
+ {
+ throw;
+ }
+ }
+ }
+
+ while (true)
+ {
+ if (autoEvent.WaitOne(TimeSpan.FromSeconds(1)))
+ {
+ break;
+ }
+ Console.WriteLine(msgCount);
+ }
+
+
+ var duration = DateTime.Now.Ticks - startTime;
+
+ Console.WriteLine($"Produced {nMessages} messages in {duration/10000.0:F0}ms");
+ Console.WriteLine($"{nMessages / (duration/10000.0):F0}k msg/s");
+ }
+
+ producer.Flush(TimeSpan.FromSeconds(10));
+ }
+
+ return firstDeliveryReport.Offset;
+ }
}
}
diff --git a/test/Confluent.Kafka.Benchmark/Program.cs b/test/Confluent.Kafka.Benchmark/Program.cs
index d65a09429..edcd4e480 100644
--- a/test/Confluent.Kafka.Benchmark/Program.cs
+++ b/test/Confluent.Kafka.Benchmark/Program.cs
@@ -121,8 +121,10 @@ public static void Main(string[] args)
{
const int NUMBER_OF_TESTS = 1;
BenchmarkProducer.TaskProduce(bootstrapServers, topicName, numberOfMessages, messageSize, headerCount, NUMBER_OF_TESTS, username, password);
+ BenchmarkProducer.BenchmarkAllocFree(bootstrapServers, topicName, numberOfMessages, messageSize, headerCount, NUMBER_OF_TESTS, username, password);
var firstMessageOffset = BenchmarkProducer.DeliveryHandlerProduce(bootstrapServers, topicName, numberOfMessages, messageSize, headerCount, NUMBER_OF_TESTS, username, password);
BenchmarkConsumer.Consume(bootstrapServers, topicName, group, firstMessageOffset, numberOfMessages, headerCount, NUMBER_OF_TESTS, username, password);
+ BenchmarkConsumer.ConsumeAllocFree(bootstrapServers, topicName, group, firstMessageOffset, numberOfMessages, headerCount, NUMBER_OF_TESTS, username, password);
}
else if (mode == "latency")
{