Skip to content

Commit e08c812

Browse files
authored
Merge pull request ceph#55952 from yuvalif/wip-yuval-64710
rgw/kafka: set message timeout to 5 seconds reviewed-by: kchheda3
2 parents 36caa82 + 1c13850 commit e08c812

File tree

2 files changed

+19
-2
lines changed

2 files changed

+19
-2
lines changed

src/common/options/rgw.yaml.in

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3983,7 +3983,7 @@ options:
39833983
are sent to it for more than the time defined.
39843984
Note that the connection will not be considered idle, even if it is down,
39853985
as long as there are attempts to send messages to it.
3986-
default: 30
3986+
default: 300
39873987
services:
39883988
- rgw
39893989
with_legacy: true
@@ -3999,3 +3999,13 @@ options:
39993999
services:
40004000
- rgw
40014001
with_legacy: true
4002+
- name: rgw_kafka_message_timeout
4003+
type: uint
4004+
level: advanced
4005+
desc: This is the maximum time in milliseconds to deliver a message (including retries)
4006+
long_desc: Delivery error occurs when the message timeout is exceeded.
4007+
Value must be greater than zero, if set to zero, a value of 1 millisecond will be used.
4008+
default: 5000
4009+
services:
4010+
- rgw
4011+
with_legacy: true

src/rgw/rgw_kafka.cc

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,9 +210,16 @@ bool new_producer(connection_t* conn) {
210210
return false;
211211
}
212212

213+
// set message timeout
214+
// according to documentation, value of zero will expire the message based on retries.
215+
// however, testing with librdkafka v1.6.1 did not expire the message in that case. hence, a value of zero is changed to 1ms
216+
constexpr std::uint64_t min_message_timeout = 1;
217+
const auto message_timeout = std::max(min_message_timeout, conn->cct->_conf->rgw_kafka_message_timeout);
218+
if (rd_kafka_conf_set(conn->temp_conf, "message.timeout.ms",
219+
std::to_string(message_timeout).c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
213220
// get list of brokers based on the bootstrap broker
214221
if (rd_kafka_conf_set(conn->temp_conf, "bootstrap.servers", conn->broker.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
215-
222+
216223
if (conn->use_ssl) {
217224
if (!conn->user.empty()) {
218225
// use SSL+SASL

0 commit comments

Comments
 (0)