diff --git a/kafka/producer/future.py b/kafka/producer/future.py index 07fa4adb4..f67db0979 100644 --- a/kafka/producer/future.py +++ b/kafka/producer/future.py @@ -38,7 +38,7 @@ def __init__(self, produce_future, relative_offset, timestamp_ms, checksum, seri produce_future.add_errback(self.failure) def _produce_success(self, offset_and_timestamp): - offset, produce_timestamp_ms, log_start_offset = offset_and_timestamp + offset, produce_timestamp_ms = offset_and_timestamp # Unpacking from args tuple is minor speed optimization (relative_offset, timestamp_ms, checksum, @@ -51,7 +51,7 @@ def _produce_success(self, offset_and_timestamp): if offset != -1 and relative_offset is not None: offset += relative_offset tp = self._produce_future.topic_partition - metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms, log_start_offset, + metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size) self.success(metadata) @@ -67,5 +67,5 @@ def get(self, timeout=None): RecordMetadata = collections.namedtuple( - 'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp', 'log_start_offset', + 'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp', 'checksum', 'serialized_key_size', 'serialized_value_size', 'serialized_header_size']) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 9bb958138..6861ec93a 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -37,8 +37,8 @@ class KafkaProducer(object): The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances. - The producer consists of a pool of buffer space that holds records that - haven't yet been transmitted to the server as well as a background I/O + The producer consists of a RecordAccumulator which holds records that + haven't yet been transmitted to the server, and a Sender background I/O thread that is responsible for turning these records into requests and transmitting them to the cluster. @@ -77,6 +77,47 @@ class KafkaProducer(object): The key_serializer and value_serializer instruct how to turn the key and value objects the user provides into bytes. + From Kafka 0.11, the KafkaProducer supports two additional modes: + the idempotent producer and the transactional producer. + The idempotent producer strengthens Kafka's delivery semantics from + at least once to exactly once delivery. In particular, producer retries + will no longer introduce duplicates. The transactional producer allows an + application to send messages to multiple partitions (and topics!) + atomically. + + To enable idempotence, the `enable_idempotence` configuration must be set + to True. If set, the `retries` config will default to `float('inf')` and + the `acks` config will default to 'all'. There are no API changes for the + idempotent producer, so existing applications will not need to be modified + to take advantage of this feature. + + To take advantage of the idempotent producer, it is imperative to avoid + application level re-sends since these cannot be de-duplicated. As such, if + an application enables idempotence, it is recommended to leave the + `retries` config unset, as it will be defaulted to `float('inf')`. + Additionally, if a :meth:`~kafka.KafkaProducer.send` returns an error even + with infinite retries (for instance if the message expires in the buffer + before being sent), then it is recommended to shut down the producer and + check the contents of the last produced message to ensure that it is not + duplicated. Finally, the producer can only guarantee idempotence for + messages sent within a single session. + + To use the transactional producer and the attendant APIs, you must set the + `transactional_id` configuration property. If the `transactional_id` is + set, idempotence is automatically enabled along with the producer configs + which idempotence depends on. Further, topics which are included in + transactions should be configured for durability. In particular, the + `replication.factor` should be at least `3`, and the `min.insync.replicas` + for these topics should be set to 2. Finally, in order for transactional + guarantees to be realized from end-to-end, the consumers must be + configured to read only committed messages as well. + + The purpose of the `transactional_id` is to enable transaction recovery + across multiple sessions of a single producer instance. It would typically + be derived from the shard identifier in a partitioned, stateful, + application. As such, it should be unique to each producer instance running + within a partitioned application. + Keyword Arguments: bootstrap_servers: 'host[:port]' string (or list of 'host[:port]' strings) that the producer should contact to bootstrap initial @@ -106,7 +147,16 @@ class KafkaProducer(object): defaults to be suitable. If the values are set to something incompatible with the idempotent producer, a KafkaConfigurationError will be raised. - + delivery_timeout_ms (float): An upper bound on the time to report success + or failure after producer.send() returns. This limits the total time + that a record will be delayed prior to sending, the time to await + acknowledgement from the broker (if expected), and the time allowed + for retriable send failures. The producer may report failure to send + a record earlier than this config if either an unrecoverable error is + encountered, the retries have been exhausted, or the record is added + to a batch which reached an earlier delivery expiration deadline. + The value of this config should be greater than or equal to the + sum of (request_timeout_ms + linger_ms). Default: 120000. acks (0, 1, 'all'): The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The @@ -134,7 +184,7 @@ class KafkaProducer(object): Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression). Default: None. - retries (int): Setting a value greater than zero will cause the client + retries (numeric): Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries @@ -142,8 +192,12 @@ class KafkaProducer(object): potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may - appear first. - Default: 0. + appear first. Note additionally that produce requests will be + failed before the number of retries has been exhausted if the timeout + configured by delivery_timeout_ms expires first before successful + acknowledgement. Users should generally prefer to leave this config + unset and instead use delivery_timeout_ms to control retry behavior. + Default: float('inf') (infinite) batch_size (int): Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent. A small batch size will make batching less common and may reduce @@ -320,10 +374,11 @@ class KafkaProducer(object): 'enable_idempotence': False, 'transactional_id': None, 'transaction_timeout_ms': 60000, + 'delivery_timeout_ms': 120000, 'acks': 1, 'bootstrap_topics_filter': set(), 'compression_type': None, - 'retries': 0, + 'retries': float('inf'), 'batch_size': 16384, 'linger_ms': 0, 'partitioner': DefaultPartitioner(), @@ -471,10 +526,7 @@ def __init__(self, **configs): else: log.info("%s: Instantiated an idempotent producer.", str(self)) - if 'retries' not in user_provided_configs: - log.info("%s: Overriding the default 'retries' config to 3 since the idempotent producer is enabled.", str(self)) - self.config['retries'] = 3 - elif self.config['retries'] == 0: + if self.config['retries'] == 0: raise Errors.KafkaConfigurationError("Must set 'retries' to non-zero when using the idempotent producer.") if 'max_in_flight_requests_per_connection' not in user_provided_configs: diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 83802ef96..1c250ee40 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -1,4 +1,4 @@ -from __future__ import absolute_import +from __future__ import absolute_import, division import collections import copy @@ -6,6 +6,13 @@ import threading import time +try: + # enum in stdlib as of py3.4 + from enum import IntEnum # pylint: disable=import-error +except ImportError: + # vendored backport module + from kafka.vendor.enum34 import IntEnum + import kafka.errors as Errors from kafka.producer.future import FutureRecordMetadata, FutureProduceResult from kafka.record.memory_records import MemoryRecordsBuilder @@ -34,10 +41,16 @@ def get(self): return self._val +class FinalState(IntEnum): + ABORTED = 0 + FAILED = 1 + SUCCEEDED = 2 + + class ProducerBatch(object): def __init__(self, tp, records, now=None): - self.max_record_size = 0 now = time.time() if now is None else now + self.max_record_size = 0 self.created = now self.drained = None self.attempts = 0 @@ -47,6 +60,11 @@ def __init__(self, tp, records, now=None): self.topic_partition = tp self.produce_future = FutureProduceResult(tp) self._retry = False + self._final_state = None + + @property + def final_state(self): + return self._final_state @property def record_count(self): @@ -72,67 +90,86 @@ def try_append(self, timestamp_ms, key, value, headers, now=None): now = time.time() if now is None else now self.max_record_size = max(self.max_record_size, metadata.size) self.last_append = now - future = FutureRecordMetadata(self.produce_future, metadata.offset, - metadata.timestamp, metadata.crc, - len(key) if key is not None else -1, - len(value) if value is not None else -1, - sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1) + future = FutureRecordMetadata( + self.produce_future, + metadata.offset, + metadata.timestamp, + metadata.crc, + len(key) if key is not None else -1, + len(value) if value is not None else -1, + sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1) return future - def done(self, base_offset=None, timestamp_ms=None, exception=None, log_start_offset=None): + def abort(self, exception): + """Abort the batch and complete the future and callbacks.""" + if self._final_state is not None: + raise Errors.IllegalStateError("Batch has already been completed in final state: %s" % self._final_state) + self._final_state = FinalState.ABORTED + + log.debug("Aborting batch for partition %s: %s", self.topic_partition, exception) + self._complete_future(-1, -1, exception) + + def done(self, base_offset=None, timestamp_ms=None, exception=None): + """ + Finalize the state of a batch. Final state, once set, is immutable. This function may be called + once or twice on a batch. It may be called twice if + 1. An inflight batch expires before a response from the broker is received. The batch's final + state is set to FAILED. But it could succeed on the broker and second time around batch.done() may + try to set SUCCEEDED final state. + + 2. If a transaction abortion happens or if the producer is closed forcefully, the final state is + ABORTED but again it could succeed if broker responds with a success. + + Attempted transitions from [FAILED | ABORTED] --> SUCCEEDED are logged. + Attempted transitions from one failure state to the same or a different failed state are ignored. + Attempted transitions from SUCCEEDED to the same or a failed state throw an exception. + """ + final_state = FinalState.SUCCEEDED if exception is None else FinalState.FAILED + if self._final_state is None: + self._final_state = final_state + if final_state is FinalState.SUCCEEDED: + log.debug("Successfully produced messages to %s with base offset %s", self.topic_partition, base_offset) + else: + log.warning("Failed to produce messages to topic-partition %s with base offset %s: %s", + self.topic_partition, base_offset, exception) + self._complete_future(base_offset, timestamp_ms, exception) + return True + + elif self._final_state is not FinalState.SUCCEEDED: + if final_state is FinalState.SUCCEEDED: + # Log if a previously unsuccessful batch succeeded later on. + log.debug("ProduceResponse returned %s for %s after batch with base offset %s had already been %s.", + final_state, self.topic_partition, base_offset, self._final_state) + else: + # FAILED --> FAILED and ABORTED --> FAILED transitions are ignored. + log.debug("Ignored state transition %s -> %s for %s batch with base offset %s", + self._final_state, final_state, self.topic_partition, base_offset) + else: + # A SUCCESSFUL batch must not attempt another state change. + raise Errors.IllegalStateError("A %s batch must not attempt another state change to %s" % (self._final_state, final_state)) + return False + + def _complete_future(self, base_offset, timestamp_ms, exception): if self.produce_future.is_done: - log.warning('Batch is already closed -- ignoring batch.done()') - return + raise Errors.IllegalStateError('Batch is already closed!') elif exception is None: - log.debug("Produced messages to topic-partition %s with base offset" - " %s log start offset %s.", self.topic_partition, base_offset, - log_start_offset) # trace - self.produce_future.success((base_offset, timestamp_ms, log_start_offset)) + self.produce_future.success((base_offset, timestamp_ms)) else: - log.warning("Failed to produce messages to topic-partition %s with base offset" - " %s log start offset %s and error %s.", self.topic_partition, base_offset, - log_start_offset, exception) # trace self.produce_future.failure(exception) - def maybe_expire(self, request_timeout_ms, retry_backoff_ms, linger_ms, is_full, now=None): - """Expire batches if metadata is not available - - A batch whose metadata is not available should be expired if one - of the following is true: - - * the batch is not in retry AND request timeout has elapsed after - it is ready (full or linger.ms has reached). - - * the batch is in retry AND request timeout has elapsed after the - backoff period ended. - """ + def has_reached_delivery_timeout(self, delivery_timeout_ms, now=None): now = time.time() if now is None else now - since_append = now - self.last_append - since_ready = now - (self.created + linger_ms / 1000.0) - since_backoff = now - (self.last_attempt + retry_backoff_ms / 1000.0) - timeout = request_timeout_ms / 1000.0 - - error = None - if not self.in_retry() and is_full and timeout < since_append: - error = "%d seconds have passed since last append" % (since_append,) - elif not self.in_retry() and timeout < since_ready: - error = "%d seconds have passed since batch creation plus linger time" % (since_ready,) - elif self.in_retry() and timeout < since_backoff: - error = "%d seconds have passed since last attempt plus backoff time" % (since_backoff,) - - if error: - self.records.close() - self.done(base_offset=-1, exception=Errors.KafkaTimeoutError( - "Batch for %s containing %s record(s) expired: %s" % ( - self.topic_partition, self.records.next_offset(), error))) - return True - return False + return delivery_timeout_ms / 1000 <= now - self.created def in_retry(self): return self._retry - def set_retry(self): + def retry(self, now=None): + now = time.time() if now is None else now self._retry = True + self.attempts += 1 + self.last_attempt = now + self.last_append = now @property def is_done(self): @@ -177,9 +214,11 @@ class RecordAccumulator(object): 'batch_size': 16384, 'compression_attrs': 0, 'linger_ms': 0, + 'request_timeout_ms': 30000, + 'delivery_timeout_ms': 120000, 'retry_backoff_ms': 100, 'transaction_manager': None, - 'message_version': 0, + 'message_version': 2, } def __init__(self, **configs): @@ -199,8 +238,27 @@ def __init__(self, **configs): # so we don't need to protect them w/ locking. self.muted = set() self._drain_index = 0 + self._next_batch_expiry_time_ms = float('inf') - def append(self, tp, timestamp_ms, key, value, headers): + if self.config['delivery_timeout_ms'] < self.config['linger_ms'] + self.config['request_timeout_ms']: + raise Errors.KafkaConfigurationError("Must set delivery_timeout_ms higher than linger_ms + request_timeout_ms") + + @property + def delivery_timeout_ms(self): + return self.config['delivery_timeout_ms'] + + @property + def next_expiry_time_ms(self): + return self._next_batch_expiry_time_ms + + def _tp_lock(self, tp): + if tp not in self._tp_locks: + with self._tp_locks[None]: + if tp not in self._tp_locks: + self._tp_locks[tp] = threading.Lock() + return self._tp_locks[tp] + + def append(self, tp, timestamp_ms, key, value, headers, now=None): """Add a record to the accumulator, return the append result. The append result will contain the future metadata, and flag for @@ -219,33 +277,29 @@ def append(self, tp, timestamp_ms, key, value, headers): """ assert isinstance(tp, TopicPartition), 'not TopicPartition' assert not self._closed, 'RecordAccumulator is closed' + now = time.time() if now is None else now # We keep track of the number of appending thread to make sure we do # not miss batches in abortIncompleteBatches(). self._appends_in_progress.increment() try: - if tp not in self._tp_locks: - with self._tp_locks[None]: - if tp not in self._tp_locks: - self._tp_locks[tp] = threading.Lock() - - with self._tp_locks[tp]: + with self._tp_lock(tp): # check if we have an in-progress batch dq = self._batches[tp] if dq: last = dq[-1] - future = last.try_append(timestamp_ms, key, value, headers) + future = last.try_append(timestamp_ms, key, value, headers, now=now) if future is not None: batch_is_full = len(dq) > 1 or last.records.is_full() return future, batch_is_full, False - with self._tp_locks[tp]: + with self._tp_lock(tp): # Need to check if producer is closed again after grabbing the # dequeue lock. assert not self._closed, 'RecordAccumulator is closed' if dq: last = dq[-1] - future = last.try_append(timestamp_ms, key, value, headers) + future = last.try_append(timestamp_ms, key, value, headers, now=now) if future is not None: # Somebody else found us a batch, return the one we # waited for! Hopefully this doesn't happen often... @@ -262,8 +316,8 @@ def append(self, tp, timestamp_ms, key, value, headers): self.config['batch_size'] ) - batch = ProducerBatch(tp, records) - future = batch.try_append(timestamp_ms, key, value, headers) + batch = ProducerBatch(tp, records, now=now) + future = batch.try_append(timestamp_ms, key, value, headers, now=now) if not future: raise Exception() @@ -274,76 +328,37 @@ def append(self, tp, timestamp_ms, key, value, headers): finally: self._appends_in_progress.decrement() - def abort_expired_batches(self, request_timeout_ms, cluster): - """Abort the batches that have been sitting in RecordAccumulator for - more than the configured request_timeout due to metadata being - unavailable. - - Arguments: - request_timeout_ms (int): milliseconds to timeout - cluster (ClusterMetadata): current metadata for kafka cluster + def maybe_update_next_batch_expiry_time(self, batch): + self._next_batch_expiry_time_ms = min(self._next_batch_expiry_time_ms, batch.created * 1000 + self.delivery_timeout_ms) - Returns: - list of ProducerBatch that were expired - """ + def expired_batches(self, now=None): + """Get a list of batches which have been sitting in the accumulator too long and need to be expired.""" expired_batches = [] - to_remove = [] - count = 0 for tp in list(self._batches.keys()): - assert tp in self._tp_locks, 'TopicPartition not in locks dict' - - # We only check if the batch should be expired if the partition - # does not have a batch in flight. This is to avoid the later - # batches get expired when an earlier batch is still in progress. - # This protection only takes effect when user sets - # max.in.flight.request.per.connection=1. Otherwise the expiration - # order is not guranteed. - if tp in self.muted: - continue - - with self._tp_locks[tp]: + with self._tp_lock(tp): # iterate over the batches and expire them if they have stayed # in accumulator for more than request_timeout_ms dq = self._batches[tp] - for batch in dq: - is_full = bool(bool(batch != dq[-1]) or batch.records.is_full()) - # check if the batch is expired - if batch.maybe_expire(request_timeout_ms, - self.config['retry_backoff_ms'], - self.config['linger_ms'], - is_full): + while dq: + batch = dq[0] + if batch.has_reached_delivery_timeout(self.delivery_timeout_ms, now=now): + dq.popleft() + batch.records.close() expired_batches.append(batch) - to_remove.append(batch) - count += 1 - self.deallocate(batch) else: # Stop at the first batch that has not expired. + self.maybe_update_next_batch_expiry_time(batch) break - - # Python does not allow us to mutate the dq during iteration - # Assuming expired batches are infrequent, this is better than - # creating a new copy of the deque for iteration on every loop - if to_remove: - for batch in to_remove: - dq.remove(batch) - to_remove = [] - - if expired_batches: - log.warning("Expired %d batches in accumulator", count) # trace - return expired_batches def reenqueue(self, batch, now=None): - """Re-enqueue the given record batch in the accumulator to retry.""" - now = time.time() if now is None else now - batch.attempts += 1 - batch.last_attempt = now - batch.last_append = now - batch.set_retry() - assert batch.topic_partition in self._tp_locks, 'TopicPartition not in locks dict' - assert batch.topic_partition in self._batches, 'TopicPartition not in batches' - dq = self._batches[batch.topic_partition] - with self._tp_locks[batch.topic_partition]: + """ + Re-enqueue the given record batch in the accumulator. In Sender._complete_batch method, we check + whether the batch has reached delivery_timeout_ms or not. Hence we do not do the delivery timeout check here. + """ + batch.retry(now=now) + with self._tp_lock(batch.topic_partition): + dq = self._batches[batch.topic_partition] dq.appendleft(batch) def ready(self, cluster, now=None): @@ -396,15 +411,15 @@ def ready(self, cluster, now=None): elif tp in self.muted: continue - with self._tp_locks[tp]: + with self._tp_lock(tp): dq = self._batches[tp] if not dq: continue batch = dq[0] - retry_backoff = self.config['retry_backoff_ms'] / 1000.0 - linger = self.config['linger_ms'] / 1000.0 - backing_off = bool(batch.attempts > 0 and - batch.last_attempt + retry_backoff > now) + retry_backoff = self.config['retry_backoff_ms'] / 1000 + linger = self.config['linger_ms'] / 1000 + backing_off = bool(batch.attempts > 0 + and (batch.last_attempt + retry_backoff) > now) waited_time = now - batch.last_attempt time_to_wait = retry_backoff if backing_off else linger time_left = max(time_to_wait - waited_time, 0) @@ -429,12 +444,94 @@ def ready(self, cluster, now=None): def has_undrained(self): """Check whether there are any batches which haven't been drained""" for tp in list(self._batches.keys()): - with self._tp_locks[tp]: + with self._tp_lock(tp): dq = self._batches[tp] if len(dq): return True return False + def _should_stop_drain_batches_for_partition(self, first, tp): + if self._transaction_manager: + if not self._transaction_manager.is_send_to_partition_allowed(tp): + return True + if not self._transaction_manager.producer_id_and_epoch.is_valid: + # we cannot send the batch until we have refreshed the PID + log.debug("Waiting to send ready batches because transaction producer id is not valid") + return True + return False + + def drain_batches_for_one_node(self, cluster, node_id, max_size, now=None): + now = time.time() if now is None else now + size = 0 + ready = [] + partitions = list(cluster.partitions_for_broker(node_id)) + if not partitions: + return ready + # to make starvation less likely this loop doesn't start at 0 + self._drain_index %= len(partitions) + start = None + while start != self._drain_index: + tp = partitions[self._drain_index] + if start is None: + start = self._drain_index + self._drain_index += 1 + self._drain_index %= len(partitions) + + # Only proceed if the partition has no in-flight batches. + if tp in self.muted: + continue + + if tp not in self._batches: + continue + + with self._tp_lock(tp): + dq = self._batches[tp] + if len(dq) == 0: + continue + first = dq[0] + backoff = bool(first.attempts > 0 and + first.last_attempt + self.config['retry_backoff_ms'] / 1000 > now) + # Only drain the batch if it is not during backoff + if backoff: + continue + + if (size + first.records.size_in_bytes() > max_size + and len(ready) > 0): + # there is a rare case that a single batch + # size is larger than the request size due + # to compression; in this case we will + # still eventually send this batch in a + # single request + break + else: + if self._should_stop_drain_batches_for_partition(first, tp): + break + + batch = dq.popleft() + if self._transaction_manager and not batch.in_retry(): + # If the batch is in retry, then we should not change the pid and + # sequence number, since this may introduce duplicates. In particular, + # the previous attempt may actually have been accepted, and if we change + # the pid and sequence here, this attempt will also be accepted, causing + # a duplicate. + sequence_number = self._transaction_manager.sequence_number(batch.topic_partition) + log.debug("Dest: %s: %s producer_id=%s epoch=%s sequence=%s", + node_id, batch.topic_partition, + self._transaction_manager.producer_id_and_epoch.producer_id, + self._transaction_manager.producer_id_and_epoch.epoch, + sequence_number) + batch.records.set_producer_state( + self._transaction_manager.producer_id_and_epoch.producer_id, + self._transaction_manager.producer_id_and_epoch.epoch, + sequence_number, + self._transaction_manager.is_transactional() + ) + batch.records.close() + size += batch.records.size_in_bytes() + ready.append(batch) + batch.drained = now + return ready + def drain(self, cluster, nodes, max_size, now=None): """ Drain all the data for the given nodes and collate them into a list of @@ -456,74 +553,7 @@ def drain(self, cluster, nodes, max_size, now=None): now = time.time() if now is None else now batches = {} for node_id in nodes: - size = 0 - partitions = list(cluster.partitions_for_broker(node_id)) - ready = [] - # to make starvation less likely this loop doesn't start at 0 - self._drain_index %= len(partitions) - start = self._drain_index - while True: - tp = partitions[self._drain_index] - if tp in self._batches and tp not in self.muted: - with self._tp_locks[tp]: - dq = self._batches[tp] - if dq: - first = dq[0] - backoff = ( - bool(first.attempts > 0) and - bool(first.last_attempt + - self.config['retry_backoff_ms'] / 1000.0 - > now) - ) - # Only drain the batch if it is not during backoff - if not backoff: - if (size + first.records.size_in_bytes() > max_size - and len(ready) > 0): - # there is a rare case that a single batch - # size is larger than the request size due - # to compression; in this case we will - # still eventually send this batch in a - # single request - break - else: - producer_id_and_epoch = None - if self._transaction_manager: - if not self._transaction_manager.is_send_to_partition_allowed(tp): - break - producer_id_and_epoch = self._transaction_manager.producer_id_and_epoch - if not producer_id_and_epoch.is_valid: - # we cannot send the batch until we have refreshed the PID - log.debug("Waiting to send ready batches because transaction producer id is not valid") - break - - batch = dq.popleft() - if producer_id_and_epoch and not batch.in_retry(): - # If the batch is in retry, then we should not change the pid and - # sequence number, since this may introduce duplicates. In particular, - # the previous attempt may actually have been accepted, and if we change - # the pid and sequence here, this attempt will also be accepted, causing - # a duplicate. - sequence_number = self._transaction_manager.sequence_number(batch.topic_partition) - log.debug("Dest: %s: %s producer_id=%s epoch=%s sequence=%s", - node_id, batch.topic_partition, producer_id_and_epoch.producer_id, producer_id_and_epoch.epoch, - sequence_number) - batch.records.set_producer_state( - producer_id_and_epoch.producer_id, - producer_id_and_epoch.epoch, - sequence_number, - self._transaction_manager.is_transactional() - ) - batch.records.close() - size += batch.records.size_in_bytes() - ready.append(batch) - batch.drained = now - - self._drain_index += 1 - self._drain_index %= len(partitions) - if start == self._drain_index: - break - - batches[node_id] = ready + batches[node_id] = self.drain_batches_for_one_node(cluster, node_id, max_size, now=now) return batches def deallocate(self, batch): @@ -588,23 +618,23 @@ def _abort_batches(self, error): for batch in self._incomplete.all(): tp = batch.topic_partition # Close the batch before aborting - with self._tp_locks[tp]: + with self._tp_lock(tp): batch.records.close() self._batches[tp].remove(batch) - batch.done(exception=error) + batch.abort(error) self.deallocate(batch) def abort_undrained_batches(self, error): for batch in self._incomplete.all(): tp = batch.topic_partition - with self._tp_locks[tp]: + with self._tp_lock(tp): aborted = False if not batch.is_done: aborted = True batch.records.close() self._batches[tp].remove(batch) if aborted: - batch.done(exception=error) + batch.abort(error) self.deallocate(batch) def close(self): diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 0e3806175..dcb3ecbdc 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -2,6 +2,7 @@ import collections import copy +import heapq import logging import threading import time @@ -29,7 +30,7 @@ class Sender(threading.Thread): DEFAULT_CONFIG = { 'max_request_size': 1048576, 'acks': 1, - 'retries': 0, + 'retries': float('inf'), 'request_timeout_ms': 30000, 'retry_backoff_ms': 100, 'metrics': None, @@ -59,6 +60,45 @@ def __init__(self, client, metadata, accumulator, **configs): else: self._sensors = None self._transaction_manager = self.config['transaction_manager'] + # A per-partition queue of batches ordered by creation time for tracking the in-flight batches + self._in_flight_batches = collections.defaultdict(list) + + def _maybe_remove_from_inflight_batches(self, batch): + try: + queue = self._in_flight_batches[batch.topic_partition] + except KeyError: + return + try: + idx = queue.index((batch.created, batch)) + except ValueError: + return + # https://stackoverflow.com/questions/10162679/python-delete-element-from-heap + queue[idx] = queue[-1] + queue.pop() + heapq.heapify(queue) + + def _get_expired_inflight_batches(self): + """Get the in-flight batches that has reached delivery timeout.""" + expired_batches = [] + to_remove = [] + for tp, queue in six.iteritems(self._in_flight_batches): + while queue: + _created_at, batch = queue[0] + if batch.has_reached_delivery_timeout(self._accumulator.delivery_timeout_ms): + heapq.heappop(queue) + if batch.final_state is None: + expired_batches.append(batch) + else: + raise Errors.IllegalStateError("%s batch created at %s gets unexpected final state %s" % (batch.topic_partition, batch.created, batch.final_state)) + else: + self._accumulator.maybe_update_next_batch_expiry_time(batch) + break + else: + # Avoid mutating in_flight_batches during iteration + to_remove.append(tp) + for tp in to_remove: + del self._in_flight_batches[tp] + return expired_batches def run(self): """The main run loop for the sender thread.""" @@ -131,7 +171,8 @@ def run_once(self): poll_timeout_ms = self._send_producer_data() self._client.poll(timeout_ms=poll_timeout_ms) - def _send_producer_data(self): + def _send_producer_data(self, now=None): + now = time.time() if now is None else now # get the list of partitions with data ready to send result = self._accumulator.ready(self._metadata) ready_nodes, next_ready_check_delay, unknown_leaders_exist = result @@ -156,14 +197,20 @@ def _send_producer_data(self): batches_by_node = self._accumulator.drain( self._metadata, ready_nodes, self.config['max_request_size']) + for batch_list in six.itervalues(batches_by_node): + for batch in batch_list: + item = (batch.created, batch) + queue = self._in_flight_batches[batch.topic_partition] + heapq.heappush(queue, item) + if self.config['guarantee_message_order']: # Mute all the partitions drained for batch_list in six.itervalues(batches_by_node): for batch in batch_list: self._accumulator.muted.add(batch.topic_partition) - expired_batches = self._accumulator.abort_expired_batches( - self.config['request_timeout_ms'], self._metadata) + expired_batches = self._accumulator.expired_batches() + expired_batches.extend(self._get_expired_inflight_batches()) if expired_batches: log.debug("%s: Expired %s batches in accumulator", str(self), len(expired_batches)) @@ -193,12 +240,18 @@ def _send_producer_data(self): requests = self._create_produce_requests(batches_by_node) # If we have any nodes that are ready to send + have sendable data, # poll with 0 timeout so this can immediately loop and try sending more - # data. Otherwise, the timeout is determined by nodes that have - # partitions with data that isn't yet sendable (e.g. lingering, backing - # off). Note that this specifically does not include nodes with + # data. Otherwise, the timeout will be the smaller value between next + # batch expiry time, and the delay time for checking data availability. + # Note that the nodes may have data that isn't yet sendable due to + # lingering, backing off, etc. This specifically does not include nodes with # sendable data that aren't ready to send since they would cause busy # looping. - poll_timeout_ms = min(next_ready_check_delay * 1000, not_ready_timeout_ms) + poll_timeout_ms = min(next_ready_check_delay * 1000, + not_ready_timeout_ms, + self._accumulator.next_expiry_time_ms - now * 1000) + if poll_timeout_ms < 0: + poll_timeout_ms = 0 + if ready_nodes: log.debug("%s: Nodes with data ready to send: %s", str(self), ready_nodes) # trace log.debug("%s: Created %d produce requests: %s", str(self), len(requests), requests) # trace @@ -349,28 +402,27 @@ def _handle_produce_response(self, node_id, send_time, batches, response): for topic, partitions in response.topics: for partition_info in partitions: - log_start_offset = None if response.API_VERSION < 2: partition, error_code, offset = partition_info ts = None elif 2 <= response.API_VERSION <= 4: partition, error_code, offset, ts = partition_info elif 5 <= response.API_VERSION <= 7: - partition, error_code, offset, ts, log_start_offset = partition_info + partition, error_code, offset, ts, _log_start_offset = partition_info else: # Currently unused / TODO: KIP-467 - partition, error_code, offset, ts, log_start_offset, _record_errors, _global_error = partition_info + partition, error_code, offset, ts, _log_start_offset, _record_errors, _global_error = partition_info tp = TopicPartition(topic, partition) error = Errors.for_code(error_code) batch = batches_by_partition[tp] - self._complete_batch(batch, error, offset, timestamp_ms=ts, log_start_offset=log_start_offset) + self._complete_batch(batch, error, offset, timestamp_ms=ts) else: # this is the acks = 0 case, just complete all requests for batch in batches: self._complete_batch(batch, None, -1) - def _fail_batch(self, batch, exception, base_offset=None, timestamp_ms=None, log_start_offset=None): + def _fail_batch(self, batch, exception, base_offset=None, timestamp_ms=None): exception = exception if type(exception) is not type else exception() if self._transaction_manager: if isinstance(exception, Errors.OutOfOrderSequenceNumberError) and \ @@ -392,12 +444,14 @@ def _fail_batch(self, batch, exception, base_offset=None, timestamp_ms=None, log elif self._transaction_manager.is_transactional(): self._transaction_manager.transition_to_abortable_error(exception) - batch.done(base_offset=base_offset, timestamp_ms=timestamp_ms, exception=exception, log_start_offset=log_start_offset) - self._accumulator.deallocate(batch) if self._sensors: self._sensors.record_errors(batch.topic_partition.topic, batch.record_count) - def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_start_offset=None): + if batch.done(base_offset=base_offset, timestamp_ms=timestamp_ms, exception=exception): + self._maybe_remove_from_inflight_batches(batch) + self._accumulator.deallocate(batch) + + def _complete_batch(self, batch, error, base_offset, timestamp_ms=None): """Complete or retry the given batch of records. Arguments: @@ -405,7 +459,6 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_star error (Exception): The error (or None if none) base_offset (int): The base offset assigned to the records if successful timestamp_ms (int, optional): The timestamp returned by the broker for this batch - log_start_offset (int, optional): The start offset of the log at the time this produce response was created """ # Standardize no-error to None if error is Errors.NoError: @@ -415,7 +468,7 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_star if self._can_retry(batch, error): # retry log.warning("%s: Got error produce response on topic-partition %s," - " retrying (%d attempts left). Error: %s", + " retrying (%s attempts left). Error: %s", str(self), batch.topic_partition, self.config['retries'] - batch.attempts - 1, error) @@ -426,6 +479,7 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_star str(self), batch.topic_partition, self._transaction_manager.sequence_number(batch.topic_partition) if self._transaction_manager else None) self._accumulator.reenqueue(batch) + self._maybe_remove_from_inflight_batches(batch) if self._sensors: self._sensors.record_retries(batch.topic_partition.topic, batch.record_count) else: @@ -433,13 +487,13 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_star str(self), batch.producer_id, batch.producer_epoch, self._transaction_manager.producer_id_and_epoch.producer_id, self._transaction_manager.producer_id_and_epoch.epoch) - self._fail_batch(batch, error, base_offset=base_offset, timestamp_ms=timestamp_ms, log_start_offset=log_start_offset) + self._fail_batch(batch, error, base_offset=base_offset, timestamp_ms=timestamp_ms) else: if error is Errors.TopicAuthorizationFailedError: error = error(batch.topic_partition.topic) # tell the user the result of their request - self._fail_batch(batch, error, base_offset=base_offset, timestamp_ms=timestamp_ms, log_start_offset=log_start_offset) + self._fail_batch(batch, error, base_offset=base_offset, timestamp_ms=timestamp_ms) if error is Errors.UnknownTopicOrPartitionError: log.warning("%s: Received unknown topic or partition error in produce request on partition %s." @@ -450,8 +504,9 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_star self._metadata.request_update() else: - batch.done(base_offset=base_offset, timestamp_ms=timestamp_ms, log_start_offset=log_start_offset) - self._accumulator.deallocate(batch) + if batch.done(base_offset=base_offset, timestamp_ms=timestamp_ms): + self._maybe_remove_from_inflight_batches(batch) + self._accumulator.deallocate(batch) if self._transaction_manager and self._transaction_manager.producer_id_and_epoch.match(batch): self._transaction_manager.increment_sequence_number(batch.topic_partition, batch.record_count) @@ -467,8 +522,10 @@ def _can_retry(self, batch, error): We can retry a send if the error is transient and the number of attempts taken is fewer than the maximum allowed """ - return (batch.attempts < self.config['retries'] - and getattr(error, 'retriable', False)) + return (not batch.has_reached_delivery_timeout(self._accumulator.delivery_timeout_ms) and + batch.attempts < self.config['retries'] and + batch.final_state is None and + getattr(error, 'retriable', False)) def _create_produce_requests(self, collated): """ diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py index 91d4a9d62..b495c76fe 100644 --- a/kafka/record/default_records.py +++ b/kafka/record/default_records.py @@ -679,14 +679,15 @@ def size(self): """ return len(self._buffer) - def size_in_bytes(self, offset, timestamp, key, value, headers): - if self._first_timestamp is not None: - timestamp_delta = timestamp - self._first_timestamp - else: - timestamp_delta = 0 + @classmethod + def header_size_in_bytes(self): + return self.HEADER_STRUCT.size + + @classmethod + def size_in_bytes(self, offset_delta, timestamp_delta, key, value, headers): size_of_body = ( 1 + # Attrs - size_of_varint(offset) + + size_of_varint(offset_delta) + size_of_varint(timestamp_delta) + self.size_of(key, value, headers) ) diff --git a/kafka/record/memory_records.py b/kafka/record/memory_records.py index 4bf3115c8..9df733059 100644 --- a/kafka/record/memory_records.py +++ b/kafka/record/memory_records.py @@ -109,6 +109,16 @@ def next_batch(self, _min_slice=MIN_SLICE, else: return DefaultRecordBatch(next_slice) + def __iter__(self): + return self + + def __next__(self): + if not self.has_next(): + raise StopIteration + return self.next_batch() + + next = __next__ + class MemoryRecordsBuilder(object): @@ -186,6 +196,10 @@ def producer_id(self): def producer_epoch(self): return self._producer_epoch + def records(self): + assert self._closed + return MemoryRecords(self._buffer) + def close(self): # This method may be called multiple times on the same batch # i.e., on retries diff --git a/test/record/test_default_records.py b/test/record/test_default_records.py index 79d3975a5..540705d50 100644 --- a/test/record/test_default_records.py +++ b/test/record/test_default_records.py @@ -57,8 +57,8 @@ def test_written_bytes_equals_size_in_bytes_v2(): producer_id=-1, producer_epoch=-1, base_sequence=-1, batch_size=999999) - size_in_bytes = builder.size_in_bytes( - 0, timestamp=9999999, key=key, value=value, headers=headers) + size_in_bytes = DefaultRecordBatchBuilder.size_in_bytes( + offset_delta=0, timestamp_delta=0, key=key, value=value, headers=headers) pos = builder.size() meta = builder.append( diff --git a/test/test_record_accumulator.py b/test/test_record_accumulator.py index 42f980712..5c7134e5c 100644 --- a/test/test_record_accumulator.py +++ b/test/test_record_accumulator.py @@ -1,16 +1,28 @@ # pylint: skip-file -from __future__ import absolute_import +from __future__ import absolute_import, division import pytest -import io -from kafka.errors import KafkaTimeoutError +from kafka.cluster import ClusterMetadata +from kafka.errors import IllegalStateError, KafkaError from kafka.producer.future import FutureRecordMetadata, RecordMetadata from kafka.producer.record_accumulator import RecordAccumulator, ProducerBatch +from kafka.record.default_records import DefaultRecordBatchBuilder from kafka.record.memory_records import MemoryRecordsBuilder from kafka.structs import TopicPartition +@pytest.fixture +def tp(): + return TopicPartition('foo', 0) + +@pytest.fixture +def cluster(tp, mocker): + metadata = ClusterMetadata() + mocker.patch.object(metadata, 'leader_for_partition', return_value=0) + mocker.patch.object(metadata, 'partitions_for_broker', return_value=[tp]) + return metadata + def test_producer_batch_producer_id(): tp = TopicPartition('foo', 0) records = MemoryRecordsBuilder( @@ -32,7 +44,7 @@ def test_producer_batch_try_append(magic): future = batch.try_append(0, b'key', b'value', []) assert isinstance(future, FutureRecordMetadata) assert not future.is_done - batch.done(base_offset=123, timestamp_ms=456, log_start_offset=0) + batch.done(base_offset=123, timestamp_ms=456) assert future.is_done # record-level checksum only provided in v0/v1 formats; payload includes magic-byte if magic == 0: @@ -44,8 +56,7 @@ def test_producer_batch_try_append(magic): expected_metadata = RecordMetadata( topic=tp[0], partition=tp[1], topic_partition=tp, - offset=123, timestamp=456, log_start_offset=0, - checksum=checksum, + offset=123, timestamp=456, checksum=checksum, serialized_key_size=3, serialized_value_size=5, serialized_header_size=-1) assert future.value == expected_metadata @@ -55,21 +66,201 @@ def test_producer_batch_retry(): magic=2, compression_type=0, batch_size=100000) batch = ProducerBatch(tp, records) assert not batch.in_retry() - batch.set_retry() + batch.retry() assert batch.in_retry() -def test_producer_batch_maybe_expire(): +def test_batch_abort(): tp = TopicPartition('foo', 0) records = MemoryRecordsBuilder( magic=2, compression_type=0, batch_size=100000) - batch = ProducerBatch(tp, records, now=1) - future = batch.try_append(0, b'key', b'value', [], now=2) - request_timeout_ms = 5000 - retry_backoff_ms = 200 - linger_ms = 1000 - is_full = True - batch.maybe_expire(request_timeout_ms, retry_backoff_ms, linger_ms, is_full, now=20) - assert batch.is_done + batch = ProducerBatch(tp, records) + future = batch.try_append(123, None, b'msg', []) + + batch.abort(KafkaError()) + assert future.is_done + + # subsequent completion should be ignored + batch.done(500, 2342342341) + batch.done(exception=KafkaError()) + assert future.is_done - assert future.failed() - assert isinstance(future.exception, KafkaTimeoutError) + with pytest.raises(KafkaError): + future.get() + +def test_batch_cannot_abort_twice(): + tp = TopicPartition('foo', 0) + records = MemoryRecordsBuilder( + magic=2, compression_type=0, batch_size=100000) + batch = ProducerBatch(tp, records) + future = batch.try_append(123, None, b'msg', []) + + batch.abort(KafkaError()) + + with pytest.raises(IllegalStateError): + batch.abort(KafkaError()) + + assert future.is_done + with pytest.raises(KafkaError): + future.get() + +def test_batch_cannot_complete_twice(): + tp = TopicPartition('foo', 0) + records = MemoryRecordsBuilder( + magic=2, compression_type=0, batch_size=100000) + batch = ProducerBatch(tp, records) + future = batch.try_append(123, None, b'msg', []) + + batch.done(500, 10, None) + + with pytest.raises(IllegalStateError): + batch.done(1000, 20, None) + + record_metadata = future.get() + + assert record_metadata.offset == 500 + assert record_metadata.timestamp == 10 + +def test_linger(tp, cluster): + now = 0 + accum = RecordAccumulator(linger_ms=10) + accum.append(tp, 0, b'key', b'value', [], now=now) + ready, next_ready_check, _unknown_leaders_exist = accum.ready(cluster, now=now) + assert len(ready) == 0, 'No partitions should be ready' + assert next_ready_check == .01 # linger_ms in secs + now += .01 + ready, _next_ready_check, _unknown_leaders_exist = accum.ready(cluster, now=now) + assert ready == set([0]), "Our partitions leader should be ready" + batches = accum.drain(cluster, ready, 0, 2147483647)[0] + assert len(batches) == 1 + batch = batches[0] + assert batch.records.is_full() + + parsed = list(batch.records.records()) + assert len(parsed) == 1 + records = list(parsed[0]) + assert len(records) == 1 + assert records[0].key == b'key', 'Keys should match' + assert records[0].value == b'value', 'Values should match' + +def _advance_now_ms(now, ms): + return now + ms / 1000 + 1/10000 # add extra .1 ms to each advance to avoid rounding issues when converting back to seconds + +def _do_expire_batch_single(cluster, tp, delivery_timeout_ms): + now = 0 + linger_ms = 300 + accum = RecordAccumulator(linger_ms=linger_ms, delivery_timeout_ms=delivery_timeout_ms, request_timeout_ms=(delivery_timeout_ms-linger_ms-100)) + + # Make the batches ready due to linger. These batches are not in retry + for mute in [False, True]: + accum.append(tp, 0, b'key', b'value', [], now=now) + ready, next_ready_check, _unknown_leaders_exist = accum.ready(cluster, now=now) + assert len(ready) == 0, 'No partitions should be ready' + assert next_ready_check == linger_ms / 1000 + + now = _advance_now_ms(now, linger_ms) + ready, _next_ready_check, _unknown_leaders_exist = accum.ready(cluster, now=now) + assert ready == set([0]), "Our partitions leader should be ready" + + expired_batches = accum.expired_batches(now=now) + assert len(expired_batches) == 0, "The batch should not expire when just linger has passed" + + if mute: + accum.muted.add(tp) + else: + try: + accum.muted.remove(tp) + except KeyError: + pass + + # Advance the clock to expire the batch. + now = _advance_now_ms(now, delivery_timeout_ms - linger_ms) + expired_batches = accum.expired_batches(now=now) + assert len(expired_batches) == 1, "The batch may expire when the partition is muted" + ready, _next_ready_check, _unknown_leaders_exist = accum.ready(cluster, now=now) + assert len(ready) == 0, "No partitions should be ready." + +def test_expired_batch_single(cluster, tp): + _do_expire_batch_single(cluster, tp, 3200) + +def test_expired_batch_single_max_value(cluster, tp): + _do_expire_batch_single(cluster, tp, 2147483647) + +def _expected_num_appends(batch_size): + size = DefaultRecordBatchBuilder.header_size_in_bytes() + offset_delta = 0 + while True: + record_size = DefaultRecordBatchBuilder.size_in_bytes(offset_delta, 0, b'key', b'value', []) + if size + record_size > batch_size: + return offset_delta + offset_delta += 1 + size += record_size + +def test_expired_batches(cluster, tp): + now = 0 + retry_backoff_ms = 100 + linger_ms = 30 + request_timeout_ms = 60 + delivery_timeout_ms = 3200 + batch_size = 1024 + accum = RecordAccumulator(linger_ms=linger_ms, delivery_timeout_ms=delivery_timeout_ms, request_timeout_ms=request_timeout_ms, retry_backoff_ms=retry_backoff_ms, batch_size=batch_size) + appends = _expected_num_appends(batch_size) + + # Test batches not in retry + for i in range(appends): + accum.append(tp, 0, b'key', b'value', [], now=now) + ready, next_ready_check, _unknown_leaders_exist = accum.ready(cluster, now=now) + assert len(ready) == 0, 'No partitions should be ready' + assert next_ready_check == linger_ms / 1000 + + # Make the batches ready due to batch full + accum.append(tp, 0, b'key', b'value', [], now=now) + ready, _next_ready_check, _unknown_leaders_exist = accum.ready(cluster, now=now) + assert ready == set([0]), "Our partitions leader should be ready" + + # Advance the clock to expire the batch. + now = _advance_now_ms(now, delivery_timeout_ms + 1) + accum.muted.add(tp) + expired_batches = accum.expired_batches(now=now) + assert len(expired_batches) == 2, "The batches will be expired no matter if the partition is muted or not" + + accum.muted.remove(tp) + expired_batches = accum.expired_batches(now=now) + assert len(expired_batches) == 0, "All batches should have been expired earlier" + ready, _next_ready_check, _unknown_leaders_exist = accum.ready(cluster, now=now) + assert len(ready) == 0, "No partitions should be ready." + + # Test batches in retry. + # Create a retried batch + accum.append(tp, 0, b'key', b'value', [], now=now) + now = _advance_now_ms(now, linger_ms) + ready, _next_ready_check, _unknown_leaders_exist = accum.ready(cluster, now=now) + assert ready == set([0]), "Our partitions leader should be ready" + + drained = accum.drain(cluster, ready, 2147483647, now=now) + assert len(drained[0]) == 1, "There should be only one batch." + now = _advance_now_ms(now, 1000) + accum.reenqueue(drained[0][0], now=now) + + # test expiration. + now = _advance_now_ms(now, request_timeout_ms + retry_backoff_ms) + expired_batches = accum.expired_batches(now=now) + assert len(expired_batches) == 0, "The batch should not be expired." + now = _advance_now_ms(now, 1) + + accum.muted.add(tp) + expired_batches = accum.expired_batches(now=now) + assert len(expired_batches) == 0, "The batch should not be expired when the partition is muted" + + accum.muted.remove(tp) + expired_batches = accum.expired_batches(now=now) + assert len(expired_batches) == 0, "The batch should not be expired when the partition is unmuted" + + now = _advance_now_ms(now, linger_ms) + ready, _next_ready_check, _unknown_leaders_exist = accum.ready(cluster, now=now) + assert ready == set([0]), "Our partitions leader should be ready" + + # Advance the clock to expire the batch. + now = _advance_now_ms(now, delivery_timeout_ms + 1) + accum.muted.add(tp) + expired_batches = accum.expired_batches(now=now) + assert len(expired_batches) == 1, "The batch should not be expired when the partition is muted" diff --git a/test/test_sender.py b/test/test_sender.py index ee057ff3a..0731454df 100644 --- a/test/test_sender.py +++ b/test/test_sender.py @@ -92,11 +92,11 @@ def test_complete_batch_success(sender): assert not batch.produce_future.is_done # No error, base_offset 0 - sender._complete_batch(batch, None, 0, timestamp_ms=123, log_start_offset=456) + sender._complete_batch(batch, None, 0, timestamp_ms=123) assert batch.is_done assert batch.produce_future.is_done assert batch.produce_future.succeeded() - assert batch.produce_future.value == (0, 123, 456) + assert batch.produce_future.value == (0, 123) def test_complete_batch_transaction(sender, transaction_manager): @@ -131,6 +131,7 @@ def test_complete_batch_transaction(sender, transaction_manager): def test_complete_batch_error(sender, error, refresh_metadata): sender._client.cluster._last_successful_refresh_ms = (time.time() - 10) * 1000 sender._client.cluster._need_update = False + sender.config['retries'] = 0 assert sender._client.cluster.ttl() > 0 batch = producer_batch() sender._complete_batch(batch, error, -1) @@ -201,8 +202,8 @@ def test_fail_batch(sender, accumulator, transaction_manager, mocker): mocker.patch.object(batch, 'done') assert sender._transaction_manager.producer_id_and_epoch.producer_id == batch.producer_id error = Exception('error') - sender._fail_batch(batch, base_offset=0, timestamp_ms=None, exception=error, log_start_offset=None) - batch.done.assert_called_with(base_offset=0, timestamp_ms=None, exception=error, log_start_offset=None) + sender._fail_batch(batch, base_offset=0, timestamp_ms=None, exception=error) + batch.done.assert_called_with(base_offset=0, timestamp_ms=None, exception=error) def test_out_of_order_sequence_number_reset_producer_id(sender, accumulator, transaction_manager, mocker): @@ -213,9 +214,9 @@ def test_out_of_order_sequence_number_reset_producer_id(sender, accumulator, tra mocker.patch.object(batch, 'done') assert sender._transaction_manager.producer_id_and_epoch.producer_id == batch.producer_id error = Errors.OutOfOrderSequenceNumberError() - sender._fail_batch(batch, base_offset=0, timestamp_ms=None, exception=error, log_start_offset=None) + sender._fail_batch(batch, base_offset=0, timestamp_ms=None, exception=error) sender._transaction_manager.reset_producer_id.assert_called_once() - batch.done.assert_called_with(base_offset=0, timestamp_ms=None, exception=error, log_start_offset=None) + batch.done.assert_called_with(base_offset=0, timestamp_ms=None, exception=error) def test_handle_produce_response():