@@ -47,6 +47,8 @@ ABSL_DECLARE_FLAG(string, dir);
4747ABSL_DECLARE_FLAG (string, dbfilename);
4848ABSL_DECLARE_FLAG (bool , df_snapshot_format);
4949
50+ ABSL_FLAG (bool , background_debug_jobs, false , " Use background fibers for debug jobs" );
51+
5052namespace dfly {
5153
5254using namespace util ;
@@ -153,11 +155,10 @@ struct ObjHist {
153155};
154156
155157// Returns number of O(1) steps executed.
156- unsigned AddObjHist (PrimeIterator it, ObjHist* hist) {
158+ void AddObjHist (PrimeIterator it, ObjHist* hist) {
157159 using namespace container_utils ;
158160 const PrimeValue& pv = it->second ;
159161 size_t val_len = 0 ;
160- unsigned steps = 1 ;
161162
162163 auto per_entry_cb = [&](ContainerEntry entry) {
163164 if (entry.value ) {
@@ -166,7 +167,6 @@ unsigned AddObjHist(PrimeIterator it, ObjHist* hist) {
166167 } else {
167168 val_len += 8 ; // size of long
168169 }
169- ++steps;
170170 return true ;
171171 };
172172
@@ -194,7 +194,6 @@ unsigned AddObjHist(PrimeIterator it, ObjHist* hist) {
194194 } else if (pv.ObjType () == OBJ_HASH) {
195195 IterateMap (pv, [&](ContainerEntry key, ContainerEntry value) {
196196 hist->entry_len .Add (key.length + value.length );
197- steps += 2 ;
198197 return true ;
199198 });
200199 if (pv.Encoding () == kEncodingListPack ) {
@@ -212,8 +211,6 @@ unsigned AddObjHist(PrimeIterator it, ObjHist* hist) {
212211
213212 if (pv.ObjType () != OBJ_STRING && pv.ObjType () != OBJ_JSON)
214213 hist->card .Add (pv.Size ());
215-
216- return steps;
217214}
218215
219216// ObjType -> ObjHist
@@ -235,33 +232,6 @@ void MergeObjHistMap(ObjHistMap&& src, ObjHistMap* dest) {
235232 }
236233}
237234
238- void DoBuildObjHist (EngineShard* shard, ConnectionContext* cntx, ObjHistMap* obj_hist_map) {
239- auto & db_slice = cntx->ns ->GetDbSlice (shard->shard_id ());
240- unsigned steps = 0 ;
241-
242- for (unsigned i = 0 ; i < db_slice.db_array_size (); ++i) {
243- DbTable* dbt = db_slice.GetDBTable (i);
244- if (dbt == nullptr )
245- continue ;
246- PrimeTable::Cursor cursor;
247- do {
248- cursor = dbt->prime .Traverse (cursor, [&](PrimeIterator it) {
249- unsigned obj_type = it->second .ObjType ();
250- auto & hist_ptr = (*obj_hist_map)[obj_type];
251- if (!hist_ptr) {
252- hist_ptr.reset (new ObjHist);
253- }
254- steps += AddObjHist (it, hist_ptr.get ());
255- });
256-
257- if (steps >= 20000 ) {
258- steps = 0 ;
259- ThisFiber::Yield ();
260- }
261- } while (cursor);
262- }
263- }
264-
265235struct SegmentInfo {
266236 base::Histogram hist;
267237};
@@ -579,6 +549,46 @@ IOStat& IOStat::operator-=(const IOStat& other) {
579549 return *this ;
580550}
581551
552+ // Traverse over all entries on all databases, manage cpu time automatically
553+ template <typename F> void TraverseAllEntries (bool background, ConnectionContext* cntx, F&& f) {
554+ util::fb2::BlockingCounter bc{0 };
555+ for (uint32_t i = 0 ; i < shard_set->size (); ++i) {
556+ bc->Add (1 );
557+ util::ProactorBase* dest = shard_set->pool ()->at (i);
558+
559+ auto cb = [f /* copy per thread */ , bc, cntx, background]() mutable {
560+ auto * shard = EngineShard::tlocal ();
561+ auto & db_slice = cntx->ns ->GetDbSlice (shard->shard_id ());
562+
563+ for (unsigned i = 0 ; i < db_slice.db_array_size (); ++i) {
564+ boost::intrusive_ptr<DbTable> dbt = db_slice.CopyDBTablePtr (i);
565+ if (!dbt)
566+ continue ;
567+
568+ PrimeTable::Cursor cursor;
569+ do {
570+ cursor = dbt->prime .Traverse (cursor, f);
571+ if (background) {
572+ ThisFiber::Yield ();
573+ } else if (base::CycleClock::ToUsec (ThisFiber::GetRunningTimeCycles ()) >= 500 ) {
574+ ThisFiber::Yield ();
575+ }
576+ } while (cursor);
577+ }
578+ bc->Dec ();
579+ };
580+ dest->DispatchBrief ([cb, background]() mutable {
581+ using namespace util ::fb2;
582+ Fiber::Opts opts{
583+ .priority = background ? FiberPriority::BACKGROUND : FiberPriority::NORMAL,
584+ .name = " Debug/Traverse" ,
585+ };
586+ Fiber (opts, std::move (cb)).Detach ();
587+ });
588+ }
589+ bc->Wait ();
590+ }
591+
582592} // namespace
583593
584594DebugCmd::DebugCmd (ServerFamily* owner, cluster::ClusterFamily* cf, ConnectionContext* cntx)
@@ -1138,10 +1148,15 @@ void DebugCmd::TxAnalysis(facade::SinkReplyBuilder* builder) {
11381148
11391149void DebugCmd::ObjHist (facade::SinkReplyBuilder* builder) {
11401150 vector<ObjHistMap> obj_hist_map_arr (shard_set->size ());
1141-
1142- shard_set->RunBlockingInParallel ([&](EngineShard* shard) {
1143- DoBuildObjHist (shard, cntx_, &obj_hist_map_arr[shard->shard_id ()]);
1144- });
1151+ auto cb = [&obj_hist_map_arr](PrimeIterator it) {
1152+ unsigned obj_type = it->second .ObjType ();
1153+ auto & hist_ptr = obj_hist_map_arr[EngineShard::tlocal ()->shard_id ()][obj_type];
1154+ if (!hist_ptr) {
1155+ hist_ptr.reset (new struct ObjHist );
1156+ }
1157+ AddObjHist (it, hist_ptr.get ());
1158+ };
1159+ TraverseAllEntries (absl::GetFlag (FLAGS_background_debug_jobs), cntx_, cb);
11451160
11461161 for (size_t i = shard_set->size () - 1 ; i > 0 ; --i) {
11471162 MergeObjHistMap (std::move (obj_hist_map_arr[i]), &obj_hist_map_arr[0 ]);
0 commit comments