Skip to content

Commit 1f2a5e3

Browse files
committed
rgw/kafka: make the connection idle and sleep timeouts and configurable
Fixes: https://tracker.ceph.com/issues/63901 Signed-off-by: Yuval Lifshitz <[email protected]>
1 parent 281de0a commit 1f2a5e3

File tree

2 files changed

+31
-11
lines changed

2 files changed

+31
-11
lines changed

src/common/options/rgw.yaml.in

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3965,3 +3965,27 @@ options:
39653965
see_also:
39663966
- rgw_bucket_counters_cache
39673967
with_legacy: true
3968+
- name: rgw_kafka_connection_idle
3969+
type: uint
3970+
level: advanced
3971+
desc: Time in seconds to delete idle kafka connections
3972+
long_desc: A conection will be considered "idle" if no messages
3973+
are sent to it for more than the time defined.
3974+
Note that the connection will not be considered idle, even if it is down,
3975+
as long as there are attempts to send messages to it.
3976+
default: 30
3977+
services:
3978+
- rgw
3979+
with_legacy: true
3980+
- name: rgw_kafka_sleep_timeout
3981+
type: uint
3982+
level: advanced
3983+
desc: Time in milliseconds to sleep while polling for kafka replies
3984+
long_desc: This will be used to prevent busy waiting for the kafka replies
3985+
As well as for the cases where the broker is down and we try to reconnect.
3986+
The same values times 3 will be used to sleep if there were no messages
3987+
sent or received across all kafka connections
3988+
default: 10
3989+
services:
3990+
- rgw
3991+
with_legacy: true

src/rgw/rgw_kafka.cc

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,6 @@ class Manager {
326326
const size_t max_connections;
327327
const size_t max_inflight;
328328
const size_t max_queue;
329-
const size_t max_idle_time;
330329
private:
331330
std::atomic<size_t> connection_count;
332331
bool stopped;
@@ -457,13 +456,15 @@ class Manager {
457456
conn_it = connections.begin();
458457
end_it = connections.end();
459458
}
459+
460+
const auto read_timeout = cct->_conf->rgw_kafka_sleep_timeout;
460461
// loop over all connections to read acks
461462
for (;conn_it != end_it;) {
462463

463464
auto& conn = conn_it->second;
464465

465466
// Checking the connection idleness
466-
if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) {
467+
if(conn->timestamp.sec() + conn->cct->_conf->rgw_kafka_connection_idle < ceph_clock_now()) {
467468
ldout(conn->cct, 20) << "kafka run: deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
468469
std::lock_guard lock(connections_lock);
469470
conn->status = STATUS_CONNECTION_IDLE;
@@ -488,15 +489,14 @@ class Manager {
488489
continue;
489490
}
490491

491-
reply_count += rd_kafka_poll(conn->producer, read_timeout_ms);
492+
reply_count += rd_kafka_poll(conn->producer, read_timeout);
492493

493494
// just increment the iterator
494495
++conn_it;
495496
}
496-
// if no messages were received or published
497-
// across all connection, sleep for 100ms
497+
// sleep if no messages were received or published across all connection
498498
if (send_count == 0 && reply_count == 0) {
499-
std::this_thread::sleep_for(std::chrono::milliseconds(100));
499+
std::this_thread::sleep_for(std::chrono::milliseconds(read_timeout*3));
500500
}
501501
}
502502
}
@@ -510,15 +510,12 @@ class Manager {
510510
Manager(size_t _max_connections,
511511
size_t _max_inflight,
512512
size_t _max_queue,
513-
int _read_timeout_ms,
514513
CephContext* _cct) :
515514
max_connections(_max_connections),
516515
max_inflight(_max_inflight),
517516
max_queue(_max_queue),
518-
max_idle_time(30),
519517
connection_count(0),
520518
stopped(false),
521-
read_timeout_ms(_read_timeout_ms),
522519
connections(_max_connections),
523520
messages(max_queue),
524521
queued(0),
@@ -673,14 +670,13 @@ static Manager* s_manager = nullptr;
673670
static const size_t MAX_CONNECTIONS_DEFAULT = 256;
674671
static const size_t MAX_INFLIGHT_DEFAULT = 8192;
675672
static const size_t MAX_QUEUE_DEFAULT = 8192;
676-
static const int READ_TIMEOUT_MS_DEFAULT = 500;
677673

678674
bool init(CephContext* cct) {
679675
if (s_manager) {
680676
return false;
681677
}
682678
// TODO: take conf from CephContext
683-
s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, READ_TIMEOUT_MS_DEFAULT, cct);
679+
s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, cct);
684680
return true;
685681
}
686682

0 commit comments

Comments
 (0)