Skip to content

Commit cab0b48

Browse files
committed
fix: test_delete_records
1 parent e9d5812 commit cab0b48

File tree

4 files changed

+155
-1
lines changed

4 files changed

+155
-1
lines changed

tests/admin.rs

Lines changed: 141 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@ use crate::utils::containers::KafkaContext;
33
use crate::utils::logging::init_test_logger;
44
use crate::utils::rand::rand_test_topic;
55
use crate::utils::{get_broker_version, KafkaVersion};
6+
use backon::{BlockingRetryable, ExponentialBuilder};
67
use rdkafka::admin::{
78
AdminOptions, ConfigEntry, ConfigSource, NewPartitions, NewTopic, ResourceSpecifier,
89
TopicReplication,
910
};
1011
use rdkafka::error::KafkaError;
12+
use rdkafka::producer::{FutureRecord, Producer};
13+
use rdkafka::{Offset, TopicPartitionList};
1114
use rdkafka_sys::RDKafkaErrorCode;
1215
use std::time::Duration;
1316

@@ -51,6 +54,8 @@ pub async fn test_topic_creation() {
5154
/// be deleted.
5255
#[tokio::test]
5356
pub async fn test_topic_create_and_delete() {
57+
init_test_logger();
58+
5459
// Get Kafka container context.
5560
let kafka_context = KafkaContext::shared()
5661
.await
@@ -207,6 +212,8 @@ pub async fn test_topic_create_and_delete() {
207212
/// creating topics.
208213
#[tokio::test]
209214
pub async fn test_incorrect_replication_factors_are_ignored_when_creating_topics() {
215+
init_test_logger();
216+
210217
// Get Kafka container context.
211218
let kafka_context_result = KafkaContext::shared().await;
212219
let Ok(kafka_context) = kafka_context_result else {
@@ -215,7 +222,6 @@ pub async fn test_incorrect_replication_factors_are_ignored_when_creating_topics
215222
kafka_context_result.unwrap_err()
216223
);
217224
};
218-
let test_topic_name = rand_test_topic("testing-topic");
219225

220226
let admin_client_result =
221227
utils::admin::create_admin_client(&kafka_context.bootstrap_servers).await;
@@ -247,6 +253,8 @@ pub async fn test_incorrect_replication_factors_are_ignored_when_creating_topics
247253
/// creating partitions.
248254
#[tokio::test]
249255
pub async fn test_incorrect_replication_factors_are_ignored_when_creating_partitions() {
256+
init_test_logger();
257+
250258
// Get Kafka container context.
251259
let kafka_context_result = KafkaContext::shared().await;
252260
let Ok(kafka_context) = kafka_context_result else {
@@ -310,6 +318,8 @@ pub async fn test_incorrect_replication_factors_are_ignored_when_creating_partit
310318
/// Verify that deleting a non-existent topic fails.
311319
#[tokio::test]
312320
pub async fn test_delete_nonexistent_topics() {
321+
init_test_logger();
322+
313323
// Get Kafka container context.
314324
let kafka_context = KafkaContext::shared()
315325
.await
@@ -336,6 +346,8 @@ pub async fn test_delete_nonexistent_topics() {
336346
/// failing operators.
337347
#[tokio::test]
338348
pub async fn test_mixed_success_results() {
349+
init_test_logger();
350+
339351
// Get Kafka container context.
340352
let kafka_context = KafkaContext::shared()
341353
.await
@@ -401,3 +413,131 @@ pub async fn test_mixed_success_results() {
401413
]
402414
);
403415
}
416+
417+
/// Test the admin client's delete records functionality.
418+
#[tokio::test]
419+
async fn test_delete_records() {
420+
init_test_logger();
421+
422+
// Get Kafka container context.
423+
let kafka_context = KafkaContext::shared()
424+
.await
425+
.expect("could not create kafka context");
426+
427+
// Create admin client
428+
let admin_client = utils::admin::create_admin_client(&kafka_context.bootstrap_servers)
429+
.await
430+
.expect("could not create admin client");
431+
432+
// Create producer client
433+
let producer_client =
434+
utils::producer::future_producer::create_producer(&kafka_context.bootstrap_servers)
435+
.await
436+
.expect("could not create producer_client");
437+
438+
let timeout = Some(Duration::from_secs(1));
439+
let opts = AdminOptions::new().operation_timeout(timeout);
440+
let topic = rand_test_topic("test_delete_records");
441+
let make_record = || FutureRecord::<str, str>::to(&topic).payload("data");
442+
443+
// Create a topic with a single partition.
444+
admin_client
445+
.create_topics(
446+
&[NewTopic::new(&topic, 1, TopicReplication::Fixed(1))],
447+
&opts,
448+
)
449+
.await
450+
.expect("topic creation failed");
451+
452+
// Ensure that the topic begins with low and high water marks of 0.
453+
let (lo, hi) = (|| {
454+
producer_client
455+
.client()
456+
.fetch_watermarks(&topic, 0, timeout)
457+
})
458+
.retry(ExponentialBuilder::default().with_max_delay(Duration::from_secs(5)))
459+
.call()
460+
.unwrap();
461+
assert_eq!(lo, 0);
462+
assert_eq!(hi, 0);
463+
464+
// Produce five messages to the topic.
465+
for _ in 0..5 {
466+
producer_client.send(make_record(), timeout).await.unwrap();
467+
}
468+
469+
// Ensure that the high water mark has advanced to 5.
470+
let (lo, hi) = producer_client
471+
.client()
472+
.fetch_watermarks(&topic, 0, timeout)
473+
.unwrap();
474+
assert_eq!(lo, 0);
475+
assert_eq!(hi, 5);
476+
477+
// Delete the record at offset 0.
478+
let mut tpl = TopicPartitionList::new();
479+
tpl.add_partition_offset(&topic, 0, Offset::Offset(1))
480+
.unwrap();
481+
let res_tpl = admin_client.delete_records(&tpl, &opts).await.unwrap();
482+
assert_eq!(res_tpl.count(), 1);
483+
assert_eq!(res_tpl.elements()[0].topic(), topic);
484+
assert_eq!(res_tpl.elements()[0].partition(), 0);
485+
assert_eq!(res_tpl.elements()[0].offset(), Offset::Offset(1));
486+
assert_eq!(res_tpl.elements()[0].error(), Ok(()));
487+
488+
// Ensure that the low water mark has advanced to 1.
489+
let (lo, hi) = producer_client
490+
.client()
491+
.fetch_watermarks(&topic, 0, timeout)
492+
.unwrap();
493+
assert_eq!(lo, 1);
494+
assert_eq!(hi, 5);
495+
496+
// Delete the record at offset 1 and also include an invalid partition in
497+
// the request. The invalid partition should not cause the request to fail,
498+
// but we should be able to see the per-partition error in the returned
499+
// topic partition list.
500+
let mut tpl = TopicPartitionList::new();
501+
tpl.add_partition_offset(&topic, 0, Offset::Offset(2))
502+
.unwrap();
503+
tpl.add_partition_offset(&topic, 1, Offset::Offset(1))
504+
.unwrap();
505+
let res_tpl = admin_client.delete_records(&tpl, &opts).await.unwrap();
506+
assert_eq!(res_tpl.count(), 2);
507+
assert_eq!(res_tpl.elements()[0].topic(), topic);
508+
assert_eq!(res_tpl.elements()[0].partition(), 0);
509+
assert_eq!(res_tpl.elements()[0].offset(), Offset::Offset(2));
510+
assert_eq!(res_tpl.elements()[0].error(), Ok(()));
511+
assert_eq!(res_tpl.elements()[1].topic(), topic);
512+
assert_eq!(res_tpl.elements()[1].partition(), 1);
513+
assert_eq!(
514+
res_tpl.elements()[1].error(),
515+
Err(KafkaError::OffsetFetch(RDKafkaErrorCode::UnknownPartition))
516+
);
517+
518+
// Ensure that the low water mark has advanced to 2.
519+
let (lo, hi) = producer_client
520+
.client()
521+
.fetch_watermarks(&topic, 0, timeout)
522+
.unwrap();
523+
assert_eq!(lo, 2);
524+
assert_eq!(hi, 5);
525+
526+
// Delete all records up to offset 5.
527+
let mut tpl = TopicPartitionList::new();
528+
tpl.add_partition_offset(&topic, 0, Offset::End).unwrap();
529+
let res_tpl = admin_client.delete_records(&tpl, &opts).await.unwrap();
530+
assert_eq!(res_tpl.count(), 1);
531+
assert_eq!(res_tpl.elements()[0].topic(), topic);
532+
assert_eq!(res_tpl.elements()[0].partition(), 0);
533+
assert_eq!(res_tpl.elements()[0].offset(), Offset::Offset(5));
534+
assert_eq!(res_tpl.elements()[0].error(), Ok(()));
535+
536+
// Ensure that the low water mark has advanced to 5.
537+
let (lo, hi) = producer_client
538+
.client()
539+
.fetch_watermarks(&topic, 0, timeout)
540+
.unwrap();
541+
assert_eq!(lo, 5);
542+
assert_eq!(hi, 5);
543+
}
File renamed without changes.
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
use anyhow::Context;
2+
use rdkafka::config::FromClientConfig;
3+
use rdkafka::producer::FutureProducer;
4+
use rdkafka::ClientConfig;
5+
6+
pub async fn create_producer(bootstrap_servers: &str) -> anyhow::Result<FutureProducer> {
7+
let mut producer_client_config = ClientConfig::default();
8+
producer_client_config.set("bootstrap.servers", bootstrap_servers);
9+
let future_producer = FutureProducer::from_config(&producer_client_config)
10+
.context("couldn't create producer client")?;
11+
Ok(future_producer)
12+
}

tests/utils/producer/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pub mod base_producer;
2+
pub mod future_producer;

0 commit comments

Comments
 (0)