diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 96bf3b79b..14dc8a30d 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -43,6 +43,10 @@ "partition_data", "metric_aggregator"]) +ExceptionMetadata = collections.namedtuple("ExceptionMetadata", + ["partition", "fetched_offset", "exception"]) + + class NoOffsetForPartitionError(Errors.KafkaError): pass @@ -131,6 +135,7 @@ def __init__(self, client, subscriptions, **configs): self._isolation_level = ISOLATION_LEVEL_CONFIG[self.config['isolation_level']] self._session_handlers = {} self._nodes_with_pending_fetch_requests = set() + self._next_in_line_exception_metadata = None def send_fetches(self): """Send FetchRequests for all assigned partitions that do not already have @@ -161,11 +166,13 @@ def reset_offsets_if_needed(self, partitions, timeout_ms=None): Raises: KafkaTimeoutError if timeout_ms provided """ - inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout resetting offsets') + needs_offset_reset = set() for tp in partitions: - # TODO: If there are several offsets to reset, we could submit offset requests in parallel if self._subscriptions.is_assigned(tp) and self._subscriptions.is_offset_reset_needed(tp): - self._reset_offset(tp, timeout_ms=inner_timeout_ms()) + needs_offset_reset.add(tp) + + if needs_offset_reset: + self._reset_offsets(needs_offset_reset, timeout_ms=timeout_ms) def _clean_done_fetch_futures(self): while True: @@ -191,31 +198,28 @@ def update_fetch_positions(self, partitions, timeout_ms=None): partition and no reset policy is available KafkaTimeoutError if timeout_ms provided. """ - inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout updating fetch positions') + needs_offset_reset = set() # reset the fetch position to the committed position for tp in partitions: - if not self._subscriptions.is_assigned(tp): - log.warning("partition %s is not assigned - skipping offset" - " update", tp) - continue - elif self._subscriptions.is_fetchable(tp): - log.warning("partition %s is still fetchable -- skipping offset" - " update", tp) + if not self._subscriptions.is_assigned(tp) or self._subscriptions.has_valid_position(tp): continue if self._subscriptions.is_offset_reset_needed(tp): - self._reset_offset(tp, timeout_ms=inner_timeout_ms()) + needs_offset_reset.add(tp) elif self._subscriptions.assignment[tp].committed is None: # there's no committed position, so we need to reset with the # default strategy self._subscriptions.need_offset_reset(tp) - self._reset_offset(tp, timeout_ms=inner_timeout_ms()) + needs_offset_reset.add(tp) else: committed = self._subscriptions.assignment[tp].committed.offset log.debug("Resetting offset for partition %s to the committed" " offset %s", tp, committed) self._subscriptions.seek(tp, committed) + if needs_offset_reset: + self._reset_offsets(needs_offset_reset, timeout_ms=timeout_ms) + def get_offsets_by_times(self, timestamps, timeout_ms): offsets = self._retrieve_offsets(timestamps, timeout_ms) for tp in timestamps: @@ -238,37 +242,36 @@ def beginning_or_end_offset(self, partitions, timestamp, timeout_ms): offsets[tp] = offsets[tp].offset return offsets - def _reset_offset(self, partition, timeout_ms=None): - """Reset offsets for the given partition using the offset reset strategy. + def _reset_offsets(self, partitions, timeout_ms=None): + """Reset offsets for the given partitions using the offset reset strategy. Arguments: - partition (TopicPartition): the partition that needs reset offset + partitions ([TopicPartition]): the partitions that need offsets reset Raises: NoOffsetForPartitionError: if no offset reset strategy is defined KafkaTimeoutError if timeout_ms provided """ - timestamp = self._subscriptions.assignment[partition].reset_strategy - if timestamp is OffsetResetStrategy.EARLIEST: - strategy = 'earliest' - elif timestamp is OffsetResetStrategy.LATEST: - strategy = 'latest' - else: - raise NoOffsetForPartitionError(partition) + offset_resets = dict() + for tp in partitions: + ts = self._subscriptions.assignment[tp].reset_strategy + if not ts: + raise NoOffsetForPartitionError(tp) + offset_resets[tp] = ts - log.debug("Resetting offset for partition %s to offset %s.", - partition, strategy) - offsets = self._retrieve_offsets({partition: timestamp}, timeout_ms=timeout_ms) + offsets = self._retrieve_offsets(offset_resets, timeout_ms=timeout_ms) - if partition in offsets: - offset = offsets[partition].offset + for partition in partitions: + if partition not in offsets: + raise NoOffsetForPartitionError(partition) # we might lose the assignment while fetching the offset, # so check it is still active if self._subscriptions.is_assigned(partition): + offset = offsets[partition].offset + log.debug("Resetting offset for partition %s to offset %s.", + partition, offset) self._subscriptions.seek(partition, offset) - else: - log.debug("Could not find offset for partition %s since it is probably deleted" % (partition,)) def _retrieve_offsets(self, timestamps, timeout_ms=None): """Fetch offset for each partition passed in ``timestamps`` map. @@ -358,20 +361,40 @@ def fetched_records(self, max_records=None, update_offsets=True): max_records = self.config['max_poll_records'] assert max_records > 0 + if self._next_in_line_exception_metadata is not None: + exc_meta = self._next_in_line_exception_metadata + self._next_in_line_exception_metadata = None + tp = exc_meta.partition + if self._subscriptions.is_fetchable(tp) and self._subscriptions.position(tp).offset == exc_meta.fetched_offset: + raise exc_meta.exception + drained = collections.defaultdict(list) records_remaining = max_records + # Needed to construct ExceptionMetadata if any exception is found when processing completed_fetch + fetched_partition = None + fetched_offset = -1 - while records_remaining > 0: - if not self._next_partition_records: - if not self._completed_fetches: - break - completion = self._completed_fetches.popleft() - self._next_partition_records = self._parse_fetched_data(completion) - else: - records_remaining -= self._append(drained, - self._next_partition_records, - records_remaining, - update_offsets) + try: + while records_remaining > 0: + if not self._next_partition_records: + if not self._completed_fetches: + break + completion = self._completed_fetches.popleft() + fetched_partition = completion.topic_partition + fetched_offset = completion.fetched_offset + self._next_partition_records = self._parse_fetched_data(completion) + else: + fetched_partition = self._next_partition_records.topic_partition + fetched_offset = self._next_partition_records.next_fetch_offset + records_remaining -= self._append(drained, + self._next_partition_records, + records_remaining, + update_offsets) + except Exception as e: + if not drained: + raise e + # To be thrown in the next call of this method + self._next_in_line_exception_metadata = ExceptionMetadata(fetched_partition, fetched_offset, e) return dict(drained), bool(self._completed_fetches) def _append(self, drained, part, max_records, update_offsets): @@ -862,6 +885,7 @@ def _on_partition_records_drain(self, partition_records): def close(self): if self._next_partition_records is not None: self._next_partition_records.drain() + self._next_in_line_exception_metadata = None class PartitionRecords(object): def __init__(self, fetch_offset, tp, records, @@ -887,6 +911,7 @@ def __init__(self, fetch_offset, tp, records, self._maybe_skip_record, self._unpack_records(tp, records, key_deserializer, value_deserializer)) self.on_drain = on_drain + self._next_inline_exception = None def _maybe_skip_record(self, record): # When fetching an offset that is in the middle of a @@ -910,12 +935,28 @@ def __bool__(self): def drain(self): if self.record_iterator is not None: self.record_iterator = None + self._next_inline_exception = None if self.metric_aggregator: self.metric_aggregator.record(self.topic_partition, self.bytes_read, self.records_read) self.on_drain(self) + def _maybe_raise_next_inline_exception(self): + if self._next_inline_exception: + exc, self._next_inline_exception = self._next_inline_exception, None + raise exc + def take(self, n=None): - return list(itertools.islice(self.record_iterator, 0, n)) + self._maybe_raise_next_inline_exception() + records = [] + try: + # Note that records.extend(iter) will extend partially when exception raised mid-stream + records.extend(itertools.islice(self.record_iterator, 0, n)) + except Exception as e: + if not records: + raise e + # To be thrown in the next call of this method + self._next_inline_exception = e + return records def _unpack_records(self, tp, records, key_deserializer, value_deserializer): try: diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 7fff6e795..078f49c39 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -760,7 +760,8 @@ def position(self, partition, timeout_ms=None): assert self._subscription.is_assigned(partition), 'Partition is not assigned' position = self._subscription.assignment[partition].position if position is None: - self._update_fetch_positions([partition], timeout_ms=timeout_ms) + # batch update fetch positions for any partitions without a valid position + self._update_fetch_positions(self._subscription.assigned_partitions(), timeout_ms=timeout_ms) position = self._subscription.assignment[partition].position return position.offset if position else None @@ -1144,9 +1145,9 @@ def _update_fetch_positions(self, partitions, timeout_ms=None): # their own offsets). self._fetcher.reset_offsets_if_needed(partitions, timeout_ms=inner_timeout_ms()) - if not self._subscription.has_all_fetch_positions(): - # if we still don't have offsets for all partitions, then we should either seek - # to the last committed position or reset using the auto reset policy + if not self._subscription.has_all_fetch_positions(partitions): + # if we still don't have offsets for the given partitions, then we should either + # seek to the last committed position or reset using the auto reset policy if (self.config['api_version'] >= (0, 8, 1) and self.config['group_id'] is not None): # first refresh commits for all assigned partitions diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 0ff2ae91b..0f479a55b 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -351,9 +351,11 @@ def has_default_offset_reset_policy(self): def is_offset_reset_needed(self, partition): return self.assignment[partition].awaiting_reset - def has_all_fetch_positions(self): - for state in self.assignment.values(): - if not state.has_valid_position: + def has_all_fetch_positions(self, partitions=None): + if partitions is None: + partitions = self.assigned_partitions() + for tp in partitions: + if not self.has_valid_position(tp): return False return True @@ -364,6 +366,9 @@ def missing_fetch_positions(self): missing.add(partition) return missing + def has_valid_position(self, partition): + return partition in self.assignment and self.assignment[partition].has_valid_position + def is_assigned(self, partition): return partition in self.assignment @@ -387,6 +392,9 @@ def move_partition_to_end(self, partition): state = self.assignment.pop(partition) self.assignment[partition] = state + def position(self, partition): + return self.assignment[partition].position + class TopicPartitionState(object): def __init__(self): @@ -394,7 +402,7 @@ def __init__(self): self.has_valid_position = False # whether we have valid position self.paused = False # whether this partition has been paused by the user self.awaiting_reset = False # whether we are awaiting reset - self.reset_strategy = None # the reset strategy if awaitingReset is set + self.reset_strategy = None # the reset strategy if awaiting_reset is set self._position = None # OffsetAndMetadata exposed to the user self.highwater = None self.drop_pending_record_batch = False diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 80bd0e42d..cc4789e6d 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -8,7 +8,6 @@ import itertools import time -from kafka.client_async import KafkaClient from kafka.consumer.fetcher import ( CompletedFetch, ConsumerRecord, Fetcher ) @@ -17,7 +16,7 @@ from kafka.future import Future from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS from kafka.protocol.fetch import FetchRequest, FetchResponse -from kafka.protocol.list_offsets import ListOffsetsResponse +from kafka.protocol.list_offsets import ListOffsetsResponse, OffsetResetStrategy from kafka.errors import ( StaleMetadata, LeaderNotAvailableError, NotLeaderForPartitionError, UnknownTopicOrPartitionError, OffsetOutOfRangeError @@ -109,38 +108,38 @@ def test_create_fetch_requests(fetcher, mocker, api_version, fetch_version): def test_update_fetch_positions(fetcher, topic, mocker): - mocker.patch.object(fetcher, '_reset_offset') + mocker.patch.object(fetcher, '_reset_offsets') partition = TopicPartition(topic, 0) # unassigned partition fetcher.update_fetch_positions([TopicPartition('fizzbuzz', 0)]) - assert fetcher._reset_offset.call_count == 0 + assert fetcher._reset_offsets.call_count == 0 # fetchable partition (has offset, not paused) fetcher.update_fetch_positions([partition]) - assert fetcher._reset_offset.call_count == 0 + assert fetcher._reset_offsets.call_count == 0 # partition needs reset, no committed offset fetcher._subscriptions.need_offset_reset(partition) fetcher._subscriptions.assignment[partition].awaiting_reset = False fetcher.update_fetch_positions([partition]) - fetcher._reset_offset.assert_called_with(partition, timeout_ms=None) + fetcher._reset_offsets.assert_called_with(set([partition]), timeout_ms=None) assert fetcher._subscriptions.assignment[partition].awaiting_reset is True fetcher.update_fetch_positions([partition]) - fetcher._reset_offset.assert_called_with(partition, timeout_ms=None) + fetcher._reset_offsets.assert_called_with(set([partition]), timeout_ms=None) # partition needs reset, has committed offset - fetcher._reset_offset.reset_mock() + fetcher._reset_offsets.reset_mock() fetcher._subscriptions.need_offset_reset(partition) fetcher._subscriptions.assignment[partition].awaiting_reset = False fetcher._subscriptions.assignment[partition].committed = OffsetAndMetadata(123, '', -1) mocker.patch.object(fetcher._subscriptions, 'seek') fetcher.update_fetch_positions([partition]) - assert fetcher._reset_offset.call_count == 0 + assert fetcher._reset_offsets.call_count == 0 fetcher._subscriptions.seek.assert_called_with(partition, 123) -def test__reset_offset(fetcher, mocker): +def test__reset_offsets(fetcher, mocker): tp = TopicPartition("topic", 0) fetcher._subscriptions.subscribe(topics=["topic"]) fetcher._subscriptions.assign_from_subscribed([tp]) @@ -148,7 +147,7 @@ def test__reset_offset(fetcher, mocker): mocked = mocker.patch.object(fetcher, '_retrieve_offsets') mocked.return_value = {tp: OffsetAndTimestamp(1001, None, -1)} - fetcher._reset_offset(tp) + fetcher._reset_offsets([tp]) assert not fetcher._subscriptions.assignment[tp].awaiting_reset assert fetcher._subscriptions.assignment[tp].position.offset == 1001 @@ -459,6 +458,23 @@ def test__unpack_records(mocker): assert records[2].offset == 2 +def test__unpack_records_corrupted(mocker): + tp = TopicPartition('foo', 0) + messages = [ + (None, b"a", None), + (None, b"b", None), + (None, b"c", None), + ] + memory_records = MemoryRecords(_build_record_batch(messages)) + from kafka.record.default_records import DefaultRecord + mocker.patch.object(DefaultRecord, 'validate_crc', side_effect=[True, True, False]) + part_records = Fetcher.PartitionRecords(0, tp, memory_records) + records = part_records.take(10) + assert len(records) == 2 + with pytest.raises(Errors.CorruptRecordError): + part_records.take(10) + + def test__parse_fetched_data(fetcher, topic, mocker): fetcher.config['check_crcs'] = False tp = TopicPartition(topic, 0) @@ -610,3 +626,132 @@ def test_partition_records_compacted_offset(mocker): msgs = records.take() assert len(msgs) == batch_end - fetch_offset - 1 assert msgs[0].offset == fetch_offset + 1 + + +def test_update_fetch_positions_paused(subscription_state, client, mocker): + fetcher = Fetcher(client, subscription_state) + tp = TopicPartition('foo', 0) + subscription_state.assign_from_user([tp]) + subscription_state.pause(tp) # paused partition does not have a valid position + subscription_state.need_offset_reset(tp, OffsetResetStrategy.LATEST) + + mocker.patch.object(fetcher, '_retrieve_offsets', return_value={tp: OffsetAndTimestamp(10, 1, -1)}) + fetcher.update_fetch_positions([tp]) + + assert not subscription_state.is_offset_reset_needed(tp) + assert not subscription_state.is_fetchable(tp) # because tp is paused + assert subscription_state.has_valid_position(tp) + assert subscription_state.position(tp) == OffsetAndMetadata(10, '', -1) + + +def test_update_fetch_positions_paused_without_valid(subscription_state, client, mocker): + fetcher = Fetcher(client, subscription_state) + tp = TopicPartition('foo', 0) + subscription_state.assign_from_user([tp]) + subscription_state.pause(tp) # paused partition does not have a valid position + + mocker.patch.object(fetcher, '_retrieve_offsets', return_value={tp: OffsetAndTimestamp(0, 1, -1)}) + fetcher.update_fetch_positions([tp]) + + assert not subscription_state.is_offset_reset_needed(tp) + assert not subscription_state.is_fetchable(tp) # because tp is paused + assert subscription_state.has_valid_position(tp) + assert subscription_state.position(tp) == OffsetAndMetadata(0, '', -1) + + +def test_update_fetch_positions_paused_with_valid(subscription_state, client, mocker): + fetcher = Fetcher(client, subscription_state) + tp = TopicPartition('foo', 0) + subscription_state.assign_from_user([tp]) + subscription_state.assignment[tp].committed = OffsetAndMetadata(0, '', -1) + subscription_state.seek(tp, 10) + subscription_state.pause(tp) # paused partition already has a valid position + + mocker.patch.object(fetcher, '_retrieve_offsets', return_value={tp: OffsetAndTimestamp(0, 1, -1)}) + fetcher.update_fetch_positions([tp]) + + assert not subscription_state.is_offset_reset_needed(tp) + assert not subscription_state.is_fetchable(tp) # because tp is paused + assert subscription_state.has_valid_position(tp) + assert subscription_state.position(tp) == OffsetAndMetadata(10, '', -1) + + +def test_fetch_position_after_exception(client, mocker): + subscription_state = SubscriptionState(offset_reset_strategy='NONE') + fetcher = Fetcher(client, subscription_state) + + tp0 = TopicPartition('foo', 0) + tp1 = TopicPartition('foo', 1) + # verify the advancement in the next fetch offset equals to the number of fetched records when + # some fetched partitions cause Exception. This ensures that consumer won't lose record upon exception + subscription_state.assign_from_user([tp0, tp1]) + subscription_state.seek(tp0, 1) + subscription_state.seek(tp1, 1) + + assert len(fetcher._fetchable_partitions()) == 2 + + empty_records = _build_record_batch([], offset=1) + three_records = _build_record_batch([(None, b'msg', None) for _ in range(3)], offset=1) + fetcher._completed_fetches.append( + CompletedFetch(tp1, 1, 0, [0, 100, three_records], mocker.MagicMock())) + fetcher._completed_fetches.append( + CompletedFetch(tp0, 1, 0, [1, 100, empty_records], mocker.MagicMock())) + records, partial = fetcher.fetched_records() + + assert len(records) == 1 + assert tp1 in records + assert tp0 not in records + assert len(records[tp1]) == 3 + assert subscription_state.position(tp1).offset == 4 + + exceptions = [] + try: + records, partial = fetcher.fetched_records() + except Errors.OffsetOutOfRangeError as e: + exceptions.append(e) + + assert len(exceptions) == 1 + assert isinstance(exceptions[0], Errors.OffsetOutOfRangeError) + assert exceptions[0].args == ({tp0: 1},) + + +def test_seek_before_exception(client, mocker): + subscription_state = SubscriptionState(offset_reset_strategy='NONE') + fetcher = Fetcher(client, subscription_state, max_poll_records=2) + + tp0 = TopicPartition('foo', 0) + tp1 = TopicPartition('foo', 1) + subscription_state.assign_from_user([tp0]) + subscription_state.seek(tp0, 1) + + assert len(fetcher._fetchable_partitions()) == 1 + + three_records = _build_record_batch([(None, b'msg', None) for _ in range(3)], offset=1) + fetcher._completed_fetches.append( + CompletedFetch(tp0, 1, 0, [0, 100, three_records], mocker.MagicMock())) + records, partial = fetcher.fetched_records() + + assert len(records) == 1 + assert tp0 in records + assert len(records[tp0]) == 2 + assert subscription_state.position(tp0).offset == 3 + + subscription_state.assign_from_user([tp0, tp1]) + subscription_state.seek(tp1, 1) + + assert len(fetcher._fetchable_partitions()) == 1 + + empty_records = _build_record_batch([], offset=1) + fetcher._completed_fetches.append( + CompletedFetch(tp1, 1, 0, [1, 100, empty_records], mocker.MagicMock())) + records, partial = fetcher.fetched_records() + + assert len(records) == 1 + assert tp0 in records + assert len(records[tp0]) == 1 + assert subscription_state.position(tp0).offset == 4 + + subscription_state.seek(tp1, 10) + # Should not throw OffsetOutOfRangeError after the seek + records, partial = fetcher.fetched_records() + assert len(records) == 0