Skip to content

Commit a1582bc

Browse files
dpkpclaude
andauthored
Producer: Enforce guaranteed message order when idempotence_enabled (#2937)
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent c019dff commit a1582bc

3 files changed

Lines changed: 202 additions & 6 deletions

File tree

kafka/producer/kafka.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,9 @@ def __init__(self, **configs):
554554
transaction_manager=self._transaction_manager,
555555
message_version=message_version,
556556
**self.config)
557-
guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1)
557+
guarantee_message_order = False
558+
if self.config['enable_idempotence'] or self.config['max_in_flight_requests_per_connection'] == 1:
559+
guarantee_message_order = True
558560
self._sender = Sender(client, self._metadata,
559561
self._accumulator,
560562
metrics=self._metrics,

test/producer/test_sender.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -744,18 +744,38 @@ def test_idempotent_default_max_in_flight(self):
744744
assert p.config['max_in_flight_requests_per_connection'] == 5
745745
p.close(timeout=0)
746746

747-
def test_guarantee_message_order_only_when_max_in_flight_1(self):
748-
"""guarantee_message_order is True only when max_in_flight == 1."""
747+
def test_idempotent_producer_forces_guarantee_message_order(self):
748+
"""guarantee_message_order is forced True when idempotence is enabled,
749+
regardless of max_in_flight. Without partition muting, a transient
750+
retryable error (e.g. NotLeader) triggers reenqueue via appendleft
751+
which reverses concurrently-failed batches; the retried sends arrive
752+
out of sequence and the broker rejects with OutOfOrderSequenceNumber.
753+
Java's producer enforces this for the same reason.
754+
"""
755+
for max_in_flight in (1, 2, 3, 4, 5):
756+
p = KafkaProducer(
757+
enable_idempotence=True,
758+
max_in_flight_requests_per_connection=max_in_flight,
759+
api_version=(0, 11),
760+
)
761+
assert p._sender.config['guarantee_message_order'] is True, (
762+
'idempotence should force guarantee_message_order=True (max_in_flight=%d)'
763+
% max_in_flight)
764+
p.close(timeout=0)
765+
766+
def test_non_idempotent_guarantee_message_order_only_when_max_in_flight_1(self):
767+
"""For non-idempotent producers, guarantee_message_order is only True
768+
when max_in_flight == 1 (the original Java behavior)."""
749769
p1 = KafkaProducer(
750-
enable_idempotence=True,
770+
enable_idempotence=False,
751771
max_in_flight_requests_per_connection=1,
752772
api_version=(0, 11),
753773
)
754774
assert p1._sender.config['guarantee_message_order'] is True
755775
p1.close(timeout=0)
756776

757777
p5 = KafkaProducer(
758-
enable_idempotence=True,
778+
enable_idempotence=False,
759779
max_in_flight_requests_per_connection=5,
760780
api_version=(0, 11),
761781
)

test/producer/test_transaction_manager_mock_broker.py

Lines changed: 175 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import pytest
1515

1616
import kafka.errors as Errors
17+
from kafka import KafkaProducer
1718
from kafka.net.compat import KafkaNetClient
1819
from kafka.producer.transaction_manager import (
1920
AddOffsetsToTxnHandler,
@@ -26,14 +27,17 @@
2627
TransactionState,
2728
TxnOffsetCommitHandler,
2829
)
29-
from kafka.protocol.metadata import FindCoordinatorResponse
30+
from kafka.protocol.metadata import FindCoordinatorResponse, MetadataResponse
3031
from kafka.protocol.producer import (
3132
AddOffsetsToTxnResponse,
3233
AddPartitionsToTxnResponse,
3334
EndTxnResponse,
3435
InitProducerIdResponse,
36+
ProduceRequest,
37+
ProduceResponse,
3538
TxnOffsetCommitResponse,
3639
)
40+
from kafka.record import MemoryRecords
3741
from kafka.structs import OffsetAndMetadata, TopicPartition
3842

3943
from test.mock_broker import MockBroker
@@ -1033,3 +1037,173 @@ def test_partial_retriable_retries_only_failed(self, broker, client):
10331037
assert tp_retry in tm._pending_txn_offset_commits
10341038
# Result not yet done--the retry has to complete first.
10351039
assert not result.is_done
1040+
1041+
1042+
# ---------------------------------------------------------------------------
1043+
# Idempotent producer ordering on retry
1044+
# ---------------------------------------------------------------------------
1045+
1046+
1047+
def _decode_produce_base_sequences(request_bytes, api_version):
1048+
"""Decode a ProduceRequest and return [(topic, partition, base_sequence)] in order."""
1049+
request = ProduceRequest.decode(request_bytes, version=api_version, header=True)
1050+
out = []
1051+
for topic_data in request.topic_data:
1052+
for partition_data in topic_data.partition_data:
1053+
records = MemoryRecords(bytes(partition_data.records))
1054+
batch = records.next_batch()
1055+
out.append((topic_data.name, partition_data.index, batch.base_sequence))
1056+
return out
1057+
1058+
1059+
class TestIdempotentProducerOrderingMockBroker:
1060+
"""Regression test: idempotent producer must mute the partition after a
1061+
retryable error so retried batches are not reordered against later batches.
1062+
1063+
Without partition muting, when the first ProduceRequest gets a transient
1064+
NotLeaderForPartition error, the sender re-enqueues that batch via
1065+
deque.appendleft. Meanwhile, additional batches (queued by send() while
1066+
the first was in flight) drain and go out with sequences ahead of the
1067+
retried base_sequence. The broker rejects them with
1068+
OutOfOrderSequenceNumberError. With muting, no further batch for the
1069+
partition drains until the in-flight batch's response is processed, so
1070+
the retry is sent before the next batch.
1071+
"""
1072+
1073+
_TOPIC = 'tx-order'
1074+
1075+
def _make_metadata_topic(self, version):
1076+
Topic = MetadataResponse.MetadataResponseTopic
1077+
Partition = Topic.MetadataResponsePartition
1078+
return Topic(version=version, error_code=0, name=self._TOPIC,
1079+
is_internal=False,
1080+
partitions=[
1081+
Partition(version=version, error_code=0,
1082+
partition_index=0, leader_id=0,
1083+
leader_epoch=0,
1084+
replica_nodes=[0], isr_nodes=[0],
1085+
offline_replicas=[]),
1086+
])
1087+
1088+
def _produce_response(self, version, error_code, base_offset):
1089+
Topic = ProduceResponse.TopicProduceResponse
1090+
Partition = Topic.PartitionProduceResponse
1091+
return ProduceResponse(
1092+
throttle_time_ms=0,
1093+
responses=[
1094+
Topic(name=self._TOPIC, partition_responses=[
1095+
Partition(index=0, error_code=error_code,
1096+
base_offset=base_offset, log_append_time_ms=-1,
1097+
log_start_offset=0, record_errors=[],
1098+
error_message=None, current_leader=None),
1099+
]),
1100+
],
1101+
)
1102+
1103+
def test_retry_does_not_reorder_against_later_batches(self):
1104+
"""First ProduceRequest fails with NotLeader; assert that the very
1105+
next ProduceRequest the broker sees is the retry (same base_sequence),
1106+
not a later batch that drained while the first was in flight."""
1107+
broker = MockBroker(broker_version=(2, 5))
1108+
broker.set_metadata(topics=[self._make_metadata_topic(version=8)])
1109+
# Producer's auto-version negotiation will land on (2, 5).
1110+
broker.respond(InitProducerIdResponse, InitProducerIdResponse(
1111+
throttle_time_ms=0, error_code=0,
1112+
producer_id=42, producer_epoch=0,
1113+
))
1114+
1115+
# Capture each ProduceRequest's first-partition base_sequence as it
1116+
# arrives. First call: hold the response (via an awaitable Future)
1117+
# until enough subsequent requests have arrived to demonstrate the
1118+
# in-flight window contains multiple batches; then return NotLeader
1119+
# (transient retryable). All subsequent calls: success.
1120+
#
1121+
# The hold is what surfaces the bug — without partition muting, the
1122+
# sender drains additional batches while the first is still in flight,
1123+
# and the first batch's retry (reenqueued via appendleft) is
1124+
# sequenced *after* them.
1125+
from kafka.future import Future
1126+
received_sequences = []
1127+
call_count = [0]
1128+
release_first = Future()
1129+
1130+
async def _held_notleader_response(api_version):
1131+
# Awaiting a kafka.future.Future yields until success/failure is
1132+
# set. While we're parked here, the broker's IO loop is free to
1133+
# process other queued ProduceRequests (each write() schedules its
1134+
# own _process_requests).
1135+
await release_first
1136+
return self._produce_response(
1137+
version=api_version,
1138+
error_code=Errors.NotLeaderForPartitionError.errno,
1139+
base_offset=-1)
1140+
1141+
def produce_response(api_key, api_version, correlation_id, request_bytes):
1142+
seqs = _decode_produce_base_sequences(request_bytes, api_version)
1143+
assert seqs, 'ProduceRequest had no partition data'
1144+
received_sequences.append(seqs[0][2])
1145+
call_count[0] += 1
1146+
if call_count[0] == 1:
1147+
# Return a coroutine; handle_request will await it.
1148+
return _held_notleader_response(api_version)
1149+
# Once a couple more requests have arrived, release the held one
1150+
# so the producer sees the NotLeader and reenqueues. With muting,
1151+
# call_count won't reach 3 (only one batch in flight at a time)
1152+
# and we'd hit the safety release in the test body below.
1153+
if call_count[0] >= 3 and not release_first.is_done:
1154+
release_first.success(None)
1155+
return self._produce_response(
1156+
version=api_version, error_code=0, base_offset=0)
1157+
1158+
# Register a respond_fn that handles every ProduceRequest the test
1159+
# sends. The MockBroker pops one queue entry per request, so we need
1160+
# one respond_fn per expected request. We don't know ahead of time
1161+
# how many will be sent, so register a generous batch.
1162+
for _ in range(64):
1163+
broker.respond_fn(ProduceRequest, produce_response)
1164+
1165+
producer = KafkaProducer(
1166+
kafka_client=broker.client_factory(),
1167+
bootstrap_servers=['%s:%d' % (broker.host, broker.port)],
1168+
api_version=(2, 5),
1169+
enable_idempotence=True,
1170+
max_in_flight_requests_per_connection=5,
1171+
batch_size=64, # tiny so multiple batches form quickly
1172+
linger_ms=5,
1173+
retry_backoff_ms=10,
1174+
request_timeout_ms=5000,
1175+
)
1176+
# Safety release: if muting *is* working, call_count never reaches 3
1177+
# (only one ProduceRequest in flight at a time), so the held NotLeader
1178+
# response would never fire on its own. Time-bound release after a
1179+
# short delay to keep the test fast in either case.
1180+
import threading
1181+
threading.Timer(0.1, lambda: (
1182+
release_first.success(None) if not release_first.is_done else None
1183+
)).start()
1184+
try:
1185+
futures = [
1186+
producer.send(self._TOPIC, value=('msg-%02d' % i).encode(),
1187+
partition=0)
1188+
for i in range(20)
1189+
]
1190+
for f in futures:
1191+
f.get(timeout=10)
1192+
finally:
1193+
producer.close(timeout=2)
1194+
1195+
# The first ProduceRequest had base_sequence 0 and was rejected with
1196+
# NotLeader. With partition muting (the fix), the second
1197+
# ProduceRequest the broker sees must be the *retry* of that batch
1198+
# — same base_sequence. Without muting, a later batch with a higher
1199+
# base_sequence would have drained while the first was in flight,
1200+
# arriving here ahead of the retry.
1201+
assert len(received_sequences) >= 2, (
1202+
'expected at least 2 ProduceRequests, got %r' % received_sequences)
1203+
assert received_sequences[0] == 0, (
1204+
'first ProduceRequest should carry base_sequence 0; got %r'
1205+
% received_sequences)
1206+
assert received_sequences[1] == 0, (
1207+
'second ProduceRequest must be the retry of base_sequence 0; '
1208+
'got %r — partition was not muted, later batch drained ahead of '
1209+
'retry' % received_sequences)

0 commit comments

Comments
 (0)