Skip to content

Commit 34de686

Browse files
committed
Add Consumer.seek()
1 parent 3456ded commit 34de686

File tree

3 files changed

+110
-1
lines changed

3 files changed

+110
-1
lines changed

confluent_kafka/src/Consumer.c

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -699,6 +699,55 @@ static PyObject *Consumer_resume (Handle *self, PyObject *args,
699699
Py_RETURN_NONE;
700700
}
701701

702+
703+
static PyObject *Consumer_seek (Handle *self, PyObject *args, PyObject *kwargs) {
704+
705+
TopicPartition *tp;
706+
rd_kafka_resp_err_t err;
707+
static char *kws[] = { "partition", NULL };
708+
rd_kafka_topic_t *rkt;
709+
710+
if (!self->rk) {
711+
PyErr_SetString(PyExc_RuntimeError, "Consumer closed");
712+
return NULL;
713+
}
714+
715+
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O", kws,
716+
(PyObject **)&tp))
717+
return NULL;
718+
719+
720+
if (PyObject_Type((PyObject *)tp) != (PyObject *)&TopicPartitionType) {
721+
PyErr_Format(PyExc_TypeError,
722+
"expected %s", TopicPartitionType.tp_name);
723+
return NULL;
724+
}
725+
726+
rkt = rd_kafka_topic_new(self->rk, tp->topic, NULL);
727+
if (!rkt) {
728+
cfl_PyErr_Format(rd_kafka_last_error(),
729+
"Failed to get topic object for "
730+
"topic \"%s\": %s",
731+
tp->topic,
732+
rd_kafka_err2str(rd_kafka_last_error()));
733+
return NULL;
734+
}
735+
736+
err = rd_kafka_seek(rkt, tp->partition, tp->offset, -1);
737+
738+
rd_kafka_topic_destroy(rkt);
739+
740+
if (err) {
741+
cfl_PyErr_Format(err,
742+
"Failed to seek to offset %"PRId64": %s",
743+
tp->offset, rd_kafka_err2str(err));
744+
return NULL;
745+
}
746+
747+
Py_RETURN_NONE;
748+
}
749+
750+
702751
static PyObject *Consumer_get_watermark_offsets (Handle *self, PyObject *args,
703752
PyObject *kwargs) {
704753

@@ -1136,6 +1185,24 @@ static PyMethodDef Consumer_methods[] = {
11361185
" :raises: KafkaException\n"
11371186
"\n"
11381187
},
1188+
{ "seek", (PyCFunction)Consumer_seek,
1189+
METH_VARARGS|METH_KEYWORDS,
1190+
".. py:function:: seek(partition)\n"
1191+
"\n"
1192+
" Set consume position for partition to offset.\n"
1193+
" The offset may be an absolute (>=0) or a\n"
1194+
" logical offset (:py:const:`OFFSET_BEGINNING` et.al).\n"
1195+
"\n"
1196+
" seek() may only be used to update the consume offset of an\n"
1197+
" actively consumed partition (i.e., after :py:const:`assign()`),\n"
1198+
" to set the starting offset of partition not being consumed instead\n"
1199+
" pass the offset in an `assign()` call.\n"
1200+
"\n"
1201+
" :param TopicPartition partition: Topic+partition+offset to seek to.\n"
1202+
"\n"
1203+
" :raises: KafkaException\n"
1204+
"\n"
1205+
},
11391206
{ "get_watermark_offsets", (PyCFunction)Consumer_get_watermark_offsets,
11401207
METH_VARARGS|METH_KEYWORDS,
11411208
".. py:function:: get_watermark_offsets(partition, [timeout=None], [cached=False])\n"

examples/integration_test.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,32 @@ def print_commit_result(err, partitions):
402402
print('# Committed offsets for: %s' % partitions)
403403

404404

405+
def verify_consumer_seek(c, seek_to_msg):
406+
""" Seek to message and verify the next consumed message matches.
407+
Must only be performed on an actively consuming consumer. """
408+
409+
tp = confluent_kafka.TopicPartition(seek_to_msg.topic(),
410+
seek_to_msg.partition(),
411+
seek_to_msg.offset())
412+
print('seek: Seeking to %s' % tp)
413+
c.seek(tp)
414+
415+
while True:
416+
msg = c.poll()
417+
assert msg is not None
418+
if msg.error():
419+
print('seek: Ignoring non-message: %s' % msg)
420+
continue
421+
422+
if msg.topic() != seek_to_msg.topic() or msg.partition() != seek_to_msg.partition():
423+
continue
424+
425+
print('seek: message at offset %d' % msg.offset())
426+
assert msg.offset() == seek_to_msg.offset(), \
427+
'expected message at offset %d, not %d' % (seek_to_msg.offset(), msg.offset())
428+
break
429+
430+
405431
def verify_consumer():
406432
""" Verify basic Consumer functionality """
407433

@@ -433,6 +459,8 @@ def print_wmark(consumer, parts):
433459
max_msgcnt = 100
434460
msgcnt = 0
435461

462+
first_msg = None
463+
436464
while True:
437465
# Consume until EOF or error
438466

@@ -455,6 +483,9 @@ def print_wmark(consumer, parts):
455483
(msg.topic(), msg.partition(), msg.offset(),
456484
msg.key(), msg.value(), tstype, timestamp))
457485

486+
if first_msg is None:
487+
first_msg = msg
488+
458489
if (msgcnt == 11):
459490
parts = c.assignment()
460491
print('Pausing partitions briefly')
@@ -499,6 +530,8 @@ def print_wmark(consumer, parts):
499530
offsets = c.offsets_for_times(topic_partions_to_search, timeout=1.0)
500531
print("offsets_for_times results: %s" % offsets)
501532

533+
verify_consumer_seek(c, first_msg)
534+
502535
# Close consumer
503536
c.close()
504537

tests/test_Consumer.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#!/usr/bin/env python
22

3-
from confluent_kafka import (Consumer, TopicPartition, KafkaError, KafkaException, TIMESTAMP_NOT_AVAILABLE,
3+
from confluent_kafka import (Consumer, TopicPartition, KafkaError,
4+
KafkaException, TIMESTAMP_NOT_AVAILABLE,
45
OFFSET_INVALID, libversion)
56
import pytest
67

@@ -55,6 +56,10 @@ def dummy_assign_revoke(consumer, partitions):
5556
partitions = list(map(lambda part: TopicPartition("test", part), range(0, 100, 3)))
5657
kc.assign(partitions)
5758

59+
with pytest.raises(KafkaException) as ex:
60+
kc.seek(TopicPartition("test", 0, 123))
61+
assert 'Erroneous state' in str(ex.value)
62+
5863
# Verify assignment
5964
assignment = kc.assignment()
6065
assert partitions == assignment
@@ -267,6 +272,10 @@ def test_any_method_after_close_throws_exception():
267272
c.position([TopicPartition("test", 0)])
268273
assert 'Consumer closed' == str(ex.value)
269274

275+
with pytest.raises(RuntimeError) as ex:
276+
c.seek([TopicPartition("test", 0, 0)])
277+
assert 'Consumer closed' == str(ex.value)
278+
270279
with pytest.raises(RuntimeError) as ex:
271280
lo, hi = c.get_watermark_offsets(TopicPartition("test", 0))
272281
assert 'Consumer closed' == str(ex.value)

0 commit comments

Comments
 (0)