Skip to content

Commit b23bc96

Browse files
committed
Typescript types,
configuration mapping, return RecordMetadata array
1 parent 78c5e9c commit b23bc96

File tree

8 files changed

+243
-21
lines changed

8 files changed

+243
-21
lines changed

index.d.ts

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@ import {
77
ConsumerTopicConfig,
88
ProducerGlobalConfig,
99
ProducerTopicConfig,
10-
} from './config';
10+
} from './types/config';
1111

12-
export * from './config';
13-
export * from './errors';
12+
export * from './types/config';
13+
export * from './types/errors';
14+
import { Kafka } from './types/kafkajs';
15+
import * as errors from './types/errors';
1416

1517
export interface LibrdKafkaError {
1618
message: string;
@@ -345,3 +347,21 @@ export interface IAdminClient {
345347
export abstract class AdminClient {
346348
static create(conf: GlobalConfig): IAdminClient;
347349
}
350+
351+
export type RdKafka = {
352+
Consumer: KafkaConsumer,
353+
Producer: Producer,
354+
HighLevelProducer: HighLevelProducer,
355+
AdminClient: AdminClient,
356+
KafkaConsumer: KafkaConsumer,
357+
createReadStream: typeof KafkaConsumer.createReadStream,
358+
createWriteStream: typeof Producer.createWriteStream,
359+
CODES: typeof errors.CODES,
360+
Topic: (name: string) => string,
361+
features: typeof features,
362+
librdkafkaVersion: typeof librdkafkaVersion,
363+
}
364+
365+
export type KafkaJS = {
366+
Kafka: Kafka
367+
}

lib/kafkajs/_kafka.js

Lines changed: 71 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,37 @@ class Producer {
1515
#state = ProducerState.INIT;
1616

1717
constructor(config) {
18-
this.#config = config;
18+
this.#config = this.#kafkaJSToRdKafkaConfig(config);
19+
}
20+
21+
#kafkaJSToRdKafkaConfig(config) {
22+
const ret = {
23+
'dr_cb': 'true',
24+
'allow.auto.create.topics': 'false'
25+
}
26+
ret['bootstrap.servers'] = config['brokers'].join(',');
27+
28+
let withSASL = false;
29+
30+
if (config.sasl) {
31+
const sasl = config.sasl;
32+
if (sasl.mechanism === 'plain' &&
33+
typeof sasl.username === 'string' &&
34+
typeof sasl.password === 'string') {
35+
ret['sasl.mechanism'] = 'PLAIN';
36+
ret['sasl.username'] = sasl.username;
37+
ret['sasl.password'] = sasl.password;
38+
withSASL = true;
39+
}
40+
}
41+
42+
if (config.ssl === true && withSASL) {
43+
ret['security.protocol'] = 'sasl_ssl';
44+
} else if (withSASL) {
45+
ret['security.protocol'] = 'sasl_plaintext';
46+
}
47+
48+
return ret;
1949
}
2050

2151
#readyCb(arg) {
@@ -48,7 +78,17 @@ class Producer {
4878
}
4979
//console.log('delivery-report: ' + JSON.stringify(report));
5080
delete report['opaque'];
51-
opaque.resolve(report);
81+
82+
const recordMetadata = {
83+
topicName: report.topic,
84+
partition: report.partition,
85+
errorCode: 0,
86+
baseOffset: report.offset,
87+
logAppendTime: null,
88+
logStartOffset: null,
89+
}
90+
91+
opaque.resolve(recordMetadata);
5292
});
5393

5494
// Resolve the promise.
@@ -77,7 +117,6 @@ class Producer {
77117

78118
this.#internalClient.on('disconnected', (arg) => {
79119
this.#state = ProducerState.DISCONNECTED;
80-
console.log('producer disconnected. ' + JSON.stringify(arg));
81120
});
82121

83122
return new Promise((resolve, reject) => {
@@ -96,15 +135,7 @@ class Producer {
96135
this.#internalClient.disconnect();
97136
}
98137

99-
// producer.send({
100-
// topic: <String>,
101-
// messages: <Message[]>,
102-
// acks: <Number>,
103-
// timeout: <Number>,
104-
// compression: <CompressionTypes>,
105-
// })
106-
107-
send(sendOptions) {
138+
async send(sendOptions) {
108139
if (this.#state !== ProducerState.CONNECTED) {
109140
return Promise.reject("Cannot send message without awaiting connect()");
110141
}
@@ -138,7 +169,34 @@ class Producer {
138169
}));
139170

140171
}
141-
return Promise.all(msgPromises);
172+
const recordMetadataArr = await Promise.all(msgPromises);
173+
174+
const topicPartitionRecordMetadata = new Map();
175+
for (const recordMetadata of recordMetadataArr) {
176+
const key = `${recordMetadata.topicName},${recordMetadata.partition}`;
177+
if (recordMetadata.baseOffset == null || !topicPartitionRecordMetadata.has(key)) {
178+
topicPartitionRecordMetadata.set(key, recordMetadata);
179+
continue;
180+
}
181+
182+
const currentRecordMetadata = topicPartitionRecordMetadata.get(key);
183+
184+
// Don't overwrite a null baseOffset
185+
if (currentRecordMetadata.baseOffset == null) {
186+
continue;
187+
}
188+
189+
if (currentRecordMetadata.baseOffset > recordMetadata.baseOffset) {
190+
topicPartitionRecordMetadata.set(key, recordMetadata);
191+
}
192+
}
193+
194+
const ret = [];
195+
for (const [key, value] of topicPartitionRecordMetadata.entries()) {
196+
value.baseOffset = value.baseOffset?.toString();
197+
ret.push(value);
198+
}
199+
return ret;
142200
}
143201
}
144202

package-lock.json

Lines changed: 23 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@
44
"description": "Node.js bindings for librdkafka",
55
"librdkafka": "2.2.0",
66
"main": "lib/index.js",
7+
"types": "types/index.d.ts",
78
"scripts": {
89
"configure": "node-gyp configure",
910
"build": "node-gyp build",
1011
"test": "make test",
1112
"install": "node-pre-gyp install --fallback-to-build",
12-
"prepack": "node ./ci/prepublish.js"
13+
"prepack": "node ./ci/prepublish.js",
14+
"test:types": "tsc -p ."
1315
},
1416
"binary": {
1517
"module_name": "node-librdkafka",
@@ -38,19 +40,21 @@
3840
],
3941
"license": "MIT",
4042
"devDependencies": {
43+
"@types/node": "^20.4.5",
4144
"bluebird": "^3.5.3",
4245
"jsdoc": "^3.4.0",
4346
"jshint": "^2.10.1",
4447
"mocha": "^10.2.0",
4548
"node-gyp": "^9.3.1",
46-
"toolkit-jsdoc": "^1.0.0"
49+
"toolkit-jsdoc": "^1.0.0",
50+
"typescript": "^5.1.6"
4751
},
4852
"dependencies": {
4953
"@mapbox/node-pre-gyp": "^1.0.11",
5054
"bindings": "^1.3.1",
5155
"nan": "^2.17.0"
5256
},
5357
"engines": {
54-
"node": ">=6.0.0"
58+
"node": ">=14.0.0"
5559
}
5660
}

tsconfig.json

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
{
2+
"compilerOptions": {
3+
"module": "commonjs",
4+
"lib": ["es6"],
5+
"noImplicitAny": true,
6+
"noImplicitThis": true,
7+
"strictNullChecks": true,
8+
"baseUrl": ".",
9+
"types": ["node_modules/@types/node"],
10+
"typeRoots": ["."],
11+
"noEmit": true,
12+
"forceConsistentCasingInFileNames": true,
13+
"strictFunctionTypes": true
14+
},
15+
"files": [
16+
"index.d.ts"
17+
]
18+
}
File renamed without changes.
File renamed without changes.

types/kafkajs.d.ts

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
import * as tls from 'tls'
2+
3+
export type BrokersFunction = () => string[] | Promise<string[]>
4+
5+
export type Mechanism = {
6+
mechanism: string
7+
}
8+
9+
type SASLMechanismOptionsMap = {
10+
plain: { username: string; password: string }
11+
'scram-sha-256': { username: string; password: string }
12+
'scram-sha-512': { username: string; password: string }
13+
}
14+
15+
export type SASLMechanism = keyof SASLMechanismOptionsMap
16+
type SASLMechanismOptions<T> = T extends SASLMechanism
17+
? { mechanism: T } & SASLMechanismOptionsMap[T]
18+
: never
19+
export type SASLOptions = SASLMechanismOptions<SASLMechanism>
20+
21+
export interface KafkaConfig {
22+
brokers: string[] | BrokersFunction
23+
ssl?: boolean
24+
sasl?: SASLOptions | Mechanism
25+
clientId?: string
26+
connectionTimeout?: number
27+
authenticationTimeout?: number
28+
reauthenticationThreshold?: number
29+
requestTimeout?: number
30+
enforceRequestTimeout?: boolean
31+
}
32+
33+
export interface ProducerConfig {
34+
metadataMaxAge?: number
35+
allowAutoTopicCreation?: boolean
36+
idempotent?: boolean
37+
transactionalId?: string
38+
transactionTimeout?: number
39+
maxInFlightRequests?: number
40+
}
41+
42+
export interface IHeaders {
43+
[key: string]: Buffer | string | (Buffer | string)[] | undefined
44+
}
45+
46+
export interface Message {
47+
key?: Buffer | string | null
48+
value: Buffer | string | null
49+
partition?: number
50+
headers?: IHeaders
51+
timestamp?: string
52+
}
53+
54+
export enum CompressionTypes {
55+
None = 0,
56+
GZIP = 1,
57+
Snappy = 2,
58+
LZ4 = 3,
59+
ZSTD = 4,
60+
}
61+
62+
export var CompressionCodecs: {
63+
[CompressionTypes.GZIP]: () => any
64+
[CompressionTypes.Snappy]: () => any
65+
[CompressionTypes.LZ4]: () => any
66+
[CompressionTypes.ZSTD]: () => any
67+
}
68+
69+
export interface ProducerRecord {
70+
topic: string
71+
messages: Message[]
72+
acks?: number
73+
timeout?: number
74+
compression?: CompressionTypes
75+
}
76+
77+
export type RecordMetadata = {
78+
topicName: string
79+
partition: number
80+
errorCode: number
81+
offset?: string
82+
timestamp?: string
83+
baseOffset?: string
84+
logAppendTime?: string
85+
logStartOffset?: string
86+
}
87+
88+
export class Kafka {
89+
constructor(config: KafkaConfig)
90+
producer(config?: ProducerConfig): Producer
91+
}
92+
93+
type Sender = {
94+
send(record: ProducerRecord): Promise<RecordMetadata[]>
95+
}
96+
97+
export type Producer = Sender & {
98+
connect(): Promise<void>
99+
disconnect(): Promise<void>
100+
}
101+

0 commit comments

Comments
 (0)