Skip to content

Commit dc3bd4f

Browse files
authored
remove each_batch method due to data consistency issues (#541)
* remove each_batch method due to data consistency issues * fix spec
1 parent 43773a9 commit dc3bd4f

File tree

3 files changed

+22
-312
lines changed

3 files changed

+22
-312
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
# Rdkafka Changelog
22

33
## 0.20.0 (Unreleased)
4-
- [Enhancement] Bump librdkafka to 2.6.1
4+
- **[Breaking]** Deprecate and remove `#each_batch` due to data consistency concerns.
5+
- [Enhancement] Bump librdkafka to `2.6.1`
56
- [Enhancement] Expose `rd_kafka_global_init` to mitigate macos forking issues.
67
- [Enhancement] Avoid clobbering LDFLAGS and CPPFLAGS if in a nix prepared environment (secobarbital).
78
- [Patch] Retire no longer needed cooperative-sticky patch.

lib/rdkafka/consumer.rb

Lines changed: 16 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -609,87 +609,23 @@ def each
609609
end
610610
end
611611

612-
# Poll for new messages and yield them in batches that may contain
613-
# messages from more than one partition.
614-
#
615-
# Rather than yield each message immediately as soon as it is received,
616-
# each_batch will attempt to wait for as long as `timeout_ms` in order
617-
# to create a batch of up to but no more than `max_items` in size.
618-
#
619-
# Said differently, if more than `max_items` are available within
620-
# `timeout_ms`, then `each_batch` will yield early with `max_items` in the
621-
# array, but if `timeout_ms` passes by with fewer messages arriving, it
622-
# will yield an array of fewer messages, quite possibly zero.
623-
#
624-
# In order to prevent wrongly auto committing many messages at once across
625-
# possibly many partitions, callers must explicitly indicate which messages
626-
# have been successfully processed as some consumed messages may not have
627-
# been yielded yet. To do this, the caller should set
628-
# `enable.auto.offset.store` to false and pass processed messages to
629-
# {store_offset}. It is also possible, though more complex, to set
630-
# 'enable.auto.commit' to false and then pass a manually assembled
631-
# TopicPartitionList to {commit}.
632-
#
633-
# As with `each`, iteration will end when the consumer is closed.
634-
#
635-
# Exception behavior is more complicated than with `each`, in that if
636-
# :yield_on_error is true, and an exception is raised during the
637-
# poll, and messages have already been received, they will be yielded to
638-
# the caller before the exception is allowed to propagate.
639-
#
640-
# If you are setting either auto.commit or auto.offset.store to false in
641-
# the consumer configuration, then you should let yield_on_error keep its
642-
# default value of false because you are guaranteed to see these messages
643-
# again. However, if both auto.commit and auto.offset.store are set to
644-
# true, you should set yield_on_error to true so you can process messages
645-
# that you may or may not see again.
646-
#
647-
# @param max_items [Integer] Maximum size of the yielded array of messages
648-
# @param bytes_threshold [Integer] Threshold number of total message bytes in the yielded array of messages
649-
# @param timeout_ms [Integer] max time to wait for up to max_items
650-
#
651-
# @yieldparam messages [Array] An array of received Message
652-
# @yieldparam pending_exception [Exception] normally nil, or an exception
653-
#
654-
# @yield [messages, pending_exception]
655-
# which will be propagated after processing of the partial batch is complete.
656-
#
657-
# @return [nil]
658-
#
659-
# @raise [RdkafkaError] When polling fails
612+
# Deprecated. Please read the error message for more details.
660613
def each_batch(max_items: 100, bytes_threshold: Float::INFINITY, timeout_ms: 250, yield_on_error: false, &block)
661-
closed_consumer_check(__method__)
662-
slice = []
663-
bytes = 0
664-
end_time = monotonic_now + timeout_ms / 1000.0
665-
loop do
666-
break if closed?
667-
max_wait = end_time - monotonic_now
668-
max_wait_ms = if max_wait <= 0
669-
0 # should not block, but may retrieve a message
670-
else
671-
(max_wait * 1000).floor
672-
end
673-
message = nil
674-
begin
675-
message = poll max_wait_ms
676-
rescue Rdkafka::RdkafkaError => error
677-
raise unless yield_on_error
678-
raise if slice.empty?
679-
yield slice.dup, error
680-
raise
681-
end
682-
if message
683-
slice << message
684-
bytes += message.payload.bytesize if message.payload
685-
end
686-
if slice.size == max_items || bytes >= bytes_threshold || monotonic_now >= end_time - 0.001
687-
yield slice.dup, nil
688-
slice.clear
689-
bytes = 0
690-
end_time = monotonic_now + timeout_ms / 1000.0
691-
end
692-
end
614+
raise NotImplementedError, <<~ERROR
615+
`each_batch` has been removed due to data consistency concerns.
616+
617+
This method was removed because it did not properly handle partition reassignments,
618+
which could lead to processing messages from partitions that were no longer owned
619+
by this consumer, resulting in duplicate message processing and data inconsistencies.
620+
621+
Recommended alternatives:
622+
623+
1. Implement your own batching logic using rebalance callbacks to properly handle
624+
partition revocations and ensure message processing correctness.
625+
626+
2. Use a high-level batching library that supports proper partition reassignment
627+
handling out of the box (such as the Karafka framework).
628+
ERROR
693629
end
694630

695631
# Returns pointer to the consumer group metadata. It is used only in the context of

spec/rdkafka/consumer_spec.rb

Lines changed: 4 additions & 231 deletions
Original file line numberDiff line numberDiff line change
@@ -921,236 +921,10 @@ def send_one_message(val)
921921
end
922922

923923
describe "#each_batch" do
924-
let(:message_payload) { 'a' * 10 }
925-
926-
before do
927-
@topic = SecureRandom.base64(10).tr('+=/', '')
928-
end
929-
930-
after do
931-
@topic = nil
932-
end
933-
934-
def topic_name
935-
@topic
936-
end
937-
938-
def produce_n(n)
939-
handles = []
940-
n.times do |i|
941-
handles << producer.produce(
942-
topic: topic_name,
943-
payload: i % 10 == 0 ? nil : Time.new.to_f.to_s,
944-
key: i.to_s,
945-
partition: 0
946-
)
947-
end
948-
handles.each(&:wait)
949-
end
950-
951-
def new_message
952-
instance_double("Rdkafka::Consumer::Message").tap do |message|
953-
allow(message).to receive(:payload).and_return(message_payload)
954-
end
955-
end
956-
957-
it "retrieves messages produced into a topic" do
958-
# This is the only each_batch test that actually produces real messages
959-
# into a topic in the real kafka of the container.
960-
#
961-
# The other tests stub 'poll' which makes them faster and more reliable,
962-
# but it makes sense to keep a single test with a fully integrated flow.
963-
# This will help to catch breaking changes in the behavior of 'poll',
964-
# libdrkafka, or Kafka.
965-
#
966-
# This is, in effect, an integration test and the subsequent specs are
967-
# unit tests.
968-
admin = rdkafka_config.admin
969-
create_topic_handle = admin.create_topic(topic_name, 1, 1)
970-
create_topic_handle.wait(max_wait_timeout: 15.0)
971-
consumer.subscribe(topic_name)
972-
produce_n 42
973-
all_yields = []
974-
consumer.each_batch(max_items: 10) do |batch|
975-
all_yields << batch
976-
break if all_yields.flatten.size >= 42
977-
end
978-
expect(all_yields.flatten.first).to be_a Rdkafka::Consumer::Message
979-
expect(all_yields.flatten.size).to eq 42
980-
expect(all_yields.size).to be > 4
981-
expect(all_yields.flatten.map(&:key)).to eq (0..41).map { |x| x.to_s }
982-
admin.close
983-
end
984-
985-
it "should batch poll results and yield arrays of messages" do
986-
consumer.subscribe(topic_name)
987-
all_yields = []
988-
expect(consumer)
989-
.to receive(:poll)
990-
.exactly(10).times
991-
.and_return(new_message)
992-
consumer.each_batch(max_items: 10) do |batch|
993-
all_yields << batch
994-
break if all_yields.flatten.size >= 10
995-
end
996-
expect(all_yields.first).to be_instance_of(Array)
997-
expect(all_yields.flatten.size).to eq 10
998-
non_empty_yields = all_yields.reject { |batch| batch.empty? }
999-
expect(non_empty_yields.size).to be < 10
1000-
end
1001-
1002-
it "should yield a partial batch if the timeout is hit with some messages" do
1003-
consumer.subscribe(topic_name)
1004-
poll_count = 0
1005-
expect(consumer)
1006-
.to receive(:poll)
1007-
.at_least(3).times do
1008-
poll_count = poll_count + 1
1009-
if poll_count > 2
1010-
sleep 0.1
1011-
nil
1012-
else
1013-
new_message
1014-
end
1015-
end
1016-
all_yields = []
1017-
consumer.each_batch(max_items: 10) do |batch|
1018-
all_yields << batch
1019-
break if all_yields.flatten.size >= 2
1020-
end
1021-
expect(all_yields.flatten.size).to eq 2
1022-
end
1023-
1024-
it "should yield [] if nothing is received before the timeout" do
1025-
admin = rdkafka_config.admin
1026-
create_topic_handle = admin.create_topic(topic_name, 1, 1)
1027-
create_topic_handle.wait(max_wait_timeout: 15.0)
1028-
consumer.subscribe(topic_name)
1029-
consumer.each_batch do |batch|
1030-
expect(batch).to eq([])
1031-
break
1032-
end
1033-
admin.close
1034-
end
1035-
1036-
it "should yield batchs of max_items in size if messages are already fetched" do
1037-
yielded_batches = []
1038-
expect(consumer)
1039-
.to receive(:poll)
1040-
.with(anything)
1041-
.exactly(20).times
1042-
.and_return(new_message)
1043-
1044-
consumer.each_batch(max_items: 10, timeout_ms: 500) do |batch|
1045-
yielded_batches << batch
1046-
break if yielded_batches.flatten.size >= 20
1047-
break if yielded_batches.size >= 20 # so failure doesn't hang
1048-
end
1049-
expect(yielded_batches.size).to eq 2
1050-
expect(yielded_batches.map(&:size)).to eq 2.times.map { 10 }
1051-
end
1052-
1053-
it "should yield batchs as soon as bytes_threshold is hit" do
1054-
yielded_batches = []
1055-
expect(consumer)
1056-
.to receive(:poll)
1057-
.with(anything)
1058-
.exactly(20).times
1059-
.and_return(new_message)
1060-
1061-
consumer.each_batch(bytes_threshold: message_payload.size * 4, timeout_ms: 500) do |batch|
1062-
yielded_batches << batch
1063-
break if yielded_batches.flatten.size >= 20
1064-
break if yielded_batches.size >= 20 # so failure doesn't hang
1065-
end
1066-
expect(yielded_batches.size).to eq 5
1067-
expect(yielded_batches.map(&:size)).to eq 5.times.map { 4 }
1068-
end
1069-
1070-
context "error raised from poll and yield_on_error is true" do
1071-
it "should yield buffered exceptions on rebalance, then break" do
1072-
config = rdkafka_consumer_config(
1073-
{
1074-
:"enable.auto.commit" => false,
1075-
:"enable.auto.offset.store" => false
1076-
}
1077-
)
1078-
consumer = config.consumer
1079-
consumer.subscribe(topic_name)
1080-
batches_yielded = []
1081-
exceptions_yielded = []
1082-
each_batch_iterations = 0
1083-
poll_count = 0
1084-
expect(consumer)
1085-
.to receive(:poll)
1086-
.with(anything)
1087-
.exactly(3).times
1088-
.and_wrap_original do |method, *args|
1089-
poll_count = poll_count + 1
1090-
if poll_count == 3
1091-
raise Rdkafka::RdkafkaError.new(27,
1092-
"partitions ... too ... heavy ... must ... rebalance")
1093-
else
1094-
new_message
1095-
end
1096-
end
1097-
expect {
1098-
consumer.each_batch(max_items: 30, yield_on_error: true) do |batch, pending_error|
1099-
batches_yielded << batch
1100-
exceptions_yielded << pending_error
1101-
each_batch_iterations = each_batch_iterations + 1
1102-
end
1103-
}.to raise_error(Rdkafka::RdkafkaError)
1104-
expect(poll_count).to eq 3
1105-
expect(each_batch_iterations).to eq 1
1106-
expect(batches_yielded.size).to eq 1
1107-
expect(batches_yielded.first.size).to eq 2
1108-
expect(exceptions_yielded.flatten.size).to eq 1
1109-
expect(exceptions_yielded.flatten.first).to be_instance_of(Rdkafka::RdkafkaError)
1110-
consumer.close
1111-
end
1112-
end
1113-
1114-
context "error raised from poll and yield_on_error is false" do
1115-
it "should yield buffered exceptions on rebalance, then break" do
1116-
config = rdkafka_consumer_config(
1117-
{
1118-
:"enable.auto.commit" => false,
1119-
:"enable.auto.offset.store" => false
1120-
}
1121-
)
1122-
consumer = config.consumer
1123-
consumer.subscribe(topic_name)
1124-
batches_yielded = []
1125-
exceptions_yielded = []
1126-
each_batch_iterations = 0
1127-
poll_count = 0
1128-
expect(consumer)
1129-
.to receive(:poll)
1130-
.with(anything)
1131-
.exactly(3).times
1132-
.and_wrap_original do |method, *args|
1133-
poll_count = poll_count + 1
1134-
if poll_count == 3
1135-
raise Rdkafka::RdkafkaError.new(27,
1136-
"partitions ... too ... heavy ... must ... rebalance")
1137-
else
1138-
new_message
1139-
end
1140-
end
1141-
expect {
1142-
consumer.each_batch(max_items: 30, yield_on_error: false) do |batch, pending_error|
1143-
batches_yielded << batch
1144-
exceptions_yielded << pending_error
1145-
each_batch_iterations = each_batch_iterations + 1
1146-
end
1147-
}.to raise_error(Rdkafka::RdkafkaError)
1148-
expect(poll_count).to eq 3
1149-
expect(each_batch_iterations).to eq 0
1150-
expect(batches_yielded.size).to eq 0
1151-
expect(exceptions_yielded.size).to eq 0
1152-
consumer.close
1153-
end
924+
it 'expect to raise an error' do
925+
expect do
926+
consumer.each_batch {}
927+
end.to raise_error(NotImplementedError)
1154928
end
1155929
end
1156930

@@ -1317,7 +1091,6 @@ def on_partitions_revoked(list)
13171091
{
13181092
:subscribe => [ nil ],
13191093
:unsubscribe => nil,
1320-
:each_batch => nil,
13211094
:pause => [ nil ],
13221095
:resume => [ nil ],
13231096
:subscription => nil,

0 commit comments

Comments
 (0)