Skip to content

Commit ffe4019

Browse files
committed
Additional AdminClient operations.
This PR adds three admin operations that were not previously supported: - AdminClient::list_offsets: List offsets for the specified topic_partitions. - AdminClient::describe_consumer_groups: Provides a description on the requested consumer groups. - AdminClient::list_consumer_group_offsets: List offset information for the topics subscribed by a consumer group. Implementation notes: - This mostly my first Rust code, so please comment and critique. The work started because I was in search of something to do after having read rust tutorials and documents. I decided for re-doing a python tool I've recently coded for Kafka administration tasks and I found that some of the operations I was using in python were missing from the admin interface in rdkafka. Although the implementation I did in my rust tool was different, I decided to try my hand at enhancing rdkafka and this PR is the result. - For all three operations I've used a similar pattern for the result: `KafkaResult<Vec<Result<TYPE, (KEY, KafkaError)>`. This is similar to the existing GroupResult. The alternative was to insert the error value in the TYPE record, following closer the librdkafka structure, but I felt it was less in line with rust conventions. - I felt that I was adding many types and that it was best to define them in a different module/file for each operation. What I've done is re-exporting them from the admin module. - I've defined types::RDKafkaXXX for all the bindings::rd_kafka_xxxx types, using the RDKafkaXXX alias in the code, keeping the function calls to rdsys::rd_kafka_xxxx - Regarding GroupResult and ConsumerGroupResult: Technically they both come from the same rd_kafka_group_result_t, GroupResul just extracting the consumer group name. The GroupResult type was already used by the delete_groups operation. In order not to break the existing API, I'm maintaining both. Perhaps in the future they could be enhanced (or perhaps it is well as it is, as the additional information is not meaningful in the delete_groups case). - Added use cases covering the different operations. I've probably not been exhaustive, but I hope the coverage is enough. - Documentation and field and structures descriptions I've taken from a mixture of the java, python and the protocol documentation. Hope it is in line with the rest of the documentation provided.
1 parent a8d089d commit ffe4019

File tree

7 files changed

+1628
-4
lines changed

7 files changed

+1628
-4
lines changed

rdkafka-sys/src/types.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,39 @@ pub type RDKafkaTopicResult = bindings::rd_kafka_topic_result_t;
9696
/// Native rdkafka group result.
9797
pub type RDKafkaGroupResult = bindings::rd_kafka_group_result_t;
9898

99+
/// Native rdkafka list offsets result.
100+
pub type RDKafkaListOffsetsResult = bindings::rd_kafka_ListOffsets_result_t;
101+
102+
/// Native rdkafka list offsets info returned.
103+
pub type RDKafkaListOffsetsResultInfo = bindings::rd_kafka_ListOffsetsResultInfo_t;
104+
105+
/// Native result of the Describe Consumer Group operation.
106+
pub type RDKafkaDescribeConsumerGroupsResult = bindings::rd_kafka_DescribeConsumerGroups_result_t;
107+
108+
/// Native description of a Consumer Group.
109+
pub type RDKafkaConsumerGroupDescription = bindings::rd_kafka_ConsumerGroupDescription_t;
110+
111+
/// Native ACL Operation enum.
112+
pub type RDKafkaAclOperation = bindings::rd_kafka_AclOperation_t;
113+
114+
/// Native Consumer Group State enum.
115+
pub type RDKafkaConsumerGroupState = bindings::rd_kafka_consumer_group_state_t;
116+
117+
/// Native Node data.
118+
pub type RDKafkaNode = bindings::rd_kafka_Node_t;
119+
120+
/// Native Consumer Group Type enum.
121+
pub type RDKafkaConsumerGroupType = bindings::rd_kafka_consumer_group_type_t;
122+
123+
/// Native Member Description data.
124+
pub type RDKafkaMemberDescription = bindings::rd_kafka_MemberDescription_t;
125+
126+
/// Native Member Assignment data.
127+
pub type RDKafkaMemberAssignment = bindings::rd_kafka_MemberAssignment_t;
128+
129+
/// Native information provided by the List Consumer Group Offsets operation.
130+
pub type RDKafkaListConsumerGroupOffsets = bindings::rd_kafka_ListConsumerGroupOffsets_t;
131+
99132
/// Native rdkafka mock cluster.
100133
pub type RDKafkaMockCluster = bindings::rd_kafka_mock_cluster_t;
101134

src/admin.rs

Lines changed: 224 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,26 @@ use crate::log::{trace, warn};
2828
use crate::util::{cstr_to_owned, AsCArray, ErrBuf, IntoOpaque, KafkaDrop, NativePtr, Timeout};
2929
use crate::TopicPartitionList;
3030

31+
// Reexport the symbols defined in the auxiliary files
32+
pub use crate::group_description::{
33+
group_description_result_key, AclOperation, ConsumerGroupDescription,
34+
ConsumerGroupDescriptionResult, ConsumerGroupState, MemberAssignment, MemberDescription,
35+
};
36+
pub use crate::list_consumer_group_offsets::{
37+
group_result_key, ConsumerGroup, ConsumerGroupResult, ListConsumerGroupOffsets,
38+
};
39+
pub use crate::list_offsets_result_info::{
40+
list_offsets_result_key, ListOffsetsResult, ListOffsetsResultInfo,
41+
};
42+
3143
//
3244
// ********** ADMIN CLIENT **********
3345
//
3446

3547
/// A client for the Kafka admin API.
3648
///
3749
/// `AdminClient` provides programmatic access to managing a Kafka cluster,
38-
/// notably manipulating topics, partitions, and configuration paramaters.
50+
/// notably manipulating topics, partitions, and configuration parameters.
3951
pub struct AdminClient<C: ClientContext> {
4052
client: Client<C>,
4153
queue: Arc<NativeQueue>,
@@ -377,6 +389,134 @@ impl<C: ClientContext> AdminClient<C> {
377389
Ok(rx)
378390
}
379391

392+
///
393+
/// List offsets for the specified topic_partitions. This operation enables to find the
394+
/// beginning offset, end offset as well as the offset matching a timestamp in partitions or
395+
/// the offset with max timestamp.
396+
///
397+
/// In order to get the latest or the earliest offset, define the input [TopicPartitionListElem]
398+
/// with [Offset::Beginning] or [Offset::End].
399+
///
400+
pub fn list_offsets(
401+
&self,
402+
offsets: &TopicPartitionList,
403+
opts: &AdminOptions,
404+
) -> impl Future<Output = KafkaResult<Vec<ListOffsetsResult>>> {
405+
match self.list_offsets_inner(offsets, opts) {
406+
Ok(rx) => Either::Left(ListOffsetsFuture { rx }),
407+
Err(err) => Either::Right(future::err(err)),
408+
}
409+
}
410+
411+
fn list_offsets_inner(
412+
&self,
413+
offsets: &TopicPartitionList,
414+
opts: &AdminOptions,
415+
) -> KafkaResult<oneshot::Receiver<NativeEvent>> {
416+
let mut err_buf = ErrBuf::new();
417+
let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
418+
unsafe {
419+
rdsys::rd_kafka_ListOffsets(
420+
self.client.native_ptr(),
421+
offsets.ptr(),
422+
native_opts.ptr(),
423+
self.queue.ptr(),
424+
);
425+
}
426+
Ok(rx)
427+
}
428+
429+
///
430+
/// Provides a description on the requested consumer groups.
431+
///
432+
pub fn describe_consumer_groups<'a, I>(
433+
&self,
434+
groups: I,
435+
opts: &AdminOptions,
436+
) -> impl Future<Output = KafkaResult<Vec<ConsumerGroupDescriptionResult>>>
437+
where
438+
I: IntoIterator<Item = &'a &'a str>,
439+
{
440+
match self.describe_consumer_groups_inner(groups, opts) {
441+
Ok(rx) => Either::Left(ConsumerGroupDescriptionFuture { rx }),
442+
Err(err) => Either::Right(future::err(err)),
443+
}
444+
}
445+
446+
fn describe_consumer_groups_inner<'a, I>(
447+
&self,
448+
groups: I,
449+
opts: &AdminOptions,
450+
) -> KafkaResult<oneshot::Receiver<NativeEvent>>
451+
where
452+
I: IntoIterator<Item = &'a &'a str>,
453+
{
454+
let mut native_groups = Vec::new();
455+
for g in groups {
456+
let cstring = CString::new(*g).expect("Failed to create CString");
457+
native_groups.push(cstring.into_raw());
458+
}
459+
460+
let mut err_buf = ErrBuf::new();
461+
let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
462+
unsafe {
463+
rdsys::rd_kafka_DescribeConsumerGroups(
464+
self.client.native_ptr(),
465+
native_groups.as_ptr() as *mut _,
466+
native_groups.len(),
467+
native_opts.ptr(),
468+
self.queue.ptr(),
469+
);
470+
}
471+
Ok(rx)
472+
}
473+
474+
///
475+
/// List offset information for the consumer group and (optional) topic partition provided in
476+
/// the request.
477+
///
478+
/// Note that, while the API takes a vector as input, it will only support one group at a time.
479+
pub fn list_consumer_group_offsets<'a, I>(
480+
&self,
481+
groups: I,
482+
opts: &AdminOptions,
483+
) -> impl Future<Output = KafkaResult<Vec<ConsumerGroupResult>>>
484+
where
485+
I: IntoIterator<Item = &'a ListConsumerGroupOffsets<'a>>,
486+
{
487+
match self.list_consumer_group_offsets_inner(groups, opts) {
488+
Ok(rx) => Either::Left(ListConsumerGroupOffsetsFuture { rx }),
489+
Err(err) => Either::Right(future::err(err)),
490+
}
491+
}
492+
493+
fn list_consumer_group_offsets_inner<'a, I>(
494+
&self,
495+
groups: I,
496+
opts: &AdminOptions,
497+
) -> KafkaResult<oneshot::Receiver<NativeEvent>>
498+
where
499+
I: IntoIterator<Item = &'a ListConsumerGroupOffsets<'a>>,
500+
{
501+
let mut native_groups = Vec::new();
502+
for g in groups {
503+
native_groups.push(g.to_native()?);
504+
}
505+
506+
let mut err_buf = ErrBuf::new();
507+
let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
508+
unsafe {
509+
rdsys::rd_kafka_ListConsumerGroupOffsets(
510+
self.client.native_ptr(),
511+
native_groups.as_c_array(),
512+
native_groups.len(),
513+
native_opts.ptr(),
514+
self.queue.ptr(),
515+
);
516+
}
517+
Ok(rx)
518+
}
519+
380520
/// Returns the client underlying this admin client.
381521
pub fn inner(&self) -> &Client<C> {
382522
&self.client
@@ -1341,3 +1481,86 @@ impl Future for AlterConfigsFuture {
13411481
Poll::Ready(Ok(out))
13421482
}
13431483
}
1484+
1485+
//
1486+
// List offsets handling
1487+
//
1488+
1489+
struct ListOffsetsFuture {
1490+
rx: oneshot::Receiver<NativeEvent>,
1491+
}
1492+
1493+
impl Future for ListOffsetsFuture {
1494+
type Output = KafkaResult<Vec<ListOffsetsResult>>;
1495+
1496+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1497+
let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
1498+
event.check_error()?;
1499+
let res = unsafe { rdsys::rd_kafka_event_ListOffsets_result(event.ptr()) };
1500+
if res.is_null() {
1501+
let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
1502+
return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
1503+
"list offsets request received response of incorrect type ({})",
1504+
typ
1505+
))));
1506+
}
1507+
let info_list =
1508+
unsafe { ListOffsetsResultInfo::vec_from_ptr(res as *mut RDKafkaListOffsetsResult) };
1509+
Poll::Ready(Ok(info_list))
1510+
}
1511+
}
1512+
1513+
//
1514+
// Describe Consumer Groups handling
1515+
//
1516+
1517+
struct ConsumerGroupDescriptionFuture {
1518+
rx: oneshot::Receiver<NativeEvent>,
1519+
}
1520+
1521+
impl Future for ConsumerGroupDescriptionFuture {
1522+
type Output = KafkaResult<Vec<Result<ConsumerGroupDescription, (String, KafkaError)>>>;
1523+
1524+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1525+
let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
1526+
event.check_error()?;
1527+
let res = unsafe { rdsys::rd_kafka_event_DescribeConsumerGroups_result(event.ptr()) };
1528+
if res.is_null() {
1529+
let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
1530+
return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
1531+
"describe consumer groups request received response of incorrect type ({})",
1532+
typ
1533+
))));
1534+
}
1535+
let group_list = unsafe { ConsumerGroupDescription::vec_from_ptr(res) };
1536+
Poll::Ready(Ok(group_list))
1537+
}
1538+
}
1539+
1540+
//
1541+
// List Consumer Group Offsets handling
1542+
//
1543+
struct ListConsumerGroupOffsetsFuture {
1544+
rx: oneshot::Receiver<NativeEvent>,
1545+
}
1546+
1547+
impl Future for ListConsumerGroupOffsetsFuture {
1548+
type Output = KafkaResult<Vec<ConsumerGroupResult>>;
1549+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1550+
let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
1551+
event.check_error()?;
1552+
1553+
let res = unsafe { rdsys::rd_kafka_event_ListConsumerGroupOffsets_result(event.ptr()) };
1554+
if res.is_null() {
1555+
let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
1556+
return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
1557+
"list consumer group offsets request received response of incorrect type ({})",
1558+
typ
1559+
))));
1560+
}
1561+
1562+
let mut n = 0;
1563+
let groups = unsafe { rdsys::rd_kafka_DeleteGroups_result_groups(res, &mut n) };
1564+
Poll::Ready(Ok(ConsumerGroup::vec_result_from_ptr(groups, n)))
1565+
}
1566+
}

0 commit comments

Comments
 (0)