Skip to content

Commit 0bbc692

Browse files
committed
Add a UTF-8 statistics handler
1 parent cb7c0d7 commit 0bbc692

File tree

3 files changed

+128
-2
lines changed

3 files changed

+128
-2
lines changed

src/Confluent.Kafka/Consumer.cs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
// Refer to LICENSE for more information.
1818

1919
using System;
20+
using System.Buffers;
2021
using System.Collections.Generic;
2122
using System.Linq;
2223
using System.Runtime.InteropServices;
@@ -39,6 +40,9 @@ internal class Config
3940
internal IEnumerable<KeyValuePair<string, string>> config;
4041
internal Action<Error> errorHandler;
4142
internal Action<LogMessage> logHandler;
43+
#if NET6_0_OR_GREATER
44+
internal ReadOnlySpanAction<byte, object> statisticsUtf8Handler;
45+
#endif
4246
internal Action<string> statisticsHandler;
4347
internal Action<CommittedOffsets> offsetsCommittedHandler;
4448
internal Action<string> oAuthBearerTokenRefreshHandler;
@@ -104,6 +108,9 @@ private void ErrorCallback(IntPtr rk, ErrorCode err, string reason, IntPtr opaqu
104108
}
105109
}
106110

111+
#if NET6_0_OR_GREATER
112+
private ReadOnlySpanAction<byte, object> statisticsUtf8Handler;
113+
#endif
107114
private Action<string> statisticsHandler;
108115
private Librdkafka.StatsDelegate statisticsCallbackDelegate;
109116
private int StatisticsCallback(IntPtr rk, IntPtr json, UIntPtr json_len, IntPtr opaque)
@@ -112,7 +119,20 @@ private int StatisticsCallback(IntPtr rk, IntPtr json, UIntPtr json_len, IntPtr
112119
try
113120
{
114121
// Ensure registered handlers are never called as a side-effect of Dispose/Finalize (prevents deadlocks in common scenarios).
115-
statisticsHandler?.Invoke(Util.Marshal.PtrToStringUTF8(json, json_len));
122+
#if NET6_0_OR_GREATER
123+
if (statisticsUtf8Handler != null)
124+
{
125+
unsafe
126+
{
127+
statisticsUtf8Handler.Invoke(new ReadOnlySpan<byte>(json.ToPointer(), (int)json_len), null);
128+
}
129+
}
130+
#endif
131+
132+
if (statisticsHandler != null)
133+
{
134+
statisticsHandler.Invoke(Util.Marshal.PtrToStringUTF8(json, json_len));
135+
}
116136
}
117137
catch (Exception e)
118138
{
@@ -645,6 +665,9 @@ internal Consumer(ConsumerBuilder<TKey, TValue> builder)
645665
{
646666
var baseConfig = builder.ConstructBaseConfig(this);
647667

668+
#if NET6_0_OR_GREATER
669+
this.statisticsUtf8Handler = baseConfig.statisticsUtf8Handler;
670+
#endif
648671
this.statisticsHandler = baseConfig.statisticsHandler;
649672
this.logHandler = baseConfig.logHandler;
650673
this.errorHandler = baseConfig.errorHandler;
@@ -727,7 +750,11 @@ internal Consumer(ConsumerBuilder<TKey, TValue> builder)
727750
{
728751
Librdkafka.conf_set_log_cb(configPtr, logCallbackDelegate);
729752
}
753+
#if NET6_0_OR_GREATER
754+
if (statisticsUtf8Handler != null || statisticsHandler != null)
755+
#else
730756
if (statisticsHandler != null)
757+
#endif
731758
{
732759
Librdkafka.conf_set_stats_cb(configPtr, statisticsCallbackDelegate);
733760
}

src/Confluent.Kafka/ConsumerBuilder.cs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
// Refer to LICENSE for more information.
1616

1717
using System;
18+
using System.Buffers;
1819
using System.Collections.Generic;
1920
using System.Linq;
2021

@@ -41,8 +42,18 @@ public class ConsumerBuilder<TKey, TValue>
4142
/// </summary>
4243
internal protected Action<IConsumer<TKey, TValue>, LogMessage> LogHandler { get; set; }
4344

45+
#if NET6_0_OR_GREATER
4446
/// <summary>
45-
/// The configured statistics handler.
47+
/// The configured statistics handler. Unlike <see cref="StatisticsHandler"/>, this handler gives access
48+
/// to the raw UTF-8 which is more performance friendly to UTF-8 parsers such as System.Text.Json. Also,
49+
/// if <see cref="StatisticsHandler"/> is not set, the JSON string allocation is completely avoided.
50+
/// </summary>
51+
internal protected ReadOnlySpanAction<byte, IConsumer<TKey, TValue>> StatisticsUtf8Handler { get; set; }
52+
#endif
53+
54+
/// <summary>
55+
/// The configured statistics handler. With .NET 6+, prefer the <see cref="StatisticsUtf8Handler"/> which
56+
/// doesn't allocate the JSON string and gives access to the raw UTF-8 bytes.
4657
/// </summary>
4758
internal protected Action<IConsumer<TKey, TValue>, string> StatisticsHandler { get; set; }
4859

@@ -98,6 +109,11 @@ internal Consumer<TKey,TValue>.Config ConstructBaseConfig(Consumer<TKey, TValue>
98109
logHandler = this.LogHandler == null
99110
? default(Action<LogMessage>)
100111
: logMessage => this.LogHandler(consumer, logMessage),
112+
#if NET6_0_OR_GREATER
113+
statisticsUtf8Handler = this.StatisticsUtf8Handler == null
114+
? default(ReadOnlySpanAction<byte, object>)
115+
: (stats, _) => this.StatisticsUtf8Handler(stats, consumer),
116+
#endif
101117
statisticsHandler = this.StatisticsHandler == null
102118
? default(Action<string>)
103119
: stats => this.StatisticsHandler(consumer, stats),
@@ -136,6 +152,24 @@ public ConsumerBuilder(IEnumerable<KeyValuePair<string, string>> config)
136152
this.Config = config;
137153
}
138154

155+
#if NET6_0_OR_GREATER
156+
/// <summary>
157+
/// Set the handler to call on statistics events. Unlike <see cref="SetStatisticsHandler"/>, this handler
158+
/// gives access to the raw UTF-8 which is more performance friendly to UTF-8 parsers such as
159+
/// System.Text.Json. Also, it doesn't allocate a potentially large string for the JSON.
160+
/// </summary>
161+
public ConsumerBuilder<TKey, TValue> SetStatisticsUtf8Handler(
162+
ReadOnlySpanAction<byte, IConsumer<TKey, TValue>> statisticsHandler)
163+
{
164+
if (this.StatisticsUtf8Handler != null)
165+
{
166+
throw new InvalidOperationException("Statistics handler may not be specified more than once.");
167+
}
168+
this.StatisticsUtf8Handler = statisticsHandler;
169+
return this;
170+
}
171+
#endif
172+
139173
/// <summary>
140174
/// Set the handler to call on statistics events. Statistics
141175
/// are provided as a JSON formatted string as defined here:

test/Confluent.Kafka.IntegrationTests/Tests/Transactions_Statistics.cs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
// Refer to LICENSE for more information.
1616

1717
using System;
18+
using System.Text.Json;
1819
using System.Threading;
1920
using Newtonsoft.Json.Linq;
2021
using Xunit;
@@ -90,5 +91,69 @@ public void Transactions_Statistics(string bootstrapServers)
9091
Assert.Equal(0, Library.HandleCount);
9192
LogToFile("end Transactions_Statistics");
9293
}
94+
95+
[Theory, MemberData(nameof(KafkaParameters))]
96+
public void Transactions_Statistics_Utf8(string bootstrapServers)
97+
{
98+
LogToFile("start Transactions_Statistics");
99+
100+
var groupName = Guid.NewGuid().ToString();
101+
102+
var cConfig = new ConsumerConfig
103+
{
104+
IsolationLevel = IsolationLevel.ReadCommitted,
105+
BootstrapServers = bootstrapServers,
106+
GroupId = groupName,
107+
EnableAutoCommit = false,
108+
StatisticsIntervalMs = 1000
109+
};
110+
111+
int ls_offset = -1;
112+
int hi_offset = -1;
113+
bool done = false;
114+
115+
using (var topic = new TemporaryTopic(bootstrapServers, 1))
116+
using (var producer = new TestProducerBuilder<string, string>(new ProducerConfig { BootstrapServers = bootstrapServers, TransactionalId = Guid.NewGuid().ToString(), LingerMs = 0 }).Build())
117+
using (var consumer = new TestConsumerBuilder<string, string>(cConfig)
118+
.SetStatisticsUtf8Handler((jsonBytes, _) =>
119+
{
120+
var stats = JsonSerializer.Deserialize<JsonElement>(jsonBytes);
121+
var firstPartitionStats = stats.GetProperty("topics").GetProperty(topic.Name)
122+
.GetProperty("partitions").GetProperty("0");
123+
ls_offset = firstPartitionStats.GetProperty("ls_offset").GetInt32();
124+
hi_offset = firstPartitionStats.GetProperty("hi_offset").GetInt32();
125+
if (hi_offset > 4) { done = true; }
126+
})
127+
.Build())
128+
{
129+
consumer.Assign(new TopicPartitionOffset(topic.Name, 0, 0));
130+
131+
producer.InitTransactions(TimeSpan.FromSeconds(30));
132+
producer.BeginTransaction();
133+
producer.ProduceAsync(topic.Name, new Message<string, string> { Key = "test", Value = "message_a" }).Wait();
134+
producer.CommitTransaction(TimeSpan.FromSeconds(30));
135+
136+
producer.BeginTransaction();
137+
producer.ProduceAsync(topic.Name, new Message<string, string> { Key = "test", Value = "message_b" }).Wait();
138+
producer.CommitTransaction(TimeSpan.FromSeconds(30));
139+
140+
producer.BeginTransaction();
141+
producer.ProduceAsync(topic.Name, new Message<string, string> { Key = "test", Value = "message1" }).Wait();
142+
producer.ProduceAsync(topic.Name, new Message<string, string> { Key = "test", Value = "message2" }).Wait();
143+
producer.ProduceAsync(topic.Name, new Message<string, string> { Key = "test", Value = "message3" }).Wait();
144+
145+
for (int i=0; i<10; ++i)
146+
{
147+
consumer.Consume(TimeSpan.FromMilliseconds(500));
148+
if (done) { break; }
149+
}
150+
151+
Assert.Equal(4, ls_offset);
152+
Assert.Equal(7, hi_offset);
153+
}
154+
155+
Assert.Equal(0, Library.HandleCount);
156+
LogToFile("end Transactions_Statistics");
157+
}
93158
}
94159
}

0 commit comments

Comments
 (0)