Skip to content

Commit e088f13

Browse files
authored
Implement Log Dir API (#142)
1 parent c179d1b commit e088f13

File tree

9 files changed

+595
-137
lines changed

9 files changed

+595
-137
lines changed

docs/admin.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,18 @@ Options:
116116
| entries | `AlterClientQuotasRequestEntry[]` | Array of entries specifying the entities and quotas to change. |
117117
| validateOnly | `boolean` | Whether to only validate the request without applying changes. Defaults to `false`. |
118118

119+
### `describeLogDirs(options[, callback])`
120+
121+
Describes log directories for specified topics across all brokers.
122+
123+
The return value is an array of broker log directory descriptions, each containing information one broker's log directories.
124+
125+
Options:
126+
127+
| Property | Type | Description |
128+
| -------- | ------------------------------- | -------------------------------------------------------------------------------- |
129+
| topics | `DescribeLogDirsRequestTopic[]` | Array of topics specifying the topics and partitions for which to describe logs. |
130+
119131
### `close([callback])`
120132

121133
Closes the admin and all its connections.

docs/diagnostic.md

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -61,22 +61,23 @@ Each tracing channel publishes events with the following common properties:
6161

6262
## Published tracing channels
6363

64-
| Name | Target | Description |
65-
| ------------------------------------ | ---------------- | ------------------------------------------------------------------------------------------------- |
66-
| `plt:kafka:connections:connects` | `Connection` | Traces a connection attempt to a broker. |
67-
| `plt:kafka:connections:api` | `Connection` | Traces a low level API request. |
68-
| `plt:kafka:connections:pools:gets` | `ConnectionPool` | Traces a connection retrieval attempt from a connection pool. |
69-
| `plt:kafka:base:apis` | `Base` | Traces a `Base.listApis` request. |
70-
| `plt:kafka:base:metadata` | `Base` | Traces a `Base.metadata` request. |
71-
| `plt:kafka:admin:topics` | `Admin` | Traces a `Admin.createTopics` or `Admin.deleteTopics` request. |
72-
| `plt:kafka:admin:groups` | `Admin` | Traces a `Admin.listGroups`, `Admin.describeGroups` or `Admin.deleteGroups` request. |
73-
| `plt:kafka:admin:admin:clientQuotas` | `Admin` | Traces a `Admin.describeClientQuotas` or `Admin.alterClientQuotas` request. |
74-
| `plt:kafka:producer:initIdempotent` | `Producer` | Traces a `Producer.initIdempotentProducer` request. |
75-
| `plt:kafka:producer:sends` | `Producer` | Traces a `Producer.send` request. |
76-
| `plt:kafka:consumer:group` | `Consumer` | Traces a `Consumer.findGroupCoordinator`, `Consumer.joinGroup` or `Consumer.leaveGroup` requests. |
77-
| `plt:kafka:consumer:heartbeat` | `Consumer` | Traces the `Consumer` heartbeat requests. |
78-
| `plt:kafka:consumer:receives` | `Consumer` | Traces processing of every message. |
79-
| `plt:kafka:consumer:fetches` | `Consumer` | Traces a `Consumer.fetch` request. |
80-
| `plt:kafka:consumer:consumes` | `Consumer` | Traces a `Consumer.consume` request. |
81-
| `plt:kafka:consumer:commits` | `Consumer` | Traces a `Consumer.commit` request. |
82-
| `plt:kafka:consumer:offsets` | `Consumer` | Traces a `Consumer.listOffsets` or `Consumer.listCommittedOffsets` request. |
64+
| Name | Target | Description |
65+
| ----------------------------------- | ---------------- | ------------------------------------------------------------------------------------------------- |
66+
| `plt:kafka:connections:connects` | `Connection` | Traces a connection attempt to a broker. |
67+
| `plt:kafka:connections:api` | `Connection` | Traces a low level API request. |
68+
| `plt:kafka:connections:pools:gets` | `ConnectionPool` | Traces a connection retrieval attempt from a connection pool. |
69+
| `plt:kafka:base:apis` | `Base` | Traces a `Base.listApis` request. |
70+
| `plt:kafka:base:metadata` | `Base` | Traces a `Base.metadata` request. |
71+
| `plt:kafka:admin:topics` | `Admin` | Traces a `Admin.createTopics` or `Admin.deleteTopics` request. |
72+
| `plt:kafka:admin:groups` | `Admin` | Traces a `Admin.listGroups`, `Admin.describeGroups` or `Admin.deleteGroups` request. |
73+
| `plt:kafka:admin:clientQuotas` | `Admin` | Traces a `Admin.describeClientQuotas` or `Admin.alterClientQuotas` request. |
74+
| `plt:kafka:admin:logDirs` | `Admin` | Traces a `Admin.describeLogDirs` request. |
75+
| `plt:kafka:producer:initIdempotent` | `Producer` | Traces a `Producer.initIdempotentProducer` request. |
76+
| `plt:kafka:producer:sends` | `Producer` | Traces a `Producer.send` request. |
77+
| `plt:kafka:consumer:group` | `Consumer` | Traces a `Consumer.findGroupCoordinator`, `Consumer.joinGroup` or `Consumer.leaveGroup` requests. |
78+
| `plt:kafka:consumer:heartbeat` | `Consumer` | Traces the `Consumer` heartbeat requests. |
79+
| `plt:kafka:consumer:receives` | `Consumer` | Traces processing of every message. |
80+
| `plt:kafka:consumer:fetches` | `Consumer` | Traces a `Consumer.fetch` request. |
81+
| `plt:kafka:consumer:consumes` | `Consumer` | Traces a `Consumer.consume` request. |
82+
| `plt:kafka:consumer:commits` | `Consumer` | Traces a `Consumer.commit` request. |
83+
| `plt:kafka:consumer:offsets` | `Consumer` | Traces a `Consumer.listOffsets` or `Consumer.listCommittedOffsets` request. |

src/clients/admin/admin.ts

Lines changed: 94 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { type DescribeLogDirsRequest, type DescribeLogDirsResponse } from '../../apis/admin/describe-log-dirs-v4.ts'
12
import {
23
type AlterClientQuotasRequest,
34
type AlterClientQuotasResponse,
@@ -40,6 +41,7 @@ import { MultipleErrors } from '../../errors.ts'
4041
import {
4142
adminClientQuotasChannel,
4243
adminGroupsChannel,
44+
adminLogDirsChannel,
4345
adminTopicsChannel,
4446
createDiagnosticContext
4547
} from '../../diagnostic.ts'
@@ -67,7 +69,8 @@ import {
6769
listGroupsOptionsValidator,
6870
listTopicsOptionsValidator,
6971
describeClientQuotasOptionsValidator,
70-
alterClientQuotasOptionsValidator
72+
alterClientQuotasOptionsValidator,
73+
describeLogDirsOptionsValidator
7174
} from './options.ts'
7275
import {
7376
type AdminOptions,
@@ -77,11 +80,13 @@ import {
7780
type DeleteTopicsOptions,
7881
type DescribeClientQuotasOptions,
7982
type AlterClientQuotasOptions,
83+
type DescribeLogDirsOptions,
8084
type DescribeGroupsOptions,
8185
type Group,
8286
type GroupBase,
8387
type ListGroupsOptions,
84-
type ListTopicsOptions
88+
type ListTopicsOptions,
89+
type BrokerLogDirDescription
8590
} from './types.ts'
8691

8792
export class Admin extends Base<AdminOptions> {
@@ -353,6 +358,38 @@ export class Admin extends Base<AdminOptions> {
353358
return callback[kCallbackPromise]
354359
}
355360

361+
describeLogDirs (options: DescribeLogDirsOptions, callback: CallbackWithPromise<BrokerLogDirDescription[]>): void
362+
describeLogDirs (options: DescribeLogDirsOptions): Promise<BrokerLogDirDescription[]>
363+
describeLogDirs (
364+
options: DescribeLogDirsOptions,
365+
callback?: CallbackWithPromise<BrokerLogDirDescription[]>
366+
): void | Promise<BrokerLogDirDescription[]> {
367+
if (!callback) {
368+
callback = createPromisifiedCallback()
369+
}
370+
371+
if (this[kCheckNotClosed](callback)) {
372+
return callback[kCallbackPromise]
373+
}
374+
375+
const validationError = this[kValidateOptions](options, describeLogDirsOptionsValidator, '/options', false)
376+
if (validationError) {
377+
callback(validationError, undefined as unknown as BrokerLogDirDescription[])
378+
return callback[kCallbackPromise]
379+
}
380+
381+
adminLogDirsChannel.traceCallback(
382+
this.#describeLogDirs,
383+
1,
384+
createDiagnosticContext({ client: this, operation: 'describeLogDirs', options }),
385+
this,
386+
options,
387+
callback
388+
)
389+
390+
return callback[kCallbackPromise]
391+
}
392+
356393
#listTopics (options: ListTopicsOptions, callback: CallbackWithPromise<string[]>): void {
357394
const includeInternals = options.includeInternals ?? false
358395

@@ -887,4 +924,59 @@ export class Admin extends Base<AdminOptions> {
887924
0
888925
)
889926
}
927+
928+
#describeLogDirs (options: DescribeLogDirsOptions, callback: CallbackWithPromise<BrokerLogDirDescription[]>): void {
929+
this[kMetadata]({ topics: [] }, (error, metadata) => {
930+
if (error) {
931+
callback(error, undefined as unknown as BrokerLogDirDescription[])
932+
return
933+
}
934+
935+
runConcurrentCallbacks<BrokerLogDirDescription>(
936+
'Describing log dirs failed.',
937+
metadata.brokers,
938+
([id, broker], concurrentCallback) => {
939+
this[kGetConnection](broker, (error, connection) => {
940+
if (error) {
941+
concurrentCallback(error, undefined as unknown as BrokerLogDirDescription)
942+
return
943+
}
944+
945+
this[kPerformWithRetry]<DescribeLogDirsResponse>(
946+
'describeLogDirs',
947+
retryCallback => {
948+
this[kGetApi]<DescribeLogDirsRequest, DescribeLogDirsResponse>('DescribeLogDirs', (error, api) => {
949+
if (error) {
950+
retryCallback(error, undefined as unknown as DescribeLogDirsResponse)
951+
return
952+
}
953+
954+
api(connection, options.topics, retryCallback as unknown as Callback<DescribeLogDirsResponse>)
955+
})
956+
},
957+
(error, response) => {
958+
if (error) {
959+
concurrentCallback(error, undefined as unknown as BrokerLogDirDescription)
960+
return
961+
}
962+
963+
concurrentCallback(null, {
964+
broker: id,
965+
throttleTimeMs: response.throttleTimeMs,
966+
results: response.results.map(result => ({
967+
logDir: result.logDir,
968+
topics: result.topics,
969+
totalBytes: result.totalBytes,
970+
usableBytes: result.usableBytes
971+
}))
972+
})
973+
},
974+
0
975+
)
976+
})
977+
},
978+
callback
979+
)
980+
})
981+
}
890982
}

src/clients/admin/options.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,31 @@ export const alterClientQuotasOptionsSchema = {
174174
additionalProperties: false
175175
}
176176

177+
export const describeLogDirsOptionsSchema = {
178+
type: 'object',
179+
properties: {
180+
topics: {
181+
type: 'array',
182+
items: {
183+
type: 'object',
184+
properties: {
185+
name: { type: 'string', minLength: 1 },
186+
partitions: {
187+
type: 'array',
188+
items: { type: 'number', minimum: 0 },
189+
minItems: 1
190+
}
191+
},
192+
required: ['name', 'partitions'],
193+
additionalProperties: false
194+
},
195+
minItems: 1
196+
}
197+
},
198+
required: ['topics'],
199+
additionalProperties: false
200+
}
201+
177202
export const createTopicsOptionsValidator = ajv.compile(createTopicOptionsSchema)
178203
export const listTopicsOptionsValidator = ajv.compile(listTopicOptionsSchema)
179204
export const deleteTopicsOptionsValidator = ajv.compile(deleteTopicOptionsSchema)
@@ -182,3 +207,4 @@ export const describeGroupsOptionsValidator = ajv.compile(describeGroupsOptionsS
182207
export const deleteGroupsOptionsValidator = ajv.compile(deleteGroupsOptionsSchema)
183208
export const describeClientQuotasOptionsValidator = ajv.compile(describeClientQuotasOptionsSchema)
184209
export const alterClientQuotasOptionsValidator = ajv.compile(alterClientQuotasOptionsSchema)
210+
export const describeLogDirsOptionsValidator = ajv.compile(describeLogDirsOptionsSchema)

src/clients/admin/types.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
import { type AlterClientQuotasRequestEntry } from '../../apis/admin/alter-client-quotas-v1.ts'
22
import { type DescribeClientQuotasRequestComponent } from '../../apis/admin/describe-client-quotas-v0.ts'
3+
import {
4+
type DescribeLogDirsResponse,
5+
type DescribeLogDirsResponseResult,
6+
type DescribeLogDirsRequestTopic
7+
} from '../../apis/admin/describe-log-dirs-v4.ts'
38
import { type CreateTopicsRequestTopicConfig } from '../../apis/admin/create-topics-v7.ts'
49
import { type ConsumerGroupState } from '../../apis/enumerations.ts'
510
import { type NullableString } from '../../protocol/definitions.ts'
@@ -83,3 +88,13 @@ export interface AlterClientQuotasOptions {
8388
entries: AlterClientQuotasRequestEntry[]
8489
validateOnly?: boolean
8590
}
91+
92+
export interface DescribeLogDirsOptions {
93+
topics: DescribeLogDirsRequestTopic[]
94+
}
95+
96+
export interface BrokerLogDirDescription {
97+
broker: number
98+
throttleTimeMs: DescribeLogDirsResponse['throttleTimeMs']
99+
results: Omit<DescribeLogDirsResponseResult, 'errorCode'>[]
100+
}

src/diagnostic.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ export const baseMetadataChannel = createTracingChannel<ClientDiagnosticEvent>('
6868
export const adminTopicsChannel = createTracingChannel<ClientDiagnosticEvent>('admin:topics')
6969
export const adminGroupsChannel = createTracingChannel<ClientDiagnosticEvent>('admin:groups')
7070
export const adminClientQuotasChannel = createTracingChannel<ClientDiagnosticEvent>('admin:clientQuotas')
71+
export const adminLogDirsChannel = createTracingChannel<ClientDiagnosticEvent>('admin:logDirs')
7172

7273
// Producer channels
7374
export const producerInitIdempotentChannel = createTracingChannel<ClientDiagnosticEvent>('producer:initIdempotent')

0 commit comments

Comments
 (0)