Skip to content

Commit a945782

Browse files
authored
Added topic id to describe group response (#1645)
Added topic id to describe group response
1 parent d69e310 commit a945782

File tree

7 files changed

+232
-2
lines changed

7 files changed

+232
-2
lines changed

docs/index.rst

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ Supporting classes
3030
- :ref:`Node <pythonclient_node>`
3131
- :ref:`ConsumerGroupTopicPartitions <pythonclient_consumer_group_topic_partition>`
3232
- :ref:`ConsumerGroupState <pythonclient_consumer_group_state>`
33+
- :ref:`Uuid <pythonclient_uuid>`
3334

3435
- Errors:
3536
- :ref:`KafkaError <pythonclient_kafkaerror>`
@@ -660,6 +661,15 @@ ConsumerGroupState
660661
.. autoclass:: confluent_kafka.ConsumerGroupState
661662
:members:
662663

664+
.. _pythonclient_uuid:
665+
666+
****
667+
Uuid
668+
****
669+
670+
.. autoclass:: confluent_kafka.Uuid
671+
:members:
672+
663673
.. _serde_field:
664674

665675
************

examples/adminapi.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,7 @@ def example_describe_topics(a, args):
540540
try:
541541
t = future.result()
542542
print("Topic name : {}".format(t.name))
543+
print("Topic id : {}".format(t.topic_id))
543544
if (t.is_internal):
544545
print("Topic is Internal")
545546

src/confluent_kafka/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
Consumer,
3030
Message,
3131
TopicPartition,
32+
Uuid,
3233
libversion,
3334
version,
3435
TIMESTAMP_NOT_AVAILABLE,
@@ -46,7 +47,7 @@
4647
'Producer', 'DeserializingConsumer',
4748
'SerializingProducer', 'TIMESTAMP_CREATE_TIME', 'TIMESTAMP_LOG_APPEND_TIME',
4849
'TIMESTAMP_NOT_AVAILABLE', 'TopicPartition', 'Node',
49-
'ConsumerGroupTopicPartitions', 'ConsumerGroupState']
50+
'ConsumerGroupTopicPartitions', 'ConsumerGroupState', 'Uuid']
5051

5152
__version__ = version()[0]
5253

src/confluent_kafka/admin/_topic.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,19 @@ class TopicDescription:
2626
----------
2727
name : str
2828
The topic name.
29+
topic_id: Uuid
30+
The topic id of the topic
31+
is_internal:
32+
Whether the topic is internal or not
2933
partitions : list(TopicPartitionInfo)
3034
Partition information.
3135
authorized_operations: list(AclOperation)
3236
AclOperations allowed for the topic.
3337
"""
3438

35-
def __init__(self, name, is_internal, partitions, authorized_operations=None):
39+
def __init__(self, name, topic_id, is_internal, partitions, authorized_operations=None):
3640
self.name = name
41+
self.topic_id = topic_id
3742
self.is_internal = is_internal
3843
self.partitions = partitions
3944
self.authorized_operations = None

src/confluent_kafka/src/Admin.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3819,6 +3819,7 @@ static PyObject *Admin_c_TopicDescription_to_py(
38193819
size_t c_authorized_operations_cnt = 0;
38203820
size_t i = 0;
38213821
const rd_kafka_AclOperation_t *c_authorized_operations = NULL;
3822+
const rd_kafka_Uuid_t *c_topic_id = NULL;
38223823

38233824
TopicDescription_type = cfl_PyObject_lookup("confluent_kafka.admin",
38243825
"TopicDescription");
@@ -3833,6 +3834,11 @@ static PyObject *Admin_c_TopicDescription_to_py(
38333834
"name",
38343835
rd_kafka_TopicDescription_name(c_topic_description));
38353836

3837+
c_topic_id = rd_kafka_TopicDescription_topic_id(c_topic_description);
3838+
PyDict_SetItemString(kwargs,
3839+
"topic_id",
3840+
c_Uuid_to_py(c_topic_id));
3841+
38363842
is_internal = PyBool_FromLong(rd_kafka_TopicDescription_is_internal(c_topic_description));
38373843
if(PyDict_SetItemString(kwargs, "is_internal", is_internal) == -1) {
38383844
goto err;

src/confluent_kafka/src/confluent_kafka.c

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -810,6 +810,158 @@ PyObject *Message_new0 (const Handle *handle, const rd_kafka_message_t *rkm) {
810810

811811

812812

813+
/****************************************************************************
814+
*
815+
*
816+
* Uuid
817+
*
818+
*
819+
*
820+
*
821+
****************************************************************************/
822+
static PyObject *Uuid_most_significant_bits (Uuid *self, PyObject *ignore) {
823+
if(self->cUuid) {
824+
return cfl_PyLong_FromLong(rd_kafka_Uuid_most_significant_bits(self->cUuid));
825+
}
826+
Py_RETURN_NONE;
827+
}
828+
829+
static PyObject *Uuid_least_significant_bits (Uuid *self, PyObject *ignore) {
830+
if(self->cUuid) {
831+
return cfl_PyLong_FromLong(rd_kafka_Uuid_least_significant_bits(self->cUuid));
832+
}
833+
Py_RETURN_NONE;
834+
}
835+
836+
static PyMethodDef Uuid_methods[] = {
837+
{ "get_most_significant_bits", (PyCFunction)Uuid_most_significant_bits, METH_NOARGS,
838+
" :returns: Most significant 64 bits of the 128 bits Uuid\n"
839+
" :rtype: int\n"
840+
"\n"
841+
},
842+
{ "get_least_significant_bits", (PyCFunction)Uuid_least_significant_bits, METH_NOARGS,
843+
" :returns: Least significant 64 bits of the 128 bits Uuid\n"
844+
" :rtype: int\n"
845+
"\n"
846+
},
847+
{ NULL }
848+
};
849+
850+
851+
static PyObject *Uuid_str0 (Uuid *self) {
852+
if(self->cUuid) {
853+
const char *base64str = rd_kafka_Uuid_base64str(self->cUuid);
854+
if(base64str)
855+
return cfl_PyUnistr(_FromString(base64str));
856+
}
857+
Py_RETURN_NONE;
858+
}
859+
860+
static long Uuid_hash (Uuid *self) {
861+
862+
return rd_kafka_Uuid_most_significant_bits(self->cUuid) ^ rd_kafka_Uuid_least_significant_bits(self->cUuid);
863+
}
864+
865+
866+
static PyObject *Uuid_new (PyTypeObject *type, PyObject *args,
867+
PyObject *kwargs) {
868+
PyObject *self = type->tp_alloc(type, 1);
869+
return self;
870+
}
871+
872+
873+
static int Uuid_init (PyObject *self0, PyObject *args,
874+
PyObject *kwargs) {
875+
Uuid *self = (Uuid *)self0;
876+
static char *kws[] = { "most_significant_bits",
877+
"least_significant_bits",
878+
NULL };
879+
int64_t most_significant_bits;
880+
int64_t least_significant_bits;
881+
882+
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "LL", kws,
883+
&most_significant_bits,
884+
&least_significant_bits))
885+
return -1;
886+
887+
self->cUuid = rd_kafka_Uuid_new(most_significant_bits, least_significant_bits);
888+
889+
return 0;
890+
}
891+
892+
static int Uuid_clear (Uuid *self) {
893+
if (self->cUuid) {
894+
rd_kafka_Uuid_destroy(self->cUuid);
895+
self->cUuid = NULL;
896+
}
897+
return 0;
898+
}
899+
900+
static int Uuid_traverse (Uuid *self,
901+
visitproc visit, void *arg) {
902+
return 0;
903+
}
904+
905+
static void Uuid_dealloc (Uuid *self) {
906+
PyObject_GC_UnTrack(self);
907+
Uuid_clear(self);
908+
909+
Py_TYPE(self)->tp_free((PyObject *)self);
910+
}
911+
912+
PyTypeObject UuidType = {
913+
PyVarObject_HEAD_INIT(NULL, 0)
914+
"cimpl.Uuid", /* tp_name */
915+
sizeof(Uuid), /* tp_basicsize */
916+
0, /* tp_itemsize */
917+
(destructor)Uuid_dealloc, /* tp_dealloc */
918+
0, /* tp_print */
919+
0, /* tp_getattr */
920+
0, /* tp_setattr */
921+
0, /* tp_compare */
922+
(reprfunc)Uuid_str0, /* tp_repr */
923+
0, /* tp_as_number */
924+
0, /* tp_as_sequence */
925+
0, /* tp_as_mapping */
926+
(hashfunc)Uuid_hash, /* tp_hash */
927+
0, /* tp_call */
928+
0, /* tp_str */
929+
PyObject_GenericGetAttr, /* tp_getattro */
930+
0, /* tp_setattro */
931+
0, /* tp_as_buffer */
932+
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE |
933+
Py_TPFLAGS_HAVE_GC, /* tp_flags */
934+
"Generic Uuid. Being used in various identifiers including topic_id.\n"
935+
"\n"
936+
".. py:function:: Uuid(most_significant_bits, least_significant_bits)\n"
937+
"\n"
938+
" Instantiate a Uuid object.\n"
939+
"\n"
940+
" :param long most_significant_bits: Most significant 64 bits of the 128 bits Uuid.\n"
941+
" :param long least_significant_bits: Least significant 64 bits of the 128 bits Uuid.\n"
942+
" :rtype: Uuid\n"
943+
"\n"
944+
"\n", /* tp_doc */
945+
(traverseproc)Uuid_traverse, /* tp_traverse */
946+
(inquiry)Uuid_clear, /* tp_clear */
947+
0, /* tp_richcompare */
948+
0, /* tp_weaklistoffset */
949+
0, /* tp_iter */
950+
0, /* tp_iternext */
951+
Uuid_methods, /* tp_methods */
952+
0, /* tp_members */
953+
0, /* tp_getset */
954+
0, /* tp_base */
955+
0, /* tp_dict */
956+
0, /* tp_descr_get */
957+
0, /* tp_descr_set */
958+
0, /* tp_dictoffset */
959+
Uuid_init, /* tp_init */
960+
0, /* tp_alloc */
961+
Uuid_new /* tp_new */
962+
};
963+
964+
813965

814966
/****************************************************************************
815967
*
@@ -1488,6 +1640,47 @@ PyObject *c_Node_to_py(const rd_kafka_Node_t *c_node) {
14881640
return NULL;
14891641
}
14901642

1643+
PyObject *c_Uuid_to_py(const rd_kafka_Uuid_t *c_uuid) {
1644+
PyObject *uuid = NULL;
1645+
PyObject *Uuid_type = NULL;
1646+
PyObject *args = NULL;
1647+
PyObject *kwargs = NULL;
1648+
1649+
if(!c_uuid)
1650+
Py_RETURN_NONE;
1651+
1652+
Uuid_type = cfl_PyObject_lookup("confluent_kafka",
1653+
"Uuid");
1654+
if (!Uuid_type) {
1655+
goto err;
1656+
}
1657+
1658+
1659+
kwargs = PyDict_New();
1660+
1661+
cfl_PyDict_SetLong(kwargs, "most_significant_bits", rd_kafka_Uuid_most_significant_bits(c_uuid));
1662+
cfl_PyDict_SetLong(kwargs, "least_significant_bits", rd_kafka_Uuid_least_significant_bits(c_uuid));
1663+
1664+
args = PyTuple_New(0);
1665+
1666+
uuid = PyObject_Call(Uuid_type, args, kwargs);
1667+
1668+
Py_DECREF(Uuid_type);
1669+
Py_DECREF(args);
1670+
Py_DECREF(kwargs);
1671+
1672+
return uuid;
1673+
1674+
err:
1675+
Py_XDECREF(Uuid_type);
1676+
Py_XDECREF(args);
1677+
Py_XDECREF(kwargs);
1678+
Py_XDECREF(uuid);
1679+
1680+
return NULL;
1681+
1682+
}
1683+
14911684

14921685
/****************************************************************************
14931686
*
@@ -2834,6 +3027,8 @@ static PyObject *_init_cimpl (void) {
28343027
return NULL;
28353028
if (PyType_Ready(&MessageType) < 0)
28363029
return NULL;
3030+
if (PyType_Ready(&UuidType) < 0)
3031+
return NULL;
28373032
if (PyType_Ready(&TopicPartitionType) < 0)
28383033
return NULL;
28393034
if (PyType_Ready(&ProducerType) < 0)
@@ -2863,6 +3058,9 @@ static PyObject *_init_cimpl (void) {
28633058
Py_INCREF(&MessageType);
28643059
PyModule_AddObject(m, "Message", (PyObject *)&MessageType);
28653060

3061+
Py_INCREF(&UuidType);
3062+
PyModule_AddObject(m, "Uuid", (PyObject *)&UuidType);
3063+
28663064
Py_INCREF(&TopicPartitionType);
28673065
PyModule_AddObject(m, "TopicPartition",
28683066
(PyObject *)&TopicPartitionType);

src/confluent_kafka/src/confluent_kafka.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
384384
PyObject *kwargs);
385385
PyObject *c_parts_to_py (const rd_kafka_topic_partition_list_t *c_parts);
386386
PyObject *c_Node_to_py(const rd_kafka_Node_t *c_node);
387+
PyObject *c_Uuid_to_py(const rd_kafka_Uuid_t *c_uuid);
387388
rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist);
388389
PyObject *list_topics (Handle *self, PyObject *args, PyObject *kwargs);
389390
PyObject *list_groups (Handle *self, PyObject *args, PyObject *kwargs);
@@ -493,6 +494,14 @@ typedef struct {
493494
extern PyTypeObject NewTopicType;
494495

495496

497+
typedef struct {
498+
PyObject_HEAD
499+
rd_kafka_Uuid_t *cUuid;
500+
} Uuid;
501+
502+
extern PyTypeObject UuidType;
503+
504+
496505
typedef struct {
497506
PyObject_HEAD
498507
char *topic;

0 commit comments

Comments
 (0)