Skip to content

Commit 36dc43c

Browse files
committed
Enqueue log messages
1 parent bb76b5b commit 36dc43c

File tree

2 files changed

+32
-20
lines changed

2 files changed

+32
-20
lines changed

src/client.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,7 @@ impl<C: ClientContext> Client<C> {
239239
Arc::as_ptr(&context) as *mut c_void,
240240
)
241241
};
242+
native_config.set("log.queue", "true")?;
242243

243244
let client_ptr = unsafe {
244245
let native_config = ManuallyDrop::new(native_config);
@@ -255,6 +256,12 @@ impl<C: ClientContext> Client<C> {
255256
return Err(KafkaError::ClientCreation(err_buf.to_string()));
256257
}
257258

259+
let ret = unsafe {
260+
rdsys::rd_kafka_set_log_queue(client_ptr, rdsys::rd_kafka_queue_get_main(client_ptr))
261+
};
262+
if ret.is_error() {
263+
return Err(KafkaError::Global(ret.into()));
264+
}
258265
unsafe { rdsys::rd_kafka_set_log_level(client_ptr, config.log_level as i32) };
259266

260267
Ok(Client {

src/config.rs

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,30 @@ impl NativeClientConfig {
155155
.to_string_lossy()
156156
.into())
157157
}
158+
159+
pub(crate) fn set(&self, key: &str, value: &str) -> KafkaResult<()> {
160+
let mut err_buf = ErrBuf::new();
161+
let key_c = CString::new(key)?;
162+
let value_c = CString::new(value)?;
163+
let ret = unsafe {
164+
rdsys::rd_kafka_conf_set(
165+
self.ptr(),
166+
key_c.as_ptr(),
167+
value_c.as_ptr(),
168+
err_buf.as_mut_ptr(),
169+
err_buf.capacity(),
170+
)
171+
};
172+
if ret.is_error() {
173+
return Err(KafkaError::ClientConfig(
174+
ret,
175+
err_buf.to_string(),
176+
key.to_string(),
177+
value.to_string(),
178+
));
179+
}
180+
Ok(())
181+
}
158182
}
159183

160184
/// Client configuration.
@@ -228,27 +252,8 @@ impl ClientConfig {
228252
/// Builds a native librdkafka configuration.
229253
pub fn create_native_config(&self) -> KafkaResult<NativeClientConfig> {
230254
let conf = unsafe { NativeClientConfig::from_ptr(rdsys::rd_kafka_conf_new()) };
231-
let mut err_buf = ErrBuf::new();
232255
for (key, value) in &self.conf_map {
233-
let key_c = CString::new(key.to_string())?;
234-
let value_c = CString::new(value.to_string())?;
235-
let ret = unsafe {
236-
rdsys::rd_kafka_conf_set(
237-
conf.ptr(),
238-
key_c.as_ptr(),
239-
value_c.as_ptr(),
240-
err_buf.as_mut_ptr(),
241-
err_buf.capacity(),
242-
)
243-
};
244-
if ret.is_error() {
245-
return Err(KafkaError::ClientConfig(
246-
ret,
247-
err_buf.to_string(),
248-
key.to_string(),
249-
value.to_string(),
250-
));
251-
}
256+
conf.set(key, value)?;
252257
}
253258
Ok(conf)
254259
}

0 commit comments

Comments
 (0)