Skip to content

Commit 361cd7f

Browse files
authored
Implement FromIterator for TopicPartitionList (fede1024#795)
* Implement FromIterator for TopicPartitionList * Support collect() from topic map iterators * Cargo fmt for check happiness.
1 parent 564f5cc commit 361cd7f

File tree

1 file changed

+82
-0
lines changed

1 file changed

+82
-0
lines changed

src/topic_partition_list.rs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use std::collections::HashMap;
66
use std::ffi::{CStr, CString};
77
use std::fmt;
8+
use std::iter::FromIterator;
89
use std::slice;
910
use std::str;
1011

@@ -398,6 +399,40 @@ impl Default for TopicPartitionList {
398399
}
399400
}
400401

402+
impl FromIterator<(String, i32, Offset)> for TopicPartitionList {
403+
fn from_iter<I>(iter: I) -> Self
404+
where
405+
I: IntoIterator<Item = (String, i32, Offset)>,
406+
{
407+
let iter = iter.into_iter();
408+
let (lower_bound, _) = iter.size_hint();
409+
let mut tpl = TopicPartitionList::with_capacity(lower_bound);
410+
411+
for (topic, partition, offset) in iter {
412+
let mut elem = tpl.add_partition(topic.as_str(), partition);
413+
elem.set_offset(offset).unwrap_or_else(|err| {
414+
panic!(
415+
"failed to set offset via collect() for {}:{} (offset: {:?}): {:?}",
416+
topic, partition, offset, err
417+
);
418+
});
419+
}
420+
421+
tpl
422+
}
423+
}
424+
425+
impl FromIterator<((String, i32), Offset)> for TopicPartitionList {
426+
fn from_iter<I>(iter: I) -> Self
427+
where
428+
I: IntoIterator<Item = ((String, i32), Offset)>,
429+
{
430+
iter.into_iter()
431+
.map(|((topic, partition), offset)| (topic, partition, offset))
432+
.collect()
433+
}
434+
}
435+
401436
impl fmt::Debug for TopicPartitionList {
402437
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
403438
f.debug_list().entries(self.elements()).finish()
@@ -541,4 +576,51 @@ mod tests {
541576
assert_eq!(topic_map, topic_map2);
542577
assert_eq!(tpl, tpl2);
543578
}
579+
580+
#[test]
581+
fn collect_topic_partition_list() {
582+
let tpl: TopicPartitionList = vec![
583+
("t1".to_string(), 0, Offset::Beginning),
584+
("t1".to_string(), 1, Offset::Offset(42)),
585+
("t2".to_string(), 0, Offset::End),
586+
]
587+
.into_iter()
588+
.collect();
589+
590+
assert_eq!(tpl.count(), 3);
591+
592+
let t1_0 = tpl.find_partition("t1", 0).unwrap();
593+
assert_eq!(t1_0.offset(), Offset::Beginning);
594+
595+
let t1_1 = tpl.find_partition("t1", 1).unwrap();
596+
assert_eq!(t1_1.offset(), Offset::Offset(42));
597+
598+
let t2_0 = tpl.find_partition("t2", 0).unwrap();
599+
assert_eq!(t2_0.offset(), Offset::End);
600+
}
601+
602+
#[test]
603+
fn collect_topic_partition_list_pairs() {
604+
let tpl: TopicPartitionList = vec![
605+
(("t1".to_string(), 0), Offset::Beginning),
606+
(("t1".to_string(), 1), Offset::Offset(7)),
607+
(("t2".to_string(), 0), Offset::Stored),
608+
]
609+
.into_iter()
610+
.collect();
611+
612+
assert_eq!(tpl.count(), 3);
613+
assert_eq!(
614+
tpl.find_partition("t1", 0).unwrap().offset(),
615+
Offset::Beginning
616+
);
617+
assert_eq!(
618+
tpl.find_partition("t1", 1).unwrap().offset(),
619+
Offset::Offset(7)
620+
);
621+
assert_eq!(
622+
tpl.find_partition("t2", 0).unwrap().offset(),
623+
Offset::Stored
624+
);
625+
}
544626
}

0 commit comments

Comments
 (0)