Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 83 additions & 42 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@
"partition_data", "metric_aggregator"])


ExceptionMetadata = collections.namedtuple("ExceptionMetadata",
["partition", "fetched_offset", "exception"])


class NoOffsetForPartitionError(Errors.KafkaError):
pass

Expand Down Expand Up @@ -131,6 +135,7 @@ def __init__(self, client, subscriptions, **configs):
self._isolation_level = ISOLATION_LEVEL_CONFIG[self.config['isolation_level']]
self._session_handlers = {}
self._nodes_with_pending_fetch_requests = set()
self._next_in_line_exception_metadata = None

def send_fetches(self):
"""Send FetchRequests for all assigned partitions that do not already have
Expand Down Expand Up @@ -161,11 +166,13 @@ def reset_offsets_if_needed(self, partitions, timeout_ms=None):
Raises:
KafkaTimeoutError if timeout_ms provided
"""
inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout resetting offsets')
needs_offset_reset = set()
for tp in partitions:
# TODO: If there are several offsets to reset, we could submit offset requests in parallel
if self._subscriptions.is_assigned(tp) and self._subscriptions.is_offset_reset_needed(tp):
self._reset_offset(tp, timeout_ms=inner_timeout_ms())
needs_offset_reset.add(tp)

if needs_offset_reset:
self._reset_offsets(needs_offset_reset, timeout_ms=timeout_ms)

def _clean_done_fetch_futures(self):
while True:
Expand All @@ -191,31 +198,28 @@ def update_fetch_positions(self, partitions, timeout_ms=None):
partition and no reset policy is available
KafkaTimeoutError if timeout_ms provided.
"""
inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout updating fetch positions')
needs_offset_reset = set()
# reset the fetch position to the committed position
for tp in partitions:
if not self._subscriptions.is_assigned(tp):
log.warning("partition %s is not assigned - skipping offset"
" update", tp)
continue
elif self._subscriptions.is_fetchable(tp):
log.warning("partition %s is still fetchable -- skipping offset"
" update", tp)
if not self._subscriptions.is_assigned(tp) or self._subscriptions.has_valid_position(tp):
continue

if self._subscriptions.is_offset_reset_needed(tp):
self._reset_offset(tp, timeout_ms=inner_timeout_ms())
needs_offset_reset.add(tp)
elif self._subscriptions.assignment[tp].committed is None:
# there's no committed position, so we need to reset with the
# default strategy
self._subscriptions.need_offset_reset(tp)
self._reset_offset(tp, timeout_ms=inner_timeout_ms())
needs_offset_reset.add(tp)
else:
committed = self._subscriptions.assignment[tp].committed.offset
log.debug("Resetting offset for partition %s to the committed"
" offset %s", tp, committed)
self._subscriptions.seek(tp, committed)

if needs_offset_reset:
self._reset_offsets(needs_offset_reset, timeout_ms=timeout_ms)

def get_offsets_by_times(self, timestamps, timeout_ms):
offsets = self._retrieve_offsets(timestamps, timeout_ms)
for tp in timestamps:
Expand All @@ -238,37 +242,36 @@ def beginning_or_end_offset(self, partitions, timestamp, timeout_ms):
offsets[tp] = offsets[tp].offset
return offsets

def _reset_offset(self, partition, timeout_ms=None):
"""Reset offsets for the given partition using the offset reset strategy.
def _reset_offsets(self, partitions, timeout_ms=None):
"""Reset offsets for the given partitions using the offset reset strategy.

Arguments:
partition (TopicPartition): the partition that needs reset offset
partitions ([TopicPartition]): the partitions that need offsets reset

Raises:
NoOffsetForPartitionError: if no offset reset strategy is defined
KafkaTimeoutError if timeout_ms provided
"""
timestamp = self._subscriptions.assignment[partition].reset_strategy
if timestamp is OffsetResetStrategy.EARLIEST:
strategy = 'earliest'
elif timestamp is OffsetResetStrategy.LATEST:
strategy = 'latest'
else:
raise NoOffsetForPartitionError(partition)
offset_resets = dict()
for tp in partitions:
ts = self._subscriptions.assignment[tp].reset_strategy
if not ts:
raise NoOffsetForPartitionError(tp)
offset_resets[tp] = ts

log.debug("Resetting offset for partition %s to offset %s.",
partition, strategy)
offsets = self._retrieve_offsets({partition: timestamp}, timeout_ms=timeout_ms)
offsets = self._retrieve_offsets(offset_resets, timeout_ms=timeout_ms)

if partition in offsets:
offset = offsets[partition].offset
for partition in partitions:
if partition not in offsets:
raise NoOffsetForPartitionError(partition)

# we might lose the assignment while fetching the offset,
# so check it is still active
if self._subscriptions.is_assigned(partition):
offset = offsets[partition].offset
log.debug("Resetting offset for partition %s to offset %s.",
partition, offset)
self._subscriptions.seek(partition, offset)
else:
log.debug("Could not find offset for partition %s since it is probably deleted" % (partition,))

def _retrieve_offsets(self, timestamps, timeout_ms=None):
"""Fetch offset for each partition passed in ``timestamps`` map.
Expand Down Expand Up @@ -358,20 +361,40 @@ def fetched_records(self, max_records=None, update_offsets=True):
max_records = self.config['max_poll_records']
assert max_records > 0

if self._next_in_line_exception_metadata is not None:
exc_meta = self._next_in_line_exception_metadata
self._next_in_line_exception_metadata = None
tp = exc_meta.partition
if self._subscriptions.is_fetchable(tp) and self._subscriptions.position(tp).offset == exc_meta.fetched_offset:
raise exc_meta.exception

drained = collections.defaultdict(list)
records_remaining = max_records
# Needed to construct ExceptionMetadata if any exception is found when processing completed_fetch
fetched_partition = None
fetched_offset = -1

while records_remaining > 0:
if not self._next_partition_records:
if not self._completed_fetches:
break
completion = self._completed_fetches.popleft()
self._next_partition_records = self._parse_fetched_data(completion)
else:
records_remaining -= self._append(drained,
self._next_partition_records,
records_remaining,
update_offsets)
try:
while records_remaining > 0:
if not self._next_partition_records:
if not self._completed_fetches:
break
completion = self._completed_fetches.popleft()
fetched_partition = completion.topic_partition
fetched_offset = completion.fetched_offset
self._next_partition_records = self._parse_fetched_data(completion)
else:
fetched_partition = self._next_partition_records.topic_partition
fetched_offset = self._next_partition_records.next_fetch_offset
records_remaining -= self._append(drained,
self._next_partition_records,
records_remaining,
update_offsets)
except Exception as e:
if not drained:
raise e
# To be thrown in the next call of this method
self._next_in_line_exception_metadata = ExceptionMetadata(fetched_partition, fetched_offset, e)
return dict(drained), bool(self._completed_fetches)

def _append(self, drained, part, max_records, update_offsets):
Expand Down Expand Up @@ -862,6 +885,7 @@ def _on_partition_records_drain(self, partition_records):
def close(self):
if self._next_partition_records is not None:
self._next_partition_records.drain()
self._next_in_line_exception_metadata = None

class PartitionRecords(object):
def __init__(self, fetch_offset, tp, records,
Expand All @@ -887,6 +911,7 @@ def __init__(self, fetch_offset, tp, records,
self._maybe_skip_record,
self._unpack_records(tp, records, key_deserializer, value_deserializer))
self.on_drain = on_drain
self._next_inline_exception = None

def _maybe_skip_record(self, record):
# When fetching an offset that is in the middle of a
Expand All @@ -910,12 +935,28 @@ def __bool__(self):
def drain(self):
if self.record_iterator is not None:
self.record_iterator = None
self._next_inline_exception = None
if self.metric_aggregator:
self.metric_aggregator.record(self.topic_partition, self.bytes_read, self.records_read)
self.on_drain(self)

def _maybe_raise_next_inline_exception(self):
if self._next_inline_exception:
exc, self._next_inline_exception = self._next_inline_exception, None
raise exc

def take(self, n=None):
return list(itertools.islice(self.record_iterator, 0, n))
self._maybe_raise_next_inline_exception()
records = []
try:
# Note that records.extend(iter) will extend partially when exception raised mid-stream
records.extend(itertools.islice(self.record_iterator, 0, n))
except Exception as e:
if not records:
raise e
# To be thrown in the next call of this method
self._next_inline_exception = e
return records

def _unpack_records(self, tp, records, key_deserializer, value_deserializer):
try:
Expand Down
9 changes: 5 additions & 4 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,8 @@ def position(self, partition, timeout_ms=None):
assert self._subscription.is_assigned(partition), 'Partition is not assigned'
position = self._subscription.assignment[partition].position
if position is None:
self._update_fetch_positions([partition], timeout_ms=timeout_ms)
# batch update fetch positions for any partitions without a valid position
self._update_fetch_positions(self._subscription.assigned_partitions(), timeout_ms=timeout_ms)
position = self._subscription.assignment[partition].position
return position.offset if position else None

Expand Down Expand Up @@ -1144,9 +1145,9 @@ def _update_fetch_positions(self, partitions, timeout_ms=None):
# their own offsets).
self._fetcher.reset_offsets_if_needed(partitions, timeout_ms=inner_timeout_ms())

if not self._subscription.has_all_fetch_positions():
# if we still don't have offsets for all partitions, then we should either seek
# to the last committed position or reset using the auto reset policy
if not self._subscription.has_all_fetch_positions(partitions):
# if we still don't have offsets for the given partitions, then we should either
# seek to the last committed position or reset using the auto reset policy
if (self.config['api_version'] >= (0, 8, 1) and
self.config['group_id'] is not None):
# first refresh commits for all assigned partitions
Expand Down
16 changes: 12 additions & 4 deletions kafka/consumer/subscription_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,9 +351,11 @@ def has_default_offset_reset_policy(self):
def is_offset_reset_needed(self, partition):
return self.assignment[partition].awaiting_reset

def has_all_fetch_positions(self):
for state in self.assignment.values():
if not state.has_valid_position:
def has_all_fetch_positions(self, partitions=None):
if partitions is None:
partitions = self.assigned_partitions()
for tp in partitions:
if not self.has_valid_position(tp):
return False
return True

Expand All @@ -364,6 +366,9 @@ def missing_fetch_positions(self):
missing.add(partition)
return missing

def has_valid_position(self, partition):
return partition in self.assignment and self.assignment[partition].has_valid_position

def is_assigned(self, partition):
return partition in self.assignment

Expand All @@ -387,14 +392,17 @@ def move_partition_to_end(self, partition):
state = self.assignment.pop(partition)
self.assignment[partition] = state

def position(self, partition):
return self.assignment[partition].position


class TopicPartitionState(object):
def __init__(self):
self.committed = None # last committed OffsetAndMetadata
self.has_valid_position = False # whether we have valid position
self.paused = False # whether this partition has been paused by the user
self.awaiting_reset = False # whether we are awaiting reset
self.reset_strategy = None # the reset strategy if awaitingReset is set
self.reset_strategy = None # the reset strategy if awaiting_reset is set
self._position = None # OffsetAndMetadata exposed to the user
self.highwater = None
self.drop_pending_record_batch = False
Expand Down
Loading
Loading