Skip to content

Commit 7ec8cda

Browse files
authored
Move activation of dispatchers into event loop thread (#190)
* Reduce test flakiness * Move activation of dispatchers to event loop thread * Enable more tests on semaphore CI * Update changelog and code comments * Reject connection only after we're done with the entire process
1 parent 34cfb67 commit 7ec8cda

File tree

14 files changed

+142
-48
lines changed

14 files changed

+142
-48
lines changed

.semaphore/semaphore.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ blocks:
106106
- '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY'
107107
- docker compose up -d && sleep 30
108108
- export NODE_OPTIONS='--max-old-space-size=1536'
109-
- npx jest --forceExit --no-colors --ci test/promisified/admin/delete_groups.spec.js test/promisified/consumer/pause.spec.js
109+
- npx jest --no-colors --ci test/promisified/
110110
- name: "ESLint"
111111
commands:
112112
- npx eslint lib/kafkajs

CHANGELOG.md

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,12 @@ v1.0.0 is a feature release. It is supported for all usage.
44

55
## Enhancements
66

7-
1. Add support for an Admin API to fetch topic offsets(#156).
7+
1. Add support for an Admin API to fetch topic offsets (#156).
8+
9+
## Fixes
10+
11+
1. Fixes an issue where `uv_async_init` was being called off the event loop thread,
12+
causing the node process to hang (#190).
813

914

1015
# confluent-kafka-javascript v0.6.1
@@ -29,7 +34,7 @@ v0.6.0 is a limited availability maintenance release. It is supported for all us
2934

3035
### Schema Registry
3136

32-
1. Add AWS AssumeRole support to AWS KMS. You can now specify a role arn, and optional
37+
1. Add AWS AssumeRole support to AWS KMS. You can now specify a role arn, and optional
3338
role session name and optional role external id.
3439

3540
2. Ensure different key ids use different client instances.

lib/kafkajs/_admin.js

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,13 @@ class Admin {
6262
*/
6363
#connectPromiseFunc = null;
6464

65+
/**
66+
* Stores the first error encountered while connecting (if any). This is what we
67+
* want to reject with.
68+
* @type {Error|null}
69+
*/
70+
#connectionError = null;
71+
6572
/**
6673
* The client name used by the admin client for logging - determined by librdkafka
6774
* using a combination of clientId and an integer.
@@ -169,8 +176,10 @@ class Admin {
169176
* @param {Error} err
170177
*/
171178
#errorCb(err) {
172-
if (this.#state === AdminState.CONNECTING) {
173-
this.#connectPromiseFunc['reject'](err);
179+
/* If we get an error in the middle of connecting, reject the promise later with this error. */
180+
if (this.#state < AdminState.CONNECTED) {
181+
if (!this.#connectionError)
182+
this.#connectionError = err;
174183
} else {
175184
this.#logger.error(`Error: ${err.message}`, this.#createAdminBindingMessageMetadata());
176185
}
@@ -212,7 +221,9 @@ class Admin {
212221
this.#clientName = this.#internalClient.name;
213222
this.#logger.info("Admin client connected", this.#createAdminBindingMessageMetadata());
214223
} catch (err) {
215-
reject(createKafkaJsErrorFromLibRdKafkaError(err));
224+
this.#state = AdminState.DISCONNECTED;
225+
const rejectionError = this.#connectionError ? this.#connectionError : err;
226+
reject(createKafkaJsErrorFromLibRdKafkaError(rejectionError));
216227
}
217228
});
218229
}

lib/kafkajs/_consumer.js

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,13 @@ class Consumer {
6565
*/
6666
#connectPromiseFunc = {};
6767

68+
/**
69+
* Stores the first error encountered while connecting (if any). This is what we
70+
* want to reject with.
71+
* @type {Error|null}
72+
*/
73+
#connectionError = null;
74+
6875
/**
6976
* state is the current state of the consumer.
7077
* @type {ConsumerState}
@@ -668,8 +675,10 @@ class Consumer {
668675
* @param {Error} err
669676
*/
670677
#errorCb(err) {
671-
if (this.#state === ConsumerState.CONNECTING) {
672-
this.#connectPromiseFunc['reject'](err);
678+
/* If we get an error in the middle of connecting, reject the promise later with this error. */
679+
if (this.#state < ConsumerState.CONNECTED) {
680+
if (!this.#connectionError)
681+
this.#connectionError = err;
673682
} else {
674683
this.#logger.error(err, this.#createConsumerBindingMessageMetadata());
675684
}
@@ -1027,8 +1036,11 @@ class Consumer {
10271036
return new Promise((resolve, reject) => {
10281037
this.#connectPromiseFunc = { resolve, reject };
10291038
this.#internalClient.connect(null, (err) => {
1030-
if (err)
1031-
reject(createKafkaJsErrorFromLibRdKafkaError(err));
1039+
if (err) {
1040+
this.#state = ConsumerState.DISCONNECTED;
1041+
const rejectionError = this.#connectionError ? this.#connectionError : err;
1042+
reject(createKafkaJsErrorFromLibRdKafkaError(rejectionError));
1043+
}
10321044
});
10331045
});
10341046
}

lib/kafkajs/_producer.js

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,13 @@ class Producer {
5858
*/
5959
#connectPromiseFunc = {};
6060

61+
/**
62+
* Stores the first error encountered while connecting (if any). This is what we
63+
* want to reject with.
64+
* @type {Error|null}
65+
*/
66+
#connectionError = null;
67+
6168
/**
6269
* state is the current state of the producer.
6370
* @type {ProducerState}
@@ -330,11 +337,12 @@ class Producer {
330337
* @param {Error} err
331338
*/
332339
#errorCb(err) {
333-
if (this.#state === ProducerState.CONNECTING) {
334-
this.#connectPromiseFunc["reject"](err);
335-
} else {
336-
this.#logger.error(err, this.#createProducerBindingMessageMetadata());
340+
/* If we get an error in the middle of connecting, reject the promise later with this error. */
341+
if (this.#state < ProducerState.CONNECTED) {
342+
if (!this.#connectionError)
343+
this.#connectionError = err;
337344
}
345+
this.#logger.error(err, this.#createProducerBindingMessageMetadata());
338346
}
339347

340348
/**
@@ -359,8 +367,11 @@ class Producer {
359367
return new Promise((resolve, reject) => {
360368
this.#connectPromiseFunc = { resolve, reject };
361369
this.#internalClient.connect(null, (err) => {
362-
if (err)
363-
reject(createKafkaJsErrorFromLibRdKafkaError(err));
370+
if (err) {
371+
this.#state = ProducerState.DISCONNECTED;
372+
const rejectionError = this.#connectionError ? this.#connectionError : err;
373+
reject(createKafkaJsErrorFromLibRdKafkaError(rejectionError));
374+
}
364375
});
365376
});
366377
}

src/admin.cc

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,6 @@ Baton AdminClient::Connect() {
5656
return baton;
5757
}
5858

59-
// Activate the dispatchers before the connection, as some callbacks may run
60-
// on the background thread.
61-
// We will deactivate them if the connection fails.
62-
ActivateDispatchers();
63-
6459
std::string errstr;
6560
{
6661
scoped_shared_write_lock lock(m_connection_lock);
@@ -999,6 +994,14 @@ NAN_METHOD(AdminClient::NodeConnect) {
999994

1000995
AdminClient* client = ObjectWrap::Unwrap<AdminClient>(info.This());
1001996

997+
// Activate the dispatchers before the connection, as some callbacks may run
998+
// on the background thread.
999+
// We will deactivate them if the connection fails.
1000+
// Because the Admin Client connect is synchronous, we can do this within
1001+
// AdminClient::Connect as well, but we do it here to keep the code similiar to
1002+
// the Producer and Consumer.
1003+
client->ActivateDispatchers();
1004+
10021005
Baton b = client->Connect();
10031006
// Let the JS library throw if we need to so the error can be more rich
10041007
int error_code = static_cast<int>(b.err());

src/kafka-consumer.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1450,6 +1450,11 @@ NAN_METHOD(KafkaConsumer::NodeConnect) {
14501450

14511451
KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());
14521452

1453+
// Activate the dispatchers before the connection, as some callbacks may run
1454+
// on the background thread.
1455+
// We will deactivate them if the connection fails.
1456+
consumer->ActivateDispatchers();
1457+
14531458
Nan::Callback *callback = new Nan::Callback(info[0].As<v8::Function>());
14541459
Nan::AsyncQueueWorker(new Workers::KafkaConsumerConnect(callback, consumer));
14551460

src/producer.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -729,6 +729,12 @@ NAN_METHOD(Producer::NodeConnect) {
729729
Nan::Callback *callback = new Nan::Callback(cb);
730730

731731
Producer* producer = ObjectWrap::Unwrap<Producer>(info.This());
732+
733+
// Activate the dispatchers before the connection, as some callbacks may run
734+
// on the background thread.
735+
// We will deactivate them if the connection fails.
736+
producer->ActivateDispatchers();
737+
732738
Nan::AsyncQueueWorker(new Workers::ProducerConnect(callback, producer));
733739

734740
info.GetReturnValue().Set(Nan::Null());

src/workers.cc

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -199,11 +199,6 @@ ProducerConnect::ProducerConnect(Nan::Callback *callback, Producer* producer):
199199
ProducerConnect::~ProducerConnect() {}
200200

201201
void ProducerConnect::Execute() {
202-
// Activate the dispatchers before the connection, as some callbacks may run
203-
// on the background thread.
204-
// We will deactivate them if the connection fails.
205-
producer->ActivateDispatchers();
206-
207202
Baton b = producer->Connect();
208203

209204
if (b.err() != RdKafka::ERR_NO_ERROR) {
@@ -558,11 +553,6 @@ KafkaConsumerConnect::KafkaConsumerConnect(Nan::Callback *callback,
558553
KafkaConsumerConnect::~KafkaConsumerConnect() {}
559554

560555
void KafkaConsumerConnect::Execute() {
561-
// Activate the dispatchers before the connection, as some callbacks may run
562-
// on the background thread.
563-
// We will deactivate them if the connection fails.
564-
consumer->ActivateDispatchers();
565-
566556
Baton b = consumer->Connect();
567557
// consumer->Wait();
568558

test/promisified/admin/describe_groups.spec.js

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ jest.setTimeout(30000);
22

33
const {
44
createConsumer,
5+
createProducer,
56
secureRandom,
67
createTopic,
78
waitFor,
@@ -11,18 +12,21 @@ const {
1112
const { ConsumerGroupStates, ErrorCodes, AclOperationTypes } = require('../../../lib').KafkaJS;
1213

1314
describe('Admin > describeGroups', () => {
14-
let topicName, groupId, consumer, admin;
15+
let topicName, groupId, consumer, admin, groupInstanceId, producer;
1516

1617
beforeEach(async () => {
1718
topicName = `test-topic-${secureRandom()}`;
1819
groupId = `consumer-group-id-${secureRandom()}`;
20+
groupInstanceId = `consumer-group-instance-id-${secureRandom()}`;
21+
22+
producer = createProducer({});
1923

2024
consumer = createConsumer({
2125
groupId,
2226
fromBeginning: true,
2327
clientId: 'test-client-id',
2428
}, {
25-
'group.instance.id': 'test-instance-id',
29+
'group.instance.id': groupInstanceId,
2630
'session.timeout.ms': 10000,
2731
'partition.assignment.strategy': 'roundrobin',
2832
});
@@ -33,6 +37,7 @@ describe('Admin > describeGroups', () => {
3337
});
3438

3539
afterEach(async () => {
40+
producer && (await producer.disconnect());
3641
consumer && (await consumer.disconnect());
3742
admin && (await admin.disconnect());
3843
});
@@ -61,9 +66,11 @@ describe('Admin > describeGroups', () => {
6166
});
6267

6368
it('should describe consumer groups', async () => {
69+
let messagesConsumed = 0;
70+
6471
await consumer.connect();
6572
await consumer.subscribe({ topic: topicName });
66-
await consumer.run({ eachMessage: async () => { } });
73+
await consumer.run({ eachMessage: async () => { messagesConsumed++; } });
6774

6875
await waitFor(() => consumer.assignment().length > 0, () => null, 1000);
6976

@@ -92,7 +99,7 @@ describe('Admin > describeGroups', () => {
9299
memberId: expect.any(String),
93100
memberAssignment: null,
94101
memberMetadata: null,
95-
groupInstanceId: 'test-instance-id',
102+
groupInstanceId: groupInstanceId,
96103
assignment: {
97104
topicPartitions:[
98105
expect.objectContaining({ topic: topicName, partition: 0 }),
@@ -104,12 +111,22 @@ describe('Admin > describeGroups', () => {
104111
})
105112
);
106113

107-
// Disconnect the consumer to make the group EMPTY.
114+
// Produce some messages so that the consumer can commit them, and hence
115+
// the group doesn't become DEAD.
116+
await producer.connect();
117+
await producer.send({
118+
topic: topicName,
119+
messages: [{ key: 'key', value: 'value' }],
120+
});
121+
122+
await waitFor(() => messagesConsumed > 0, () => null, 1000);
123+
124+
// Disconnect the consumer to make the group EMPTY and commit offsets.
108125
await consumer.disconnect();
109126
consumer = null;
110127

111128
// Wait so that session.timeout.ms expires and the group becomes EMPTY.
112-
await sleep(12000);
129+
await sleep(10500);
113130

114131
// Don't include authorized operations this time.
115132
describeGroupsResult = await admin.describeGroups([groupId]);
@@ -119,9 +136,9 @@ describe('Admin > describeGroups', () => {
119136
groupId,
120137
protocol: '',
121138
partitionAssignor: '',
122-
isSimpleConsumerGroup: false,
123-
protocolType: 'consumer',
124139
state: ConsumerGroupStates.EMPTY,
140+
protocolType: 'consumer',
141+
isSimpleConsumerGroup: false,
125142
coordinator: expect.objectContaining({
126143
id: expect.any(Number),
127144
host: expect.any(String),
@@ -130,6 +147,7 @@ describe('Admin > describeGroups', () => {
130147
members: [],
131148
})
132149
);
150+
133151
expect(describeGroupsResult.groups[0].authorizedOperations).toBeUndefined();
134152
});
135153
});

0 commit comments

Comments
 (0)