Skip to content

Commit 5c19cef

Browse files
Merge pull request #469 from jalaziz/fix-topicmgr-memory-leak
fix: Kafka client resource leak
2 parents 183abf8 + 66439a0 commit 5c19cef

File tree

2 files changed

+11
-6
lines changed

2 files changed

+11
-6
lines changed

processor.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,11 @@ func (g *Processor) Run(ctx context.Context) (rerr error) {
288288
if err != nil {
289289
return fmt.Errorf("Error creating topic manager for brokers [%s]: %v", strings.Join(g.brokers, ","), err)
290290
}
291+
defer func() {
292+
if err := g.tmgr.Close(); err != nil {
293+
errs = multierror.Append(errs, fmt.Errorf("error closing topic manager: %w", err))
294+
}
295+
}()
291296

292297
// create kafka producer
293298
g.log.Debugf("creating producer")

processor_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ func TestProcessor_Run(t *testing.T) {
220220
ctrl, bm := createMockBuilder(t)
221221
defer ctrl.Finish()
222222

223-
bm.tmgr.EXPECT().Close().Times(1)
223+
bm.tmgr.EXPECT().Close().Times(2)
224224
bm.tmgr.EXPECT().Partitions(gomock.Any()).Return([]int32{0}, nil).Times(1)
225225
bm.producer.EXPECT().Close().Times(1)
226226

@@ -263,7 +263,7 @@ func TestProcessor_Run(t *testing.T) {
263263
ctrl, bm := createMockBuilder(t)
264264
defer ctrl.Finish()
265265

266-
bm.tmgr.EXPECT().Close().Times(1)
266+
bm.tmgr.EXPECT().Close().Times(2)
267267
bm.tmgr.EXPECT().Partitions(gomock.Any()).Return([]int32{0}, nil).Times(1)
268268
bm.producer.EXPECT().Close().Times(1)
269269

@@ -300,7 +300,7 @@ func TestProcessor_Run(t *testing.T) {
300300
ctrl, bm := createMockBuilder(t)
301301
defer ctrl.Finish()
302302

303-
bm.tmgr.EXPECT().Close().Times(1)
303+
bm.tmgr.EXPECT().Close().Times(2)
304304
bm.tmgr.EXPECT().Partitions(gomock.Any()).Return([]int32{0}, nil).Times(1)
305305
bm.producer.EXPECT().Close().Times(1)
306306

@@ -343,7 +343,7 @@ func TestProcessor_Run(t *testing.T) {
343343
ctrl, bm := createMockBuilder(t)
344344
defer ctrl.Finish()
345345

346-
bm.tmgr.EXPECT().Close().Times(1)
346+
bm.tmgr.EXPECT().Close().Times(2)
347347
bm.tmgr.EXPECT().Partitions(gomock.Any()).Return([]int32{0}, nil).Times(1)
348348
bm.producer.EXPECT().Close().Times(1)
349349

@@ -394,7 +394,7 @@ func TestProcessor_Stop(t *testing.T) {
394394
ctrl, bm := createMockBuilder(t)
395395
defer ctrl.Finish()
396396

397-
bm.tmgr.EXPECT().Close().Times(1)
397+
bm.tmgr.EXPECT().Close().Times(2)
398398
bm.tmgr.EXPECT().Partitions(gomock.Any()).Return([]int32{0}, nil).Times(1)
399399
bm.producer.EXPECT().Close().Times(1)
400400

@@ -448,7 +448,7 @@ func TestProcessor_Stop(t *testing.T) {
448448
ctrl, bm := createMockBuilder(t)
449449
defer ctrl.Finish()
450450

451-
bm.tmgr.EXPECT().Close().Times(1)
451+
bm.tmgr.EXPECT().Close().Times(2)
452452
bm.tmgr.EXPECT().Partitions(gomock.Any()).Return([]int32{0}, nil).Times(1)
453453
bm.producer.EXPECT().Close().Times(1)
454454

0 commit comments

Comments
 (0)