Skip to content

Commit fdaa0d4

Browse files
committed
Refactor Consumer and Producer into common Handle for code reuse.
1 parent 8a7b837 commit fdaa0d4

File tree

4 files changed

+172
-139
lines changed

4 files changed

+172
-139
lines changed

confluent_kafka/src/Consumer.c

Lines changed: 60 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -28,23 +28,26 @@
2828
****************************************************************************/
2929

3030

31-
static int Consumer_clear (Consumer *self) {
32-
if (self->on_assign) {
33-
Py_DECREF(self->on_assign);
34-
self->on_assign = NULL;
31+
static int Consumer_clear (Handle *self) {
32+
if (self->u.Consumer.on_assign) {
33+
Py_DECREF(self->u.Consumer.on_assign);
34+
self->u.Consumer.on_assign = NULL;
3535
}
36-
if (self->on_revoke) {
37-
Py_DECREF(self->on_revoke);
38-
self->on_revoke = NULL;
36+
if (self->u.Consumer.on_revoke) {
37+
Py_DECREF(self->u.Consumer.on_revoke);
38+
self->u.Consumer.on_revoke = NULL;
3939
}
40-
if (self->on_commit) {
41-
Py_DECREF(self->on_commit);
42-
self->on_commit = NULL;
40+
if (self->u.Consumer.on_commit) {
41+
Py_DECREF(self->u.Consumer.on_commit);
42+
self->u.Consumer.on_commit = NULL;
4343
}
44+
45+
Handle_clear(self);
46+
4447
return 0;
4548
}
4649

47-
static void Consumer_dealloc (Consumer *self) {
50+
static void Consumer_dealloc (Handle *self) {
4851
PyObject_GC_UnTrack(self);
4952

5053
Consumer_clear(self);
@@ -55,12 +58,15 @@ static void Consumer_dealloc (Consumer *self) {
5558
Py_TYPE(self)->tp_free((PyObject *)self);
5659
}
5760

58-
static int Consumer_traverse (Consumer *self,
59-
visitproc visit, void *arg) {
60-
if (self->on_assign)
61-
Py_VISIT(self->on_assign);
62-
if (self->on_revoke)
63-
Py_VISIT(self->on_revoke);
61+
static int Consumer_traverse (Handle *self,
62+
visitproc visit, void *arg) {
63+
if (self->u.Consumer.on_assign)
64+
Py_VISIT(self->u.Consumer.on_assign);
65+
if (self->u.Consumer.on_revoke)
66+
Py_VISIT(self->u.Consumer.on_revoke);
67+
68+
Handle_traverse(self, visit, arg);
69+
6470
return 0;
6571
}
6672

@@ -69,7 +75,7 @@ static int Consumer_traverse (Consumer *self,
6975

7076

7177

72-
static PyObject *Consumer_subscribe (Consumer *self, PyObject *args,
78+
static PyObject *Consumer_subscribe (Handle *self, PyObject *args,
7379
PyObject *kwargs) {
7480

7581
rd_kafka_topic_partition_list_t *topics;
@@ -130,29 +136,29 @@ static PyObject *Consumer_subscribe (Consumer *self, PyObject *args,
130136
/*
131137
* Update rebalance callbacks
132138
*/
133-
if (self->on_assign) {
134-
Py_DECREF(self->on_assign);
135-
self->on_assign = NULL;
139+
if (self->u.Consumer.on_assign) {
140+
Py_DECREF(self->u.Consumer.on_assign);
141+
self->u.Consumer.on_assign = NULL;
136142
}
137143
if (on_assign) {
138-
self->on_assign = on_assign;
139-
Py_INCREF(self->on_assign);
144+
self->u.Consumer.on_assign = on_assign;
145+
Py_INCREF(self->u.Consumer.on_assign);
140146
}
141147

142-
if (self->on_revoke) {
143-
Py_DECREF(self->on_revoke);
144-
self->on_revoke = NULL;
148+
if (self->u.Consumer.on_revoke) {
149+
Py_DECREF(self->u.Consumer.on_revoke);
150+
self->u.Consumer.on_revoke = NULL;
145151
}
146152
if (on_revoke) {
147-
self->on_revoke = on_revoke;
148-
Py_INCREF(self->on_revoke);
153+
self->u.Consumer.on_revoke = on_revoke;
154+
Py_INCREF(self->u.Consumer.on_revoke);
149155
}
150156

151157
Py_RETURN_NONE;
152158
}
153159

154160

155-
static PyObject *Consumer_unsubscribe (Consumer *self,
161+
static PyObject *Consumer_unsubscribe (Handle *self,
156162
PyObject *ignore) {
157163

158164
rd_kafka_resp_err_t err;
@@ -169,15 +175,15 @@ static PyObject *Consumer_unsubscribe (Consumer *self,
169175
}
170176

171177

172-
static PyObject *Consumer_assign (Consumer *self, PyObject *tlist) {
178+
static PyObject *Consumer_assign (Handle *self, PyObject *tlist) {
173179

174180
rd_kafka_topic_partition_list_t *c_parts;
175181
rd_kafka_resp_err_t err;
176182

177183
if (!(c_parts = py_to_c_parts(tlist)))
178184
return NULL;
179185

180-
self->rebalance_assigned++;
186+
self->u.Consumer.rebalance_assigned++;
181187

182188
err = rd_kafka_assign(self->rk, c_parts);
183189

@@ -194,11 +200,11 @@ static PyObject *Consumer_assign (Consumer *self, PyObject *tlist) {
194200
}
195201

196202

197-
static PyObject *Consumer_unassign (Consumer *self, PyObject *ignore) {
203+
static PyObject *Consumer_unassign (Handle *self, PyObject *ignore) {
198204

199205
rd_kafka_resp_err_t err;
200206

201-
self->rebalance_assigned++;
207+
self->u.Consumer.rebalance_assigned++;
202208

203209
err = rd_kafka_assign(self->rk, NULL);
204210
if (err) {
@@ -213,7 +219,7 @@ static PyObject *Consumer_unassign (Consumer *self, PyObject *ignore) {
213219

214220

215221

216-
static PyObject *Consumer_commit (Consumer *self, PyObject *args,
222+
static PyObject *Consumer_commit (Handle *self, PyObject *args,
217223
PyObject *kwargs) {
218224

219225
rd_kafka_resp_err_t err;
@@ -281,7 +287,7 @@ static PyObject *Consumer_commit (Consumer *self, PyObject *args,
281287

282288

283289

284-
static PyObject *Consumer_committed (Consumer *self, PyObject *args,
290+
static PyObject *Consumer_committed (Handle *self, PyObject *args,
285291
PyObject *kwargs) {
286292

287293
PyObject *plist;
@@ -317,7 +323,7 @@ static PyObject *Consumer_committed (Consumer *self, PyObject *args,
317323
}
318324

319325

320-
static PyObject *Consumer_position (Consumer *self, PyObject *args,
326+
static PyObject *Consumer_position (Handle *self, PyObject *args,
321327
PyObject *kwargs) {
322328

323329
PyObject *plist;
@@ -352,7 +358,7 @@ static PyObject *Consumer_position (Consumer *self, PyObject *args,
352358

353359

354360

355-
static PyObject *Consumer_poll (Consumer *self, PyObject *args,
361+
static PyObject *Consumer_poll (Handle *self, PyObject *args,
356362
PyObject *kwargs) {
357363
double tmout = -1.0f;
358364
static char *kws[] = { "timeout", NULL };
@@ -384,7 +390,7 @@ static PyObject *Consumer_poll (Consumer *self, PyObject *args,
384390
}
385391

386392

387-
static PyObject *Consumer_close (Consumer *self, PyObject *ignore) {
393+
static PyObject *Consumer_close (Handle *self, PyObject *ignore) {
388394
self->thread_state = PyEval_SaveThread();
389395
rd_kafka_consumer_close(self->rk);
390396
PyEval_RestoreThread(self->thread_state);
@@ -523,7 +529,7 @@ static PyObject *Consumer_new (PyTypeObject *type, PyObject *args,
523529
PyTypeObject ConsumerType = {
524530
PyVarObject_HEAD_INIT(NULL, 0)
525531
"cimpl.Consumer", /*tp_name*/
526-
sizeof(Consumer), /*tp_basicsize*/
532+
sizeof(Handle), /*tp_basicsize*/
527533
0, /*tp_itemsize*/
528534
(destructor)Consumer_dealloc, /*tp_dealloc*/
529535
0, /*tp_print*/
@@ -584,14 +590,16 @@ PyTypeObject ConsumerType = {
584590
static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
585591
rd_kafka_topic_partition_list_t *c_parts,
586592
void *opaque) {
587-
Consumer *self = opaque;
593+
Handle *self = opaque;
588594

589595
PyEval_RestoreThread(self->thread_state);
590596

591-
self->rebalance_assigned = 0;
597+
self->u.Consumer.rebalance_assigned = 0;
592598

593-
if ((err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS && self->on_assign) ||
594-
(err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS && self->on_revoke)) {
599+
if ((err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS &&
600+
self->u.Consumer.on_assign) ||
601+
(err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS &&
602+
self->u.Consumer.on_revoke)) {
595603
PyObject *parts;
596604
PyObject *args, *result;
597605

@@ -612,7 +620,8 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
612620

613621
result = PyObject_CallObject(
614622
err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS ?
615-
self->on_assign : self->on_revoke, args);
623+
self->u.Consumer.on_assign :
624+
self->u.Consumer.on_revoke, args);
616625

617626
Py_DECREF(args);
618627

@@ -628,7 +637,7 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
628637
* to synchronize state, if the user did not do this from callback,
629638
* or there was no callback, or the callback failed, then we perform
630639
* that assign() call here instead. */
631-
if (!self->rebalance_assigned) {
640+
if (!self->u.Consumer.rebalance_assigned) {
632641
if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS)
633642
rd_kafka_assign(rk, c_parts);
634643
else
@@ -642,10 +651,10 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
642651
static void Consumer_offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
643652
rd_kafka_topic_partition_list_t *c_parts,
644653
void *opaque) {
645-
Consumer *self = opaque;
654+
Handle *self = opaque;
646655
PyObject *parts, *k_err, *args, *result;
647656

648-
if (!self->on_commit)
657+
if (!self->u.Consumer.on_commit)
649658
return;
650659

651660
PyEval_RestoreThread(self->thread_state);
@@ -669,7 +678,7 @@ static void Consumer_offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
669678
return;
670679
}
671680

672-
result = PyObject_CallObject(self->on_commit, args);
681+
result = PyObject_CallObject(self->u.Consumer.on_commit, args);
673682

674683
Py_DECREF(args);
675684

@@ -687,16 +696,16 @@ static void Consumer_offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
687696

688697
static PyObject *Consumer_new (PyTypeObject *type, PyObject *args,
689698
PyObject *kwargs) {
690-
Consumer *self;
699+
Handle *self;
691700
char errstr[256];
692701
rd_kafka_conf_t *conf;
693702

694-
self = (Consumer *)ConsumerType.tp_alloc(&ConsumerType, 0);
703+
self = (Handle *)ConsumerType.tp_alloc(&ConsumerType, 0);
695704
if (!self)
696705
return NULL;
697706

698707
if (!(conf = common_conf_setup(RD_KAFKA_CONSUMER, self,
699-
args, kwargs))) {
708+
args, kwargs))) {
700709
Py_DECREF(self);
701710
return NULL;
702711
}

0 commit comments

Comments
 (0)