Skip to content

Commit 63f8bcc

Browse files
committed
Use callback for disconnect
1 parent b23bc96 commit 63f8bcc

File tree

1 file changed

+8
-6
lines changed

1 file changed

+8
-6
lines changed

lib/kafkajs/_kafka.js

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,6 @@ class Producer {
115115
this.#internalClient.on('event.error', this.#errorCb.bind(this));
116116
this.#internalClient.on('event.log', console.log);
117117

118-
this.#internalClient.on('disconnected', (arg) => {
119-
this.#state = ProducerState.DISCONNECTED;
120-
});
121-
122118
return new Promise((resolve, reject) => {
123119
this.#connectPromiseFunc = {resolve, reject};
124120
console.log("Connecting....");
@@ -127,12 +123,18 @@ class Producer {
127123
});
128124
}
129125

130-
disconnect() {
126+
async disconnect() {
131127
if (this.#state >= ProducerState.DISCONNECTING) {
132128
return;
133129
}
134130
this.#state = ProducerState.DISCONNECTING;
135-
this.#internalClient.disconnect();
131+
await new Promise((resolve, reject) => {
132+
const cb = (err) => {
133+
err ? reject(err) : resolve();
134+
this.#state = ProducerState.DISCONNECTED;
135+
}
136+
this.#internalClient.disconnect(5000, cb);
137+
});
136138
}
137139

138140
async send(sendOptions) {

0 commit comments

Comments
 (0)