Skip to content

Commit 8fbc981

Browse files
authored
KIP-320: Allow fetchers to detect (#1540)
and handle log truncation. Python changes
1 parent 26d40be commit 8fbc981

File tree

5 files changed

+128
-44
lines changed

5 files changed

+128
-44
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ v2.1.0 is a feature release with the following features, fixes and enhancements:
1212
- Add reference support in Schema Registry client. (@RickTalken, #1304)
1313
- Migrated travis jobs to Semaphore CI (#1503)
1414
- Added support for schema references. (#1514 and @slominskir #1088)
15+
- [KIP-320](https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation):
16+
add offset leader epoch methods to the TopicPartition and Message classes (#1540).
1517

1618
confluent-kafka-python is based on librdkafka v2.1.0, see the
1719
[librdkafka release notes](https://github.com/edenhill/librdkafka/releases/tag/v2.1.0)

src/confluent_kafka/src/Consumer.c

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,7 @@ static PyObject *Consumer_commit (Handle *self, PyObject *args,
486486
} else if (msg) {
487487
Message *m;
488488
PyObject *uo8;
489+
rd_kafka_topic_partition_t *rktpar;
489490

490491
if (PyObject_Type((PyObject *)msg) !=
491492
(PyObject *)&MessageType) {
@@ -497,9 +498,12 @@ static PyObject *Consumer_commit (Handle *self, PyObject *args,
497498
m = (Message *)msg;
498499

499500
c_offsets = rd_kafka_topic_partition_list_new(1);
500-
rd_kafka_topic_partition_list_add(
501-
c_offsets, cfl_PyUnistr_AsUTF8(m->topic, &uo8),
502-
m->partition)->offset =m->offset + 1;
501+
rktpar = rd_kafka_topic_partition_list_add(
502+
c_offsets, cfl_PyUnistr_AsUTF8(m->topic, &uo8),
503+
m->partition);
504+
rktpar->offset =m->offset + 1;
505+
rd_kafka_topic_partition_set_leader_epoch(rktpar,
506+
m->leader_epoch);
503507
Py_XDECREF(uo8);
504508

505509
} else {
@@ -612,6 +616,7 @@ static PyObject *Consumer_store_offsets (Handle *self, PyObject *args,
612616
} else {
613617
Message *m;
614618
PyObject *uo8;
619+
rd_kafka_topic_partition_t *rktpar;
615620

616621
if (PyObject_Type((PyObject *)msg) !=
617622
(PyObject *)&MessageType) {
@@ -623,9 +628,12 @@ static PyObject *Consumer_store_offsets (Handle *self, PyObject *args,
623628
m = (Message *)msg;
624629

625630
c_offsets = rd_kafka_topic_partition_list_new(1);
626-
rd_kafka_topic_partition_list_add(
631+
rktpar = rd_kafka_topic_partition_list_add(
627632
c_offsets, cfl_PyUnistr_AsUTF8(m->topic, &uo8),
628-
m->partition)->offset = m->offset + 1;
633+
m->partition);
634+
rktpar->offset = m->offset + 1;
635+
rd_kafka_topic_partition_set_leader_epoch(rktpar,
636+
m->leader_epoch);
629637
Py_XDECREF(uo8);
630638
}
631639

@@ -783,9 +791,11 @@ static PyObject *Consumer_resume (Handle *self, PyObject *args,
783791
static PyObject *Consumer_seek (Handle *self, PyObject *args, PyObject *kwargs) {
784792

785793
TopicPartition *tp;
786-
rd_kafka_resp_err_t err;
794+
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
787795
static char *kws[] = { "partition", NULL };
788-
rd_kafka_topic_t *rkt;
796+
rd_kafka_topic_partition_list_t *seek_partitions;
797+
rd_kafka_topic_partition_t *rktpar;
798+
rd_kafka_error_t *error;
789799

790800
if (!self->rk) {
791801
PyErr_SetString(PyExc_RuntimeError, "Consumer closed");
@@ -803,21 +813,26 @@ static PyObject *Consumer_seek (Handle *self, PyObject *args, PyObject *kwargs)
803813
return NULL;
804814
}
805815

806-
rkt = rd_kafka_topic_new(self->rk, tp->topic, NULL);
807-
if (!rkt) {
808-
cfl_PyErr_Format(rd_kafka_last_error(),
809-
"Failed to get topic object for "
810-
"topic \"%s\": %s",
811-
tp->topic,
812-
rd_kafka_err2str(rd_kafka_last_error()));
813-
return NULL;
814-
}
816+
seek_partitions = rd_kafka_topic_partition_list_new(1);
817+
rktpar = rd_kafka_topic_partition_list_add(seek_partitions,
818+
tp->topic, tp->partition);
819+
rktpar->offset = tp->offset;
820+
rd_kafka_topic_partition_set_leader_epoch(rktpar, tp->leader_epoch);
815821

816822
Py_BEGIN_ALLOW_THREADS;
817-
err = rd_kafka_seek(rkt, tp->partition, tp->offset, -1);
823+
error = rd_kafka_seek_partitions(self->rk, seek_partitions, -1);
818824
Py_END_ALLOW_THREADS;
819825

820-
rd_kafka_topic_destroy(rkt);
826+
if (error) {
827+
err = rd_kafka_error_code(error);
828+
rd_kafka_error_destroy(error);
829+
}
830+
831+
if (!err && seek_partitions->elems[0].err) {
832+
err = seek_partitions->elems[0].err;
833+
}
834+
835+
rd_kafka_topic_partition_list_destroy(seek_partitions);
821836

822837
if (err) {
823838
cfl_PyErr_Format(err,

src/confluent_kafka/src/confluent_kafka.c

Lines changed: 81 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,13 @@ static PyObject *Message_offset (Message *self, PyObject *ignore) {
476476
Py_RETURN_NONE;
477477
}
478478

479+
static PyObject *Message_leader_epoch (Message *self, PyObject *ignore) {
480+
if (self->leader_epoch >= 0)
481+
return cfl_PyInt_FromInt(self->leader_epoch);
482+
else
483+
Py_RETURN_NONE;
484+
}
485+
479486

480487
static PyObject *Message_timestamp (Message *self, PyObject *ignore) {
481488
return Py_BuildValue("iL",
@@ -571,6 +578,11 @@ static PyMethodDef Message_methods[] = {
571578
" :rtype: int or None\n"
572579
"\n"
573580
},
581+
{ "leader_epoch", (PyCFunction)Message_leader_epoch, METH_NOARGS,
582+
" :returns: message offset leader epoch or None if not available.\n"
583+
" :rtype: int or None\n"
584+
"\n"
585+
},
574586
{ "timestamp", (PyCFunction)Message_timestamp, METH_NOARGS,
575587
"Retrieve timestamp type and timestamp from message.\n"
576588
"The timestamp type is one of:\n\n"
@@ -743,7 +755,7 @@ PyTypeObject MessageType = {
743755
0, /* tp_weaklistoffset */
744756
0, /* tp_iter */
745757
0, /* tp_iternext */
746-
Message_methods, /* tp_methods */
758+
Message_methods, /* tp_methods */
747759
0, /* tp_members */
748760
0, /* tp_getset */
749761
0, /* tp_base */
@@ -784,6 +796,7 @@ PyObject *Message_new0 (const Handle *handle, const rd_kafka_message_t *rkm) {
784796

785797
self->partition = rkm->partition;
786798
self->offset = rkm->offset;
799+
self->leader_epoch = rd_kafka_message_leader_epoch(rkm);
787800

788801
self->timestamp = rd_kafka_message_timestamp(rkm, &self->tstype);
789802

@@ -825,12 +838,17 @@ static int TopicPartition_clear (TopicPartition *self) {
825838

826839
static void TopicPartition_setup (TopicPartition *self, const char *topic,
827840
int partition, long long offset,
841+
int32_t leader_epoch,
828842
const char *metadata,
829843
rd_kafka_resp_err_t err) {
830844
self->topic = strdup(topic);
831845
self->partition = partition;
832846
self->offset = offset;
833847

848+
if (leader_epoch < 0)
849+
leader_epoch = -1;
850+
self->leader_epoch = leader_epoch;
851+
834852
if (metadata != NULL) {
835853
self->metadata = strdup(metadata);
836854
} else {
@@ -854,23 +872,27 @@ static int TopicPartition_init (PyObject *self, PyObject *args,
854872
PyObject *kwargs) {
855873
const char *topic;
856874
int partition = RD_KAFKA_PARTITION_UA;
875+
int32_t leader_epoch = -1;
857876
long long offset = RD_KAFKA_OFFSET_INVALID;
858877
const char *metadata = NULL;
859878

860879
static char *kws[] = { "topic",
861880
"partition",
862881
"offset",
863882
"metadata",
883+
"leader_epoch",
864884
NULL };
865885

866-
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|iLs", kws,
886+
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|iLsi", kws,
867887
&topic, &partition, &offset,
868-
&metadata)) {
888+
&metadata,
889+
&leader_epoch)) {
869890
return -1;
870891
}
871892

872893
TopicPartition_setup((TopicPartition *)self,
873-
topic, partition, offset, metadata, 0);
894+
topic, partition, offset,
895+
leader_epoch, metadata, 0);
874896
return 0;
875897
}
876898

@@ -890,6 +912,13 @@ static int TopicPartition_traverse (TopicPartition *self,
890912
return 0;
891913
}
892914

915+
static PyObject *TopicPartition_get_leader_epoch (TopicPartition *tp, void *closure) {
916+
if (tp->leader_epoch >= 0) {
917+
return cfl_PyInt_FromInt(tp->leader_epoch);
918+
}
919+
Py_RETURN_NONE;
920+
}
921+
893922

894923
static PyMemberDef TopicPartition_members[] = {
895924
{ "topic", T_STRING, offsetof(TopicPartition, topic), READONLY,
@@ -913,15 +942,36 @@ static PyMemberDef TopicPartition_members[] = {
913942
{ NULL }
914943
};
915944

945+
static PyGetSetDef TopicPartition_getters_and_setters[] = {
946+
{
947+
/* name */
948+
"leader_epoch",
949+
(getter) TopicPartition_get_leader_epoch,
950+
NULL,
951+
/* doc */
952+
":attribute leader_epoch: Offset leader epoch (int), or None",
953+
/* closure */
954+
NULL
955+
},
956+
{ NULL }
957+
};
958+
916959

917960
static PyObject *TopicPartition_str0 (TopicPartition *self) {
918961
PyObject *errstr = NULL;
919962
PyObject *errstr8 = NULL;
920963
const char *c_errstr = NULL;
921964
PyObject *ret;
922965
char offset_str[40];
966+
char leader_epoch_str[12];
923967

924968
snprintf(offset_str, sizeof(offset_str), "%"CFL_PRId64"", self->offset);
969+
if (self->leader_epoch >= 0)
970+
snprintf(leader_epoch_str, sizeof(leader_epoch_str),
971+
"%"CFL_PRId32"", self->leader_epoch);
972+
else
973+
snprintf(leader_epoch_str, sizeof(leader_epoch_str),
974+
"None");
925975

926976
if (self->error != Py_None) {
927977
errstr = cfl_PyObject_Unistr(self->error);
@@ -930,9 +980,10 @@ static PyObject *TopicPartition_str0 (TopicPartition *self) {
930980

931981
ret = cfl_PyUnistr(
932982
_FromFormat("TopicPartition{topic=%s,partition=%"CFL_PRId32
933-
",offset=%s,error=%s}",
983+
",offset=%s,leader_epoch=%s,error=%s}",
934984
self->topic, self->partition,
935985
offset_str,
986+
leader_epoch_str,
936987
c_errstr ? c_errstr : "None"));
937988
Py_XDECREF(errstr8);
938989
Py_XDECREF(errstr);
@@ -1024,48 +1075,53 @@ PyTypeObject TopicPartitionType = {
10241075
"It is typically used to provide a list of topics or partitions for "
10251076
"various operations, such as :py:func:`Consumer.assign()`.\n"
10261077
"\n"
1027-
".. py:function:: TopicPartition(topic, [partition], [offset])\n"
1078+
".. py:function:: TopicPartition(topic, [partition], [offset],"
1079+
" [metadata], [leader_epoch])\n"
10281080
"\n"
10291081
" Instantiate a TopicPartition object.\n"
10301082
"\n"
10311083
" :param string topic: Topic name\n"
10321084
" :param int partition: Partition id\n"
10331085
" :param int offset: Initial partition offset\n"
1086+
" :param string metadata: Offset metadata\n"
1087+
" :param int leader_epoch: Offset leader epoch\n"
10341088
" :rtype: TopicPartition\n"
10351089
"\n"
10361090
"\n", /*tp_doc*/
10371091
(traverseproc)TopicPartition_traverse, /* tp_traverse */
10381092
(inquiry)TopicPartition_clear, /* tp_clear */
10391093
(richcmpfunc)TopicPartition_richcompare, /* tp_richcompare */
1040-
0, /* tp_weaklistoffset */
1041-
0, /* tp_iter */
1042-
0, /* tp_iternext */
1043-
0, /* tp_methods */
1044-
TopicPartition_members,/* tp_members */
1045-
0, /* tp_getset */
1046-
0, /* tp_base */
1047-
0, /* tp_dict */
1048-
0, /* tp_descr_get */
1049-
0, /* tp_descr_set */
1050-
0, /* tp_dictoffset */
1051-
TopicPartition_init, /* tp_init */
1052-
0, /* tp_alloc */
1053-
TopicPartition_new /* tp_new */
1094+
0, /* tp_weaklistoffset */
1095+
0, /* tp_iter */
1096+
0, /* tp_iternext */
1097+
0, /* tp_methods */
1098+
TopicPartition_members, /* tp_members */
1099+
TopicPartition_getters_and_setters, /* tp_getset */
1100+
0, /* tp_base */
1101+
0, /* tp_dict */
1102+
0, /* tp_descr_get */
1103+
0, /* tp_descr_set */
1104+
0, /* tp_dictoffset */
1105+
TopicPartition_init, /* tp_init */
1106+
0, /* tp_alloc */
1107+
TopicPartition_new /* tp_new */
10541108
};
10551109

10561110
/**
10571111
* @brief Internal factory to create a TopicPartition object.
10581112
*/
10591113
static PyObject *TopicPartition_new0 (const char *topic, int partition,
1060-
long long offset, const char *metadata,
1114+
long long offset, int32_t leader_epoch,
1115+
const char *metadata,
10611116
rd_kafka_resp_err_t err) {
10621117
TopicPartition *self;
10631118

10641119
self = (TopicPartition *)TopicPartitionType.tp_new(
10651120
&TopicPartitionType, NULL, NULL);
10661121

10671122
TopicPartition_setup(self, topic, partition,
1068-
offset, metadata, err);
1123+
offset, leader_epoch,
1124+
metadata, err);
10691125

10701126
return (PyObject *)self;
10711127
}
@@ -1090,6 +1146,7 @@ PyObject *c_parts_to_py (const rd_kafka_topic_partition_list_t *c_parts) {
10901146
TopicPartition_new0(
10911147
rktpar->topic, rktpar->partition,
10921148
rktpar->offset,
1149+
rd_kafka_topic_partition_get_leader_epoch(rktpar),
10931150
rktpar->metadata,
10941151
rktpar->err));
10951152
}
@@ -1133,6 +1190,8 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {
11331190
tp->topic,
11341191
tp->partition);
11351192
rktpar->offset = tp->offset;
1193+
rd_kafka_topic_partition_set_leader_epoch(rktpar,
1194+
tp->leader_epoch);
11361195
if (tp->metadata != NULL) {
11371196
rktpar->metadata_size = strlen(tp->metadata) + 1;
11381197
rktpar->metadata = strdup(tp->metadata);

src/confluent_kafka/src/confluent_kafka.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,7 @@ typedef struct {
356356
char *topic;
357357
int partition;
358358
int64_t offset;
359+
int32_t leader_epoch;
359360
char *metadata;
360361
PyObject *error;
361362
} TopicPartition;
@@ -428,6 +429,7 @@ typedef struct {
428429
PyObject *error;
429430
int32_t partition;
430431
int64_t offset;
432+
int32_t leader_epoch;
431433
int64_t timestamp;
432434
rd_kafka_timestamp_type_t tstype;
433435
int64_t latency; /**< Producer: time it took to produce message */

tests/integration/integration_test.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -569,7 +569,8 @@ def verify_consumer_seek(c, seek_to_msg):
569569

570570
tp = confluent_kafka.TopicPartition(seek_to_msg.topic(),
571571
seek_to_msg.partition(),
572-
seek_to_msg.offset())
572+
seek_to_msg.offset(),
573+
leader_epoch=seek_to_msg.leader_epoch())
573574
print('seek: Seeking to %s' % tp)
574575
c.seek(tp)
575576

@@ -583,9 +584,14 @@ def verify_consumer_seek(c, seek_to_msg):
583584
if msg.topic() != seek_to_msg.topic() or msg.partition() != seek_to_msg.partition():
584585
continue
585586

586-
print('seek: message at offset %d' % msg.offset())
587-
assert msg.offset() == seek_to_msg.offset(), \
588-
'expected message at offset %d, not %d' % (seek_to_msg.offset(), msg.offset())
587+
print('seek: message at offset %d (epoch %d)' %
588+
(msg.offset(), msg.leader_epoch()))
589+
assert msg.offset() == seek_to_msg.offset() and \
590+
msg.leader_epoch() == seek_to_msg.leader_epoch(), \
591+
('expected message at offset %d (epoch %d), ' % (seek_to_msg.offset(),
592+
seek_to_msg.leader_epoch())) + \
593+
('not %d (epoch %d)' % (msg.offset(),
594+
msg.leader_epoch()))
589595
break
590596

591597

0 commit comments

Comments
 (0)