Skip to content

Commit 4e8cdb8

Browse files
authored
protect rd_kafka_destroy references from GC (#509)
1 parent 473fe00 commit 4e8cdb8

File tree

1 file changed

+12
-1
lines changed

1 file changed

+12
-1
lines changed

src/Confluent.Kafka/Impl/LibRdKafka.cs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,6 @@ static bool SetDelegates(Type nativeMethodsClass)
131131
_topic_conf_set_partitioner_cb = (Action<IntPtr, PartitionerDelegate>)methods.Where(m => m.Name == "rd_kafka_topic_conf_set_partitioner_cb").Single().CreateDelegate(typeof(Action<IntPtr, PartitionerDelegate>));
132132
_topic_partition_available = (Func<IntPtr, int, bool>)methods.Where(m => m.Name == "rd_kafka_topic_partition_available").Single().CreateDelegate(typeof(Func<IntPtr, int, bool>));
133133
_new = (Func<RdKafkaType, IntPtr, StringBuilder, UIntPtr, SafeKafkaHandle>)methods.Where(m => m.Name == "rd_kafka_new").Single().CreateDelegate(typeof(Func<RdKafkaType, IntPtr, StringBuilder, UIntPtr, SafeKafkaHandle>));
134-
_destroy = (Action<IntPtr>)methods.Where(m => m.Name == "rd_kafka_destroy").Single().CreateDelegate(typeof(Action<IntPtr>));
135134
_name = (Func<IntPtr, IntPtr>)methods.Where(m => m.Name == "rd_kafka_name").Single().CreateDelegate(typeof(Func<IntPtr, IntPtr>));
136135
_memberid = (Func<IntPtr, IntPtr>)methods.Where(m => m.Name == "rd_kafka_memberid").Single().CreateDelegate(typeof(Func<IntPtr, IntPtr>));
137136
_topic_new = (Func<IntPtr, string, IntPtr, SafeTopicHandle>)methods.Where(m => m.Name == "rd_kafka_topic_new").Single().CreateDelegate(typeof(Func<IntPtr, string, IntPtr, SafeTopicHandle>));
@@ -175,6 +174,9 @@ static bool SetDelegates(Type nativeMethodsClass)
175174
_event_destroy = (Action<IntPtr>)methods.Where(m => m.Name == "rd_kafka_event_destroy").Single().CreateDelegate(typeof(Action<IntPtr>));
176175
_queue_poll = (Func<IntPtr, IntPtr, IntPtr>)methods.Where(m => m.Name == "rd_kafka_queue_poll").Single().CreateDelegate(typeof(Func<IntPtr, IntPtr, IntPtr>));
177176

177+
_destroyMethodInfo = methods.Where(m => m.Name == "rd_kafka_destroy").Single();
178+
_destroy = (Action<IntPtr>)_destroyMethodInfo.CreateDelegate(typeof(Action<IntPtr>));
179+
178180
try
179181
{
180182
// throws if the native library failed to load.
@@ -333,6 +335,14 @@ public static bool Initialize(string userSpecifiedPath)
333335

334336
isInitialized = true;
335337

338+
// Protect the _destroy and _destroyMethodInfo objects from garbage collection. This is
339+
// required since the Producer/Consumer finalizers may reference them, and they might
340+
// have otherwise been cleaned up at that point. To keep things simple, there is no reference
341+
// counting / corresponding Free() call - there is negligible overhead in keeping these
342+
// references around for the lifetime of the process.
343+
GCHandle.Alloc(_destroy, GCHandleType.Normal);
344+
GCHandle.Alloc(_destroyMethodInfo, GCHandleType.Normal);
345+
336346
return isInitialized;
337347
}
338348
}
@@ -506,6 +516,7 @@ internal static SafeKafkaHandle kafka_new(RdKafkaType type, IntPtr conf,
506516
StringBuilder errstr, UIntPtr errstr_size)
507517
=> _new(type, conf, errstr, errstr_size);
508518

519+
private static MethodInfo _destroyMethodInfo;
509520
private static Action<IntPtr> _destroy;
510521
internal static void destroy(IntPtr rk) => _destroy(rk);
511522

0 commit comments

Comments
 (0)