Skip to content

Commit 0815837

Browse files
committed
#523: add commit.last_poll_timestamp(partition)
1 parent 61fa44e commit 0815837

File tree

6 files changed

+38
-2
lines changed

6 files changed

+38
-2
lines changed

CHANGES.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
CHANGES
22
-------
33

4+
523.feature
5+
^^^^^^^^^^^
6+
7+
Add `consumer.last_poll_timestamp(partition)` which gives the ms timestamp of the last update of `highwater` and `lso`.
8+
9+
410
0.5.2 (2019-03-10)
511
^^^^^^^^^^^^^^^^^^
612

aiokafka/consumer/consumer.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -683,6 +683,25 @@ def last_stable_offset(self, partition):
683683
assignment = self._subscription.subscription.assignment
684684
return assignment.state_value(partition).lso
685685

686+
def last_poll_timestamp(self, partition):
687+
""" Returns the timestamp of the last poll of this partition (in ms).
688+
It is the last time `highwater` and `last_stable_offset` were
689+
udpated. However it does not mean that new messages were received.
690+
691+
As with ``highwater()`` will not be available until some messages are
692+
consumed.
693+
694+
Arguments:
695+
partition (TopicPartition): partition to check
696+
697+
Returns:
698+
int or None: timestamp if available
699+
"""
700+
assert self._subscription.is_assigned(partition), \
701+
'Partition is not assigned'
702+
assignment = self._subscription.subscription.assignment
703+
return assignment.state_value(partition).timestamp
704+
686705
def seek(self, partition, offset):
687706
""" Manually specify the fetch offset for a TopicPartition.
688707

aiokafka/consumer/fetcher.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import collections
33
import logging
44
import random
5+
import time
56
from itertools import chain
67

78
from kafka.protocol.offset import OffsetRequest
@@ -673,6 +674,7 @@ async def _proc_fetch_request(self, assignment, node_id, request):
673674
for partition, offset, _ in partitions:
674675
fetch_offsets[TopicPartition(topic, partition)] = offset
675676

677+
now_ms = int(1000 * time.time())
676678
for topic, partitions in response.topics:
677679
for partition, error_code, highwater, *part_data in partitions:
678680
tp = TopicPartition(topic, partition)
@@ -696,6 +698,7 @@ async def _proc_fetch_request(self, assignment, node_id, request):
696698
lso = None
697699
tp_state.highwater = highwater
698700
tp_state.lso = lso
701+
tp_state.timestamp = now_ms
699702

700703
# part_data also contains lso, aborted_transactions.
701704
# message_set is last

aiokafka/consumer/subscription_state.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,7 @@ def __init__(self, assignment, *, loop):
463463

464464
self.highwater = None # Last fetched highwater mark
465465
self.lso = None # Last fetched stable offset mark
466+
self.timestamp = None # timestamp of last poll
466467
self._position = None # The current position of the topic
467468
self._position_fut = create_future(loop=loop)
468469

docs/consumer.rst

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -444,8 +444,10 @@ catch up). For example::
444444
for partition in consumer.assignment():
445445
highwater = consumer.highwater(partition)
446446
position = await consumer.position(partition)
447-
lag = highwater - position
448-
if lag > LAG_THRESHOLD:
447+
position_lag = highwater - position
448+
timestamp = consumer.last_poll_timestamp(partition)
449+
time_lag = time.time() * 1000 - timestamp
450+
if position_lag > POSITION_THRESHOLD or time_lag > TIME_THRESHOLD:
449451
partitions.append(partition)
450452

451453
.. note:: This interface differs from `pause()`/`resume()` interface of

tests/test_consumer.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ async def test_simple_consumer(self):
6969
# check unsupported version
7070
consumer = await self.consumer_factory(api_version="0.8")
7171

72+
now = time.time()
7273
await self.send_messages(0, list(range(0, 100)))
7374
await self.send_messages(1, list(range(100, 200)))
7475
# Start a consumer_factory
@@ -101,6 +102,10 @@ async def test_simple_consumer(self):
101102

102103
h = consumer.highwater(p0)
103104
self.assertEqual(h, 100)
105+
t = consumer.last_poll_timestamp(p0)
106+
self.assertGreaterEqual(t, int(now * 1000))
107+
now = time.time()
108+
self.assertLessEqual(t, int(now * 1000))
104109

105110
consumer.seek(p0, offset + 90)
106111
for i in range(10):

0 commit comments

Comments
 (0)