Skip to content

Commit 695cbaf

Browse files
authored
Update helio to pull in epoll socket fix (#5974)
* Update helio to pull in epoll socket fix * server/core: remove queue task index Queue task index added in #5716 has since been removed from helio in romange/helio#476 The call sites are adjusted to previous version in this changeset. Signed-off-by: Abhijat Malviya <[email protected]>
1 parent f0bc353 commit 695cbaf

File tree

8 files changed

+19
-21
lines changed

8 files changed

+19
-21
lines changed

src/core/task_queue.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class TaskQueue {
3939
util::detail::ResultMover<ResultType> mover;
4040

4141
++blocked_submitters_;
42-
Add([&mover, f = std::forward<F>(f), done](unsigned) mutable {
42+
Add([&mover, f = std::forward<F>(f), done]() mutable {
4343
mover.Apply(f);
4444
done.Notify();
4545
});

src/server/debugcmd.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -972,7 +972,7 @@ void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t num_of_keys,
972972
++index;
973973

974974
if (shard_batch.sz == 32) {
975-
ess.Add(sid, [this, index, options, shard_batch](unsigned) {
975+
ess.Add(sid, [this, index, options, shard_batch]() {
976976
DoPopulateBatch(options, shard_batch);
977977
if (index % 50 == 0) {
978978
ThisFiber::Yield();

src/server/engine_shard_set.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ class EngineShardSet {
9999
template <typename U> void AwaitRunningOnShardQueue(U&& func) {
100100
util::fb2::BlockingCounter bc(size_);
101101
for (size_t i = 0; i < size_; ++i) {
102-
Add(i, [&func, bc](unsigned) mutable {
102+
Add(i, [&func, bc]() mutable {
103103
func(EngineShard::tlocal());
104104
bc->Dec();
105105
});

src/server/multi_command_squasher.cc

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) {
256256
atomic_uint64_t max_sched_cycles{0}, max_exec_cycles{0};
257257
base::SpinLock lock;
258258
uint64_t fiber_running_cycles{0}, proactor_running_cycles{0};
259-
uint32_t max_sched_thread_id{0}, max_sched_seq_num{0}, max_task_index{0};
259+
uint32_t max_sched_thread_id{0}, max_sched_seq_num{0};
260260
vector<string> past_fibers;
261261

262262
// Atomic transactions (that have all keys locked) perform hops and run squashed commands via
@@ -273,7 +273,7 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) {
273273

274274
// Saves work in case logging is disable (i.e. log_squash_threshold_cached is high).
275275
const uint64_t min_threshold_cycles = CycleClock::FromUsec(log_squash_threshold_cached / 5);
276-
auto cb = [&, bc, rb](unsigned task_index) mutable {
276+
auto cb = [&, bc, rb]() mutable {
277277
uint64_t sched_time = CycleClock::Now() - start;
278278

279279
// Update max_sched_cycles in lock-free fashion, to avoid contention
@@ -291,7 +291,6 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) {
291291
max_sched_thread_id = ProactorBase::me()->GetPoolIndex();
292292
max_sched_seq_num = fb2::GetFiberRunSeq();
293293
past_fibers = fb2::GetPastFiberNames();
294-
max_task_index = task_index;
295294
}
296295
break;
297296
}
@@ -356,10 +355,10 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) {
356355

357356
LOG_EVERY_T(INFO, 0.1)
358357
<< "Squashed " << order_.size() << " commands. "
359-
<< "Total/Fanout/MaxSchedTime/ThreadCbTime/ThreadId/FiberCbTime/FiberSeq/TaskSeq/"
358+
<< "Total/Fanout/MaxSchedTime/ThreadCbTime/ThreadId/FiberCbTime/FiberSeq/"
360359
<< "MaxExecTime: " << total_usec << "/" << num_shards_ << "/" << max_sched_usec << "/"
361360
<< proactor_running_usec << "/" << max_sched_thread_id << "/" << fiber_running_usec << "/"
362-
<< "/" << max_sched_seq_num << "/" << max_task_index << "/" << max_exec_usec
361+
<< "/" << max_sched_seq_num << "/" << max_exec_usec
363362
<< "\n past fibers: " << absl::StrJoin(past_fibers, ", ")
364363
<< "\ncoordinator thread running time: "
365364
<< CycleClock::ToUsec(ProactorBase::me()->GetCurrentBusyCycles());

src/server/rdb_load.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2107,7 +2107,7 @@ error_code RdbLoader::Load(io::Source* src) {
21072107
FlushShardAsync(i);
21082108

21092109
// Active database if not existed before.
2110-
shard_set->Add(i, [dbid](unsigned) { GetCurrentDbSlice().ActivateDb(dbid); });
2110+
shard_set->Add(i, [dbid] { GetCurrentDbSlice().ActivateDb(dbid); });
21112111
}
21122112

21132113
cur_db_index_ = dbid;
@@ -2202,7 +2202,7 @@ void RdbLoader::FinishLoad(absl::Time start_time, size_t* keys_loaded) {
22022202
FlushShardAsync(i);
22032203

22042204
// Send sentinel callbacks to ensure that all previous messages have been processed.
2205-
shard_set->Add(i, [bc](unsigned) mutable { bc->Dec(); });
2205+
shard_set->Add(i, [bc]() mutable { bc->Dec(); });
22062206
}
22072207
bc->Wait(); // wait for sentinels to report.
22082208
// Decrement local one if it exists
@@ -2523,7 +2523,7 @@ void RdbLoader::FlushShardAsync(ShardId sid) {
25232523
if (out_buf.empty())
25242524
return;
25252525

2526-
auto cb = [indx = this->cur_db_index_, this, ib = std::move(out_buf)](unsigned) {
2526+
auto cb = [indx = this->cur_db_index_, this, ib = std::move(out_buf)] {
25272527
auto& db_slice = GetCurrentDbSlice();
25282528

25292529
// Before we start loading, increment LoadInProgress.
@@ -2793,7 +2793,7 @@ bool RdbLoader::ShouldDiscardKey(std::string_view key, const ObjSettings& settin
27932793
void RdbLoader::LoadScriptFromAux(string&& body) {
27942794
ServerState* ss = ServerState::tlocal();
27952795
auto interpreter = ss->BorrowInterpreter();
2796-
absl::Cleanup clean = [ss, interpreter]() { ss->ReturnInterpreter(interpreter); };
2796+
absl::Cleanup clean = [ss, interpreter] { ss->ReturnInterpreter(interpreter); };
27972797

27982798
if (script_mgr_) {
27992799
auto res = script_mgr_->Insert(body, interpreter);

src/server/transaction.cc

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -778,7 +778,7 @@ void Transaction::ScheduleInternal() {
778778
shard_set->Add(unique_shard_id_, &Transaction::ScheduleBatchInShard);
779779
}
780780
} else {
781-
auto cb = [&schedule_ctx](unsigned) {
781+
auto cb = [&schedule_ctx] {
782782
if (!schedule_ctx.trans->ScheduleInShard(EngineShard::tlocal(),
783783
schedule_ctx.optimistic_execution)) {
784784
schedule_ctx.fail_cnt.fetch_add(1, memory_order_relaxed);
@@ -824,8 +824,7 @@ void Transaction::ScheduleInternal() {
824824
// See https://github.com/dragonflydb/dragonfly/issues/150 for more info.
825825
if (should_poll_execution.load(memory_order_relaxed)) {
826826
IterateActiveShards([](const auto& sd, auto i) {
827-
shard_set->Add(
828-
i, [](unsigned) { EngineShard::tlocal()->PollExecution("cancel_cleanup", nullptr); });
827+
shard_set->Add(i, [] { EngineShard::tlocal()->PollExecution("cancel_cleanup", nullptr); });
829828
});
830829
}
831830
InitTxTime(); // update time for next scheduling attempt
@@ -855,7 +854,7 @@ void Transaction::UnlockMulti() {
855854
DCHECK_EQ(shard_data_.size(), shard_set->size());
856855
for (ShardId i = 0; i < shard_data_.size(); ++i) {
857856
vector<LockFp> fps = std::move(sharded_keys[i]);
858-
shard_set->Add(i, [this, fps = std::move(fps)](unsigned) {
857+
shard_set->Add(i, [this, fps = std::move(fps)] {
859858
this->UnlockMultiShardCb(fps, EngineShard::tlocal());
860859
intrusive_ptr_release(this);
861860
});
@@ -941,7 +940,7 @@ void Transaction::DispatchHop() {
941940

942941
use_count_.fetch_add(run_cnt, memory_order_relaxed); // for each pointer from poll_cb
943942

944-
auto poll_cb = [this](unsigned) {
943+
auto poll_cb = [this] {
945944
CHECK(namespace_ != nullptr);
946945
EngineShard::tlocal()->PollExecution("exec_cb", this);
947946
DVLOG(3) << "ptr_release " << DebugId();
@@ -1015,7 +1014,7 @@ void Transaction::ExpireBlocking(WaitKeys wkeys) {
10151014
DVLOG(1) << "ExpireBlocking " << DebugId();
10161015
run_barrier_.Start(unique_shard_cnt_);
10171016

1018-
auto expire_cb = [this, &wkeys](unsigned) {
1017+
auto expire_cb = [this, &wkeys] {
10191018
EngineShard* es = EngineShard::tlocal();
10201019
if (wkeys) {
10211020
IndexSlice is(0, 1);
@@ -1186,7 +1185,7 @@ bool Transaction::ScheduleInShard(EngineShard* shard, bool execute_optimistic) {
11861185
return true;
11871186
}
11881187

1189-
void Transaction::ScheduleBatchInShard(unsigned) {
1188+
void Transaction::ScheduleBatchInShard() {
11901189
EngineShard* shard = EngineShard::tlocal();
11911190
auto& stats = shard->stats();
11921191
stats.tx_batch_schedule_calls_total++;

src/server/transaction.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -515,7 +515,7 @@ class Transaction {
515515
bool ScheduleInShard(EngineShard* shard, bool execute_optimistic);
516516

517517
// Optimized extension of ScheduleInShard. Pulls several transactions queued for scheduling.
518-
static void ScheduleBatchInShard(unsigned);
518+
static void ScheduleBatchInShard();
519519

520520
// Set ARMED flags, start run barrier and submit poll tasks. Doesn't wait for the run barrier
521521
void DispatchHop();

0 commit comments

Comments
 (0)