Skip to content

Commit 50528fb

Browse files
committed
fix: test_configs
1 parent cab0b48 commit 50528fb

File tree

3 files changed

+87
-184
lines changed

3 files changed

+87
-184
lines changed

tests/admin.rs

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ use crate::utils::rand::rand_test_topic;
55
use crate::utils::{get_broker_version, KafkaVersion};
66
use backon::{BlockingRetryable, ExponentialBuilder};
77
use rdkafka::admin::{
8-
AdminOptions, ConfigEntry, ConfigSource, NewPartitions, NewTopic, ResourceSpecifier,
9-
TopicReplication,
8+
AdminOptions, AlterConfig, ConfigEntry, ConfigSource, NewPartitions, NewTopic,
9+
OwnedResourceSpecifier, ResourceSpecifier, TopicReplication,
1010
};
1111
use rdkafka::error::KafkaError;
1212
use rdkafka::producer::{FutureRecord, Producer};
@@ -541,3 +541,85 @@ async fn test_delete_records() {
541541
assert_eq!(lo, 5);
542542
assert_eq!(hi, 5);
543543
}
544+
545+
#[tokio::test]
546+
async fn test_configs() {
547+
init_test_logger();
548+
549+
// Get Kafka container context.
550+
let kafka_context = KafkaContext::shared()
551+
.await
552+
.expect("could not create kafka context");
553+
554+
// Create admin client
555+
let admin_client = utils::admin::create_admin_client(&kafka_context.bootstrap_servers)
556+
.await
557+
.expect("could not create admin client");
558+
let opts = AdminOptions::new();
559+
let broker = ResourceSpecifier::Broker(utils::BROKER_ID);
560+
561+
let res = admin_client
562+
.describe_configs(&[broker], &opts)
563+
.await
564+
.expect("describe configs failed");
565+
let config = &res[0].as_ref().expect("describe configs failed");
566+
let orig_val = config
567+
.get("log.flush.interval.messages")
568+
.expect("original config entry missing")
569+
.value
570+
.as_ref()
571+
.expect("original value missing");
572+
573+
let config = AlterConfig::new(broker).set("log.flush.interval.messages", "1234");
574+
let res = admin_client
575+
.alter_configs(&[config], &opts)
576+
.await
577+
.expect("alter configs failed");
578+
assert_eq!(res, &[Ok(OwnedResourceSpecifier::Broker(utils::BROKER_ID))]);
579+
580+
let mut tries = 0;
581+
loop {
582+
let res = admin_client
583+
.describe_configs(&[broker], &opts)
584+
.await
585+
.expect("describe configs failed");
586+
let config = &res[0].as_ref().expect("describe configs failed");
587+
let entry = config.get("log.flush.interval.messages");
588+
let expected_entry = if get_broker_version(&kafka_context) < KafkaVersion(1, 1, 0, 0) {
589+
// Pre-1.1, the AlterConfig operation will silently fail, and the
590+
// config will remain unchanged, which I guess is worth testing.
591+
ConfigEntry {
592+
name: "log.flush.interval.messages".into(),
593+
value: Some(orig_val.clone()),
594+
source: ConfigSource::Default,
595+
is_read_only: true,
596+
is_default: true,
597+
is_sensitive: false,
598+
}
599+
} else {
600+
ConfigEntry {
601+
name: "log.flush.interval.messages".into(),
602+
value: Some("1234".into()),
603+
source: ConfigSource::DynamicBroker,
604+
is_read_only: false,
605+
is_default: false,
606+
is_sensitive: false,
607+
}
608+
};
609+
if entry == Some(&expected_entry) {
610+
break;
611+
} else if tries >= 5 {
612+
panic!("{:?} != {:?}", entry, Some(&expected_entry));
613+
} else {
614+
tries += 1;
615+
tokio::time::sleep(Duration::from_secs(1)).await;
616+
}
617+
}
618+
619+
let config = AlterConfig::new(broker).set("log.flush.interval.ms", orig_val);
620+
let res = admin_client
621+
.alter_configs(&[config], &opts)
622+
.await
623+
.expect("alter configs failed");
624+
assert_eq!(res, &[Ok(OwnedResourceSpecifier::Broker(utils::BROKER_ID))]);
625+
}

tests/producer.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::utils::admin::{create_admin_client, create_topic};
22
use crate::utils::consumer::{create_subscribed_base_consumer, poll_x_times_for_messages};
33
use crate::utils::containers::KafkaContext;
44
use crate::utils::logging::init_test_logger;
5-
use crate::utils::producer::{create_producer, send_record};
5+
use crate::utils::producer::base_producer::create_producer;
66
use crate::utils::rand::rand_test_topic;
77
use rdkafka::producer::BaseRecord;
88
use rdkafka::Message;
@@ -58,7 +58,8 @@ pub async fn test_basic_produce() {
5858
let record = BaseRecord::to(&test_topic_name) // destination topic
5959
.key(&[1, 2, 3, 4]) // message key
6060
.payload("content"); // message payload
61-
let send_record_result = send_record(&base_producer, record).await;
61+
let send_record_result =
62+
crate::utils::producer::base_producer::send_record(&base_producer, record).await;
6263
if send_record_result.is_err() {
6364
panic!("could not send record: {}", send_record_result.unwrap_err());
6465
}

tests/test_admin.rs

Lines changed: 0 additions & 180 deletions
Original file line numberDiff line numberDiff line change
@@ -119,186 +119,6 @@ fn verify_delete(topic: &str) {
119119
.unwrap()
120120
}
121121

122-
/// Test the admin client's delete records functionality.
123-
#[tokio::test]
124-
async fn test_delete_records() {
125-
let producer = create_config().create::<FutureProducer<_>>().unwrap();
126-
let admin_client = create_admin_client();
127-
let timeout = Some(Duration::from_secs(1));
128-
let opts = AdminOptions::new().operation_timeout(timeout);
129-
let topic = rand_test_topic("test_delete_records");
130-
let make_record = || FutureRecord::<str, str>::to(&topic).payload("data");
131-
132-
// Create a topic with a single partition.
133-
admin_client
134-
.create_topics(
135-
&[NewTopic::new(&topic, 1, TopicReplication::Fixed(1))],
136-
&opts,
137-
)
138-
.await
139-
.expect("topic creation failed");
140-
141-
// Ensure that the topic begins with low and high water marks of 0.
142-
let (lo, hi) = (|| producer.client().fetch_watermarks(&topic, 0, timeout))
143-
.retry(ExponentialBuilder::default().with_max_delay(Duration::from_secs(5)))
144-
.call()
145-
.unwrap();
146-
assert_eq!(lo, 0);
147-
assert_eq!(hi, 0);
148-
149-
// Produce five messages to the topic.
150-
for _ in 0..5 {
151-
producer.send(make_record(), timeout).await.unwrap();
152-
}
153-
154-
// Ensure that the high water mark has advanced to 5.
155-
let (lo, hi) = producer
156-
.client()
157-
.fetch_watermarks(&topic, 0, timeout)
158-
.unwrap();
159-
assert_eq!(lo, 0);
160-
assert_eq!(hi, 5);
161-
162-
// Delete the record at offset 0.
163-
let mut tpl = TopicPartitionList::new();
164-
tpl.add_partition_offset(&topic, 0, Offset::Offset(1))
165-
.unwrap();
166-
let res_tpl = admin_client.delete_records(&tpl, &opts).await.unwrap();
167-
assert_eq!(res_tpl.count(), 1);
168-
assert_eq!(res_tpl.elements()[0].topic(), topic);
169-
assert_eq!(res_tpl.elements()[0].partition(), 0);
170-
assert_eq!(res_tpl.elements()[0].offset(), Offset::Offset(1));
171-
assert_eq!(res_tpl.elements()[0].error(), Ok(()));
172-
173-
// Ensure that the low water mark has advanced to 1.
174-
let (lo, hi) = producer
175-
.client()
176-
.fetch_watermarks(&topic, 0, timeout)
177-
.unwrap();
178-
assert_eq!(lo, 1);
179-
assert_eq!(hi, 5);
180-
181-
// Delete the record at offset 1 and also include an invalid partition in
182-
// the request. The invalid partition should not cause the request to fail,
183-
// but we should be able to see the per-partition error in the returned
184-
// topic partition list.
185-
let mut tpl = TopicPartitionList::new();
186-
tpl.add_partition_offset(&topic, 0, Offset::Offset(2))
187-
.unwrap();
188-
tpl.add_partition_offset(&topic, 1, Offset::Offset(1))
189-
.unwrap();
190-
let res_tpl = admin_client.delete_records(&tpl, &opts).await.unwrap();
191-
assert_eq!(res_tpl.count(), 2);
192-
assert_eq!(res_tpl.elements()[0].topic(), topic);
193-
assert_eq!(res_tpl.elements()[0].partition(), 0);
194-
assert_eq!(res_tpl.elements()[0].offset(), Offset::Offset(2));
195-
assert_eq!(res_tpl.elements()[0].error(), Ok(()));
196-
assert_eq!(res_tpl.elements()[1].topic(), topic);
197-
assert_eq!(res_tpl.elements()[1].partition(), 1);
198-
assert_eq!(
199-
res_tpl.elements()[1].error(),
200-
Err(KafkaError::OffsetFetch(RDKafkaErrorCode::UnknownPartition))
201-
);
202-
203-
// Ensure that the low water mark has advanced to 2.
204-
let (lo, hi) = producer
205-
.client()
206-
.fetch_watermarks(&topic, 0, timeout)
207-
.unwrap();
208-
assert_eq!(lo, 2);
209-
assert_eq!(hi, 5);
210-
211-
// Delete all records up to offset 5.
212-
let mut tpl = TopicPartitionList::new();
213-
tpl.add_partition_offset(&topic, 0, Offset::End).unwrap();
214-
let res_tpl = admin_client.delete_records(&tpl, &opts).await.unwrap();
215-
assert_eq!(res_tpl.count(), 1);
216-
assert_eq!(res_tpl.elements()[0].topic(), topic);
217-
assert_eq!(res_tpl.elements()[0].partition(), 0);
218-
assert_eq!(res_tpl.elements()[0].offset(), Offset::Offset(5));
219-
assert_eq!(res_tpl.elements()[0].error(), Ok(()));
220-
221-
// Ensure that the low water mark has advanced to 5.
222-
let (lo, hi) = producer
223-
.client()
224-
.fetch_watermarks(&topic, 0, timeout)
225-
.unwrap();
226-
assert_eq!(lo, 5);
227-
assert_eq!(hi, 5);
228-
}
229-
230-
#[tokio::test]
231-
async fn test_configs() {
232-
let admin_client = create_admin_client();
233-
let opts = AdminOptions::new();
234-
let broker = ResourceSpecifier::Broker(0);
235-
236-
let res = admin_client
237-
.describe_configs(&[broker], &opts)
238-
.await
239-
.expect("describe configs failed");
240-
let config = &res[0].as_ref().expect("describe configs failed");
241-
let orig_val = config
242-
.get("log.flush.interval.messages")
243-
.expect("original config entry missing")
244-
.value
245-
.as_ref()
246-
.expect("original value missing");
247-
248-
let config = AlterConfig::new(broker).set("log.flush.interval.messages", "1234");
249-
let res = admin_client
250-
.alter_configs(&[config], &opts)
251-
.await
252-
.expect("alter configs failed");
253-
assert_eq!(res, &[Ok(OwnedResourceSpecifier::Broker(0))]);
254-
255-
let mut tries = 0;
256-
loop {
257-
let res = admin_client
258-
.describe_configs(&[broker], &opts)
259-
.await
260-
.expect("describe configs failed");
261-
let config = &res[0].as_ref().expect("describe configs failed");
262-
let entry = config.get("log.flush.interval.messages");
263-
let expected_entry = if get_broker_version() < KafkaVersion(1, 1, 0, 0) {
264-
// Pre-1.1, the AlterConfig operation will silently fail, and the
265-
// config will remain unchanged, which I guess is worth testing.
266-
ConfigEntry {
267-
name: "log.flush.interval.messages".into(),
268-
value: Some(orig_val.clone()),
269-
source: ConfigSource::Default,
270-
is_read_only: true,
271-
is_default: true,
272-
is_sensitive: false,
273-
}
274-
} else {
275-
ConfigEntry {
276-
name: "log.flush.interval.messages".into(),
277-
value: Some("1234".into()),
278-
source: ConfigSource::DynamicBroker,
279-
is_read_only: false,
280-
is_default: false,
281-
is_sensitive: false,
282-
}
283-
};
284-
if entry == Some(&expected_entry) {
285-
break;
286-
} else if tries >= 5 {
287-
panic!("{:?} != {:?}", entry, Some(&expected_entry));
288-
} else {
289-
tries += 1;
290-
tokio::time::sleep(Duration::from_secs(1)).await;
291-
}
292-
}
293-
294-
let config = AlterConfig::new(broker).set("log.flush.interval.ms", orig_val);
295-
let res = admin_client
296-
.alter_configs(&[config], &opts)
297-
.await
298-
.expect("alter configs failed");
299-
assert_eq!(res, &[Ok(OwnedResourceSpecifier::Broker(0))]);
300-
}
301-
302122
#[tokio::test]
303123
async fn test_groups() {
304124
let admin_client = create_admin_client();

0 commit comments

Comments
 (0)