Skip to content

Commit 2285c64

Browse files
authored
feat: Improved enumeration handling. (#119)
* feat: Improved enumeration handling. Signed-off-by: Paolo Insogna <[email protected]>
1 parent 2558291 commit 2285c64

File tree

11 files changed

+113
-54
lines changed

11 files changed

+113
-54
lines changed

src/apis/enumerations.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,15 @@
11
// SASL Authentication
2-
export const SASLMechanisms = ['PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512', 'OAUTHBEARER'] as const
3-
export type SASLMechanism = (typeof SASLMechanisms)[number]
2+
export const SASLMechanisms = {
3+
PLAIN: 'PLAIN',
4+
SCRAM_SHA_256: 'SCRAM-SHA-256',
5+
SCRAM_SHA_512: 'SCRAM-SHA-512',
6+
OAUTHBEARER: 'OAUTHBEARER'
7+
} as const
8+
9+
export const allowedSASLMechanisms = Object.values(SASLMechanisms) as SASLMechanismValue[]
10+
11+
export type SASLMechanism = keyof typeof SASLMechanisms
12+
export type SASLMechanismValue = (typeof SASLMechanisms)[keyof typeof SASLMechanisms]
413

514
// Metadata API
615
// ./metadata/find-coordinator.ts
@@ -13,11 +22,13 @@ export const ProduceAcks = {
1322
NO_RESPONSE: 0,
1423
LEADER: 1
1524
} as const
25+
export const allowedProduceAcks = Object.values(ProduceAcks) as number[]
1626
export type ProduceAck = keyof typeof ProduceAcks
1727

1828
// Consumer API
1929
// ./consumer/fetch.ts
2030
export const FetchIsolationLevels = { READ_UNCOMMITTED: 0, READ_COMMITTED: 1 }
31+
export const allowedFetchIsolationLevels = Object.values(FetchIsolationLevels) as number[]
2132
export type FetchIsolationLevel = keyof typeof FetchIsolationLevels
2233

2334
export const ListOffsetTimestamps = { LATEST: -1n, EARLIEST: -2n }

src/clients/base/options.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { SASLMechanisms } from '../../apis/enumerations.ts'
1+
import { allowedSASLMechanisms } from '../../apis/enumerations.ts'
22
import { ajv } from '../../utils.ts'
33
import { version } from '../../version.ts'
44
import { type BaseOptions } from './types.ts'
@@ -41,7 +41,7 @@ export const baseOptionsSchema = {
4141
sasl: {
4242
type: 'object',
4343
properties: {
44-
mechanism: { type: 'string', enum: SASLMechanisms },
44+
mechanism: { type: 'string', enum: allowedSASLMechanisms },
4545
username: { oneOf: [{ type: 'string' }, { function: true }] },
4646
password: { oneOf: [{ type: 'string' }, { function: true }] },
4747
token: { oneOf: [{ type: 'string' }, { function: true }] },

src/clients/consumer/options.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
import { FetchIsolationLevels } from '../../apis/enumerations.ts'
1+
import { allowedFetchIsolationLevels } from '../../apis/enumerations.ts'
22
import { ajv } from '../../utils.ts'
33
import { idProperty, topicWithPartitionAndOffsetProperties } from '../base/options.ts'
44
import { serdeProperties } from '../serde.ts'
5-
import { MessagesStreamFallbackModes, MessagesStreamModes, type ConsumerOptions } from './types.ts'
5+
import { allowedMessagesStreamFallbackModes, allowedMessagesStreamModes, type ConsumerOptions } from './types.ts'
66

77
export const groupOptionsProperties = {
88
sessionTimeout: { type: 'number', minimum: 0 },
@@ -59,7 +59,7 @@ export const consumeOptionsProperties = {
5959
minBytes: { type: 'number', minimum: 0 },
6060
maxBytes: { type: 'number', minimum: 0 },
6161
maxWaitTime: { type: 'number', minimum: 0 },
62-
isolationLevel: { type: 'string', enum: Object.keys(FetchIsolationLevels) },
62+
isolationLevel: { type: 'string', enum: allowedFetchIsolationLevels },
6363
deserializers: serdeProperties,
6464
highWaterMark: { type: 'number', minimum: 1 }
6565
}
@@ -74,8 +74,8 @@ export const consumeOptionsSchema = {
7474
type: 'object',
7575
properties: {
7676
topics: { type: 'array', items: idProperty },
77-
mode: { type: 'string', enum: Object.values(MessagesStreamModes) },
78-
fallbackMode: { type: 'string', enum: Object.values(MessagesStreamFallbackModes) },
77+
mode: { type: 'string', enum: allowedMessagesStreamModes },
78+
fallbackMode: { type: 'string', enum: allowedMessagesStreamFallbackModes },
7979
maxFetches: { type: 'number', minimum: 0, default: 0 },
8080
offsets: {
8181
type: 'array',
@@ -197,7 +197,7 @@ export const listOffsetsOptionsSchema = {
197197
items: { type: 'number', minimum: 0 }
198198
}
199199
},
200-
isolationLevel: { type: 'string', enum: Object.keys(FetchIsolationLevels) },
200+
isolationLevel: { type: 'string', enum: allowedFetchIsolationLevels },
201201
timestamp: { bigint: true }
202202
},
203203
required: ['topics'],

src/clients/consumer/types.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ export const MessagesStreamModes = {
5151
COMMITTED: 'committed',
5252
MANUAL: 'manual'
5353
} as const
54+
export const allowedMessagesStreamModes = Object.values(MessagesStreamModes) as MessagesStreamModeValue[]
55+
5456
export type MessagesStreamMode = keyof typeof MessagesStreamModes
5557
export type MessagesStreamModeValue = (typeof MessagesStreamModes)[keyof typeof MessagesStreamModes]
5658

@@ -59,9 +61,14 @@ export const MessagesStreamFallbackModes = {
5961
EARLIEST: 'earliest',
6062
FAIL: 'fail'
6163
} as const
64+
export const allowedMessagesStreamFallbackModes = Object.values(
65+
MessagesStreamFallbackModes
66+
) as MessagesStreamFallbackModeValue[]
67+
6268
export type MessagesStreamFallbackMode = keyof typeof MessagesStreamFallbackModes
6369
export type MessagesStreamFallbackModeValue =
6470
(typeof MessagesStreamFallbackModes)[keyof typeof MessagesStreamFallbackModes]
71+
6572
export interface GroupOptions {
6673
sessionTimeout?: number
6774
rebalanceTimeout?: number

src/clients/producer/options.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import { ProduceAcks } from '../../apis/enumerations.ts'
2-
import { compressionsAlgorithms } from '../../protocol/compression.ts'
1+
import { allowedProduceAcks, ProduceAcks } from '../../apis/enumerations.ts'
2+
import { allowedCompressionsAlgorithms, compressionsAlgorithms } from '../../protocol/compression.ts'
33
import { messageSchema } from '../../protocol/records.ts'
44
import { ajv, enumErrorMessage } from '../../utils.ts'
55
import { serdeProperties } from '../serde.ts'
@@ -11,15 +11,15 @@ export const produceOptionsProperties = {
1111
acks: {
1212
type: 'number',
1313
enumeration: {
14-
allowed: Object.values(ProduceAcks),
14+
allowed: allowedProduceAcks,
1515
errorMessage: enumErrorMessage(ProduceAcks)
1616
}
1717
},
1818
compression: {
1919
type: 'string',
2020
enumeration: {
21-
allowed: Object.keys(compressionsAlgorithms),
22-
errorMessage: enumErrorMessage(compressionsAlgorithms, true)
21+
allowed: allowedCompressionsAlgorithms,
22+
errorMessage: enumErrorMessage(compressionsAlgorithms)
2323
}
2424
},
2525
partitioner: { function: true },

src/network/connection.ts

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { createConnection, type NetConnectOpts, type Socket } from 'node:net'
44
import { connect as createTLSConnection, type ConnectionOptions as TLSConnectionOptions } from 'node:tls'
55
import { type CallbackWithPromise, createPromisifiedCallback, kCallbackPromise } from '../apis/callbacks.ts'
66
import { type Callback, type ResponseParser } from '../apis/definitions.ts'
7-
import { type SASLMechanism, SASLMechanisms } from '../apis/enumerations.ts'
7+
import { allowedSASLMechanisms, SASLMechanisms, type SASLMechanismValue } from '../apis/enumerations.ts'
88
import { saslAuthenticateV2, saslHandshakeV1 } from '../apis/index.ts'
99
import { type SaslAuthenticateResponse } from '../apis/security/sasl-authenticate-v2.ts'
1010
import {
@@ -39,7 +39,7 @@ export interface Broker {
3939
}
4040

4141
export interface SASLOptions {
42-
mechanism: SASLMechanism
42+
mechanism: SASLMechanismValue
4343
username?: string | SASLCredentialProvider
4444
password?: string | SASLCredentialProvider
4545
token?: string | SASLCredentialProvider
@@ -359,7 +359,7 @@ export class Connection extends EventEmitter {
359359

360360
const { mechanism, username, password, token } = this.#options.sasl!
361361

362-
if (!SASLMechanisms.includes(mechanism)) {
362+
if (!allowedSASLMechanisms.includes(mechanism)) {
363363
this.#onConnectionError(
364364
host,
365365
port,
@@ -384,9 +384,9 @@ export class Connection extends EventEmitter {
384384
this.emit('sasl:handshake', response.mechanisms)
385385
const callback = this.#onSaslAuthenticate.bind(this, host, port, diagnosticContext)
386386

387-
if (mechanism === 'PLAIN') {
387+
if (mechanism === SASLMechanisms.PLAIN) {
388388
saslPlain.authenticate(saslAuthenticateV2.api, this, username!, password!, callback)
389-
} else if (mechanism === 'OAUTHBEARER') {
389+
} else if (mechanism === SASLMechanisms.OAUTHBEARER) {
390390
saslOAuthBearer.authenticate(saslAuthenticateV2.api, this, token!, callback)
391391
} else {
392392
saslScramSha.authenticate(
@@ -524,7 +524,12 @@ export class Connection extends EventEmitter {
524524
authBytes?: Buffer
525525
): void {
526526
if (error) {
527-
this.#onConnectionError(host, port, diagnosticContext, error)
527+
this.#onConnectionError(
528+
host,
529+
port,
530+
diagnosticContext,
531+
new AuthenticationError('SASL authentication failed.', { cause: error })
532+
)
528533
return
529534
}
530535

src/protocol/compression.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ export const CompressionAlgorithms = {
2727
LZ4: 'lz4',
2828
ZSTD: 'zstd'
2929
} as const
30+
31+
export const allowedCompressionsAlgorithms = Object.values(CompressionAlgorithms) as CompressionAlgorithmValue[]
3032
export type CompressionAlgorithm = keyof typeof CompressionAlgorithms
3133
export type CompressionAlgorithmValue = (typeof CompressionAlgorithms)[keyof typeof CompressionAlgorithms]
3234

src/protocol/sasl/oauth-bearer.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,14 @@ import { type Connection, type SASLCredentialProvider } from '../../network/conn
55
import { getCredential } from './credential-provider.ts'
66

77
export function jwtValidateAuthenticationBytes (authBytes: Buffer, callback: CallbackWithPromise<Buffer>): void {
8-
let authData: Record<string, string>
8+
let authData: Record<string, string> = {}
9+
910
try {
10-
authData = authBytes.length > 0 ? JSON.parse(authBytes.toString('utf-8')) : {}
11+
if (authBytes.length > 0) {
12+
authData = JSON.parse(authBytes.toString('utf-8'))
13+
}
14+
15+
/* c8 ignore next 8 - Hard to test */
1116
} catch (e) {
1217
callback(
1318
new AuthenticationError('Invalid authBytes in SASL/OAUTHBEARER response', { authBytes }),

test/clients/base/sasl-oauthbearer.test.ts

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,17 @@
11
import { createSigner } from 'fast-jwt'
2-
import { deepStrictEqual, rejects } from 'node:assert'
2+
import { deepStrictEqual, ok, rejects } from 'node:assert'
33
import { once } from 'node:events'
44
import { test } from 'node:test'
5-
import { AuthenticationError, Base, NetworkError, parseBroker, sleep } from '../../../src/index.ts'
5+
import {
6+
AuthenticationError,
7+
Base,
8+
MultipleErrors,
9+
NetworkError,
10+
parseBroker,
11+
SASLMechanisms,
12+
saslOAuthBearer,
13+
sleep
14+
} from '../../../src/index.ts'
615
import { kafkaSaslBootstrapServers } from '../../helpers.ts'
716

817
const saslBroker = parseBroker(kafkaSaslBootstrapServers[0])
@@ -34,7 +43,7 @@ test('should connect to SASL protected broker using SASL/OAUTHBEARER', async t =
3443
bootstrapBrokers: kafkaSaslBootstrapServers,
3544
strict: true,
3645
retries: 0,
37-
sasl: { mechanism: 'OAUTHBEARER', token }
46+
sasl: { mechanism: SASLMechanisms.OAUTHBEARER, token }
3847
})
3948

4049
t.after(() => base.close())
@@ -44,7 +53,28 @@ test('should connect to SASL protected broker using SASL/OAUTHBEARER', async t =
4453
deepStrictEqual(metadata.brokers.get(1), saslBroker)
4554
})
4655

47-
// The 'should handle authentication errors' is not possible here as the Kafka unsecured validator accepts any token
56+
test('should handle authentication errors', async t => {
57+
const base = new Base({
58+
clientId: 'clientId',
59+
bootstrapBrokers: kafkaSaslBootstrapServers,
60+
retries: 0,
61+
sasl: {
62+
mechanism: 'OAUTHBEARER',
63+
token: 'invalid',
64+
authBytesValidator: saslOAuthBearer.jwtValidateAuthenticationBytes
65+
}
66+
})
67+
68+
t.after(() => base.close())
69+
70+
try {
71+
await base.metadata({ topics: [] })
72+
throw new Error('Expected error not thrown')
73+
} catch (error) {
74+
ok(error instanceof MultipleErrors)
75+
deepStrictEqual(error.errors[0].cause.message, 'SASL authentication failed.')
76+
}
77+
})
4878

4979
test('should accept a function as credential provider', async t => {
5080
const signSync = createSigner({

test/clients/base/sasl.test.ts

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@ import { deepStrictEqual, ok, rejects } from 'node:assert'
22
import { once } from 'node:events'
33
import { before, test } from 'node:test'
44
import {
5+
allowedSASLMechanisms,
56
AuthenticationError,
67
Base,
78
MultipleErrors,
89
NetworkError,
910
parseBroker,
10-
sleep,
11-
type SASLMechanism
11+
sleep
1212
} from '../../../src/index.ts'
1313
import { createScramUsers } from '../../fixtures/create-users.ts'
1414
import { kafkaSaslBootstrapServers } from '../../helpers.ts'
@@ -29,14 +29,20 @@ test('UNAUTHENTICATED - should not connect to SASL protected broker by default',
2929
await rejects(() => base.metadata({ topics: [] }))
3030
})
3131

32-
for (const mechanism of ['PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512']) {
32+
for (const mechanism of allowedSASLMechanisms) {
33+
if (mechanism === 'OAUTHBEARER') {
34+
// GSSAPI requires a properly configured Kerberos environment
35+
// which is out of scope for these tests
36+
continue
37+
}
38+
3339
test(`${mechanism} - should connect to SASL protected broker`, async t => {
3440
const base = new Base({
3541
clientId: 'clientId',
3642
bootstrapBrokers: kafkaSaslBootstrapServers,
3743
strict: true,
3844
retries: 0,
39-
sasl: { mechanism: mechanism as SASLMechanism, username: 'admin', password: 'admin' }
45+
sasl: { mechanism, username: 'admin', password: 'admin' }
4046
})
4147

4248
t.after(() => base.close())
@@ -51,7 +57,7 @@ for (const mechanism of ['PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512']) {
5157
clientId: 'clientId',
5258
bootstrapBrokers: kafkaSaslBootstrapServers,
5359
retries: 0,
54-
sasl: { mechanism: mechanism as SASLMechanism, username: 'admin', password: 'invalid' }
60+
sasl: { mechanism, username: 'admin', password: 'invalid' }
5561
})
5662

5763
t.after(() => base.close())
@@ -72,7 +78,7 @@ for (const mechanism of ['PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512']) {
7278
strict: true,
7379
retries: 0,
7480
sasl: {
75-
mechanism: mechanism as SASLMechanism,
81+
mechanism,
7682
username () {
7783
return 'admin'
7884
},
@@ -94,7 +100,7 @@ for (const mechanism of ['PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512']) {
94100
strict: true,
95101
retries: 0,
96102
sasl: {
97-
mechanism: mechanism as SASLMechanism,
103+
mechanism,
98104
username: 'admin',
99105
async password () {
100106
await sleep(1000)
@@ -117,7 +123,7 @@ for (const mechanism of ['PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512']) {
117123
strict: true,
118124
retries: 0,
119125
sasl: {
120-
mechanism: mechanism as SASLMechanism,
126+
mechanism,
121127
username () {
122128
throw new Error('Kaboom!')
123129
}
@@ -150,7 +156,7 @@ for (const mechanism of ['PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512']) {
150156
strict: true,
151157
retries: 0,
152158
sasl: {
153-
mechanism: mechanism as SASLMechanism,
159+
mechanism,
154160
username: 'admin',
155161
async password () {
156162
throw new Error('Kaboom!')
@@ -183,7 +189,7 @@ for (const mechanism of ['PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512']) {
183189
bootstrapBrokers: kafkaSaslBootstrapServers,
184190
strict: true,
185191
retries: 0,
186-
sasl: { mechanism: mechanism as SASLMechanism, username: 'admin', password: 'admin' }
192+
sasl: { mechanism, username: 'admin', password: 'admin' }
187193
})
188194

189195
t.after(() => base.close())

0 commit comments

Comments
 (0)