Skip to content

Commit fb1734f

Browse files
authored
consumer: guard manager.run calls with client._lock (#2940)
1 parent 96275d2 commit fb1734f

1 file changed

Lines changed: 4 additions & 2 deletions

File tree

kafka/consumer/fetcher.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,8 @@ def offsets_by_times(self, timestamps, timeout_ms=None):
219219
Raises:
220220
KafkaTimeoutError if timeout_ms provided
221221
"""
222-
offsets = self._manager.run(self._fetch_offsets_by_times_async, timestamps, timeout_ms)
222+
with self._client._lock:
223+
offsets = self._manager.run(self._fetch_offsets_by_times_async, timestamps, timeout_ms)
223224
for tp in timestamps:
224225
if tp not in offsets:
225226
offsets[tp] = None
@@ -344,7 +345,8 @@ def beginning_or_end_offset(self, partitions, timestamp, timeout_ms=None):
344345
KafkaTimeoutError if timeout_ms provided.
345346
"""
346347
timestamps = dict([(tp, timestamp) for tp in partitions])
347-
offsets = self._manager.run(self._fetch_offsets_by_times_async, timestamps, timeout_ms)
348+
with self._client._lock:
349+
offsets = self._manager.run(self._fetch_offsets_by_times_async, timestamps, timeout_ms)
348350
for tp in timestamps:
349351
offsets[tp] = offsets[tp].offset
350352
return offsets

0 commit comments

Comments
 (0)