Skip to content

Commit 17029ad

Browse files
Made num_partitions optional (#1515)
* Default New Topic Params -> num_partitions optional * Added to Changelog * Keeping one source of truth * Made unit tests and Integration bounded to scope of python client * Changelog * Unit tests removed from Integration test * Flake8 * Review Comments Addressed * flake8 and whitespace changes * Whitespace Changes * flake8 * Whitespaces * Whitespace * PR comment addressal * PR comment addressal * PR comment addressal * PR comment Addressal
1 parent 6a3f438 commit 17029ad

File tree

4 files changed

+32
-7
lines changed

4 files changed

+32
-7
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
- Added `set_sasl_credentials`. This new method (on the Producer, Consumer, and AdminClient) allows modifying the stored
66
SASL PLAIN/SCRAM credentials that will be used for subsequent (new) connections to a broker (#1511).
77
- Wheels for Linux / arm64 (#1496).
8-
8+
- Added support for Default num_partitions in CreateTopics Admin API.
99

1010
## v2.0.2
1111

src/confluent_kafka/src/Admin.c

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,7 @@ static PyObject *Admin_create_topics (Handle *self, PyObject *args,
418418
rd_kafka_AdminOptions_t *c_options = NULL;
419419
int tcnt;
420420
int i;
421+
int topic_partition_count;
421422
rd_kafka_NewTopic_t **c_objs;
422423
rd_kafka_queue_t *rkqu;
423424
CallState cs;
@@ -492,10 +493,16 @@ static PyObject *Admin_create_topics (Handle *self, PyObject *args,
492493
goto err;
493494
}
494495

496+
if (newt->num_partitions == -1) {
497+
topic_partition_count = PyList_Size(newt->replica_assignment);
498+
} else {
499+
topic_partition_count = newt->num_partitions;
500+
}
495501
if (!Admin_set_replica_assignment(
496502
"CreateTopics", (void *)c_objs[i],
497503
newt->replica_assignment,
498-
newt->num_partitions, newt->num_partitions,
504+
topic_partition_count,
505+
topic_partition_count,
499506
"num_partitions")) {
500507
i++;
501508
goto err;

src/confluent_kafka/src/AdminTypes.c

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,18 +74,20 @@ static int NewTopic_init (PyObject *self0, PyObject *args,
7474
"config",
7575
NULL };
7676

77+
self->num_partitions = -1;
7778
self->replication_factor = -1;
7879
self->replica_assignment = NULL;
7980
self->config = NULL;
8081

81-
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "si|iOO", kws,
82+
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|iiOO", kws,
8283
&topic, &self->num_partitions,
8384
&self->replication_factor,
8485
&self->replica_assignment,
8586
&self->config))
8687
return -1;
8788

8889

90+
8991
if (self->config) {
9092
if (!PyDict_Check(self->config)) {
9193
PyErr_SetString(PyExc_TypeError,
@@ -125,7 +127,8 @@ static PyMemberDef NewTopic_members[] = {
125127
{ "topic", T_STRING, offsetof(NewTopic, topic), READONLY,
126128
":py:attribute:topic - Topic name (string)" },
127129
{ "num_partitions", T_INT, offsetof(NewTopic, num_partitions), 0,
128-
":py:attribute: Number of partitions (int)" },
130+
":py:attribute: Number of partitions (int).\n"
131+
"Or -1 if a replica_assignment is specified" },
129132
{ "replication_factor", T_INT, offsetof(NewTopic, replication_factor),
130133
0,
131134
" :py:attribute: Replication factor (int).\n"
@@ -147,6 +150,11 @@ static PyMemberDef NewTopic_members[] = {
147150

148151

149152
static PyObject *NewTopic_str0 (NewTopic *self) {
153+
if (self->num_partitions == -1) {
154+
return cfl_PyUnistr(
155+
_FromFormat("NewTopic(topic=%s)",
156+
self->topic));
157+
}
150158
return cfl_PyUnistr(
151159
_FromFormat("NewTopic(topic=%s,num_partitions=%d)",
152160
self->topic, self->num_partitions));
@@ -202,7 +210,12 @@ NewTopic_richcompare (NewTopic *self, PyObject *o2, int op) {
202210

203211
static long NewTopic_hash (NewTopic *self) {
204212
PyObject *topic = cfl_PyUnistr(_FromString(self->topic));
205-
long r = PyObject_Hash(topic) ^ self->num_partitions;
213+
long r;
214+
if (self->num_partitions == -1) {
215+
r = PyObject_Hash(topic);
216+
} else {
217+
r = PyObject_Hash(topic) ^ self->num_partitions;
218+
}
206219
Py_DECREF(topic);
207220
return r;
208221
}
@@ -233,12 +246,12 @@ PyTypeObject NewTopicType = {
233246
"NewTopic specifies per-topic settings for passing to "
234247
"AdminClient.create_topics().\n"
235248
"\n"
236-
".. py:function:: NewTopic(topic, num_partitions, [replication_factor], [replica_assignment], [config])\n"
249+
".. py:function:: NewTopic(topic, [num_partitions], [replication_factor], [replica_assignment], [config])\n"
237250
"\n"
238251
" Instantiate a NewTopic object.\n"
239252
"\n"
240253
" :param string topic: Topic name\n"
241-
" :param int num_partitions: Number of partitions to create\n"
254+
" :param int num_partitions: Number of partitions to create, or -1 if replica_assignment is used.\n"
242255
" :param int replication_factor: Replication factor of partitions, or -1 if replica_assignment is used.\n"
243256
" :param list replica_assignment: List of lists with the replication assignment for each new partition.\n"
244257
" :param dict config: Dict (str:str) of topic configuration. See http://kafka.apache.org/documentation.html#topicconfigs\n"

tests/test_Admin.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,11 @@ def test_create_topics_api():
122122
with pytest.raises(Exception):
123123
a.create_topics([None, NewTopic("mytopic", 1, 2)])
124124

125+
try:
126+
a.create_topics([NewTopic("mytopic")])
127+
except Exception as err:
128+
assert False, f"When none of the partitions, \
129+
replication and assignment is present, the request should not fail, but it does with error {err}"
125130
fs = a.create_topics([NewTopic("mytopic", 3, 2)])
126131
with pytest.raises(KafkaException):
127132
for f in concurrent.futures.as_completed(iter(fs.values())):

0 commit comments

Comments
 (0)