Skip to content

Commit 856378f

Browse files
committed
fix: We don't actually need new callbacks!
We just need to trigger the existing one in the ClientContext
1 parent 9c2d2a9 commit 856378f

File tree

6 files changed

+3
-38
lines changed

6 files changed

+3
-38
lines changed

src/consumer/base_consumer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ where
255255
let reason = unsafe {
256256
CStr::from_ptr(rdsys::rd_kafka_event_error_string(event.ptr())).to_string_lossy()
257257
};
258-
self.context().error_callback(error.clone(), reason.trim());
258+
self.context().error(error.clone(), reason.trim());
259259
Some(error)
260260
} else {
261261
None

src/consumer/mod.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,10 +130,6 @@ pub trait ConsumerContext: ClientContext + Sized {
130130
fn main_queue_min_poll_interval(&self) -> Timeout {
131131
Timeout::After(Duration::from_secs(1))
132132
}
133-
134-
/// Error callback
135-
#[allow(unused_variables)]
136-
fn error_callback(&self, error: KafkaError, reason: &str) {}
137133
}
138134

139135
/// An inert [`ConsumerContext`] that can be used when no customizations are

src/producer/base_producer.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ use crate::topic_partition_list::TopicPartitionList;
7070
use crate::util::{Deadline, IntoOpaque, NativePtr, Timeout};
7171

7272
pub use crate::message::DeliveryResult;
73-
use crate::ClientContext;
7473

7574
use super::NoCustomPartitioner;
7675

@@ -418,8 +417,7 @@ where
418417
let reason = unsafe {
419418
CStr::from_ptr(rdsys::rd_kafka_event_error_string(event.ptr())).to_string_lossy()
420419
};
421-
<C as ClientContext>::error(self.context(), error.clone(), reason.trim());
422-
<C as ProducerContext<Part>>::error_callback(self.context(), error, reason.trim());
420+
self.context().error(error.clone(), reason.trim());
423421
}
424422

425423
/// Returns a pointer to the native Kafka client.

src/producer/future_producer.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -201,10 +201,6 @@ where
201201
};
202202
let _ = tx.send(owned_delivery_result); // TODO: handle error
203203
}
204-
205-
fn error_callback(&self, error: KafkaError, reason: &str) {
206-
C::error(&self.wrapped_context, error, reason);
207-
}
208204
}
209205

210206
/// A producer that returns a [`Future`] for every message being produced.
@@ -454,10 +450,6 @@ mod tests {
454450
fn delivery(&self, _: &DeliveryResult<'_>, _: Self::DeliveryOpaque) {
455451
unimplemented!()
456452
}
457-
458-
fn error_callback(&self, _: KafkaError, _: &str) {
459-
unimplemented!()
460-
}
461453
}
462454

463455
// Verify that the future producer is clone, according to documentation.

src/producer/mod.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ use std::sync::Arc;
163163

164164
use crate::client::{Client, ClientContext};
165165
use crate::consumer::ConsumerGroupMetadata;
166-
use crate::error::{KafkaError, KafkaResult};
166+
use crate::error::KafkaResult;
167167
use crate::topic_partition_list::TopicPartitionList;
168168
use crate::util::{IntoOpaque, Timeout};
169169

@@ -201,9 +201,6 @@ pub trait ProducerContext<Part: Partitioner = NoCustomPartitioner>: ClientContex
201201
/// when calling send.
202202
fn delivery(&self, delivery_result: &DeliveryResult<'_>, delivery_opaque: Self::DeliveryOpaque);
203203

204-
/// Error callback
205-
fn error_callback(&self, error: KafkaError, reason: &str);
206-
207204
/// This method is called when creating producer in order to optionally register custom partitioner.
208205
/// If custom partitioner is not used then `partitioner` configuration property is used (or its default).
209206
///
@@ -267,8 +264,6 @@ impl ProducerContext<NoCustomPartitioner> for DefaultProducerContext {
267264
type DeliveryOpaque = ();
268265

269266
fn delivery(&self, _: &DeliveryResult<'_>, _: Self::DeliveryOpaque) {}
270-
271-
fn error_callback(&self, _: KafkaError, _: &str) {}
272267
}
273268

274269
/// Common trait for all producers.

tests/test_low_producers.rs

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,6 @@ impl ProducerContext for PrintingContext {
4242
fn delivery(&self, delivery_result: &DeliveryResult, delivery_opaque: Self::DeliveryOpaque) {
4343
println!("Delivery: {:?} {:?}", delivery_result, delivery_opaque);
4444
}
45-
46-
fn error_callback(&self, error: KafkaError, reason: &str) {
47-
println!("Error: {reason:?} {error:?}");
48-
}
4945
}
5046

5147
type TestProducerDeliveryResult = (OwnedMessage, Option<KafkaError>, usize);
@@ -104,10 +100,6 @@ impl<Part: Partitioner + Send + Sync> ProducerContext<Part> for CollectingContex
104100
Some(p) => Some(p),
105101
}
106102
}
107-
108-
fn error_callback(&self, _: KafkaError, _: &str) {
109-
unimplemented!()
110-
}
111103
}
112104

113105
// Partitioner sending all messages to single, defined partition.
@@ -348,10 +340,6 @@ impl ProducerContext for HeaderCheckContext {
348340
}
349341
(*self.ids.lock().unwrap()).insert(message_id);
350342
}
351-
352-
fn error_callback(&self, _: KafkaError, _: &str) {
353-
unimplemented!()
354-
}
355343
}
356344

357345
#[test]
@@ -441,10 +429,6 @@ fn test_base_producer_opaque_arc() -> Result<(), Box<dyn Error>> {
441429
let mut shared_count = opaque.lock().unwrap();
442430
*shared_count += 1;
443431
}
444-
445-
fn error_callback(&self, _: KafkaError, _: &str) {
446-
unimplemented!()
447-
}
448432
}
449433

450434
let shared_count = Arc::new(Mutex::new(0));

0 commit comments

Comments
 (0)