Skip to content

Commit 6f039ad

Browse files
author
jdillinger
committed
feat kafka: reuse consumer after user exception
commit_hash:aa170d722e6d81556d581b37da4e1afc33297b2c
1 parent 0a3291e commit 6f039ad

File tree

7 files changed

+146
-14
lines changed

7 files changed

+146
-14
lines changed

kafka/functional_tests/balanced_consumer_groups/static_config.yaml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ components_manager:
2929
- test-topic
3030
auto_offset_reset: smallest
3131
max_batch_size: 1
32-
restart_after_failure_delay: 1s
32+
poll_timeout: 200ms
33+
restart_after_failure_delay: 100ms
3334
security_protocol: PLAINTEXT
3435

3536
kafka-consumer-second: *kafka_consumer
@@ -40,7 +41,8 @@ components_manager:
4041
- test-cooperative-topic
4142
auto_offset_reset: smallest
4243
max_batch_size: 1
43-
restart_after_failure_delay: 1s
44+
poll_timeout: 200ms
45+
restart_after_failure_delay: 100ms
4446
security_protocol: PLAINTEXT
4547
rd_kafka_custom_options:
4648
partition.assignment.strategy: cooperative-sticky

kafka/functional_tests/balanced_consumer_groups/tests/test_faulty_consumer.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ def second_consumer_received(_data):
3636
def second_consumer_subscribed(_data):
3737
logging.info('Second consumer subscribed')
3838

39+
@testpoint('tp_kafka-consumer-second_revoked')
40+
def second_consumer_revoked(_data):
41+
logging.info('Second consumer revoked')
42+
3943
@testpoint('tp_kafka-consumer-second_polled')
4044
def second_consumer_polled(_data):
4145
logging.info('Second consumer polled')
@@ -66,7 +70,11 @@ def second_consumer_failed(_data):
6670
CONSUMERS[1],
6771
)
6872
assert len(first_consumer_messages) == 1 and len(second_consumer_messages) == 1
73+
first_consumer_messages = first_consumer_messages[0]['partition']
6974
second_consumer_partition = second_consumer_messages[0]['partition']
75+
logging.info(
76+
f'First consumer was subscribed to {first_consumer_messages} partition',
77+
)
7078
logging.info(
7179
f'Second consumer was subscribed to {second_consumer_partition} partition',
7280
)
@@ -77,12 +85,17 @@ def second_consumer_failed(_data):
7785
MESSAGE_TO_FAIL,
7886
second_consumer_partition,
7987
)
80-
await second_consumer_polled.wait_call()
81-
await second_consumer_failed.wait_call()
88+
for _ in range(0, 2):
89+
await second_consumer_polled.wait_call()
90+
await second_consumer_failed.wait_call()
91+
92+
assert not second_consumer_subscribed.has_calls
93+
assert not second_consumer_revoked.has_calls
8294

8395
await stop_consumers(service_client, [CONSUMERS[1]])
8496

8597
await first_consumer_revoked.wait_call()
98+
await second_consumer_revoked.wait_call()
8699
await first_consumer_subscribed.wait_call()
87100
await first_consumer_subscribed.wait_call()
88101

@@ -143,10 +156,12 @@ def second_consumer_failed(_data):
143156
CONSUMERS[1],
144157
)
145158
assert len(second_consumer_messages) == 2
159+
assert {'key-1', 'key-2'} == set(parse_message_keys(second_consumer_messages))
146160

147161
await kafka_producer.send(TOPIC, 'key-3', MESSAGE_TO_FAIL, 0)
148-
await second_consumer_polled.wait_call()
149-
await second_consumer_failed.wait_call()
162+
for _ in range(0, 2):
163+
await second_consumer_polled.wait_call()
164+
await second_consumer_failed.wait_call()
150165

151166
await make_non_faulty(service_client, CONSUMERS[1])
152167

kafka/include/userver/kafka/impl/consumer.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <userver/kafka/impl/consumer_params.hpp>
1111
#include <userver/kafka/impl/holders.hpp>
1212
#include <userver/kafka/impl/stats.hpp>
13+
#include <userver/utils/span.hpp>
1314
#include <userver/utils/statistics/writer.hpp>
1415
#include <userver/utils/zstring_view.hpp>
1516

@@ -85,6 +86,11 @@ class Consumer final {
8586
std::chrono::milliseconds timeout
8687
) const;
8788

89+
/// @brief Seeks multiple topic partitions to the given offsets in one call.
90+
/// @param params Topic, partition_id and offset for each seek.
91+
/// @param timeout Timeout for the operation.
92+
void MultiSeek(utils::span<const SeekParams> params, std::chrono::milliseconds timeout) const;
93+
8894
/// @brief Seeks the \b partition_id for the specified \b topic to the begginning offset .
8995
/// @see ConsumerScope::SeekToBeginning for better commitment process
9096
void SeekToBeginning(utils::zstring_view topic, std::uint32_t partition_id, std::chrono::milliseconds timeout)

kafka/include/userver/kafka/impl/consumer_params.hpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,20 @@
33
#include <chrono>
44

55
#include <userver/logging/level.hpp>
6+
#include <userver/utils/zstring_view.hpp>
67
#include <userver/yaml_config/fwd.hpp>
78

89
USERVER_NAMESPACE_BEGIN
910

1011
namespace kafka::impl {
1112

13+
/// @brief Topic, partition and offset for a single seek (e.g. in MultiSeek).
14+
struct SeekParams {
15+
utils::zstring_view topic;
16+
std::uint32_t partition_id;
17+
std::uint64_t offset;
18+
};
19+
1220
/// @brief Specifies the logging format for the message key.
1321
enum class MessageKeyLogFormat {
1422
kPlainText, ///< Log message key as is

kafka/src/kafka/impl/consumer.cpp

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#include <userver/kafka/impl/consumer.hpp>
22

3+
#include <map>
34
#include <string_view>
5+
#include <vector>
46

57
#include <userver/engine/sleep.hpp>
68
#include <userver/formats/json/value_builder.hpp>
@@ -129,8 +131,34 @@ void Consumer::RunConsuming(ConsumerScope::Callback callback) {
129131
consumer_->AccountMessageBatchProcessingSucceeded(polled_messages);
130132
TESTPOINT(fmt::format("tp_{}", name_), {});
131133
} catch (const std::exception& e) {
134+
LOG_ERROR("Messages processing failed in consumer into client callback: {}", e.what());
132135
consumer_->AccountMessageBatchProcessingFailed(polled_messages);
133-
throw;
136+
137+
// Seek back to the start of the failed batch so the same messages
138+
// are redelivered on the next poll (no consumer recreation).
139+
// Seek with numeric offset is local-only in librdkafka: it sets
140+
// next_fetch_position; no broker request until the next Fetch (poll).
141+
using Key = std::pair<utils::zstring_view, std::uint32_t>;
142+
std::map<Key, std::uint64_t> min_offset_by_partition;
143+
for (const auto& msg : polled_messages) {
144+
Key key{msg.GetTopic(), static_cast<std::uint32_t>(msg.GetPartition())};
145+
const auto offset = static_cast<std::uint64_t>(msg.GetOffset());
146+
auto it = min_offset_by_partition.find(key);
147+
if (it == min_offset_by_partition.end()) {
148+
min_offset_by_partition[key] = offset;
149+
} else {
150+
it->second = std::min(it->second, offset);
151+
}
152+
}
153+
std::vector<SeekParams> seek_params;
154+
seek_params.reserve(min_offset_by_partition.size());
155+
for (const auto& [key, offset] : min_offset_by_partition) {
156+
seek_params.push_back({key.first, key.second, offset});
157+
}
158+
constexpr auto kSeekAfterFailureTimeout = std::chrono::seconds(20);
159+
MultiSeek(seek_params, kSeekAfterFailureTimeout);
160+
LOG_WARNING("Seeked back to reprocess failed batch");
161+
CallErrorTestpoint(fmt::format("tp_error_{}", name_), e.what());
134162
}
135163
}
136164
}
@@ -219,6 +247,16 @@ void Consumer::Seek(
219247
}).Get();
220248
}
221249

250+
void Consumer::MultiSeek(utils::span<const SeekParams> params, std::chrono::milliseconds timeout) const {
251+
UINVARIANT(processing_.load(), "Message processing is not currently started");
252+
253+
utils::Async(consumer_blocking_task_processor_, "consumer_multi_seek", [this, params, timeout] {
254+
ExtendCurrentSpan();
255+
256+
consumer_->MultiSeek(params, timeout);
257+
}).Get();
258+
}
259+
222260
void Consumer::SeekToBeginning(utils::zstring_view topic, std::uint32_t partition_id, std::chrono::milliseconds timeout)
223261
const {
224262
UINVARIANT(processing_.load(), "Message processing is not currently started");

kafka/src/kafka/impl/consumer_impl.cpp

Lines changed: 61 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include <userver/tracing/span.hpp>
1515
#include <userver/utils/encoding/hex.hpp>
1616
#include <userver/utils/fast_scope_guard.hpp>
17+
#include <userver/utils/numeric_cast.hpp>
1718
#include <userver/utils/span.hpp>
1819

1920
#include <kafka/impl/error_buffer.hpp>
@@ -310,6 +311,7 @@ void ConsumerImpl::LogCallback(const char* facility, const char* message, int lo
310311
void ConsumerImpl::RebalanceCallback(rd_kafka_resp_err_t err, const rd_kafka_topic_partition_list_t* partitions) {
311312
tracing::Span span{"rebalance_callback"};
312313
span.AddTag("kafka_callback", "rebalance_callback");
314+
span.AddTag("kafka_consumer", name_);
313315

314316
LOG(execution_params_.operation_log_level,
315317
"Consumer group rebalanced ('{}' protocol)",
@@ -729,6 +731,20 @@ void ConsumerImpl::Seek(
729731
SeekToOffset(topic, partition_id, static_cast<std::int64_t>(offset), timeout);
730732
}
731733

734+
void ConsumerImpl::MultiSeek(utils::span<const SeekParams> params, std::chrono::milliseconds timeout) const {
735+
if (params.empty()) {
736+
return;
737+
}
738+
for (const auto& p : params) {
739+
if (p.offset > static_cast<std::uint64_t>(std::numeric_limits<std::int64_t>::max())) {
740+
throw SeekInvalidArgumentException(
741+
fmt::format("Offset value have to be <= std::int64_t::max(). offset: {}", p.offset)
742+
);
743+
}
744+
}
745+
SeekToOffsets(params, timeout);
746+
}
747+
732748
void ConsumerImpl::SeekToOffset(
733749
utils::zstring_view topic,
734750
std::uint32_t partition_id,
@@ -768,13 +784,7 @@ void ConsumerImpl::SeekToOffset(
768784
const auto* err =
769785
rd_kafka_seek_partitions(consumer_.GetHandle(), topic_partitions_list.GetHandle(), ToRdKafkaTimeout(deadline));
770786
if (err == nullptr) {
771-
LOG_INFO(
772-
"Seeked to offset: {}"
773-
" for partition: {} topic: {} successfully",
774-
offset,
775-
partition_id,
776-
topic
777-
);
787+
LOG_INFO("Seeked to offset: {} for partition: {} topic: {} successfully", offset, partition_id, topic);
778788
return;
779789
}
780790

@@ -786,6 +796,50 @@ void ConsumerImpl::SeekToOffset(
786796
));
787797
}
788798

799+
void ConsumerImpl::SeekToOffsets(utils::span<const SeekParams> params, std::chrono::milliseconds timeout) const {
800+
if (params.empty()) {
801+
return;
802+
}
803+
if (timeout.count() <= 0) {
804+
throw SeekInvalidArgumentException(fmt::format("Timeout value have to be > 0. value: {}ms", timeout.count()));
805+
}
806+
807+
const auto deadline = engine::Deadline::FromDuration(timeout);
808+
809+
// Seek is local in librdkafka (next_fetch_position). No poll needed; next Fetch uses new position.
810+
TopicPartitionsListHolder topic_partitions_list{
811+
rd_kafka_topic_partition_list_new(utils::numeric_cast<int>(params.size()))
812+
};
813+
for (const auto& p : params) {
814+
rd_kafka_topic_partition_t* part = rd_kafka_topic_partition_list_add(
815+
topic_partitions_list.GetHandle(),
816+
p.topic.c_str(),
817+
static_cast<std::int32_t>(p.partition_id)
818+
);
819+
part->offset = static_cast<std::int64_t>(p.offset);
820+
}
821+
822+
PrintTopicPartitionsList(topic_partitions_list.GetHandle(), [](const rd_kafka_topic_partition_t& partition) {
823+
return fmt::format(
824+
"Partition {} for topic '{}' seeking to offset: {}",
825+
partition.partition,
826+
partition.topic,
827+
partition.offset
828+
);
829+
});
830+
831+
const auto* err =
832+
rd_kafka_seek_partitions(consumer_.GetHandle(), topic_partitions_list.GetHandle(), ToRdKafkaTimeout(deadline));
833+
if (err == nullptr) {
834+
LOG_INFO("MultiSeek: seeked {} partition(s) successfully", params.size());
835+
return;
836+
}
837+
838+
throw SeekException(
839+
fmt::format("Failed to multi-seek {} partition(s). err: {}", params.size(), rd_kafka_error_string(err))
840+
);
841+
}
842+
789843
void ConsumerImpl::SeekToEnd(utils::zstring_view topic, std::uint32_t partition_id, std::chrono::milliseconds timeout)
790844
const {
791845
SeekToOffset(topic, partition_id, RD_KAFKA_OFFSET_END, timeout);

kafka/src/kafka/impl/consumer_impl.hpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <userver/kafka/message.hpp>
1313
#include <userver/kafka/offset_range.hpp>
1414
#include <userver/kafka/rebalance_types.hpp>
15+
#include <userver/utils/span.hpp>
1516
#include <userver/utils/zstring_view.hpp>
1617

1718
#include <kafka/impl/holders_aliases.hpp>
@@ -76,6 +77,11 @@ class ConsumerImpl final {
7677
/// @brief Seeks the partition ID for the specified \b topic to the end offset .
7778
void SeekToEnd(utils::zstring_view topic, std::uint32_t partition_id, std::chrono::milliseconds timeout) const;
7879

80+
/// @brief Seeks multiple topic partitions to the given offsets in one call.
81+
/// @param params Topic, partition_id and offset for each seek.
82+
/// @param timeout Timeout for the operation.
83+
void MultiSeek(utils::span<const SeekParams> params, std::chrono::milliseconds timeout) const;
84+
7985
/// @brief Effectively calls `PollMessage` until `deadline` is reached
8086
/// and no more than `max_batch_size` messages polled.
8187
MessageBatch PollBatch(std::size_t max_batch_size, engine::Deadline deadline);
@@ -146,6 +152,9 @@ class ConsumerImpl final {
146152
std::chrono::milliseconds timeout
147153
) const;
148154

155+
/// @brief Seeks multiple topic partitions to the given offsets (internal, expects validated inputs).
156+
void SeekToOffsets(utils::span<const SeekParams> params, std::chrono::milliseconds timeout) const;
157+
149158
/// @brief Callback which is called after succeeded/failed commit.
150159
/// Currently, used for logging purposes.
151160
void OffsetCommitCallback(rd_kafka_resp_err_t err, const rd_kafka_topic_partition_list_s* committed_offsets) const;

0 commit comments

Comments
 (0)