Skip to content

Commit b75372b

Browse files
authored
don't set callbacks if not configured (#900)
1 parent 006c7b5 commit b75372b

File tree

2 files changed

+32
-8
lines changed

2 files changed

+32
-8
lines changed

src/Confluent.Kafka/Consumer.cs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -590,12 +590,27 @@ internal Consumer(ConsumerBuilder<TKey, TValue> builder)
590590

591591
IntPtr configPtr = configHandle.DangerousGetHandle();
592592

593-
Librdkafka.conf_set_rebalance_cb(configPtr, rebalanceDelegate);
594-
Librdkafka.conf_set_offset_commit_cb(configPtr, commitDelegate);
593+
if (partitionsAssignedHandler != null || partitionsRevokedHandler != null)
594+
{
595+
Librdkafka.conf_set_rebalance_cb(configPtr, rebalanceDelegate);
596+
}
597+
if (offsetsCommittedHandler != null)
598+
{
599+
Librdkafka.conf_set_offset_commit_cb(configPtr, commitDelegate);
600+
}
595601

596-
Librdkafka.conf_set_error_cb(configPtr, errorCallbackDelegate);
597-
Librdkafka.conf_set_log_cb(configPtr, logCallbackDelegate);
598-
Librdkafka.conf_set_stats_cb(configPtr, statisticsCallbackDelegate);
602+
if (errorHandler != null)
603+
{
604+
Librdkafka.conf_set_error_cb(configPtr, errorCallbackDelegate);
605+
}
606+
if (logHandler != null)
607+
{
608+
Librdkafka.conf_set_log_cb(configPtr, logCallbackDelegate);
609+
}
610+
if (statisticsHandler != null)
611+
{
612+
Librdkafka.conf_set_stats_cb(configPtr, statisticsCallbackDelegate);
613+
}
599614

600615
this.kafkaHandle = SafeKafkaHandle.Create(RdKafkaType.Consumer, configPtr, this);
601616
configHandle.SetHandleAsInvalid(); // config object is no longer useable.

src/Confluent.Kafka/Producer.cs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -587,9 +587,18 @@ internal Producer(ProducerBuilder<TKey, TValue> builder)
587587
logCallbackDelegate = LogCallback;
588588
statisticsCallbackDelegate = StatisticsCallback;
589589

590-
Librdkafka.conf_set_error_cb(configPtr, errorCallbackDelegate);
591-
Librdkafka.conf_set_log_cb(configPtr, logCallbackDelegate);
592-
Librdkafka.conf_set_stats_cb(configPtr, statisticsCallbackDelegate);
590+
if (errorHandler != null)
591+
{
592+
Librdkafka.conf_set_error_cb(configPtr, errorCallbackDelegate);
593+
}
594+
if (logHandler != null)
595+
{
596+
Librdkafka.conf_set_log_cb(configPtr, logCallbackDelegate);
597+
}
598+
if (statisticsHandler != null)
599+
{
600+
Librdkafka.conf_set_stats_cb(configPtr, statisticsCallbackDelegate);
601+
}
593602

594603
this.ownedKafkaHandle = SafeKafkaHandle.Create(RdKafkaType.Producer, configPtr, this);
595604
configHandle.SetHandleAsInvalid(); // config object is no longer useable.

0 commit comments

Comments
 (0)