diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 36df78326..49fa3e261 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -779,13 +779,12 @@ def position(self, partition, timeout_ms=None): timer = Timer(timeout_ms) position = self._subscription.assignment[partition].position - while position is None: + while position is None and not timer.expired: # batch update fetch positions for any partitions without a valid position - if self._update_fetch_positions(timeout_ms=timer.timeout_ms): - position = self._subscription.assignment[partition].position - if timer.expired: - return None - else: + self._update_fetch_positions(timeout_ms=timer.timeout_ms) + self._client.poll(timeout_ms=timer.timeout_ms) + position = self._subscription.assignment[partition].position + if position is not None: return position.offset def highwater(self, partition): diff --git a/test/integration/test_consumer_integration.py b/test/integration/test_consumer_integration.py index 71cf2642d..6060dc830 100644 --- a/test/integration/test_consumer_integration.py +++ b/test/integration/test_consumer_integration.py @@ -302,3 +302,25 @@ def test_kafka_consumer_offsets_for_times_errors(kafka_consumer_factory, topic): with pytest.raises(KafkaTimeoutError): consumer.offsets_for_times({bad_tp: 0}) + + +@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") +def test_kafka_consumer_position_after_seek_to_end(kafka_consumer_factory, topic, send_messages): + send_messages(range(0, 10), partition=0) + + # Start a consumer with manual partition assignment. + consumer = kafka_consumer_factory( + topics=(), + group_id=None, + enable_auto_commit=False, + ) + tp = TopicPartition(topic, 0) + consumer.assign([tp]) + + # Seek to the end of the partition, and call position() to synchronize the + # partition's offset without calling poll(). + consumer.seek_to_end(tp) + position = consumer.position(tp, timeout_ms=1000) + + # Verify we got the expected position + assert position == 10, f"Expected position 10, got {position}"