Skip to content

Commit 8f56cee

Browse files
authored
feat: implement KIP-848 new consumer protocol (#133)
* feat: implement KIP-848 new consumer protocol
1 parent 97e1c36 commit 8f56cee

File tree

16 files changed

+655
-78
lines changed

16 files changed

+655
-78
lines changed

docker-compose.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ services:
3939
# Consumer group options
4040
KAFKA_GROUP_COORDINATOR_REBALANCE_PROTOCOLS: "classic,consumer"
4141
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: "0"
42+
KAFKA_GROUP_CONSUMER_HEARTBEAT_INTERVAL_MS: "200"
43+
KAFKA_GROUP_CONSUMER_MIN_HEARTBEAT_INTERVAL_MS: "100"
4244

4345
broker-sasl:
4446
image: *image

docs/consumer.md

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ Creates a new consumer with type `Consumer<Key, Value, HeaderKey, HeaderValue>`.
2626
Options:
2727

2828
| Property | Type | Default | Description |
29-
| ----------------- | --------------------------------------------------- | ------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
29+
| ----------------- | --------------------------------------------------- | ------------------------- | - |
3030
| groupId | `string` | | Consumer group ID. |
3131
| autocommit | `boolean \| number` | `true` | Whether to autocommit consumed messages.<br/><br/> If it is `true`, then messages are committed immediately.<br/><br/> If it is a number, it specifies how often offsets will be committed. Only the last offset for a topic-partition is committed.<br/><br/>If set to `false`, then each message read from the stream will have a `commit` method which should be used to manually commit offsets. |
3232
| minBytes | `number` | `1` | Minimum amount of data the brokers should return. The value might not be respected by Kafka. |
@@ -35,10 +35,13 @@ Options:
3535
| isolationLevel | string | `READ_COMMITTED` | Kind of isolation applied to fetch requests. It can be used to only read producers-committed messages.<br/><br/> The valid values are defined in the `FetchIsolationLevels` enumeration. |
3636
| deserializers | `Deserializers<Key, Value, HeaderKey, HeaderValue>` | | Object that specifies which deserialisers to use.<br/><br/>The object should only contain one or more of the `key`, `value`, `headerKey` and `headerValue` properties. |
3737
| highWaterMark | `number` | `1024` | The maximum amount of messages to store in memory before delaying fetch requests. Note that this severely impacts both performance at the cost of memory use. |
38-
| sessionTimeout | `number` | 1 minute | Amount of time in milliseconds to wait for a consumer to send the heartbeat before considering it down.<br/><br/> This is only relevant when Kafka creates a new group. |
39-
| rebalanceTimeout | `number` | 2 minutes | Amount of time in milliseconds to wait for a consumer to confirm the rebalancing before considering it down.<br/><br/> This is only relevant when Kafka creates a new group. |
40-
| heartbeatInterval | `number` | 3 seconds | Interval in milliseconds between heartbeats. |
41-
| protocols | `GroupProtocolSubscription[]` | `roundrobin`, version `1` | Protocols used by this consumer group.<br/><br/> Each protocol must be an object specifying the `name`, `version` and optionally `metadata` properties. |
38+
| sessionTimeout | `number` | 1 minute | Amount of time in milliseconds to wait for a consumer to send the heartbeat before considering it down.<br/><br/> This is only relevant when Kafka creates a new group.<br/><br/> Not supported for `groupProtocol=consumer`, instead it is set with broker configuration property `group.consumer.session.timeout.ms`. |
39+
| rebalanceTimeout | `number` | 2 minutes | Amount of time in milliseconds to wait for a consumer to confirm the rebalancing before considering it down.<br/><br/> This is only relevant when Kafka creates a new group. |
40+
| heartbeatInterval | `number` | 3 seconds | Interval in milliseconds between heartbeats.<br/><br/> Not supported for `groupProtocol=consumer`, instead it is set with the broker configuration property `group.consumer.heartbeat.interval`. |
41+
| groupProtocol | `'classic' \| 'consumer'` | `'classic'` | Group protocol to use. Use `'classic'` for the original consumer group protocol and `'consumer'` for the new protocol introduced in [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol).<br/><br/> The `'consumer'` protocol provides server-side partition assignment and incremental rebalancing behavior. |
42+
| groupRemoteAssignor | `string` | `null` | Server-side assignor to use for `groupProtocol=consumer`. Keep it unset to let the server select a suitable assignor for the group. Available assignors: `'uniform'` or `'range'`. |
43+
| protocols | `GroupProtocolSubscription[]` | `roundrobin`, version `1` | Protocols used by this consumer group.<br/><br/> Each protocol must be an object specifying the `name`, `version` and optionally `metadata` properties. <br/><br/> Not supported for `groupProtocol=consumer`. |
44+
| partitionAssigner | `GroupPartitionsAssigner` | | Client-side partition assignment strategy.<br/><br/> Not supported for `groupProtocol=consumer`, use `groupRemoteAssignor` instead. |
4245

4346
It also supports all the constructor options of `Base`.
4447

@@ -170,6 +173,8 @@ Joins (and creates if necessary) a consumer group.
170173

171174
It returns the group member ID for this consumer.
172175

176+
This method is no-op for `groupProtocol=consumer`.
177+
173178
Options:
174179

175180
| Property | Type | Default | Description |
@@ -187,6 +192,8 @@ If `force` is not `true`, then the method will throw an error if any `MessagesSt
187192

188193
The return value is `void`.
189194

195+
This method is no-op for `groupProtocol=consumer`.
196+
190197
## FAQs
191198

192199
### My consumer is not receiving any message when the application restarts

src/apis/callbacks.ts

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { MultipleErrors } from '../errors.ts'
1+
import { MultipleErrors, TimeoutError } from '../errors.ts'
22
import { PromiseWithResolvers } from '../utils.ts'
33
import { type Callback } from './definitions.ts'
44

@@ -65,3 +65,23 @@ export function runConcurrentCallbacks<ReturnType> (
6565
operation(item, operationCallback.bind(null, i++))
6666
}
6767
}
68+
69+
export function createTimeoutCallback<ReturnType> (
70+
callback: Callback<ReturnType>,
71+
timeout: number,
72+
errorMessage: string
73+
): Callback<ReturnType> {
74+
let timeoutFired = false
75+
const timeoutHandle = setTimeout(() => {
76+
timeoutFired = true
77+
callback(new TimeoutError(errorMessage), undefined as unknown as ReturnType)
78+
}, timeout)
79+
80+
return (error: Error | null, result: ReturnType) => {
81+
if (timeoutFired) {
82+
return
83+
}
84+
clearTimeout(timeoutHandle)
85+
callback(error, result)
86+
}
87+
}

src/apis/consumer/consumer-group-heartbeat-v0.ts

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ export interface ConsumerGroupHeartbeatResponse {
3434
memberId: NullableString
3535
memberEpoch: number
3636
heartbeatIntervalMs: number
37-
assignment: ConsumerGroupHeartbeatResponseAssignment[]
37+
assignment: ConsumerGroupHeartbeatResponseAssignment | null
3838
}
3939

4040
/*
@@ -103,16 +103,14 @@ export function parseResponse (
103103
memberId: reader.readNullableString(),
104104
memberEpoch: reader.readInt32(),
105105
heartbeatIntervalMs: reader.readInt32(),
106-
assignment: reader.readArray(r => {
107-
return {
108-
topicPartitions: r.readArray(r => {
109-
return {
110-
topicId: r.readUUID(),
111-
partitions: r.readArray(r => r.readInt32(), true, false)
112-
}
113-
})
114-
} as ConsumerGroupHeartbeatResponseAssignment
115-
})
106+
assignment: reader.readNullableStruct(() => ({
107+
topicPartitions: reader.readArray(r => {
108+
return {
109+
topicId: r.readUUID(),
110+
partitions: r.readArray(r => r.readInt32(), true, false)
111+
}
112+
})
113+
}))
116114
}
117115

118116
if (response.errorCode !== 0) {

src/apis/enumerations.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@ export const allowedProduceAcks = Object.values(ProduceAcks) as number[]
2626
export type ProduceAck = keyof typeof ProduceAcks
2727

2828
// Consumer API
29+
30+
export const GroupProtocols = { CLASSIC: 'classic', CONSUMER: 'consumer' } as const
31+
export const allowedGroupProtocols = Object.values(GroupProtocols)
32+
export type GroupProtocol = keyof typeof GroupProtocols
33+
2934
// ./consumer/fetch.ts
3035
export const FetchIsolationLevels = { READ_UNCOMMITTED: 0, READ_COMMITTED: 1 }
3136
export const allowedFetchIsolationLevels = Object.values(FetchIsolationLevels) as number[]

0 commit comments

Comments
 (0)