Skip to content

Commit e9adb35

Browse files
authored
feat: Added consumer lagging API and metrics. (#147)
Signed-off-by: Paolo Insogna <[email protected]>
1 parent 628dd81 commit e9adb35

File tree

14 files changed

+1695
-1064
lines changed

14 files changed

+1695
-1064
lines changed

docs/consumer.md

Lines changed: 65 additions & 34 deletions
Large diffs are not rendered by default.

docs/diagnostic.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ Support for diagnostic and instrumentation is provided via [Node.js Diagnostic C
77
The presence of the `result` and `error` properties and the order of tracing channels events follows the current pseudo-code flow:
88

99
```javascript
10-
function operation(options, callback) {
10+
function operation (options, callback) {
1111
const channel = tracingChannel('plt:kafka:example')
1212
const context = { operationId: 0n, options }
1313

@@ -55,9 +55,10 @@ Each tracing channel publishes events with the following common properties:
5555

5656
## Published channels
5757

58-
| Name | Description |
59-
| --------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
60-
| `plt:kafka:instances` | Notifies any creation of a `Connection`, `ConnectionPool`, `Base`, `Admin`, `Producer`, `Consumer` or `MessagesStream`. This channel will publish objects with the `type` and `instance` property. |
58+
| Name | Description |
59+
| ------------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
60+
| `plt:kafka:instances` | Notifies any creation of a `Connection`, `ConnectionPool`, `Base`, `Admin`, `Producer`, `Consumer` or `MessagesStream`. This channel will publish objects with the `type` and `instance` property. |
61+
| `plt:kafka:consumer:lag` | Notifies any `Consumer` lag obtained via `Consumer.getLag` (including the one triggered via `Consumer.startLagMonitoring`). |
6162

6263
## Published tracing channels
6364

docs/metrics.md

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,15 @@ To ensure maximum compatibility, no `prom-client` version is shipped with the pa
66

77
If you provide both the `metrics.client` and `metrics.registry` (an instance of `Registry`) options, then `@platformatic/kafka` will register and provide the following metrics:
88

9-
| Name | Type | Description |
10-
| ------------------------- | --------- | ----------------------------------------- |
11-
| `kafka_producers` | `Gauge` | Number of active Kafka producers. |
12-
| `kafka_produced_messages` | `Counter` | Number of produced Kafka messages. |
13-
| `kafka_consumers` | `Gauge` | Number of active Kafka consumers. |
14-
| `kafka_consumers_streams` | `Gauge` | Number of active Kafka consumers streams. |
15-
| `kafka_consumers_topics` | `Gauge` | Number of topics being consumed. |
16-
| `kafka_consumed_messages` | `Counter` | Number of consumed Kafka messages. |
9+
| Name | Type | Description |
10+
| ------------------------- | ----------- | ----------------------------------------- |
11+
| `kafka_producers` | `Gauge` | Number of active Kafka producers. |
12+
| `kafka_produced_messages` | `Counter` | Number of produced Kafka messages. |
13+
| `kafka_consumers` | `Gauge` | Number of active Kafka consumers. |
14+
| `kafka_consumers_streams` | `Gauge` | Number of active Kafka consumers streams. |
15+
| `kafka_consumers_topics` | `Gauge` | Number of topics being consumed. |
16+
| `kafka_consumers_lags` | `Histogram` | Lag of active Kafka consumers |
17+
| `kafka_consumed_messages` | `Counter` | Number of consumed Kafka messages. |
1718

1819
Optionally, you can provide labels with the `metrics.label` option.
1920

src/clients/admin/admin.ts

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,8 @@
1-
import { type DescribeLogDirsRequest, type DescribeLogDirsResponse } from '../../apis/admin/describe-log-dirs-v4.ts'
21
import {
32
type AlterClientQuotasRequest,
43
type AlterClientQuotasResponse,
54
type AlterClientQuotasResponseEntries
65
} from '../../apis/admin/alter-client-quotas-v1.ts'
7-
import {
8-
type DescribeClientQuotasRequest,
9-
type DescribeClientQuotasResponse,
10-
type DescribeClientQuotasResponseEntry
11-
} from '../../apis/admin/describe-client-quotas-v0.ts'
126
import {
137
type CreateTopicsRequest,
148
type CreateTopicsRequestTopic,
@@ -21,7 +15,13 @@ import {
2115
type DeleteTopicsRequestTopic,
2216
type DeleteTopicsResponse
2317
} from '../../apis/admin/delete-topics-v6.ts'
18+
import {
19+
type DescribeClientQuotasRequest,
20+
type DescribeClientQuotasResponse,
21+
type DescribeClientQuotasResponseEntry
22+
} from '../../apis/admin/describe-client-quotas-v0.ts'
2423
import { type DescribeGroupsRequest, type DescribeGroupsResponse } from '../../apis/admin/describe-groups-v5.ts'
24+
import { type DescribeLogDirsRequest, type DescribeLogDirsResponse } from '../../apis/admin/describe-log-dirs-v4.ts'
2525
import { type ListGroupsRequest as ListGroupsRequestV4 } from '../../apis/admin/list-groups-v4.ts'
2626
import {
2727
type ListGroupsRequest as ListGroupsRequestV5,
@@ -37,14 +37,14 @@ import { type Callback } from '../../apis/definitions.ts'
3737
import { FindCoordinatorKeyTypes, type ConsumerGroupState } from '../../apis/enumerations.ts'
3838
import { type FindCoordinatorRequest, type FindCoordinatorResponse } from '../../apis/metadata/find-coordinator-v6.ts'
3939
import { type MetadataRequest, type MetadataResponse } from '../../apis/metadata/metadata-v12.ts'
40-
import { MultipleErrors } from '../../errors.ts'
4140
import {
4241
adminClientQuotasChannel,
4342
adminGroupsChannel,
4443
adminLogDirsChannel,
4544
adminTopicsChannel,
4645
createDiagnosticContext
4746
} from '../../diagnostic.ts'
47+
import { MultipleErrors } from '../../errors.ts'
4848
import { Reader } from '../../protocol/reader.ts'
4949
import {
5050
Base,
@@ -62,31 +62,31 @@ import {
6262
import { type BaseOptions } from '../base/types.ts'
6363
import { type GroupAssignment } from '../consumer/types.ts'
6464
import {
65+
alterClientQuotasOptionsValidator,
6566
createTopicsOptionsValidator,
6667
deleteGroupsOptionsValidator,
6768
deleteTopicsOptionsValidator,
69+
describeClientQuotasOptionsValidator,
6870
describeGroupsOptionsValidator,
71+
describeLogDirsOptionsValidator,
6972
listGroupsOptionsValidator,
70-
listTopicsOptionsValidator,
71-
describeClientQuotasOptionsValidator,
72-
alterClientQuotasOptionsValidator,
73-
describeLogDirsOptionsValidator
73+
listTopicsOptionsValidator
7474
} from './options.ts'
7575
import {
7676
type AdminOptions,
77+
type AlterClientQuotasOptions,
78+
type BrokerLogDirDescription,
7779
type CreatedTopic,
7880
type CreateTopicsOptions,
7981
type DeleteGroupsOptions,
8082
type DeleteTopicsOptions,
8183
type DescribeClientQuotasOptions,
82-
type AlterClientQuotasOptions,
83-
type DescribeLogDirsOptions,
8484
type DescribeGroupsOptions,
85+
type DescribeLogDirsOptions,
8586
type Group,
8687
type GroupBase,
8788
type ListGroupsOptions,
88-
type ListTopicsOptions,
89-
type BrokerLogDirDescription
89+
type ListTopicsOptions
9090
} from './types.ts'
9191

9292
export class Admin extends Base<AdminOptions> {
@@ -368,6 +368,7 @@ export class Admin extends Base<AdminOptions> {
368368
callback = createPromisifiedCallback()
369369
}
370370

371+
/* c8 ignore next 3 - Hard to test */
371372
if (this[kCheckNotClosed](callback)) {
372373
return callback[kCallbackPromise]
373374
}
@@ -927,6 +928,7 @@ export class Admin extends Base<AdminOptions> {
927928

928929
#describeLogDirs (options: DescribeLogDirsOptions, callback: CallbackWithPromise<BrokerLogDirDescription[]>): void {
929930
this[kMetadata]({ topics: [] }, (error, metadata) => {
931+
/* c8 ignore next 4 - Hard to test */
930932
if (error) {
931933
callback(error, undefined as unknown as BrokerLogDirDescription[])
932934
return

0 commit comments

Comments
 (0)