Skip to content

Commit cd7a62e

Browse files
Fix memory leak in admin client (#862)
1 parent 07d969b commit cd7a62e

File tree

1 file changed

+25
-9
lines changed

1 file changed

+25
-9
lines changed

src/admin.cc

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -156,10 +156,12 @@ rd_kafka_event_t* PollForEvent(
156156
// Establish what attempt we are on
157157
int attempt = 0;
158158

159-
rd_kafka_event_t * event_response;
159+
rd_kafka_event_t * event_response = nullptr;
160160

161161
// Poll the event queue until we get it
162162
do {
163+
// free previously fetched event
164+
rd_kafka_event_destroy(event_response);
163165
// Increment attempt counter
164166
attempt = attempt + 1;
165167
event_response = rd_kafka_queue_poll(topic_rkqu, timeout_ms);
@@ -171,6 +173,7 @@ rd_kafka_event_t* PollForEvent(
171173
// type, bail out with a null
172174
if (event_response == NULL ||
173175
rd_kafka_event_type(event_response) != event_type) {
176+
rd_kafka_event_destroy(event_response);
174177
return NULL;
175178
}
176179

@@ -219,8 +222,9 @@ Baton AdminClient::CreateTopic(rd_kafka_NewTopic_t* topic, int timeout_ms) {
219222
// Now we can get the error code from the event
220223
if (rd_kafka_event_error(event_response)) {
221224
// If we had a special error code, get out of here with it
222-
return Baton(static_cast<RdKafka::ErrorCode>(
223-
rd_kafka_event_error(event_response)));
225+
const rd_kafka_resp_err_t errcode = rd_kafka_event_error(event_response);
226+
rd_kafka_event_destroy(event_response);
227+
return Baton(static_cast<RdKafka::ErrorCode>(errcode));
224228
}
225229

226230
// get the created results
@@ -239,13 +243,17 @@ Baton AdminClient::CreateTopic(rd_kafka_NewTopic_t* topic, int timeout_ms) {
239243

240244
if (errcode != RD_KAFKA_RESP_ERR_NO_ERROR) {
241245
if (errmsg) {
242-
return Baton(static_cast<RdKafka::ErrorCode>(errcode), std::string(errmsg)); // NOLINT
246+
const std::string errormsg = std::string(errmsg);
247+
rd_kafka_event_destroy(event_response);
248+
return Baton(static_cast<RdKafka::ErrorCode>(errcode), errormsg); // NOLINT
243249
} else {
250+
rd_kafka_event_destroy(event_response);
244251
return Baton(static_cast<RdKafka::ErrorCode>(errcode));
245252
}
246253
}
247254
}
248255

256+
rd_kafka_event_destroy(event_response);
249257
return Baton(RdKafka::ERR_NO_ERROR);
250258
}
251259
}
@@ -294,8 +302,9 @@ Baton AdminClient::DeleteTopic(rd_kafka_DeleteTopic_t* topic, int timeout_ms) {
294302
// Now we can get the error code from the event
295303
if (rd_kafka_event_error(event_response)) {
296304
// If we had a special error code, get out of here with it
297-
return Baton(static_cast<RdKafka::ErrorCode>(
298-
rd_kafka_event_error(event_response)));
305+
const rd_kafka_resp_err_t errcode = rd_kafka_event_error(event_response);
306+
rd_kafka_event_destroy(event_response);
307+
return Baton(static_cast<RdKafka::ErrorCode>(errcode));
299308
}
300309

301310
// get the created results
@@ -312,10 +321,12 @@ Baton AdminClient::DeleteTopic(rd_kafka_DeleteTopic_t* topic, int timeout_ms) {
312321
const rd_kafka_resp_err_t errcode = rd_kafka_topic_result_error(terr);
313322

314323
if (errcode != RD_KAFKA_RESP_ERR_NO_ERROR) {
324+
rd_kafka_event_destroy(event_response);
315325
return Baton(static_cast<RdKafka::ErrorCode>(errcode));
316326
}
317327
}
318328

329+
rd_kafka_event_destroy(event_response);
319330
return Baton(RdKafka::ERR_NO_ERROR);
320331
}
321332
}
@@ -367,8 +378,9 @@ Baton AdminClient::CreatePartitions(
367378
// Now we can get the error code from the event
368379
if (rd_kafka_event_error(event_response)) {
369380
// If we had a special error code, get out of here with it
370-
return Baton(static_cast<RdKafka::ErrorCode>(
371-
rd_kafka_event_error(event_response)));
381+
const rd_kafka_resp_err_t errcode = rd_kafka_event_error(event_response);
382+
rd_kafka_event_destroy(event_response);
383+
return Baton(static_cast<RdKafka::ErrorCode>(errcode));
372384
}
373385

374386
// get the created results
@@ -387,13 +399,17 @@ Baton AdminClient::CreatePartitions(
387399

388400
if (errcode != RD_KAFKA_RESP_ERR_NO_ERROR) {
389401
if (errmsg) {
390-
return Baton(static_cast<RdKafka::ErrorCode>(errcode), std::string(errmsg)); // NOLINT
402+
const std::string errormsg = std::string(errmsg);
403+
rd_kafka_event_destroy(event_response);
404+
return Baton(static_cast<RdKafka::ErrorCode>(errcode), errormsg); // NOLINT
391405
} else {
406+
rd_kafka_event_destroy(event_response);
392407
return Baton(static_cast<RdKafka::ErrorCode>(errcode));
393408
}
394409
}
395410
}
396411

412+
rd_kafka_event_destroy(event_response);
397413
return Baton(RdKafka::ERR_NO_ERROR);
398414
}
399415
}

0 commit comments

Comments
 (0)