Skip to content

Commit d739960

Browse files
manifestfede1024
authored andcommitted
Enable the development of custom consumers
It is currently impossible to develop a custom consumer based on `BaseConsumer` because its `queue` property, which is necessary to receive notifications about new incoming messages, is private. This defines `set_nonempty_callback` method on `BaseConsumer` similarly to how it has already been done for `PartitionQueue`. That will allow setting `rdkafka_sys::rd_kafka_queue_cb_event_enable` callback from within a custom consumer implementation.
1 parent 52546d0 commit d739960

File tree

2 files changed

+129
-0
lines changed

2 files changed

+129
-0
lines changed

src/consumer/base_consumer.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ where
3838
client: Client<C>,
3939
queue: NativeQueue,
4040
group_id: Option<String>,
41+
nonempty_callback: Option<Box<Box<dyn Fn() + Send + Sync>>>,
4142
}
4243

4344
impl FromClientConfig for BaseConsumer {
@@ -98,6 +99,7 @@ where
9899
client,
99100
queue,
100101
group_id,
102+
nonempty_callback: None,
101103
})
102104
}
103105

@@ -360,6 +362,36 @@ where
360362
pub(crate) fn native_client(&self) -> &NativeClient {
361363
self.client.native_client()
362364
}
365+
366+
/// Sets a callback that will be invoked whenever the queue becomes
367+
/// nonempty.
368+
pub fn set_nonempty_callback<F>(&mut self, f: F)
369+
where
370+
F: Fn() + Send + Sync + 'static,
371+
{
372+
// SAFETY: we keep `F` alive until the next call to
373+
// `rd_kafka_queue_cb_event_enable`. That might be the next call to
374+
// `set_nonempty_callback` or it might be when the queue is dropped. The
375+
// double indirection is required because `&dyn Fn` is a fat pointer.
376+
377+
unsafe extern "C" fn native_message_queue_nonempty_cb(
378+
_: *mut RDKafka,
379+
opaque_ptr: *mut c_void,
380+
) {
381+
let f = opaque_ptr as *const *const (dyn Fn() + Send + Sync);
382+
(**f)();
383+
}
384+
385+
let f: Box<Box<dyn Fn() + Send + Sync>> = Box::new(Box::new(f));
386+
unsafe {
387+
rdsys::rd_kafka_queue_cb_event_enable(
388+
self.queue.ptr(),
389+
Some(native_message_queue_nonempty_cb),
390+
&*f as *const _ as *mut c_void,
391+
)
392+
}
393+
self.nonempty_callback = Some(f);
394+
}
363395
}
364396

365397
impl<C> Consumer<C> for BaseConsumer<C>
@@ -722,6 +754,8 @@ where
722754
C: ConsumerContext,
723755
{
724756
fn drop(&mut self) {
757+
unsafe { rdsys::rd_kafka_queue_cb_event_enable(self.queue.ptr(), None, ptr::null_mut()) }
758+
725759
trace!("Destroying consumer: {:?}", self.client.native_ptr());
726760
if self.group_id.is_some() {
727761
if let Err(err) = self.close_queue() {

tests/test_low_consumers.rs

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,101 @@ async fn test_produce_consume_message_queue_nonempty_callback() {
447447
assert_eq!(wakeups.load(Ordering::SeqCst), 2);
448448
}
449449

450+
#[tokio::test]
451+
async fn test_produce_consume_consumer_nonempty_callback() {
452+
let _r = env_logger::try_init();
453+
454+
let topic_name = rand_test_topic("test_produce_consume_consumer_nonempty_callback");
455+
456+
create_topic(&topic_name, 1).await;
457+
458+
// Turn off statistics to prevent interference with the wakeups counter.
459+
let mut config_overrides = HashMap::new();
460+
config_overrides.insert("statistics.interval.ms", "0");
461+
462+
let mut consumer: BaseConsumer<_> = consumer_config(&rand_test_group(), Some(config_overrides))
463+
.create_with_context(ConsumerTestContext { _n: 64 })
464+
.expect("Consumer creation failed");
465+
466+
let mut tpl = TopicPartitionList::new();
467+
tpl.add_partition_offset(&topic_name, 0, Offset::Beginning)
468+
.unwrap();
469+
consumer.assign(&tpl).unwrap();
470+
471+
let wakeups = Arc::new(AtomicUsize::new(0));
472+
consumer.set_nonempty_callback({
473+
let wakeups = wakeups.clone();
474+
move || {
475+
wakeups.fetch_add(1, Ordering::SeqCst);
476+
}
477+
});
478+
479+
let wait_for_wakeups = |target| {
480+
let start = Instant::now();
481+
let timeout = Duration::from_secs(15);
482+
loop {
483+
let w = wakeups.load(Ordering::SeqCst);
484+
match w.cmp(&target) {
485+
std::cmp::Ordering::Equal => break,
486+
std::cmp::Ordering::Greater => panic!("wakeups {} exceeds target {}", w, target),
487+
std::cmp::Ordering::Less => (),
488+
};
489+
thread::sleep(Duration::from_millis(100));
490+
if start.elapsed() > timeout {
491+
panic!("timeout exceeded while waiting for wakeup");
492+
}
493+
}
494+
};
495+
496+
// Initiate connection.
497+
assert!(consumer.poll(Duration::from_secs(0)).is_none());
498+
499+
// Expect no wakeups for 1s.
500+
thread::sleep(Duration::from_secs(1));
501+
assert_eq!(wakeups.load(Ordering::SeqCst), 0);
502+
503+
// Verify there are no messages waiting.
504+
assert!(consumer.poll(Duration::from_secs(0)).is_none());
505+
506+
// Populate the topic, and expect a wakeup notifying us of the new messages.
507+
populate_topic(&topic_name, 2, &value_fn, &key_fn, None, None).await;
508+
wait_for_wakeups(1);
509+
510+
// Read one of the messages.
511+
assert!(consumer.poll(Duration::from_secs(0)).is_some());
512+
513+
// Add more messages to the topic. Expect no additional wakeups, as the
514+
// queue is not fully drained, for 1s.
515+
populate_topic(&topic_name, 2, &value_fn, &key_fn, None, None).await;
516+
thread::sleep(Duration::from_secs(1));
517+
assert_eq!(wakeups.load(Ordering::SeqCst), 1);
518+
519+
// Drain the queue.
520+
assert!(consumer.poll(None).is_some());
521+
assert!(consumer.poll(None).is_some());
522+
assert!(consumer.poll(None).is_some());
523+
524+
// Expect no additional wakeups for 1s.
525+
thread::sleep(Duration::from_secs(1));
526+
assert_eq!(wakeups.load(Ordering::SeqCst), 1);
527+
528+
// Add another message, and expect a wakeup.
529+
populate_topic(&topic_name, 1, &value_fn, &key_fn, None, None).await;
530+
wait_for_wakeups(2);
531+
532+
// Expect no additional wakeups for 1s.
533+
thread::sleep(Duration::from_secs(1));
534+
assert_eq!(wakeups.load(Ordering::SeqCst), 2);
535+
536+
// Disable the queue and add another message.
537+
consumer.set_nonempty_callback(|| ());
538+
populate_topic(&topic_name, 1, &value_fn, &key_fn, None, None).await;
539+
540+
// Expect no additional wakeups for 1s.
541+
thread::sleep(Duration::from_secs(1));
542+
assert_eq!(wakeups.load(Ordering::SeqCst), 2);
543+
}
544+
450545
#[tokio::test]
451546
async fn test_invalid_consumer_position() {
452547
// Regression test for #360, in which calling `position` on a consumer which

0 commit comments

Comments
 (0)