diff --git a/src/Confluent.Kafka/AdminClient.cs b/src/Confluent.Kafka/AdminClient.cs index 64e67445b..372c9be16 100644 --- a/src/Confluent.Kafka/AdminClient.cs +++ b/src/Confluent.Kafka/AdminClient.cs @@ -1577,6 +1577,9 @@ internal AdminClient(AdminClientBuilder builder) var producerBuilder = new ProducerBuilder(config); if (builder.LogHandler != null) { producerBuilder.SetLogHandler((_, logMessage) => builder.LogHandler(this, logMessage)); } if (builder.ErrorHandler != null) { producerBuilder.SetErrorHandler((_, error) => builder.ErrorHandler(this, error)); } +#if NET6_0_OR_GREATER + if (builder.StatisticsUtf8Handler != null) { producerBuilder.SetStatisticsUtf8Handler((stats, _) => builder.StatisticsUtf8Handler(stats, this)); } +#endif if (builder.StatisticsHandler != null) { producerBuilder.SetStatisticsHandler((_, stats) => builder.StatisticsHandler(this, stats)); } if (builder.OAuthBearerTokenRefreshHandler != null) { producerBuilder.SetOAuthBearerTokenRefreshHandler(builder.OAuthBearerTokenRefreshHandler); } this.ownedClient = producerBuilder.Build(); diff --git a/src/Confluent.Kafka/AdminClientBuilder.cs b/src/Confluent.Kafka/AdminClientBuilder.cs index 6f0e9e712..98f3b5ffe 100644 --- a/src/Confluent.Kafka/AdminClientBuilder.cs +++ b/src/Confluent.Kafka/AdminClientBuilder.cs @@ -15,6 +15,7 @@ // Refer to LICENSE for more information. using System; +using System.Buffers; using System.Collections.Generic; @@ -40,6 +41,15 @@ public class AdminClientBuilder /// internal protected Action LogHandler { get; set; } +#if NET6_0_OR_GREATER + /// + /// The configured statistics handler. Unlike , this handler gives access + /// to the raw UTF-8 which is more performance friendly to UTF-8 parsers such as System.Text.Json. Also, + /// if is not set, the JSON string allocation is completely avoided. + /// + internal protected ReadOnlySpanAction StatisticsUtf8Handler { get; set; } +#endif + /// /// The configured statistics handler. /// @@ -65,6 +75,24 @@ public AdminClientBuilder(IEnumerable> config) this.Config = config; } +#if NET6_0_OR_GREATER + /// + /// Set the handler to call on statistics events. Unlike , this handler + /// gives access to the raw UTF-8 which is more performance friendly to UTF-8 parsers such as + /// System.Text.Json. Also, it doesn't allocate a potentially large string for the JSON. + /// + public AdminClientBuilder SetStatisticsUtf8Handler( + ReadOnlySpanAction statisticsHandler) + { + if (this.StatisticsUtf8Handler != null) + { + throw new InvalidOperationException("Statistics handler may not be specified more than once."); + } + this.StatisticsUtf8Handler = statisticsHandler; + return this; + } +#endif + /// /// Set the handler to call on statistics events. Statistics are provided /// as a JSON formatted string as defined here: diff --git a/src/Confluent.Kafka/Consumer.cs b/src/Confluent.Kafka/Consumer.cs index e75a96ee9..69597c38f 100644 --- a/src/Confluent.Kafka/Consumer.cs +++ b/src/Confluent.Kafka/Consumer.cs @@ -17,6 +17,7 @@ // Refer to LICENSE for more information. using System; +using System.Buffers; using System.Collections.Generic; using System.Linq; using System.Runtime.InteropServices; @@ -39,6 +40,9 @@ internal class Config internal IEnumerable> config; internal Action errorHandler; internal Action logHandler; +#if NET6_0_OR_GREATER + internal ReadOnlySpanAction statisticsUtf8Handler; +#endif internal Action statisticsHandler; internal Action offsetsCommittedHandler; internal Action oAuthBearerTokenRefreshHandler; @@ -104,6 +108,9 @@ private void ErrorCallback(IntPtr rk, ErrorCode err, string reason, IntPtr opaqu } } +#if NET6_0_OR_GREATER + private ReadOnlySpanAction statisticsUtf8Handler; +#endif private Action statisticsHandler; private Librdkafka.StatsDelegate statisticsCallbackDelegate; private int StatisticsCallback(IntPtr rk, IntPtr json, UIntPtr json_len, IntPtr opaque) @@ -112,7 +119,20 @@ private int StatisticsCallback(IntPtr rk, IntPtr json, UIntPtr json_len, IntPtr try { // Ensure registered handlers are never called as a side-effect of Dispose/Finalize (prevents deadlocks in common scenarios). - statisticsHandler?.Invoke(Util.Marshal.PtrToStringUTF8(json, json_len)); +#if NET6_0_OR_GREATER + if (statisticsUtf8Handler != null) + { + unsafe + { + statisticsUtf8Handler.Invoke(new ReadOnlySpan(json.ToPointer(), (int)json_len), null); + } + } +#endif + + if (statisticsHandler != null) + { + statisticsHandler.Invoke(Util.Marshal.PtrToStringUTF8(json, json_len)); + } } catch (Exception e) { @@ -645,6 +665,9 @@ internal Consumer(ConsumerBuilder builder) { var baseConfig = builder.ConstructBaseConfig(this); +#if NET6_0_OR_GREATER + this.statisticsUtf8Handler = baseConfig.statisticsUtf8Handler; +#endif this.statisticsHandler = baseConfig.statisticsHandler; this.logHandler = baseConfig.logHandler; this.errorHandler = baseConfig.errorHandler; @@ -727,7 +750,11 @@ internal Consumer(ConsumerBuilder builder) { Librdkafka.conf_set_log_cb(configPtr, logCallbackDelegate); } +#if NET6_0_OR_GREATER + if (statisticsUtf8Handler != null || statisticsHandler != null) +#else if (statisticsHandler != null) +#endif { Librdkafka.conf_set_stats_cb(configPtr, statisticsCallbackDelegate); } diff --git a/src/Confluent.Kafka/ConsumerBuilder.cs b/src/Confluent.Kafka/ConsumerBuilder.cs index 56549e783..bdb93ae14 100644 --- a/src/Confluent.Kafka/ConsumerBuilder.cs +++ b/src/Confluent.Kafka/ConsumerBuilder.cs @@ -15,6 +15,7 @@ // Refer to LICENSE for more information. using System; +using System.Buffers; using System.Collections.Generic; using System.Linq; @@ -41,8 +42,18 @@ public class ConsumerBuilder /// internal protected Action, LogMessage> LogHandler { get; set; } +#if NET6_0_OR_GREATER /// - /// The configured statistics handler. + /// The configured statistics handler. Unlike , this handler gives access + /// to the raw UTF-8 which is more performance friendly to UTF-8 parsers such as System.Text.Json. Also, + /// if is not set, the JSON string allocation is completely avoided. + /// + internal protected ReadOnlySpanAction> StatisticsUtf8Handler { get; set; } +#endif + + /// + /// The configured statistics handler. With .NET 6+, prefer the which + /// doesn't allocate the JSON string and gives access to the raw UTF-8 bytes. /// internal protected Action, string> StatisticsHandler { get; set; } @@ -98,6 +109,11 @@ internal Consumer.Config ConstructBaseConfig(Consumer logHandler = this.LogHandler == null ? default(Action) : logMessage => this.LogHandler(consumer, logMessage), +#if NET6_0_OR_GREATER + statisticsUtf8Handler = this.StatisticsUtf8Handler == null + ? default(ReadOnlySpanAction) + : (stats, _) => this.StatisticsUtf8Handler(stats, consumer), +#endif statisticsHandler = this.StatisticsHandler == null ? default(Action) : stats => this.StatisticsHandler(consumer, stats), @@ -136,6 +152,24 @@ public ConsumerBuilder(IEnumerable> config) this.Config = config; } +#if NET6_0_OR_GREATER + /// + /// Set the handler to call on statistics events. Unlike , this handler + /// gives access to the raw UTF-8 which is more performance friendly to UTF-8 parsers such as + /// System.Text.Json. Also, it doesn't allocate a potentially large string for the JSON. + /// + public ConsumerBuilder SetStatisticsUtf8Handler( + ReadOnlySpanAction> statisticsHandler) + { + if (this.StatisticsUtf8Handler != null) + { + throw new InvalidOperationException("Statistics handler may not be specified more than once."); + } + this.StatisticsUtf8Handler = statisticsHandler; + return this; + } +#endif + /// /// Set the handler to call on statistics events. Statistics /// are provided as a JSON formatted string as defined here: diff --git a/src/Confluent.Kafka/Producer.cs b/src/Confluent.Kafka/Producer.cs index 79867514b..bd71ee32d 100644 --- a/src/Confluent.Kafka/Producer.cs +++ b/src/Confluent.Kafka/Producer.cs @@ -15,6 +15,7 @@ // Refer to LICENSE for more information. using System; +using System.Buffers; using System.Collections.Generic; using System.Linq; using System.Runtime.InteropServices; @@ -36,6 +37,9 @@ internal class Config public IEnumerable> config; public Action errorHandler; public Action logHandler; +#if NET6_0_OR_GREATER + internal ReadOnlySpanAction statisticsUtf8Handler; +#endif public Action statisticsHandler; public Action oAuthBearerTokenRefreshHandler; public Dictionary partitioners; @@ -139,23 +143,38 @@ private void ErrorCallback(IntPtr rk, ErrorCode err, string reason, IntPtr opaqu } +#if NET6_0_OR_GREATER + private ReadOnlySpanAction statisticsUtf8Handler; +#endif private Action statisticsHandler; private Librdkafka.StatsDelegate statisticsCallbackDelegate; private int StatisticsCallback(IntPtr rk, IntPtr json, UIntPtr json_len, IntPtr opaque) { - // Ensure registered handlers are never called as a side-effect of Dispose/Finalize (prevents deadlocks in common scenarios). if (ownedKafkaHandle.IsClosed) { return 0; } - try { - statisticsHandler?.Invoke(Util.Marshal.PtrToStringUTF8(json)); + // Ensure registered handlers are never called as a side-effect of Dispose/Finalize (prevents deadlocks in common scenarios). +#if NET6_0_OR_GREATER + if (statisticsUtf8Handler != null) + { + unsafe + { + statisticsUtf8Handler.Invoke(new ReadOnlySpan(json.ToPointer(), (int)json_len), null); + } + } +#endif + + if (statisticsHandler != null) + { + statisticsHandler.Invoke(Util.Marshal.PtrToStringUTF8(json, json_len)); + } } catch (Exception e) { handlerException = e; } - return 0; // instruct librdkafka to immediately free the json ptr. + return 0; // instruct librdkafka to immediately free the json ptr } private Action oAuthBearerTokenRefreshHandler; @@ -399,8 +418,8 @@ public void Flush(CancellationToken cancellationToken) throw new OperationCanceledException(); } } - } - + } + /// public void Dispose() @@ -576,6 +595,9 @@ internal Producer(ProducerBuilder builder) // TODO: Hijack the "delivery.report.only.error" configuration parameter and add functionality to enforce that Tasks // that never complete are never created when this is set to true. +#if NET6_0_OR_GREATER + this.statisticsUtf8Handler = baseConfig.statisticsUtf8Handler; +#endif this.statisticsHandler = baseConfig.statisticsHandler; this.logHandler = baseConfig.logHandler; this.errorHandler = baseConfig.errorHandler; @@ -675,7 +697,11 @@ internal Producer(ProducerBuilder builder) { Librdkafka.conf_set_log_cb(configPtr, logCallbackDelegate); } +#if NET6_0_OR_GREATER + if (statisticsUtf8Handler != null || statisticsHandler != null) +#else if (statisticsHandler != null) +#endif { Librdkafka.conf_set_stats_cb(configPtr, statisticsCallbackDelegate); } diff --git a/src/Confluent.Kafka/ProducerBuilder.cs b/src/Confluent.Kafka/ProducerBuilder.cs index e2a3b488b..7a7c2d90e 100644 --- a/src/Confluent.Kafka/ProducerBuilder.cs +++ b/src/Confluent.Kafka/ProducerBuilder.cs @@ -15,6 +15,7 @@ // Refer to LICENSE for more information. using System; +using System.Buffers; using System.Collections.Generic; @@ -74,6 +75,15 @@ public class ProducerBuilder /// internal protected Action, LogMessage> LogHandler { get; set; } +#if NET6_0_OR_GREATER + /// + /// The configured statistics handler. Unlike , this handler gives access + /// to the raw UTF-8 which is more performance friendly to UTF-8 parsers such as System.Text.Json. Also, + /// if is not set, the JSON string allocation is completely avoided. + /// + internal protected ReadOnlySpanAction> StatisticsUtf8Handler { get; set; } +#endif + /// /// The configured statistics handler. /// @@ -125,6 +135,11 @@ internal Producer.Config ConstructBaseConfig(Producer logHandler = this.LogHandler == null ? default(Action) : logMessage => this.LogHandler(producer, logMessage), +#if NET6_0_OR_GREATER + statisticsUtf8Handler = this.StatisticsUtf8Handler == null + ? default(ReadOnlySpanAction) + : (stats, _) => this.StatisticsUtf8Handler(stats, producer), +#endif statisticsHandler = this.StatisticsHandler == null ? default(Action) : stats => this.StatisticsHandler(producer, stats), @@ -148,6 +163,24 @@ public ProducerBuilder(IEnumerable> config) this.Config = config; } +#if NET6_0_OR_GREATER + /// + /// Set the handler to call on statistics events. Unlike , this handler + /// gives access to the raw UTF-8 which is more performance friendly to UTF-8 parsers such as + /// System.Text.Json. Also, it doesn't allocate a potentially large string for the JSON. + /// + public ProducerBuilder SetStatisticsUtf8Handler( + ReadOnlySpanAction> statisticsHandler) + { + if (this.StatisticsUtf8Handler != null) + { + throw new InvalidOperationException("Statistics handler may not be specified more than once."); + } + this.StatisticsUtf8Handler = statisticsHandler; + return this; + } +#endif + /// /// Set the handler to call on statistics events. Statistics are provided as /// a JSON formatted string as defined here: diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Transactions_Statistics.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Transactions_Statistics.cs index 3e3115ccb..0ba91ec91 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/Transactions_Statistics.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/Transactions_Statistics.cs @@ -15,6 +15,7 @@ // Refer to LICENSE for more information. using System; +using System.Text.Json; using System.Threading; using Newtonsoft.Json.Linq; using Xunit; @@ -90,5 +91,69 @@ public void Transactions_Statistics(string bootstrapServers) Assert.Equal(0, Library.HandleCount); LogToFile("end Transactions_Statistics"); } + + [Theory, MemberData(nameof(KafkaParameters))] + public void Transactions_Statistics_Utf8(string bootstrapServers) + { + LogToFile("start Transactions_Statistics"); + + var groupName = Guid.NewGuid().ToString(); + + var cConfig = new ConsumerConfig + { + IsolationLevel = IsolationLevel.ReadCommitted, + BootstrapServers = bootstrapServers, + GroupId = groupName, + EnableAutoCommit = false, + StatisticsIntervalMs = 1000 + }; + + int ls_offset = -1; + int hi_offset = -1; + bool done = false; + + using (var topic = new TemporaryTopic(bootstrapServers, 1)) + using (var producer = new TestProducerBuilder(new ProducerConfig { BootstrapServers = bootstrapServers, TransactionalId = Guid.NewGuid().ToString(), LingerMs = 0 }).Build()) + using (var consumer = new TestConsumerBuilder(cConfig) + .SetStatisticsUtf8Handler((jsonBytes, _) => + { + var stats = JsonSerializer.Deserialize(jsonBytes); + var firstPartitionStats = stats.GetProperty("topics").GetProperty(topic.Name) + .GetProperty("partitions").GetProperty("0"); + ls_offset = firstPartitionStats.GetProperty("ls_offset").GetInt32(); + hi_offset = firstPartitionStats.GetProperty("hi_offset").GetInt32(); + if (hi_offset > 4) { done = true; } + }) + .Build()) + { + consumer.Assign(new TopicPartitionOffset(topic.Name, 0, 0)); + + producer.InitTransactions(TimeSpan.FromSeconds(30)); + producer.BeginTransaction(); + producer.ProduceAsync(topic.Name, new Message { Key = "test", Value = "message_a" }).Wait(); + producer.CommitTransaction(TimeSpan.FromSeconds(30)); + + producer.BeginTransaction(); + producer.ProduceAsync(topic.Name, new Message { Key = "test", Value = "message_b" }).Wait(); + producer.CommitTransaction(TimeSpan.FromSeconds(30)); + + producer.BeginTransaction(); + producer.ProduceAsync(topic.Name, new Message { Key = "test", Value = "message1" }).Wait(); + producer.ProduceAsync(topic.Name, new Message { Key = "test", Value = "message2" }).Wait(); + producer.ProduceAsync(topic.Name, new Message { Key = "test", Value = "message3" }).Wait(); + + for (int i=0; i<10; ++i) + { + consumer.Consume(TimeSpan.FromMilliseconds(500)); + if (done) { break; } + } + + Assert.Equal(4, ls_offset); + Assert.Equal(7, hi_offset); + } + + Assert.Equal(0, Library.HandleCount); + LogToFile("end Transactions_Statistics"); + } } }