Skip to content

Commit 712f9de

Browse files
authored
Merge pull request #714 from fede1024/delete-records
Add support for deleting records to admin client
2 parents 964d5b8 + 990e2f9 commit 712f9de

File tree

4 files changed

+198
-15
lines changed

4 files changed

+198
-15
lines changed

rdkafka-sys/src/types.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ pub type RDKafkaDeleteGroup = bindings::rd_kafka_DeleteGroup_t;
7878
/// Native rdkafka new partitions object.
7979
pub type RDKafkaNewPartitions = bindings::rd_kafka_NewPartitions_t;
8080

81+
/// Native rdkafka delete records object.
82+
pub type RDKafkaDeleteRecords = bindings::rd_kafka_DeleteRecords_t;
83+
8184
/// Native rdkafka config resource.
8285
pub type RDKafkaConfigResource = bindings::rd_kafka_ConfigResource_t;
8386

src/admin.rs

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext};
2626
use crate::error::{IsError, KafkaError, KafkaResult};
2727
use crate::log::{trace, warn};
2828
use crate::util::{cstr_to_owned, AsCArray, ErrBuf, IntoOpaque, KafkaDrop, NativePtr, Timeout};
29+
use crate::TopicPartitionList;
2930

3031
//
3132
// ********** ADMIN CLIENT **********
@@ -218,6 +219,53 @@ impl<C: ClientContext> AdminClient<C> {
218219
Ok(rx)
219220
}
220221

222+
/// Deletes records from a topic.
223+
///
224+
/// The provided `offsets` is a topic partition list specifying which
225+
/// records to delete from a list of topic partitions. For each entry in the
226+
/// list, the messages at offsets before the specified offsets (exclusive)
227+
/// in the specified partition will be deleted. Use offset [`Offset::End`]
228+
/// to delete all records in the partition.
229+
///
230+
/// Returns a topic partition list describing the result of the deletion. If
231+
/// the operation succeeded for a partition, the offset for that partition
232+
/// will be set to the post-deletion low-water mark for that partition. If
233+
/// the operation failed for a partition, there will be an error for that
234+
/// partition's entry in the list.
235+
pub fn delete_records(
236+
&self,
237+
offsets: &TopicPartitionList,
238+
opts: &AdminOptions,
239+
) -> impl Future<Output = KafkaResult<TopicPartitionList>> {
240+
match self.delete_records_inner(offsets, opts) {
241+
Ok(rx) => Either::Left(DeleteRecordsFuture { rx }),
242+
Err(err) => Either::Right(future::err(err)),
243+
}
244+
}
245+
246+
fn delete_records_inner(
247+
&self,
248+
offsets: &TopicPartitionList,
249+
opts: &AdminOptions,
250+
) -> KafkaResult<oneshot::Receiver<NativeEvent>> {
251+
let mut err_buf = ErrBuf::new();
252+
let delete_records = unsafe {
253+
NativeDeleteRecords::from_ptr(rdsys::rd_kafka_DeleteRecords_new(offsets.ptr()))
254+
}
255+
.ok_or_else(|| KafkaError::AdminOpCreation(err_buf.to_string()))?;
256+
let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
257+
unsafe {
258+
rdsys::rd_kafka_DeleteRecords(
259+
self.client.native_ptr(),
260+
&mut delete_records.ptr(),
261+
1,
262+
native_opts.ptr(),
263+
self.queue.ptr(),
264+
);
265+
}
266+
Ok(rx)
267+
}
268+
221269
/// Retrieves the configuration parameters for the specified resources.
222270
///
223271
/// Note that while the API supports describing multiple configurations at
@@ -950,6 +998,43 @@ impl Future for CreatePartitionsFuture {
950998
}
951999
}
9521000

1001+
//
1002+
// Delete records handling
1003+
//
1004+
1005+
type NativeDeleteRecords = NativePtr<RDKafkaDeleteRecords>;
1006+
1007+
unsafe impl KafkaDrop for RDKafkaDeleteRecords {
1008+
const TYPE: &'static str = "delete records";
1009+
const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_DeleteRecords_destroy;
1010+
}
1011+
1012+
struct DeleteRecordsFuture {
1013+
rx: oneshot::Receiver<NativeEvent>,
1014+
}
1015+
1016+
impl Future for DeleteRecordsFuture {
1017+
type Output = KafkaResult<TopicPartitionList>;
1018+
1019+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1020+
let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
1021+
event.check_error()?;
1022+
let res = unsafe { rdsys::rd_kafka_event_DeleteRecords_result(event.ptr()) };
1023+
if res.is_null() {
1024+
let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
1025+
return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
1026+
"delete records request received response of incorrect type ({})",
1027+
typ
1028+
))));
1029+
}
1030+
let tpl = unsafe {
1031+
let tpl = rdsys::rd_kafka_DeleteRecords_result_offsets(res);
1032+
TopicPartitionList::from_ptr(rdsys::rd_kafka_topic_partition_list_copy(tpl))
1033+
};
1034+
Poll::Ready(Ok(tpl))
1035+
}
1036+
}
1037+
9531038
//
9541039
// Describe configs handling
9551040
//

src/util.rs

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -318,20 +318,6 @@ where
318318
}
319319
}
320320

321-
#[allow(dead_code)]
322-
pub(crate) struct OnDrop<F>(pub F)
323-
where
324-
F: Fn();
325-
326-
impl<F> Drop for OnDrop<F>
327-
where
328-
F: Fn(),
329-
{
330-
fn drop(&mut self) {
331-
(self.0)()
332-
}
333-
}
334-
335321
/// An abstraction over asynchronous runtimes.
336322
///
337323
/// There are several asynchronous runtimes available for Rust. By default

tests/test_admin.rs

Lines changed: 110 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ use rdkafka::client::DefaultClientContext;
1212
use rdkafka::consumer::{BaseConsumer, CommitMode, Consumer, DefaultConsumerContext};
1313
use rdkafka::error::{KafkaError, RDKafkaErrorCode};
1414
use rdkafka::metadata::Metadata;
15-
use rdkafka::{ClientConfig, TopicPartitionList};
15+
use rdkafka::producer::{FutureProducer, FutureRecord, Producer};
16+
use rdkafka::{ClientConfig, Offset, TopicPartitionList};
1617

1718
use crate::utils::*;
1819

@@ -356,6 +357,114 @@ async fn test_topics() {
356357
}
357358
}
358359

360+
/// Test the admin client's delete records functionality.
361+
#[tokio::test]
362+
async fn test_delete_records() {
363+
let producer = create_config().create::<FutureProducer<_>>().unwrap();
364+
let admin_client = create_admin_client();
365+
let timeout = Some(Duration::from_secs(1));
366+
let opts = AdminOptions::new().operation_timeout(timeout);
367+
let topic = rand_test_topic("test_delete_records");
368+
let make_record = || FutureRecord::<str, str>::to(&topic).payload("data");
369+
370+
// Create a topic with a single partition.
371+
admin_client
372+
.create_topics(
373+
&[NewTopic::new(&topic, 1, TopicReplication::Fixed(1))],
374+
&opts,
375+
)
376+
.await
377+
.expect("topic creation failed");
378+
379+
// Ensure that the topic begins with low and high water marks of 0.
380+
let (lo, hi) = producer
381+
.client()
382+
.fetch_watermarks(&topic, 0, timeout)
383+
.unwrap();
384+
assert_eq!(lo, 0);
385+
assert_eq!(hi, 0);
386+
387+
// Produce five messages to the topic.
388+
for _ in 0..5 {
389+
producer.send(make_record(), timeout).await.unwrap();
390+
}
391+
392+
// Ensure that the high water mark has advanced to 5.
393+
let (lo, hi) = producer
394+
.client()
395+
.fetch_watermarks(&topic, 0, timeout)
396+
.unwrap();
397+
assert_eq!(lo, 0);
398+
assert_eq!(hi, 5);
399+
400+
// Delete the record at offset 0.
401+
let mut tpl = TopicPartitionList::new();
402+
tpl.add_partition_offset(&topic, 0, Offset::Offset(1))
403+
.unwrap();
404+
let res_tpl = admin_client.delete_records(&tpl, &opts).await.unwrap();
405+
assert_eq!(res_tpl.count(), 1);
406+
assert_eq!(res_tpl.elements()[0].topic(), topic);
407+
assert_eq!(res_tpl.elements()[0].partition(), 0);
408+
assert_eq!(res_tpl.elements()[0].offset(), Offset::Offset(1));
409+
assert_eq!(res_tpl.elements()[0].error(), Ok(()));
410+
411+
// Ensure that the low water mark has advanced to 1.
412+
let (lo, hi) = producer
413+
.client()
414+
.fetch_watermarks(&topic, 0, timeout)
415+
.unwrap();
416+
assert_eq!(lo, 1);
417+
assert_eq!(hi, 5);
418+
419+
// Delete the record at offset 1 and also include an invalid partition in
420+
// the request. The invalid partition should not cause the request to fail,
421+
// but we should be able to see the per-partition error in the returned
422+
// topic partition list.
423+
let mut tpl = TopicPartitionList::new();
424+
tpl.add_partition_offset(&topic, 0, Offset::Offset(2))
425+
.unwrap();
426+
tpl.add_partition_offset(&topic, 1, Offset::Offset(1))
427+
.unwrap();
428+
let res_tpl = admin_client.delete_records(&tpl, &opts).await.unwrap();
429+
assert_eq!(res_tpl.count(), 2);
430+
assert_eq!(res_tpl.elements()[0].topic(), topic);
431+
assert_eq!(res_tpl.elements()[0].partition(), 0);
432+
assert_eq!(res_tpl.elements()[0].offset(), Offset::Offset(2));
433+
assert_eq!(res_tpl.elements()[0].error(), Ok(()));
434+
assert_eq!(res_tpl.elements()[1].topic(), topic);
435+
assert_eq!(res_tpl.elements()[1].partition(), 1);
436+
assert_eq!(
437+
res_tpl.elements()[1].error(),
438+
Err(KafkaError::OffsetFetch(RDKafkaErrorCode::UnknownPartition))
439+
);
440+
441+
// Ensure that the low water mark has advanced to 2.
442+
let (lo, hi) = producer
443+
.client()
444+
.fetch_watermarks(&topic, 0, timeout)
445+
.unwrap();
446+
assert_eq!(lo, 2);
447+
assert_eq!(hi, 5);
448+
449+
// Delete all records up to offset 5.
450+
let mut tpl = TopicPartitionList::new();
451+
tpl.add_partition_offset(&topic, 0, Offset::End).unwrap();
452+
let res_tpl = admin_client.delete_records(&tpl, &opts).await.unwrap();
453+
assert_eq!(res_tpl.count(), 1);
454+
assert_eq!(res_tpl.elements()[0].topic(), topic);
455+
assert_eq!(res_tpl.elements()[0].partition(), 0);
456+
assert_eq!(res_tpl.elements()[0].offset(), Offset::Offset(5));
457+
assert_eq!(res_tpl.elements()[0].error(), Ok(()));
458+
459+
// Ensure that the low water mark has advanced to 5.
460+
let (lo, hi) = producer
461+
.client()
462+
.fetch_watermarks(&topic, 0, timeout)
463+
.unwrap();
464+
assert_eq!(lo, 5);
465+
assert_eq!(hi, 5);
466+
}
467+
359468
#[tokio::test]
360469
async fn test_configs() {
361470
let admin_client = create_admin_client();

0 commit comments

Comments
 (0)