Skip to content

Commit 514c460

Browse files
author
jdillinger
committed
feat kafka: support group.instance.id option
<section id="quibbler-autodescription"> #### Поддержка статических членов группы потребителей Kafka 📝 - 🔄 Добавлена возможность использовать `{pod_name}` в `group_instance_id` для динамической подстановки имени пода, что позволяет корректно настраивать статические члены группы потребителей. - 🛠 Обновлена логика обработки `group_id` и `group_instance_id`, чтобы поддерживать подстановку имени пода из переменной окружения `YA_K8S_POD_NAME`. - 🧪 Обновлены конфигурации и тесты для корректной работы с новой функциональностью, включая добавление фикстур для эмуляции переменных окружения в тестах. - 📦 Расширена схема конфигурации Kafka, чтобы поддерживать новое поле `group_instance_id` и обновленное описание для `env_pod_name`. <a href="https://nda.ya.ru/t/qa0kX64r7DqvtN"><font size="2">Autodescription by Yandex Code Assistant</font></a> </section> commit_hash:cb19e17260acf07bdd50e89c05195a24e25fab64
1 parent 1cf12c0 commit 514c460

File tree

4 files changed

+50
-12
lines changed

4 files changed

+50
-12
lines changed

kafka/include/userver/kafka/impl/configuration.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ struct ConsumerConfiguration final {
5252
std::string group_id;
5353
std::string auto_offset_reset{"smallest"};
5454
std::optional<std::string> env_pod_name{};
55+
std::optional<std::string> group_instance_id{};
5556
std::chrono::milliseconds max_callback_duration{300000};
5657

5758
RdKafkaOptions rd_kafka_options;

kafka/src/kafka/consumer_component.yaml

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,15 @@ properties:
1818
with the client instance which made it,
1919
which can be helpful in debugging and troubleshooting scenarios.
2020
default: userver
21+
group_instance_id:
22+
type: string
23+
description: |
24+
Enable static group membership. Static group members are able
25+
to leave and rejoin a group within the configured session.timeout.ms
26+
without prompting a group rebalance. This should be used in combination
27+
with a larger session.timeout.ms to avoid group rebalances caused by
28+
transient unavailability (e.g. process restarts)
29+
default: none
2130
topics:
2231
type: array
2332
description: list of topics consumer subscribes
@@ -26,13 +35,11 @@ properties:
2635
description: topic name
2736
max_batch_size:
2837
type: integer
29-
description: maximum number of messages consumer waits for new messages before
30-
calling a callback
38+
description: maximum number of messages consumer waits for new messages before calling a callback
3139
default: 1
3240
poll_timeout:
3341
type: string
34-
description: maximum amount of time consumer waits for new messages before
35-
calling a callback
42+
description: maximum amount of time consumer waits for new messages before calling a callback
3643
default: 1s
3744
message_key_log_format:
3845
type: string
@@ -78,10 +85,9 @@ properties:
7885
env_pod_name:
7986
type: string
8087
description: |
81-
if defined and `group_id` value contains
82-
`{pod_name}` substring, the substring
83-
is replaced with the value of the environment
84-
variable `env_pod_name`
88+
if defined and `group_id` or `group_instance_id` value contains
89+
`{pod_name}` substring, the substring is replaced with
90+
the value of the environment variable `env_pod_name`
8591
default: none
8692
security_protocol:
8793
type: string

kafka/src/kafka/impl/configuration.cpp

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ std::string ResolveGroupId(const ConsumerConfiguration& configuration) {
8484
}
8585

8686
const auto pos = group_id.find(kPodNameSubstr);
87-
if (group_id.find(kPodNameSubstr) != std::string::npos) {
87+
if (pos != std::string::npos) {
8888
const auto pod_name = ResolvePodName(*configuration.env_pod_name);
8989

9090
return group_id.replace(pos, kPodNameSubstr.size(), pod_name);
@@ -93,6 +93,28 @@ std::string ResolveGroupId(const ConsumerConfiguration& configuration) {
9393
return group_id;
9494
}
9595

96+
std::optional<std::string> ResolveGroupInstanceId(const ConsumerConfiguration& configuration) {
97+
static constexpr std::string_view kPodNameSubstr{"{pod_name}"};
98+
99+
if (!configuration.group_instance_id.has_value()) {
100+
return std::nullopt;
101+
}
102+
auto group_instance_id{configuration.group_instance_id.value()};
103+
104+
if (!configuration.env_pod_name.has_value()) {
105+
return std::nullopt;
106+
}
107+
108+
const auto pos = group_instance_id.find(kPodNameSubstr);
109+
if (pos != std::string::npos) {
110+
const auto pod_name = ResolvePodName(*configuration.env_pod_name);
111+
112+
return group_instance_id.replace(pos, kPodNameSubstr.size(), pod_name);
113+
}
114+
115+
return group_instance_id;
116+
}
117+
96118
} // namespace
97119

98120
CommonConfiguration Parse(const yaml_config::YamlConfig& config, formats::parse::To<CommonConfiguration>) {
@@ -169,6 +191,7 @@ ConsumerConfiguration Parse(const yaml_config::YamlConfig& config, formats::pars
169191
if (config.HasMember(kEnvPodNameField)) {
170192
consumer.env_pod_name = config[kEnvPodNameField].As<std::string>();
171193
}
194+
consumer.group_instance_id = config["group_instance_id"].As<std::optional<std::string>>(std::nullopt);
172195

173196
return consumer;
174197
}
@@ -327,10 +350,19 @@ void Configuration::SetRdKafka(const RdKafkaOptions& rd_kafka_options) {
327350
void Configuration::SetConsumer(const ConsumerConfiguration& configuration) {
328351
const auto group_id = ResolveGroupId(configuration);
329352
UINVARIANT(!group_id.empty(), "Consumer group_id must not be empty");
353+
const auto group_instance_id = ResolveGroupInstanceId(configuration);
330354

331-
LOG_INFO("Consumer '{}' is going to join group '{}'", name_, group_id);
355+
LOG_INFO(
356+
"Consumer '{}' is going to join group '{}' as instance '{}'",
357+
name_,
358+
group_id,
359+
group_instance_id.value_or("<chosen by broker>")
360+
);
332361

333362
SetOption("group.id", group_id);
363+
if (group_instance_id.has_value()) {
364+
SetOption("group.instance.id", group_instance_id.value());
365+
}
334366
SetOption("enable.auto.commit", "false");
335367
SetOption("auto.offset.reset", configuration.auto_offset_reset);
336368
SetOption("max.poll.interval.ms", configuration.max_callback_duration);

kafka/src/kafka/producer_component.yaml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,7 @@ properties:
4747
default: 1000000
4848
message_send_max_retries:
4949
type: integer
50-
description: maximum number of send request retries until `delivery_timeout`
51-
reached
50+
description: maximum number of send request retries until `delivery_timeout` reached
5251
default: 2147483647
5352
retry_backoff:
5453
type: string

0 commit comments

Comments
 (0)