Skip to content

Commit 2293471

Browse files
committed
admin: add DescribeLogDirs admin API (KIP-113) (#5192)
Implement the DescribeLogDirs admin API (Kafka API key 35) to allow querying partition sizes in bytes on disk for capacity planning and monitoring. Adds full C admin API following the ElectLeaders pattern with self-contained protocol request/response. Supports protocol versions 0-4 with flexible version negotiation (v2+). The topics parameter accepts a topic-partition list to filter, or NULL for all topics. Request targets a specific broker via AdminOptions_set_broker(), defaults to controller. No fanout: single broker per call, same as Java AdminClient.
1 parent f21766f commit 2293471

File tree

11 files changed

+720
-2
lines changed

11 files changed

+720
-2
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
librdkafka v2.14.0 is a feature release:
44

55
* [KIP-768](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575#KIP768:ExtendSASL/OAUTHBEARERwithSupportforOIDC-ClientConfiguration) Extend SASL/OAUTHBEARER to support OIDC claim mapping beyond the default `sub` claim (#5336).
6+
* [KIP-113](https://cwiki.apache.org/confluence/display/KAFKA/KIP-113%3A+Support+replicas+movement+between+log+directories) Add support for AdminAPI `DescribeLogDirs()` (#5333, @piochelepiotr).
67

78
# librdkafka v2.13.2
89

src/rdkafka.h

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5610,6 +5610,8 @@ typedef int rd_kafka_event_type_t;
56105610
#define RD_KAFKA_EVENT_LISTOFFSETS_RESULT 0x400000
56115611
/** ElectLeaders_result_t */
56125612
#define RD_KAFKA_EVENT_ELECTLEADERS_RESULT 0x800000
5613+
/** DescribeLogDirs_result_t */
5614+
#define RD_KAFKA_EVENT_DESCRIBELOGDIRS_RESULT 0x1000000
56135615

56145616
/**
56155617
* @returns the event type for the given event.
@@ -5894,6 +5896,8 @@ typedef rd_kafka_event_t rd_kafka_AlterUserScramCredentials_result_t;
58945896
typedef rd_kafka_event_t rd_kafka_ListOffsets_result_t;
58955897
/*! ElectLeaders result type */
58965898
typedef rd_kafka_event_t rd_kafka_ElectLeaders_result_t;
5899+
/*! DescribeLogDirs result type */
5900+
typedef rd_kafka_event_t rd_kafka_DescribeLogDirs_result_t;
58975901

58985902
/**
58995903
* @brief Get CreateTopics result.
@@ -7118,6 +7122,7 @@ typedef enum rd_kafka_admin_op_t {
71187122
RD_KAFKA_ADMIN_OP_DESCRIBECLUSTER, /**< DescribeCluster */
71197123
RD_KAFKA_ADMIN_OP_LISTOFFSETS, /**< ListOffsets */
71207124
RD_KAFKA_ADMIN_OP_ELECTLEADERS, /**< ElectLeaders */
7125+
RD_KAFKA_ADMIN_OP_DESCRIBELOGDIRS, /**< DescribeLogDirs */
71217126
RD_KAFKA_ADMIN_OP__CNT /**< Number of ops defined */
71227127
} rd_kafka_admin_op_t;
71237128

@@ -10140,6 +10145,190 @@ rd_kafka_ElectLeaders_result_partitions(
1014010145

1014110146
/**@}*/
1014210147

10148+
/**
10149+
* @name Admin API - DescribeLogDirs
10150+
* @{
10151+
*/
10152+
10153+
/**
10154+
* @brief Opaque type for a log directory description.
10155+
*/
10156+
typedef struct rd_kafka_LogDirDescription_s rd_kafka_LogDirDescription_t;
10157+
10158+
/**
10159+
* @brief Opaque type for a log directory topic description.
10160+
*/
10161+
typedef struct rd_kafka_LogDirTopicDescription_s
10162+
rd_kafka_LogDirTopicDescription_t;
10163+
10164+
/**
10165+
* @brief Opaque type for a log directory partition description.
10166+
*/
10167+
typedef struct rd_kafka_LogDirPartitionDescription_s
10168+
rd_kafka_LogDirPartitionDescription_t;
10169+
10170+
/**
10171+
* @brief Describe log directories on a broker.
10172+
*
10173+
* @param rk Client instance.
10174+
* @param topics Topic-partition list to filter results, or NULL
10175+
* to describe all topics on the target broker.
10176+
* @param options Optional admin options, or NULL for defaults.
10177+
* Valid options:
10178+
* - rd_kafka_AdminOptions_set_broker()
10179+
* @param rkqu Queue to emit result on.
10180+
*
10181+
* @remark The result event type emitted on the supplied queue is of type
10182+
* \c RD_KAFKA_EVENT_DESCRIBELOGDIRS_RESULT
10183+
*/
10184+
RD_EXPORT void
10185+
rd_kafka_DescribeLogDirs(rd_kafka_t *rk,
10186+
const rd_kafka_topic_partition_list_t *topics,
10187+
const rd_kafka_AdminOptions_t *options,
10188+
rd_kafka_queue_t *rkqu);
10189+
10190+
/**
10191+
* @brief Get an array of log dir descriptions from a
10192+
* DescribeLogDirs result.
10193+
*
10194+
* @param result Result to get log dir descriptions from.
10195+
* @param cntp is updated to the number of elements in the array.
10196+
*
10197+
* @remark The lifetime of the returned memory is the same
10198+
* as the lifetime of the \p result object.
10199+
*/
10200+
RD_EXPORT const rd_kafka_LogDirDescription_t **
10201+
rd_kafka_DescribeLogDirs_result_descriptions(
10202+
const rd_kafka_DescribeLogDirs_result_t *result,
10203+
size_t *cntp);
10204+
10205+
/**
10206+
* @brief Get DescribeLogDirs result.
10207+
*
10208+
* @returns the result of a DescribeLogDirs request, or NULL if
10209+
* event is of different type.
10210+
*
10211+
* @remark The lifetime of the returned memory is the same
10212+
* as the lifetime of the \p rkev object.
10213+
*
10214+
* Event types:
10215+
* RD_KAFKA_EVENT_DESCRIBELOGDIRS_RESULT
10216+
*/
10217+
RD_EXPORT const rd_kafka_DescribeLogDirs_result_t *
10218+
rd_kafka_event_DescribeLogDirs_result(rd_kafka_event_t *rkev);
10219+
10220+
/**
10221+
* @brief Get the error for a log dir description.
10222+
*
10223+
* @param logdir The log dir description.
10224+
*
10225+
* @returns the error associated with the log dir, or NULL
10226+
* on success.
10227+
*
10228+
* @remark The lifetime of the returned memory is the same
10229+
* as the lifetime of the \p logdir object.
10230+
*/
10231+
RD_EXPORT const rd_kafka_error_t *
10232+
rd_kafka_LogDirDescription_error(const rd_kafka_LogDirDescription_t *logdir);
10233+
10234+
/**
10235+
* @brief Get the log directory path from a log dir description.
10236+
*
10237+
* @param logdir The log dir description.
10238+
*
10239+
* @remark The lifetime of the returned memory is the same
10240+
* as the lifetime of the \p logdir object.
10241+
*/
10242+
RD_EXPORT const char *
10243+
rd_kafka_LogDirDescription_log_dir(const rd_kafka_LogDirDescription_t *logdir);
10244+
10245+
/**
10246+
* @brief Get the array of topic descriptions from a log dir
10247+
* description.
10248+
*
10249+
* @param logdir The log dir description.
10250+
* @param cntp is updated to the number of topics.
10251+
*
10252+
* @remark The lifetime of the returned memory is the same
10253+
* as the lifetime of the \p logdir object.
10254+
*/
10255+
RD_EXPORT const rd_kafka_LogDirTopicDescription_t **
10256+
rd_kafka_LogDirDescription_topics(const rd_kafka_LogDirDescription_t *logdir,
10257+
size_t *cntp);
10258+
10259+
/**
10260+
* @brief Get the topic name from a log dir topic description.
10261+
*
10262+
* @param topic_desc The topic description.
10263+
*
10264+
* @remark The lifetime of the returned memory is the same
10265+
* as the lifetime of the \p topic_desc object.
10266+
*/
10267+
RD_EXPORT const char *rd_kafka_LogDirTopicDescription_topic(
10268+
const rd_kafka_LogDirTopicDescription_t *topic_desc);
10269+
10270+
/**
10271+
* @brief Get the array of partition descriptions from a log dir
10272+
* topic description.
10273+
*
10274+
* @param topic_desc The topic description.
10275+
* @param cntp is updated to the number of partitions.
10276+
*
10277+
* @remark The lifetime of the returned memory is the same
10278+
* as the lifetime of the \p topic_desc object.
10279+
*/
10280+
RD_EXPORT const rd_kafka_LogDirPartitionDescription_t **
10281+
rd_kafka_LogDirTopicDescription_partitions(
10282+
const rd_kafka_LogDirTopicDescription_t *topic_desc,
10283+
size_t *cntp);
10284+
10285+
/**
10286+
* @brief Get the partition index from a log dir partition
10287+
* description.
10288+
*
10289+
* @param partition_desc The partition description.
10290+
*
10291+
* @returns the partition index.
10292+
*/
10293+
RD_EXPORT int32_t rd_kafka_LogDirPartitionDescription_partition(
10294+
const rd_kafka_LogDirPartitionDescription_t *partition_desc);
10295+
10296+
/**
10297+
* @brief Get the partition size in bytes from a log dir partition
10298+
* description.
10299+
*
10300+
* @param partition_desc The partition description.
10301+
*
10302+
* @returns the size in bytes on disk.
10303+
*/
10304+
RD_EXPORT int64_t rd_kafka_LogDirPartitionDescription_size(
10305+
const rd_kafka_LogDirPartitionDescription_t *partition_desc);
10306+
10307+
/**
10308+
* @brief Get the offset lag from a log dir partition description.
10309+
*
10310+
* @param partition_desc The partition description.
10311+
*
10312+
* @returns the offset lag.
10313+
*/
10314+
RD_EXPORT int64_t rd_kafka_LogDirPartitionDescription_offset_lag(
10315+
const rd_kafka_LogDirPartitionDescription_t *partition_desc);
10316+
10317+
/**
10318+
* @brief Get the is_future flag from a log dir partition
10319+
* description.
10320+
*
10321+
* @param partition_desc The partition description.
10322+
*
10323+
* @returns 1 if this is a future log directory (i.e., the
10324+
* partition is being moved to this log dir),
10325+
* 0 otherwise.
10326+
*/
10327+
RD_EXPORT int rd_kafka_LogDirPartitionDescription_is_future(
10328+
const rd_kafka_LogDirPartitionDescription_t *partition_desc);
10329+
10330+
/**@}*/
10331+
1014310332
/**
1014410333
* @name Security APIs
1014510334
* @{

0 commit comments

Comments
 (0)