Skip to content

Commit 61fee0e

Browse files
committed
rgw/multisite/datalog: Decrement with grace period
This guards against an excess decrement in the sequence: RGW_a: Fetch sem_set (see key 'foo') RGW_b: run renew_entry ('foo' is no longer in `cur_cycle`) RGW_a: notify (does not see 'foo' in response) RGW_a: Decrements 'foo' Signed-off-by: Adam Emerson <[email protected]>
1 parent b228673 commit 61fee0e

File tree

3 files changed

+19
-3
lines changed

3 files changed

+19
-3
lines changed

src/cls/sem_set/DESIGN.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@ after a write, the data log entry might never be made.
5757
respond with bs1 in their `cur_cycle`, bs1 will be decremented
5858
thrice)
5959
6. For each entry in the unordered map, decrement on the semaphore
60-
object only if the object's count is greater than 0.
60+
object only if the object's count is greater than 0. Send a
61+
grace period corresponding to the length of time since fetch
62+
times a fudge factor.
6163
7. If the `notify` operation errors, don't decrement anything.
6264
* Have some task call `compress` on a regular basis (Daily? Hourly?),
6365
to keep seldom used or deleted bucket shards from slowing down

src/rgw/driver/rados/rgw_datalog.cc

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1466,6 +1466,11 @@ asio::awaitable<void> RGWDataChangesLog::renew_run(decltype(renew_signal)) {
14661466
++run;
14671467
}
14681468

1469+
if (ceph::mono_clock::now() - last_recovery < 6h) {
1470+
co_await recover(&dp, recovery_signal);
1471+
};
1472+
1473+
14691474
int interval = cct->_conf->rgw_data_log_window * 3 / 4;
14701475
renew_timer->expires_after(std::chrono::seconds(interval));
14711476
co_await renew_timer->async_wait(asio::use_awaitable);
@@ -1676,6 +1681,7 @@ RGWDataChangesLog::gather_working_sets(
16761681
asio::awaitable<void>
16771682
RGWDataChangesLog::decrement_sems(
16781683
int index,
1684+
ceph::mono_time fetch_time,
16791685
bc::flat_map<std::string, uint64_t>&& semcount)
16801686
{
16811687
namespace sem_set = neorados::cls::sem_set;
@@ -1686,9 +1692,10 @@ RGWDataChangesLog::decrement_sems(
16861692
batch.insert(iter->first);
16871693
semcount.erase(std::move(iter));
16881694
}
1695+
auto grace = ((ceph::mono_clock::now() - fetch_time) * 4) / 3;
16891696
co_await rados->execute(
16901697
get_sem_set_oid(index), loc, neorados::WriteOp{}.exec(
1691-
sem_set::decrement(std::move(batch))),
1698+
sem_set::decrement(std::move(batch), grace)),
16921699
asio::use_awaitable);
16931700
}
16941701
}
@@ -1700,6 +1707,7 @@ RGWDataChangesLog::recover_shard(const DoutPrefixProvider* dpp, int index)
17001707
do {
17011708
bc::flat_map<std::string, uint64_t> semcount;
17021709

1710+
auto fetch_time = ceph::mono_clock::now();
17031711
// Gather entries in the shard
17041712
std::tie(semcount, cursor) = co_await read_sems(index, std::move(cursor));
17051713
// If we have none, no point doing the rest
@@ -1727,7 +1735,7 @@ RGWDataChangesLog::recover_shard(const DoutPrefixProvider* dpp, int index)
17271735
<< "failed, skipping decrement" << dendl;
17281736
continue;
17291737
}
1730-
co_await decrement_sems(index, std::move(semcount));
1738+
co_await decrement_sems(index, fetch_time, std::move(semcount));
17311739
} while (!cursor.empty());
17321740
co_return;
17331741
}
@@ -1747,6 +1755,10 @@ asio::awaitable<void> RGWDataChangesLog::recover(const DoutPrefixProvider* dpp,
17471755
co_await group.wait();
17481756
}(dpp),
17491757
asio::use_awaitable);
1758+
1759+
std::unique_lock l(lock);
1760+
last_recovery = ceph::mono_clock::now();
1761+
l.unlock();
17501762
}
17511763

17521764
void RGWDataChangesLogInfo::dump(Formatter *f) const

src/rgw/driver/rados/rgw_datalog.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,7 @@ class RGWDataChangesLog {
367367
std::make_shared<asio::cancellation_signal>();
368368
std::shared_ptr<asio::cancellation_signal> recovery_signal =
369369
std::make_shared<asio::cancellation_signal>();
370+
ceph::mono_time last_recovery = ceph::mono_clock::zero();
370371

371372
const int num_shards;
372373
std::string get_prefix() { return "data_log"; }
@@ -524,6 +525,7 @@ class RGWDataChangesLog {
524525
bc::flat_map<std::string, uint64_t>& semcount);
525526
asio::awaitable<void>
526527
decrement_sems(int index,
528+
ceph::mono_time fetch_time,
527529
bc::flat_map<std::string, uint64_t>&& semcount);
528530
asio::awaitable<void> recover_shard(const DoutPrefixProvider* dpp, int index);
529531
asio::awaitable<void> recover(const DoutPrefixProvider* dpp,

0 commit comments

Comments
 (0)