Skip to content
6 changes: 3 additions & 3 deletions kafka/producer/future.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __init__(self, produce_future, relative_offset, timestamp_ms, checksum, seri
produce_future.add_errback(self.failure)

def _produce_success(self, offset_and_timestamp):
offset, produce_timestamp_ms, log_start_offset = offset_and_timestamp
offset, produce_timestamp_ms = offset_and_timestamp

# Unpacking from args tuple is minor speed optimization
(relative_offset, timestamp_ms, checksum,
Expand All @@ -51,7 +51,7 @@ def _produce_success(self, offset_and_timestamp):
if offset != -1 and relative_offset is not None:
offset += relative_offset
tp = self._produce_future.topic_partition
metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms, log_start_offset,
metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms,
checksum, serialized_key_size,
serialized_value_size, serialized_header_size)
self.success(metadata)
Expand All @@ -67,5 +67,5 @@ def get(self, timeout=None):


RecordMetadata = collections.namedtuple(
'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp', 'log_start_offset',
'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp',
'checksum', 'serialized_key_size', 'serialized_value_size', 'serialized_header_size'])
74 changes: 63 additions & 11 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ class KafkaProducer(object):
The producer is thread safe and sharing a single producer instance across
threads will generally be faster than having multiple instances.

The producer consists of a pool of buffer space that holds records that
haven't yet been transmitted to the server as well as a background I/O
The producer consists of a RecordAccumulator which holds records that
haven't yet been transmitted to the server, and a Sender background I/O
thread that is responsible for turning these records into requests and
transmitting them to the cluster.

Expand Down Expand Up @@ -77,6 +77,47 @@ class KafkaProducer(object):
The key_serializer and value_serializer instruct how to turn the key and
value objects the user provides into bytes.

From Kafka 0.11, the KafkaProducer supports two additional modes:
the idempotent producer and the transactional producer.
The idempotent producer strengthens Kafka's delivery semantics from
at least once to exactly once delivery. In particular, producer retries
will no longer introduce duplicates. The transactional producer allows an
application to send messages to multiple partitions (and topics!)
atomically.

To enable idempotence, the `enable_idempotence` configuration must be set
to True. If set, the `retries` config will default to `float('inf')` and
the `acks` config will default to 'all'. There are no API changes for the
idempotent producer, so existing applications will not need to be modified
to take advantage of this feature.

To take advantage of the idempotent producer, it is imperative to avoid
application level re-sends since these cannot be de-duplicated. As such, if
an application enables idempotence, it is recommended to leave the
`retries` config unset, as it will be defaulted to `float('inf')`.
Additionally, if a :meth:`~kafka.KafkaProducer.send` returns an error even
with infinite retries (for instance if the message expires in the buffer
before being sent), then it is recommended to shut down the producer and
check the contents of the last produced message to ensure that it is not
duplicated. Finally, the producer can only guarantee idempotence for
messages sent within a single session.

To use the transactional producer and the attendant APIs, you must set the
`transactional_id` configuration property. If the `transactional_id` is
set, idempotence is automatically enabled along with the producer configs
which idempotence depends on. Further, topics which are included in
transactions should be configured for durability. In particular, the
`replication.factor` should be at least `3`, and the `min.insync.replicas`
for these topics should be set to 2. Finally, in order for transactional
guarantees to be realized from end-to-end, the consumers must be
configured to read only committed messages as well.

The purpose of the `transactional_id` is to enable transaction recovery
across multiple sessions of a single producer instance. It would typically
be derived from the shard identifier in a partitioned, stateful,
application. As such, it should be unique to each producer instance running
within a partitioned application.

Keyword Arguments:
bootstrap_servers: 'host[:port]' string (or list of 'host[:port]'
strings) that the producer should contact to bootstrap initial
Expand Down Expand Up @@ -106,7 +147,16 @@ class KafkaProducer(object):
defaults to be suitable. If the values are set to something
incompatible with the idempotent producer, a KafkaConfigurationError
will be raised.

delivery_timeout_ms (float): An upper bound on the time to report success
or failure after producer.send() returns. This limits the total time
that a record will be delayed prior to sending, the time to await
acknowledgement from the broker (if expected), and the time allowed
for retriable send failures. The producer may report failure to send
a record earlier than this config if either an unrecoverable error is
encountered, the retries have been exhausted, or the record is added
to a batch which reached an earlier delivery expiration deadline.
The value of this config should be greater than or equal to the
sum of (request_timeout_ms + linger_ms). Default: 120000.
acks (0, 1, 'all'): The number of acknowledgments the producer requires
the leader to have received before considering a request complete.
This controls the durability of records that are sent. The
Expand Down Expand Up @@ -134,16 +184,20 @@ class KafkaProducer(object):
Compression is of full batches of data, so the efficacy of batching
will also impact the compression ratio (more batching means better
compression). Default: None.
retries (int): Setting a value greater than zero will cause the client
retries (numeric): Setting a value greater than zero will cause the client
to resend any record whose send fails with a potentially transient
error. Note that this retry is no different than if the client
resent the record upon receiving the error. Allowing retries
without setting max_in_flight_requests_per_connection to 1 will
potentially change the ordering of records because if two batches
are sent to a single partition, and the first fails and is retried
but the second succeeds, then the records in the second batch may
appear first.
Default: 0.
appear first. Note additionally that produce requests will be
failed before the number of retries has been exhausted if the timeout
configured by delivery_timeout_ms expires first before successful
acknowledgement. Users should generally prefer to leave this config
unset and instead use delivery_timeout_ms to control retry behavior.
Default: float('inf') (infinite)
batch_size (int): Requests sent to brokers will contain multiple
batches, one for each partition with data available to be sent.
A small batch size will make batching less common and may reduce
Expand Down Expand Up @@ -320,10 +374,11 @@ class KafkaProducer(object):
'enable_idempotence': False,
'transactional_id': None,
'transaction_timeout_ms': 60000,
'delivery_timeout_ms': 120000,
'acks': 1,
'bootstrap_topics_filter': set(),
'compression_type': None,
'retries': 0,
'retries': float('inf'),
'batch_size': 16384,
'linger_ms': 0,
'partitioner': DefaultPartitioner(),
Expand Down Expand Up @@ -471,10 +526,7 @@ def __init__(self, **configs):
else:
log.info("%s: Instantiated an idempotent producer.", str(self))

if 'retries' not in user_provided_configs:
log.info("%s: Overriding the default 'retries' config to 3 since the idempotent producer is enabled.", str(self))
self.config['retries'] = 3
elif self.config['retries'] == 0:
if self.config['retries'] == 0:
raise Errors.KafkaConfigurationError("Must set 'retries' to non-zero when using the idempotent producer.")

if 'max_in_flight_requests_per_connection' not in user_provided_configs:
Expand Down
Loading
Loading