diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 000000000..19928e17b --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,85 @@ +name: 'confluent-kafka-dotnet build pipeline' + +env: + DOTNET_CLI_TELEMETRY_OPTOUT: 'true' + +on: + push: + pull_request: + +jobs: + + build-test: + strategy: + matrix: + os: [ubuntu-latest, windows-latest, macos-13] # macos-13 for x86_x64 arch + runs-on: ${{ matrix.os }} + steps: + - uses: actions/checkout@v4 + - name: Setup .NET + uses: actions/setup-dotnet@v4 + with: + dotnet-version: | + 6.0.x + 8.0.x + - name: Build and test + run: | + dotnet nuget add source --username user --password ${{ github.token }} --store-password-in-clear-text --name github https://nuget.pkg.github.com/${{ github.repository_owner }}/index.json + dotnet restore + dotnet test -c Release test/Confluent.Kafka.UnitTests/Confluent.Kafka.UnitTests.csproj + + package: + needs: [build-test] + runs-on: windows-latest + steps: + + - name: Show default environment variables + run: | + echo "The job_id is: $GITHUB_JOB" # reference the default environment variables + echo "The id of this action is: $GITHUB_ACTION" # reference the default environment variables + echo "The run id is: $GITHUB_RUN_ID" + echo "The GitHub Actor's username is: $GITHUB_ACTOR" + echo "GitHub SHA: $GITHUB_SHA" + - uses: actions/checkout@v4 + - name: Setup .NET + uses: actions/setup-dotnet@v4 + with: + dotnet-version: | + 6.0.x + 8.0.x + - name: Build and create packages + run: | + dotnet restore + dotnet build Confluent.Kafka.sln -c Release + + # Different packaging for tagged vs untagged builds + if ($env:GITHUB_REF -match '^refs/tags/') { + $suffix = "gr-experimental" + } else { + $suffix = "ci-$env:GITHUB_RUN_ID" + } + + dotnet pack src/Confluent.Kafka/Confluent.Kafka.csproj --output dist -c Release --version-suffix $suffix + + - name: Upload artifacts + uses: actions/upload-artifact@v4 + with: + name: build-artifacts + path: dist/ + + # Publish NuGet packages when a tag is pushed. + # Tests need to succeed for all components and on all platforms first, + # including having a tag name that matches the version number. + publish-release: + if: ${{ startsWith(github.ref, 'refs/tags/v') }} + needs: package + runs-on: ubuntu-latest + steps: + - name: Download NuGet package artifacts + uses: actions/download-artifact@v4 + with: + name: build-artifacts + path: dist + - name: Publish to NuGet + run: | + dotnet nuget push "dist/Confluent.Kafka*.nupkg" --source https://nuget.pkg.github.com/${{ github.repository_owner }}/index.json --api-key ${{ github.token }} diff --git a/src/Confluent.Kafka/Confluent.Kafka.csproj b/src/Confluent.Kafka/Confluent.Kafka.csproj index 984bfcede..a226d88ce 100644 --- a/src/Confluent.Kafka/Confluent.Kafka.csproj +++ b/src/Confluent.Kafka/Confluent.Kafka.csproj @@ -25,7 +25,7 @@ - + None diff --git a/src/Confluent.Kafka/Consumer.cs b/src/Confluent.Kafka/Consumer.cs index e75a96ee9..a1003a4bd 100644 --- a/src/Confluent.Kafka/Consumer.cs +++ b/src/Confluent.Kafka/Consumer.cs @@ -999,6 +999,58 @@ public ConsumeResult Consume(int millisecondsTimeout) public ConsumeResult Consume(TimeSpan timeout) => Consume(timeout.TotalMillisecondsAsInt()); + /// + /// Consumes a message and invokes the callback with an allocation-free reader. + /// Returns true if a message was consumed, false if timeout occurred. + /// + /// Timeout in milliseconds + /// Callback to invoke with the message reader + /// True if a message was consumed, false if timeout occurred + /// Thrown when consumption fails + public unsafe bool ConsumeWithCallback(int millisecondsTimeout, Experimental.AllocFreeConsumeCallback callback) + { + if (callback == null) + throw new ArgumentNullException(nameof(callback)); + + var msgPtr = kafkaHandle.ConsumerPoll((IntPtr)millisecondsTimeout); + + if (this.handlerException != null) + { + var ex = this.handlerException; + this.handlerException = null; + if (msgPtr != IntPtr.Zero) + { + Librdkafka.message_destroy(msgPtr); + } + + throw ex; + } + + if (msgPtr == IntPtr.Zero) + { + return false; // Timeout occurred + } + + try + { + var msg = (rd_kafka_message*)msgPtr; + + // Handle message errors before invoking callback + if (msg->err != ErrorCode.NoError && msg->err != ErrorCode.Local_PartitionEOF) + { + throw new KafkaException(msg->err); + } + + // Create the reader and invoke the callback + var reader = new Experimental.MessageReader(msg); + callback(in reader); + return true; + } + finally + { + Librdkafka.message_destroy(msgPtr); + } + } /// public IConsumerGroupMetadata ConsumerGroupMetadata diff --git a/src/Confluent.Kafka/Experimental.cs b/src/Confluent.Kafka/Experimental.cs new file mode 100644 index 000000000..17547a116 --- /dev/null +++ b/src/Confluent.Kafka/Experimental.cs @@ -0,0 +1,135 @@ +using System; +using Confluent.Kafka.Impl; +using Confluent.Kafka.Internal; + +namespace Confluent.Kafka; + +/// +/// Experimental alloc-free APIs. +/// Note that these APIs can be changed substantially or removed entirely in the future versions. +/// +public abstract class Experimental +{ + /// + /// Callback delegate for allocation-free message consumption. + /// + /// The key type + /// The value type + /// Message reader providing allocation-free access to message data + public delegate void AllocFreeConsumeCallback(in MessageReader reader); + + /// + /// Alloc-free delivery handler + /// + public delegate void AllocFreeDeliveryHandler(in MessageReader arg); + + /// + /// Allocation-free message reader that provides access to consumed message data. + /// This is a ref struct to ensure stack allocation and prevent escaping the callback scope. + /// + public readonly unsafe ref struct MessageReader + { + private readonly rd_kafka_message* msg; + private readonly Lazy _hdrsPtr; + + internal MessageReader( + rd_kafka_message* msg) + { + this.msg = msg; + _hdrsPtr = new Lazy(() => + { + var ptr = (IntPtr)msg; + IntPtr hdrsPtr; + Librdkafka.message_headers(ptr, out hdrsPtr); + return hdrsPtr; + }); + } + + /// Gets the topic name + public string Topic => + msg->rkt != IntPtr.Zero + ? Util.Marshal.PtrToStringUTF8(Librdkafka.topic_name(msg->rkt)) + : null; + + /// Gets the partition + public Partition Partition => msg->partition; + + /// Gets the error code + public ErrorCode ErrorCode => msg->err; + + /// Gets the offset + public Offset Offset => msg->offset; + + /// Gets the leader epoch + public int? LeaderEpoch => + msg->rkt != IntPtr.Zero && msg->offset != Offset.Unset + ? Librdkafka.message_leader_epoch((IntPtr)msg) + : null; + + /// Gets the timestamp + public Timestamp Timestamp + { + get + { + var timestampUnix = Librdkafka.message_timestamp((IntPtr)msg, out var timestampType); + return new Timestamp(timestampUnix, (TimestampType)timestampType); + } + } + + /// Gets whether this indicates partition EOF + public bool IsPartitionEOF => msg->err == ErrorCode.Local_PartitionEOF; + + /// + /// Gets the raw key data as a ReadOnlySpan without allocation + /// + public ReadOnlySpan KeySpan => + msg->key == IntPtr.Zero + ? ReadOnlySpan.Empty + : new ReadOnlySpan(msg->key.ToPointer(), (int)msg->key_len); + + /// + /// Gets the raw value data as a ReadOnlySpan without allocation + /// + public ReadOnlySpan ValueSpan => + msg->val == IntPtr.Zero + ? ReadOnlySpan.Empty + : new ReadOnlySpan(msg->val.ToPointer(), (int)msg->len); + + /// + /// Gets a header by index without allocation + /// + /// Zero-based header index + /// Header name (allocated string) + /// Header value as ReadOnlySpan + /// True if header exists at the given index + public bool TryGetHeader(int index, out string name, out ReadOnlySpan value) + { + name = null; + value = ReadOnlySpan.Empty; + + var hdrsPtr = _hdrsPtr.Value; + if (hdrsPtr == IntPtr.Zero) + return false; + + var err = Librdkafka.header_get_all( + hdrsPtr, + (IntPtr)index, + out IntPtr namep, + out IntPtr valuep, + out IntPtr sizep + ); + + if (err != ErrorCode.NoError) + return false; + + name = Util.Marshal.PtrToStringUTF8(namep); + + if (valuep != IntPtr.Zero) + { + value = new ReadOnlySpan(valuep.ToPointer(), (int)sizep); + } + + return true; + } + } +} \ No newline at end of file diff --git a/src/Confluent.Kafka/IConsumer.cs b/src/Confluent.Kafka/IConsumer.cs index 38c3a5a71..caa7168ed 100644 --- a/src/Confluent.Kafka/IConsumer.cs +++ b/src/Confluent.Kafka/IConsumer.cs @@ -110,6 +110,7 @@ public interface IConsumer : IClient /// ConsumeResult Consume(TimeSpan timeout); + bool ConsumeWithCallback(int millisecondsTimeout, Experimental.AllocFreeConsumeCallback callback); /// /// Gets the (dynamic) group member id of diff --git a/src/Confluent.Kafka/IProducer.cs b/src/Confluent.Kafka/IProducer.cs index cb3501cdc..aedbaa391 100644 --- a/src/Confluent.Kafka/IProducer.cs +++ b/src/Confluent.Kafka/IProducer.cs @@ -182,6 +182,55 @@ void Produce( Message message, Action> deliveryHandler = null); + /// + /// Asynchronously send a single message to a + /// Kafka topic partition. + /// + /// + /// The topic partition to produce + /// the message to. + /// + /// + /// A delegate that will be called + /// with a delivery report corresponding to the + /// produce request (if enabled). + /// + /// + /// Value bytes to produce + /// + /// + /// Key bytes to produce + /// + /// + /// Timestamp of the message to be produced + /// + /// + /// Optional headers of the message to be produced + /// + /// + /// Thrown in response to any error that is known + /// immediately (excluding user application logic errors), + /// for example ErrorCode.Local_QueueFull. Asynchronous + /// notification of unsuccessful produce requests is made + /// available via the + /// parameter (if specified). The Error property of the + /// exception / delivery report provides more detailed + /// information. + /// + /// + /// Thrown in response to invalid argument values. + /// + /// + /// Thrown in response to error conditions that reflect + /// an error in the application logic of the calling + /// application. + /// + void Produce( + TopicPartition topicPartition, + ReadOnlySpan val, + ReadOnlySpan key, + Timestamp timestamp, + Headers headers); /// /// Poll for callback events. diff --git a/src/Confluent.Kafka/Impl/SafeKafkaHandle.cs b/src/Confluent.Kafka/Impl/SafeKafkaHandle.cs index 60d0e34ba..7a9c8ce88 100644 --- a/src/Confluent.Kafka/Impl/SafeKafkaHandle.cs +++ b/src/Confluent.Kafka/Impl/SafeKafkaHandle.cs @@ -359,69 +359,43 @@ private IntPtr marshalHeaders(IReadOnlyList headers) internal ErrorCode Produce( string topic, - byte[] val, int valOffset, int valLength, - byte[] key, int keyOffset, int keyLength, + ReadOnlySpan val, + ReadOnlySpan key, int partition, long timestamp, IReadOnlyList headers, IntPtr opaque) { - var pValue = IntPtr.Zero; - var pKey = IntPtr.Zero; - - var gchValue = default(GCHandle); - var gchKey = default(GCHandle); - - if (val == null) - { - if (valOffset != 0 || valLength != 0) - { - throw new ArgumentException("valOffset and valLength parameters must be 0 when producing null values."); - } - } - else - { - gchValue = GCHandle.Alloc(val, GCHandleType.Pinned); - pValue = Marshal.UnsafeAddrOfPinnedArrayElement(val, valOffset); - } - - if (key == null) - { - if (keyOffset != 0 || keyLength != 0) - { - throw new ArgumentException("keyOffset and keyLength parameters must be 0 when producing null key values."); - } - } - else - { - gchKey = GCHandle.Alloc(key, GCHandleType.Pinned); - pKey = Marshal.UnsafeAddrOfPinnedArrayElement(key, keyOffset); - } - IntPtr headersPtr = marshalHeaders(headers); try { - var errorCode = Librdkafka.produceva( - handle, - topic, - partition, - (IntPtr)MsgFlags.MSG_F_COPY, - pValue, (UIntPtr)valLength, - pKey, (UIntPtr)keyLength, - timestamp, - headersPtr, - opaque); - - if (errorCode != ErrorCode.NoError) + unsafe { - if (headersPtr != IntPtr.Zero) + fixed (byte* valPtr = val, keyPtr = key) { - Librdkafka.headers_destroy(headersPtr); + var errorCode = Librdkafka.produceva( + handle, + topic, + partition, + (IntPtr)MsgFlags.MSG_F_COPY, + (IntPtr)valPtr, (UIntPtr)val.Length, + (IntPtr)keyPtr, (UIntPtr)key.Length, + timestamp, + headersPtr, + opaque); + + if (errorCode != ErrorCode.NoError) + { + if (headersPtr != IntPtr.Zero) + { + Librdkafka.headers_destroy(headersPtr); + } + } + + return errorCode; } } - - return errorCode; } catch { @@ -431,18 +405,6 @@ internal ErrorCode Produce( } throw; } - finally - { - if (val != null) - { - gchValue.Free(); - } - - if (key != null) - { - gchKey.Free(); - } - } } private static int[] MarshalCopy(IntPtr source, int length) diff --git a/src/Confluent.Kafka/Producer.cs b/src/Confluent.Kafka/Producer.cs index 79867514b..e9cc635ea 100644 --- a/src/Confluent.Kafka/Producer.cs +++ b/src/Confluent.Kafka/Producer.cs @@ -36,6 +36,7 @@ internal class Config public IEnumerable> config; public Action errorHandler; public Action logHandler; + public Experimental.AllocFreeDeliveryHandler AllocFreeAllocFreeDeliveryHandler; public Action statisticsHandler; public Action oAuthBearerTokenRefreshHandler; public Dictionary partitioners; @@ -177,6 +178,7 @@ private void OAuthBearerTokenRefreshCallback(IntPtr rk, IntPtr oauthbearer_confi private Action logHandler; + private Experimental.AllocFreeDeliveryHandler allocFreeAllocFreeDeliveryHandler; private object loggerLockObj = new object(); private Librdkafka.LogDelegate logCallbackDelegate; private void LogCallback(IntPtr rk, SyslogLevel level, string fac, string buf) @@ -196,30 +198,38 @@ private void LogCallback(IntPtr rk, SyslogLevel level, string fac, string buf) private Librdkafka.DeliveryReportDelegate DeliveryReportCallback; - private void DeliveryReportCallbackImpl(IntPtr rk, IntPtr rkmessage, IntPtr opaque) - { + private unsafe void DeliveryReportCallbackImpl(IntPtr rk, IntPtr rkmessage, IntPtr opaque) + { // Ensure registered handlers are never called as a side-effect of Dispose/Finalize (prevents deadlocks in common scenarios). if (ownedKafkaHandle.IsClosed) { return; } try { - var msg = Util.Marshal.PtrToStructure(rkmessage); + var msg = (rd_kafka_message*)rkmessage; + + if (allocFreeAllocFreeDeliveryHandler != null) + { + // Create the reader and invoke the callback + var reader = new Experimental.MessageReader(msg); + allocFreeAllocFreeDeliveryHandler.Invoke(reader); + return; + } // the msg._private property has dual purpose. Here, it is an opaque pointer set // by Topic.Produce to be an IDeliveryHandler. When Consuming, it's for internal // use (hence the name). - if (msg._private == IntPtr.Zero) + if (msg->_private == IntPtr.Zero) { // Note: this can occur if the ProduceAsync overload that accepts a DeliveryHandler // was used and the delivery handler was set to null. return; } - var gch = GCHandle.FromIntPtr(msg._private); + var gch = GCHandle.FromIntPtr(msg->_private); var deliveryHandler = (IDeliveryHandler) gch.Target; gch.Free(); - Headers headers = null; + Headers headers = null; if (this.enableDeliveryReportHeaders) { headers = new Headers(); @@ -258,14 +268,14 @@ private void DeliveryReportCallbackImpl(IntPtr rk, IntPtr rkmessage, IntPtr opaq messageStatus = Librdkafka.message_status(rkmessage); } - deliveryHandler.HandleDeliveryReport( + deliveryHandler?.HandleDeliveryReport( new DeliveryReport { // Topic is not set here in order to avoid the marshalling cost. // Instead, the delivery handler is expected to cache the topic string. - Partition = msg.partition, - Offset = msg.offset, - Error = KafkaHandle.CreatePossiblyFatalError(msg.err, null), + Partition = msg->partition, + Offset = msg->offset, + Error = KafkaHandle.CreatePossiblyFatalError(msg->err, null), Status = messageStatus, Message = new Message { Timestamp = new Timestamp(timestamp, (TimestampType)timestampType), Headers = headers } } @@ -279,8 +289,8 @@ private void DeliveryReportCallbackImpl(IntPtr rk, IntPtr rkmessage, IntPtr opaq private void ProduceImpl( string topic, - byte[] val, int valOffset, int valLength, - byte[] key, int keyOffset, int keyLength, + ReadOnlySpan val, + ReadOnlySpan key, Timestamp timestamp, Partition partition, IReadOnlyList headers, @@ -308,8 +318,8 @@ private void ProduceImpl( err = KafkaHandle.Produce( topic, - val, valOffset, valLength, - key, keyOffset, keyLength, + val, + key, partition.Value, timestamp.UnixTimestampMs, headers, @@ -325,8 +335,8 @@ private void ProduceImpl( { err = KafkaHandle.Produce( topic, - val, valOffset, valLength, - key, keyOffset, keyLength, + val, + key, partition.Value, timestamp.UnixTimestampMs, headers, @@ -399,8 +409,8 @@ public void Flush(CancellationToken cancellationToken) throw new OperationCanceledException(); } } - } - + } + /// public void Dispose() @@ -578,6 +588,7 @@ internal Producer(ProducerBuilder builder) this.statisticsHandler = baseConfig.statisticsHandler; this.logHandler = baseConfig.logHandler; + this.allocFreeAllocFreeDeliveryHandler = baseConfig.AllocFreeAllocFreeDeliveryHandler; this.errorHandler = baseConfig.errorHandler; this.oAuthBearerTokenRefreshHandler = baseConfig.oAuthBearerTokenRefreshHandler; @@ -805,8 +816,8 @@ public async Task> ProduceAsync( ProduceImpl( topicPartition.Topic, - valBytes, 0, valBytes == null ? 0 : valBytes.Length, - keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length, + new ReadOnlySpan(valBytes, 0, valBytes?.Length ?? 0), + new ReadOnlySpan(keyBytes, 0, keyBytes?.Length ?? 0), message.Timestamp, topicPartition.Partition, headers.BackingList, handler); @@ -816,8 +827,8 @@ public async Task> ProduceAsync( { ProduceImpl( topicPartition.Topic, - valBytes, 0, valBytes == null ? 0 : valBytes.Length, - keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length, + new ReadOnlySpan(valBytes, 0, valBytes?.Length ?? 0), + new ReadOnlySpan(keyBytes, 0, keyBytes?.Length ?? 0), message.Timestamp, topicPartition.Partition, headers.BackingList, null); @@ -915,8 +926,8 @@ public void Produce( { ProduceImpl( topicPartition.Topic, - valBytes, 0, valBytes == null ? 0 : valBytes.Length, - keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length, + new ReadOnlySpan(valBytes, 0, valBytes?.Length ?? 0), + new ReadOnlySpan(keyBytes, 0, keyBytes?.Length ?? 0), message.Timestamp, topicPartition.Partition, headers.BackingList, deliveryHandler == null @@ -931,13 +942,43 @@ public void Produce( { throw new ProduceException( ex.Error, - new DeliveryReport + new DeliveryReport { Message = message, TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset) }); } } + + /// + public void Produce( + TopicPartition topicPartition, + ReadOnlySpan val, + ReadOnlySpan key, + Timestamp timestamp, + Headers headers) + { + try + { + ProduceImpl( + topicPartition.Topic, + val, + key, + timestamp, + topicPartition.Partition, + headers.BackingList, + null); + } + catch (KafkaException ex) + { + throw new ProduceException( + ex.Error, + new DeliveryReport + { + TopicPartitionOffset = new TopicPartitionOffset(topicPartition, Offset.Unset) + }); + } + } private class TypedTaskDeliveryHandlerShim : TaskCompletionSource>, IDeliveryHandler { @@ -1054,7 +1095,7 @@ public void BeginTransaction() /// public void CommitTransaction(TimeSpan timeout) - => KafkaHandle.CommitTransaction(timeout.TotalMillisecondsAsInt()); + => KafkaHandle.CommitTransaction(timeout.TotalMillisecondsAsInt()); /// public void CommitTransaction() diff --git a/src/Confluent.Kafka/ProducerBuilder.cs b/src/Confluent.Kafka/ProducerBuilder.cs index e2a3b488b..b67fd2eb8 100644 --- a/src/Confluent.Kafka/ProducerBuilder.cs +++ b/src/Confluent.Kafka/ProducerBuilder.cs @@ -73,6 +73,11 @@ public class ProducerBuilder /// The configured log handler. /// internal protected Action, LogMessage> LogHandler { get; set; } + + /// + /// The configured alloc-free delivery handler. + /// + internal protected Experimental.AllocFreeDeliveryHandler AllocFreeDeliveryHandler { get; set; } /// /// The configured statistics handler. @@ -125,6 +130,7 @@ internal Producer.Config ConstructBaseConfig(Producer logHandler = this.LogHandler == null ? default(Action) : logMessage => this.LogHandler(producer, logMessage), + AllocFreeAllocFreeDeliveryHandler = this.AllocFreeDeliveryHandler, statisticsHandler = this.StatisticsHandler == null ? default(Action) : stats => this.StatisticsHandler(producer, stats), @@ -259,6 +265,19 @@ public ProducerBuilder SetLogHandler(Action + /// Set a custom alloc-free delivery handler + /// + public ProducerBuilder SetAllocFreeDeliveryHandler(Experimental.AllocFreeDeliveryHandler allocFreeDeliveryHandler) + { + if (this.AllocFreeDeliveryHandler != null) + { + throw new ArgumentException("Alloc-free delivery handler may only be specified once"); + } + this.AllocFreeDeliveryHandler = allocFreeDeliveryHandler; + return this; + } + /// /// Set SASL/OAUTHBEARER token refresh callback in provided /// conf object. The SASL/OAUTHBEARER token refresh callback diff --git a/test/Confluent.Kafka.Benchmark/BenchmarkConsumer.cs b/test/Confluent.Kafka.Benchmark/BenchmarkConsumer.cs index 45fe3a615..d545ccc13 100644 --- a/test/Confluent.Kafka.Benchmark/BenchmarkConsumer.cs +++ b/test/Confluent.Kafka.Benchmark/BenchmarkConsumer.cs @@ -72,5 +72,55 @@ record = consumer.Consume(TimeSpan.FromSeconds(1)); public static void Consume(string bootstrapServers, string topic, string group, long firstMessageOffset, int nMessages, int nHeaders, int nTests, string username, string password) => BenchmarkConsumerImpl(bootstrapServers, topic, group, firstMessageOffset, nMessages, nTests, nHeaders, username, password); + + public static void ConsumeAllocFree(string bootstrapServers, string topic, string group, + long firstMessageOffset, int nMessages, int nHeaders, int nTests, string username, string password) + { + var consumerConfig = new ConsumerConfig + { + GroupId = group, + BootstrapServers = bootstrapServers, + SessionTimeoutMs = 6000, + ConsumeResultFields = nHeaders == 0 ? "none" : "headers", + QueuedMinMessages = 1000000, + SaslUsername = username, + SaslPassword = password, + SecurityProtocol = username == null ? SecurityProtocol.Plaintext : SecurityProtocol.SaslSsl, + SaslMechanism = SaslMechanism.Plain + }; + + using (var consumer = new ConsumerBuilder(consumerConfig).Build()) + { + for (var j=0; j() { new TopicPartitionOffset(topic, 0, firstMessageOffset) }); + + var cnt = 0; + Experimental.AllocFreeConsumeCallback callback = (in Experimental.MessageReader mr) => + { + if (mr.ErrorCode != ErrorCode.NoError) + { + cnt += 1; + } + }; + + // consume 1 message before starting the timer to avoid including potential one-off delays. + consumer.ConsumeWithCallback(1000, callback); + + long startTime = DateTime.Now.Ticks; + while (cnt < nMessages-1) + { + consumer.ConsumeWithCallback(1000, callback); + } + + var duration = DateTime.Now.Ticks - startTime; + + Console.WriteLine($"Consumed (alloc-free) {nMessages-1} messages in {duration/10000.0:F0}ms"); + Console.WriteLine($"{(nMessages-1) / (duration/10000.0):F0}k msg/s"); + } + } + } } } diff --git a/test/Confluent.Kafka.Benchmark/BenchmarkProducer.cs b/test/Confluent.Kafka.Benchmark/BenchmarkProducer.cs index eb2a9a736..17f7b9b2a 100644 --- a/test/Confluent.Kafka.Benchmark/BenchmarkProducer.cs +++ b/test/Confluent.Kafka.Benchmark/BenchmarkProducer.cs @@ -180,5 +180,117 @@ public static long TaskProduce(string bootstrapServers, string topic, int nMessa /// public static long DeliveryHandlerProduce(string bootstrapServers, string topic, int nMessages, int msgSize, int nHeaders, int nTests, string username, string password) => BenchmarkProducerImpl(bootstrapServers, topic, nMessages, msgSize, nTests, nHeaders, true, username, password); + + public static long BenchmarkAllocFree( + string bootstrapServers, + string topic, + int nMessages, + int msgSize, + int nTests, + int nHeaders, + string username, + string password) + { + // mirrors the librdkafka performance test example. + var config = new ProducerConfig + { + BootstrapServers = bootstrapServers, + QueueBufferingMaxMessages = 2000000, + MessageSendMaxRetries = 3, + RetryBackoffMs = 500 , + LingerMs = 100, + SaslUsername = username, + SaslPassword = password, + SecurityProtocol = username == null ? SecurityProtocol.Plaintext : SecurityProtocol.SaslSsl, + SaslMechanism = SaslMechanism.Plain + }; + + DeliveryResult firstDeliveryReport = null; + + Headers headers = null; + if (nHeaders > 0) + { + headers = new Headers(); + for (int i=0; i + { + if (mr.ErrorCode != ErrorCode.NoError) + { + // Not interested in benchmark results in the (unlikely) event there is an error. + Console.WriteLine($"A error occured producing a message: {mr.ErrorCode}"); + Environment.Exit(1); // note: exceptions do not currently propagate to calling code from a deliveryHandler method. + } + + if (--msgCount == 0) + { + autoEvent.Set(); + } + }; + + + using (var producer = new ProducerBuilder(config).SetAllocFreeDeliveryHandler(allocFreeDeliveryHandler).Build()) + { + var topicPartition = new TopicPartition(topic, Partition.Any); + for (var j=0; j ++cnt).ToArray(); + + // this avoids including connection setup, topic creation time, etc.. in result. + firstDeliveryReport = producer.ProduceAsync(topic, new Message { Value = val, Headers = headers }).Result; + + var startTime = DateTime.Now.Ticks; + + for (int i = 0; i < nMessages; i += 1) + { + try + { + producer.Produce(topicPartition, val.AsSpan(), ReadOnlySpan.Empty, Timestamp.Default, null); + } + catch (ProduceException ex) + { + if (ex.Error.Code == ErrorCode.Local_QueueFull) + { + producer.Poll(TimeSpan.FromSeconds(1)); + i -= 1; + } + else + { + throw; + } + } + } + + while (true) + { + if (autoEvent.WaitOne(TimeSpan.FromSeconds(1))) + { + break; + } + Console.WriteLine(msgCount); + } + + + var duration = DateTime.Now.Ticks - startTime; + + Console.WriteLine($"Produced {nMessages} messages in {duration/10000.0:F0}ms"); + Console.WriteLine($"{nMessages / (duration/10000.0):F0}k msg/s"); + } + + producer.Flush(TimeSpan.FromSeconds(10)); + } + + return firstDeliveryReport.Offset; + } } } diff --git a/test/Confluent.Kafka.Benchmark/Program.cs b/test/Confluent.Kafka.Benchmark/Program.cs index d65a09429..edcd4e480 100644 --- a/test/Confluent.Kafka.Benchmark/Program.cs +++ b/test/Confluent.Kafka.Benchmark/Program.cs @@ -121,8 +121,10 @@ public static void Main(string[] args) { const int NUMBER_OF_TESTS = 1; BenchmarkProducer.TaskProduce(bootstrapServers, topicName, numberOfMessages, messageSize, headerCount, NUMBER_OF_TESTS, username, password); + BenchmarkProducer.BenchmarkAllocFree(bootstrapServers, topicName, numberOfMessages, messageSize, headerCount, NUMBER_OF_TESTS, username, password); var firstMessageOffset = BenchmarkProducer.DeliveryHandlerProduce(bootstrapServers, topicName, numberOfMessages, messageSize, headerCount, NUMBER_OF_TESTS, username, password); BenchmarkConsumer.Consume(bootstrapServers, topicName, group, firstMessageOffset, numberOfMessages, headerCount, NUMBER_OF_TESTS, username, password); + BenchmarkConsumer.ConsumeAllocFree(bootstrapServers, topicName, group, firstMessageOffset, numberOfMessages, headerCount, NUMBER_OF_TESTS, username, password); } else if (mode == "latency") {