Skip to content

Commit 4885043

Browse files
authored
Fix bug where the timeout in the admin APIs wasn't taken into account. (#653)
1 parent c515b29 commit 4885043

File tree

2 files changed

+31
-23
lines changed

2 files changed

+31
-23
lines changed

lib/admin.js

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -119,11 +119,11 @@ AdminClient.prototype.createTopic = function(topic, timeout, cb) {
119119

120120
if (typeof timeout === 'function') {
121121
cb = timeout;
122-
timeout = 1000;
122+
timeout = 5000;
123123
}
124124

125125
if (!timeout) {
126-
timeout = 1000;
126+
timeout = 5000;
127127
}
128128

129129
this._client.createTopic(topic, timeout, function(err) {
@@ -154,11 +154,11 @@ AdminClient.prototype.deleteTopic = function(topic, timeout, cb) {
154154

155155
if (typeof timeout === 'function') {
156156
cb = timeout;
157-
timeout = 1000;
157+
timeout = 5000;
158158
}
159159

160160
if (!timeout) {
161-
timeout = 1000;
161+
timeout = 5000;
162162
}
163163

164164
this._client.deleteTopic(topic, timeout, function(err) {
@@ -181,7 +181,7 @@ AdminClient.prototype.deleteTopic = function(topic, timeout, cb) {
181181
* @param {string} topic - The topic to add partitions to, by name.
182182
* @param {number} totalPartitions - The total number of partitions the topic should have
183183
* after the request
184-
* @param {number} timeout - Number of milliseconds to wait while trying to delete the topic.
184+
* @param {number} timeout - Number of milliseconds to wait while trying to create the partitions.
185185
* @param {function} cb - The callback to be executed when finished
186186
*/
187187
AdminClient.prototype.createPartitions = function(topic, totalPartitions, timeout, cb) {
@@ -191,11 +191,11 @@ AdminClient.prototype.createPartitions = function(topic, totalPartitions, timeou
191191

192192
if (typeof timeout === 'function') {
193193
cb = timeout;
194-
timeout = 1000;
194+
timeout = 5000;
195195
}
196196

197197
if (!timeout) {
198-
timeout = 1000;
198+
timeout = 5000;
199199
}
200200

201201
this._client.createPartitions(topic, totalPartitions, timeout, function(err) {

src/admin.cc

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
#include <string>
1111
#include <vector>
12+
#include <math.h>
1213

1314
#include "src/workers.h"
1415
#include "src/admin.h"
@@ -151,23 +152,30 @@ v8::Local<v8::Object> AdminClient::NewInstance(v8::Local<v8::Value> arg) {
151152
rd_kafka_event_t* PollForEvent(
152153
rd_kafka_queue_t * topic_rkqu,
153154
rd_kafka_event_type_t event_type,
154-
int max_tries,
155155
int timeout_ms) {
156-
// Establish what attempt we are on
157-
int attempt = 0;
156+
// Initiate exponential timeout
157+
int attempts = 1;
158+
int exp_timeout_ms = timeout_ms;
159+
if (timeout_ms > 2000) {
160+
// measure optimal number of attempts
161+
attempts = log10(timeout_ms / 1000) / log10(2) + 1;
162+
// measure initial exponential timeout based on attempts
163+
exp_timeout_ms = timeout_ms / (pow(2, attempts) - 1);
164+
}
158165

159166
rd_kafka_event_t * event_response = nullptr;
160167

161168
// Poll the event queue until we get it
162169
do {
163170
// free previously fetched event
164171
rd_kafka_event_destroy(event_response);
165-
// Increment attempt counter
166-
attempt = attempt + 1;
167-
event_response = rd_kafka_queue_poll(topic_rkqu, timeout_ms);
172+
// poll and update attempts and exponential timeout
173+
event_response = rd_kafka_queue_poll(topic_rkqu, exp_timeout_ms);
174+
attempts = attempts - 1;
175+
exp_timeout_ms = 2 * exp_timeout_ms;
168176
} while (
169177
rd_kafka_event_type(event_response) != event_type &&
170-
attempt < max_tries);
178+
attempts > 0);
171179

172180
// If this isn't the type of response we want, or if we do not have a response
173181
// type, bail out with a null
@@ -204,8 +212,7 @@ Baton AdminClient::CreateTopic(rd_kafka_NewTopic_t* topic, int timeout_ms) {
204212
rd_kafka_event_t * event_response = PollForEvent(
205213
topic_rkqu,
206214
RD_KAFKA_EVENT_CREATETOPICS_RESULT,
207-
5,
208-
1000);
215+
timeout_ms);
209216

210217
// Destroy the queue since we are done with it.
211218
rd_kafka_queue_destroy(topic_rkqu);
@@ -284,8 +291,7 @@ Baton AdminClient::DeleteTopic(rd_kafka_DeleteTopic_t* topic, int timeout_ms) {
284291
rd_kafka_event_t * event_response = PollForEvent(
285292
topic_rkqu,
286293
RD_KAFKA_EVENT_DELETETOPICS_RESULT,
287-
5,
288-
1000);
294+
timeout_ms);
289295

290296
// Destroy the queue since we are done with it.
291297
rd_kafka_queue_destroy(topic_rkqu);
@@ -360,8 +366,7 @@ Baton AdminClient::CreatePartitions(
360366
rd_kafka_event_t * event_response = PollForEvent(
361367
topic_rkqu,
362368
RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT,
363-
5,
364-
1000);
369+
timeout_ms);
365370

366371
// Destroy the queue since we are done with it.
367372
rd_kafka_queue_destroy(topic_rkqu);
@@ -480,7 +485,7 @@ NAN_METHOD(AdminClient::NodeCreateTopic) {
480485
AdminClient* client = ObjectWrap::Unwrap<AdminClient>(info.This());
481486

482487
// Get the timeout
483-
int timeout = Nan::To<int32_t>(info[2]).FromJust();
488+
int timeout = Nan::To<int32_t>(info[1]).FromJust();
484489

485490
std::string errstr;
486491
// Get that topic we want to create
@@ -524,7 +529,7 @@ NAN_METHOD(AdminClient::NodeDeleteTopic) {
524529
Nan::To<v8::String>(info[0]).ToLocalChecked());
525530

526531
// Get the timeout
527-
int timeout = Nan::To<int32_t>(info[2]).FromJust();
532+
int timeout = Nan::To<int32_t>(info[1]).FromJust();
528533

529534
// Get that topic we want to create
530535
rd_kafka_DeleteTopic_t* topic = rd_kafka_DeleteTopic_new(
@@ -563,6 +568,9 @@ NAN_METHOD(AdminClient::NodeCreatePartitions) {
563568
Nan::Callback *callback = new Nan::Callback(cb);
564569
AdminClient* client = ObjectWrap::Unwrap<AdminClient>(info.This());
565570

571+
// Get the timeout
572+
int timeout = Nan::To<int32_t>(info[2]).FromJust();
573+
566574
// Get the total number of desired partitions
567575
int partition_total_count = Nan::To<int32_t>(info[1]).FromJust();
568576

@@ -585,7 +593,7 @@ NAN_METHOD(AdminClient::NodeCreatePartitions) {
585593

586594
// Queue up dat work
587595
Nan::AsyncQueueWorker(new Workers::AdminClientCreatePartitions(
588-
callback, client, new_partitions, 1000));
596+
callback, client, new_partitions, timeout));
589597

590598
return info.GetReturnValue().Set(Nan::Null());
591599
}

0 commit comments

Comments
 (0)