15
15
// Refer to LICENSE for more information.
16
16
17
17
using System ;
18
+ using System . Buffers ;
18
19
using System . Collections . Generic ;
19
20
using System . Linq ;
20
21
using System . Runtime . InteropServices ;
@@ -36,6 +37,9 @@ internal class Config
36
37
public IEnumerable < KeyValuePair < string , string > > config ;
37
38
public Action < Error > errorHandler ;
38
39
public Action < LogMessage > logHandler ;
40
+ #if NET6_0_OR_GREATER
41
+ internal ReadOnlySpanAction < byte , object > statisticsUtf8Handler ;
42
+ #endif
39
43
public Action < string > statisticsHandler ;
40
44
public Action < string > oAuthBearerTokenRefreshHandler ;
41
45
public Dictionary < string , PartitionerDelegate > partitioners ;
@@ -139,23 +143,38 @@ private void ErrorCallback(IntPtr rk, ErrorCode err, string reason, IntPtr opaqu
139
143
}
140
144
141
145
146
+ #if NET6_0_OR_GREATER
147
+ private ReadOnlySpanAction < byte , object > statisticsUtf8Handler ;
148
+ #endif
142
149
private Action < string > statisticsHandler ;
143
150
private Librdkafka . StatsDelegate statisticsCallbackDelegate ;
144
151
private int StatisticsCallback ( IntPtr rk , IntPtr json , UIntPtr json_len , IntPtr opaque )
145
152
{
146
- // Ensure registered handlers are never called as a side-effect of Dispose/Finalize (prevents deadlocks in common scenarios).
147
153
if ( ownedKafkaHandle . IsClosed ) { return 0 ; }
148
-
149
154
try
150
155
{
151
- statisticsHandler ? . Invoke ( Util . Marshal . PtrToStringUTF8 ( json ) ) ;
156
+ // Ensure registered handlers are never called as a side-effect of Dispose/Finalize (prevents deadlocks in common scenarios).
157
+ #if NET6_0_OR_GREATER
158
+ if ( statisticsUtf8Handler != null )
159
+ {
160
+ unsafe
161
+ {
162
+ statisticsUtf8Handler . Invoke ( new ReadOnlySpan < byte > ( json . ToPointer ( ) , ( int ) json_len ) , null ) ;
163
+ }
164
+ }
165
+ #endif
166
+
167
+ if ( statisticsHandler != null )
168
+ {
169
+ statisticsHandler . Invoke ( Util . Marshal . PtrToStringUTF8 ( json , json_len ) ) ;
170
+ }
152
171
}
153
172
catch ( Exception e )
154
173
{
155
174
handlerException = e ;
156
175
}
157
176
158
- return 0 ; // instruct librdkafka to immediately free the json ptr.
177
+ return 0 ; // instruct librdkafka to immediately free the json ptr
159
178
}
160
179
161
180
private Action < string > oAuthBearerTokenRefreshHandler ;
@@ -399,8 +418,8 @@ public void Flush(CancellationToken cancellationToken)
399
418
throw new OperationCanceledException ( ) ;
400
419
}
401
420
}
402
- }
403
-
421
+ }
422
+
404
423
405
424
/// <inheritdoc/>
406
425
public void Dispose ( )
@@ -576,6 +595,9 @@ internal Producer(ProducerBuilder<TKey, TValue> builder)
576
595
// TODO: Hijack the "delivery.report.only.error" configuration parameter and add functionality to enforce that Tasks
577
596
// that never complete are never created when this is set to true.
578
597
598
+ #if NET6_0_OR_GREATER
599
+ this . statisticsUtf8Handler = baseConfig . statisticsUtf8Handler ;
600
+ #endif
579
601
this . statisticsHandler = baseConfig . statisticsHandler ;
580
602
this . logHandler = baseConfig . logHandler ;
581
603
this . errorHandler = baseConfig . errorHandler ;
@@ -675,7 +697,11 @@ internal Producer(ProducerBuilder<TKey, TValue> builder)
675
697
{
676
698
Librdkafka . conf_set_log_cb ( configPtr , logCallbackDelegate ) ;
677
699
}
700
+ #if NET6_0_OR_GREATER
701
+ if ( statisticsUtf8Handler != null || statisticsHandler != null )
702
+ #else
678
703
if ( statisticsHandler != null )
704
+ #endif
679
705
{
680
706
Librdkafka . conf_set_stats_cb ( configPtr , statisticsCallbackDelegate ) ;
681
707
}
0 commit comments