Skip to content

Commit 16edbaa

Browse files
committed
Same for producer and admin client
1 parent 39764a3 commit 16edbaa

File tree

4 files changed

+96
-6
lines changed

4 files changed

+96
-6
lines changed

src/Confluent.Kafka/AdminClient.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1487,6 +1487,9 @@ internal AdminClient(AdminClientBuilder builder)
14871487
var producerBuilder = new ProducerBuilder<Null, Null>(config);
14881488
if (builder.LogHandler != null) { producerBuilder.SetLogHandler((_, logMessage) => builder.LogHandler(this, logMessage)); }
14891489
if (builder.ErrorHandler != null) { producerBuilder.SetErrorHandler((_, error) => builder.ErrorHandler(this, error)); }
1490+
#if NET6_0_OR_GREATER
1491+
if (builder.StatisticsUtf8Handler != null) { producerBuilder.SetStatisticsUtf8Handler((stats, _) => builder.StatisticsUtf8Handler(stats, this)); }
1492+
#endif
14901493
if (builder.StatisticsHandler != null) { producerBuilder.SetStatisticsHandler((_, stats) => builder.StatisticsHandler(this, stats)); }
14911494
if (builder.OAuthBearerTokenRefreshHandler != null) { producerBuilder.SetOAuthBearerTokenRefreshHandler(builder.OAuthBearerTokenRefreshHandler); }
14921495
this.ownedClient = producerBuilder.Build();

src/Confluent.Kafka/AdminClientBuilder.cs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
// Refer to LICENSE for more information.
1616

1717
using System;
18+
using System.Buffers;
1819
using System.Collections.Generic;
1920

2021

@@ -40,6 +41,15 @@ public class AdminClientBuilder
4041
/// </summary>
4142
internal protected Action<IAdminClient, LogMessage> LogHandler { get; set; }
4243

44+
#if NET6_0_OR_GREATER
45+
/// <summary>
46+
/// The configured statistics handler. Unlike <see cref="StatisticsHandler"/>, this handler gives access
47+
/// to the raw UTF-8 which is more performance friendly to UTF-8 parsers such as System.Text.Json. Also,
48+
/// if <see cref="StatisticsHandler"/> is not set, the JSON string allocation is completely avoided.
49+
/// </summary>
50+
internal protected ReadOnlySpanAction<byte, IAdminClient> StatisticsUtf8Handler { get; set; }
51+
#endif
52+
4353
/// <summary>
4454
/// The configured statistics handler.
4555
/// </summary>
@@ -65,6 +75,24 @@ public AdminClientBuilder(IEnumerable<KeyValuePair<string, string>> config)
6575
this.Config = config;
6676
}
6777

78+
#if NET6_0_OR_GREATER
79+
/// <summary>
80+
/// Set the handler to call on statistics events. Unlike <see cref="SetStatisticsHandler"/>, this handler
81+
/// gives access to the raw UTF-8 which is more performance friendly to UTF-8 parsers such as
82+
/// System.Text.Json. Also, it doesn't allocate a potentially large string for the JSON.
83+
/// </summary>
84+
public AdminClientBuilder SetStatisticsUtf8Handler(
85+
ReadOnlySpanAction<byte, IAdminClient> statisticsHandler)
86+
{
87+
if (this.StatisticsUtf8Handler != null)
88+
{
89+
throw new InvalidOperationException("Statistics handler may not be specified more than once.");
90+
}
91+
this.StatisticsUtf8Handler = statisticsHandler;
92+
return this;
93+
}
94+
#endif
95+
6896
/// <summary>
6997
/// Set the handler to call on statistics events. Statistics are provided
7098
/// as a JSON formatted string as defined here:

src/Confluent.Kafka/Producer.cs

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
// Refer to LICENSE for more information.
1616

1717
using System;
18+
using System.Buffers;
1819
using System.Collections.Generic;
1920
using System.Linq;
2021
using System.Runtime.InteropServices;
@@ -36,6 +37,9 @@ internal class Config
3637
public IEnumerable<KeyValuePair<string, string>> config;
3738
public Action<Error> errorHandler;
3839
public Action<LogMessage> logHandler;
40+
#if NET6_0_OR_GREATER
41+
internal ReadOnlySpanAction<byte, object> statisticsUtf8Handler;
42+
#endif
3943
public Action<string> statisticsHandler;
4044
public Action<string> oAuthBearerTokenRefreshHandler;
4145
public Dictionary<string, PartitionerDelegate> partitioners;
@@ -139,23 +143,38 @@ private void ErrorCallback(IntPtr rk, ErrorCode err, string reason, IntPtr opaqu
139143
}
140144

141145

146+
#if NET6_0_OR_GREATER
147+
private ReadOnlySpanAction<byte, object> statisticsUtf8Handler;
148+
#endif
142149
private Action<string> statisticsHandler;
143150
private Librdkafka.StatsDelegate statisticsCallbackDelegate;
144151
private int StatisticsCallback(IntPtr rk, IntPtr json, UIntPtr json_len, IntPtr opaque)
145152
{
146-
// Ensure registered handlers are never called as a side-effect of Dispose/Finalize (prevents deadlocks in common scenarios).
147153
if (ownedKafkaHandle.IsClosed) { return 0; }
148-
149154
try
150155
{
151-
statisticsHandler?.Invoke(Util.Marshal.PtrToStringUTF8(json));
156+
// Ensure registered handlers are never called as a side-effect of Dispose/Finalize (prevents deadlocks in common scenarios).
157+
#if NET6_0_OR_GREATER
158+
if (statisticsUtf8Handler != null)
159+
{
160+
unsafe
161+
{
162+
statisticsUtf8Handler.Invoke(new ReadOnlySpan<byte>(json.ToPointer(), (int)json_len), null);
163+
}
164+
}
165+
#endif
166+
167+
if (statisticsHandler != null)
168+
{
169+
statisticsHandler.Invoke(Util.Marshal.PtrToStringUTF8(json, json_len));
170+
}
152171
}
153172
catch (Exception e)
154173
{
155174
handlerException = e;
156175
}
157176

158-
return 0; // instruct librdkafka to immediately free the json ptr.
177+
return 0; // instruct librdkafka to immediately free the json ptr
159178
}
160179

161180
private Action<string> oAuthBearerTokenRefreshHandler;
@@ -399,8 +418,8 @@ public void Flush(CancellationToken cancellationToken)
399418
throw new OperationCanceledException();
400419
}
401420
}
402-
}
403-
421+
}
422+
404423

405424
/// <inheritdoc/>
406425
public void Dispose()
@@ -576,6 +595,9 @@ internal Producer(ProducerBuilder<TKey, TValue> builder)
576595
// TODO: Hijack the "delivery.report.only.error" configuration parameter and add functionality to enforce that Tasks
577596
// that never complete are never created when this is set to true.
578597

598+
#if NET6_0_OR_GREATER
599+
this.statisticsUtf8Handler = baseConfig.statisticsUtf8Handler;
600+
#endif
579601
this.statisticsHandler = baseConfig.statisticsHandler;
580602
this.logHandler = baseConfig.logHandler;
581603
this.errorHandler = baseConfig.errorHandler;
@@ -675,7 +697,11 @@ internal Producer(ProducerBuilder<TKey, TValue> builder)
675697
{
676698
Librdkafka.conf_set_log_cb(configPtr, logCallbackDelegate);
677699
}
700+
#if NET6_0_OR_GREATER
701+
if (statisticsUtf8Handler != null || statisticsHandler != null)
702+
#else
678703
if (statisticsHandler != null)
704+
#endif
679705
{
680706
Librdkafka.conf_set_stats_cb(configPtr, statisticsCallbackDelegate);
681707
}

src/Confluent.Kafka/ProducerBuilder.cs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
// Refer to LICENSE for more information.
1616

1717
using System;
18+
using System.Buffers;
1819
using System.Collections.Generic;
1920

2021

@@ -74,6 +75,15 @@ public class ProducerBuilder<TKey, TValue>
7475
/// </summary>
7576
internal protected Action<IProducer<TKey, TValue>, LogMessage> LogHandler { get; set; }
7677

78+
#if NET6_0_OR_GREATER
79+
/// <summary>
80+
/// The configured statistics handler. Unlike <see cref="StatisticsHandler"/>, this handler gives access
81+
/// to the raw UTF-8 which is more performance friendly to UTF-8 parsers such as System.Text.Json. Also,
82+
/// if <see cref="StatisticsHandler"/> is not set, the JSON string allocation is completely avoided.
83+
/// </summary>
84+
internal protected ReadOnlySpanAction<byte, IProducer<TKey, TValue>> StatisticsUtf8Handler { get; set; }
85+
#endif
86+
7787
/// <summary>
7888
/// The configured statistics handler.
7989
/// </summary>
@@ -125,6 +135,11 @@ internal Producer<TKey,TValue>.Config ConstructBaseConfig(Producer<TKey, TValue>
125135
logHandler = this.LogHandler == null
126136
? default(Action<LogMessage>)
127137
: logMessage => this.LogHandler(producer, logMessage),
138+
#if NET6_0_OR_GREATER
139+
statisticsUtf8Handler = this.StatisticsUtf8Handler == null
140+
? default(ReadOnlySpanAction<byte, object>)
141+
: (stats, _) => this.StatisticsUtf8Handler(stats, producer),
142+
#endif
128143
statisticsHandler = this.StatisticsHandler == null
129144
? default(Action<string>)
130145
: stats => this.StatisticsHandler(producer, stats),
@@ -148,6 +163,24 @@ public ProducerBuilder(IEnumerable<KeyValuePair<string, string>> config)
148163
this.Config = config;
149164
}
150165

166+
#if NET6_0_OR_GREATER
167+
/// <summary>
168+
/// Set the handler to call on statistics events. Unlike <see cref="SetStatisticsHandler"/>, this handler
169+
/// gives access to the raw UTF-8 which is more performance friendly to UTF-8 parsers such as
170+
/// System.Text.Json. Also, it doesn't allocate a potentially large string for the JSON.
171+
/// </summary>
172+
public ProducerBuilder<TKey, TValue> SetStatisticsUtf8Handler(
173+
ReadOnlySpanAction<byte, IProducer<TKey, TValue>> statisticsHandler)
174+
{
175+
if (this.StatisticsUtf8Handler != null)
176+
{
177+
throw new InvalidOperationException("Statistics handler may not be specified more than once.");
178+
}
179+
this.StatisticsUtf8Handler = statisticsHandler;
180+
return this;
181+
}
182+
#endif
183+
151184
/// <summary>
152185
/// Set the handler to call on statistics events. Statistics are provided as
153186
/// a JSON formatted string as defined here:

0 commit comments

Comments
 (0)