44#define dout_prefix *_dout << " ceph_dedup_daemon: " \
55 << __func__ << " : "
66
7- ceph::shared_mutex glock = ceph::make_shared_mutex(" glock" );
8- class SampleDedupWorkerThread ;
9- bool all_stop = false ; // Accessed in the main thread and in other worker threads under glock
10-
117po::options_description make_usage () {
128 po::options_description desc (" Usage" );
139 desc.add_options ()
@@ -235,15 +231,42 @@ class SampleDedupWorkerThread : public Thread
235231 };
236232
237233 struct SampleDedupGlobal {
238- FpStore fp_store;
239- const double sampling_ratio = -1 ;
234+ public:
240235 SampleDedupGlobal (
241236 size_t chunk_threshold,
242237 int sampling_ratio,
243238 uint32_t report_period,
244239 size_t fpstore_threshold) :
245240 fp_store (chunk_threshold, report_period, fpstore_threshold),
246241 sampling_ratio (static_cast <double >(sampling_ratio) / 100 ) { }
242+
243+ bool is_all_stop () {
244+ std::shared_lock l{glock};
245+ return all_stop;
246+ }
247+ static void set_all_stop () {
248+ std::unique_lock l{glock};
249+ all_stop = true ;
250+ }
251+ static void handle_signal (int signum)
252+ {
253+ switch (signum) {
254+ case SIGINT:
255+ case SIGTERM:
256+ set_all_stop ();
257+ dout (0 ) << " got a signal(" << signum << " ), daemon wil be terminiated" << dendl;
258+ break ;
259+
260+ default :
261+ ceph_abort_msgf (" unexpected signal %d" , signum);
262+ }
263+ }
264+ friend class SampleDedupWorkerThread ;
265+ private:
266+ FpStore fp_store;
267+ const double sampling_ratio = -1 ;
268+ inline static ceph::shared_mutex glock = ceph::make_shared_mutex(" glock" );
269+ inline static bool all_stop = false ; // Accessed in the main thread and in other worker threads under glock
247270 };
248271
249272 SampleDedupWorkerThread (
@@ -317,9 +340,7 @@ class SampleDedupWorkerThread : public Thread
317340void SampleDedupWorkerThread::crawl ()
318341{
319342 ObjectCursor current_object = begin;
320- std::shared_lock l{glock};
321- while (!all_stop && current_object < end) {
322- l.unlock ();
343+ while (!sample_dedup_global.is_all_stop () && current_object < end) {
323344 std::vector<ObjectItem> objects;
324345 // Get the list of object IDs to deduplicate
325346 std::tie (objects, current_object) = get_objects (current_object, end, 100 );
@@ -347,16 +368,12 @@ void SampleDedupWorkerThread::crawl()
347368 } else {
348369 try_dedup_and_accumulate_result (target);
349370 }
350- l.lock ();
351- if (all_stop) {
371+ if (sample_dedup_global.is_all_stop ()) {
352372 oid_for_evict.clear ();
353373 break ;
354374 }
355- l.unlock ();
356375 }
357- l.lock ();
358376 }
359- l.unlock ();
360377
361378 vector<AioCompRef> evict_completions (oid_for_evict.size ());
362379 int i = 0 ;
@@ -576,7 +593,7 @@ int SampleDedupWorkerThread::do_chunk_dedup(chunk_t &chunk, snap_t snap)
576593 return ret;
577594}
578595
579- int make_crawling_daemon (const po::variables_map &opts)
596+ int run_crawling_daemon (const po::variables_map &opts)
580597{
581598 string base_pool_name = get_opts_pool_name (opts);
582599 string chunk_pool_name = get_opts_chunk_pool (opts);
@@ -662,16 +679,14 @@ int make_crawling_daemon(const po::variables_map &opts)
662679 << " )"
663680 << dendl;
664681
665- std::shared_lock l (glock);
682+ SampleDedupWorkerThread::SampleDedupGlobal state (
683+ chunk_dedup_threshold, sampling_ratio, report_period, fp_threshold);
684+ ret = 0 ;
666685
667- while (!all_stop) {
668- l.unlock ();
686+ while (!state.is_all_stop ()) {
669687 ObjectCursor begin = io_ctx.object_list_begin ();
670688 ObjectCursor end = io_ctx.object_list_end ();
671689
672- SampleDedupWorkerThread::SampleDedupGlobal sample_dedup_global (
673- chunk_dedup_threshold, sampling_ratio, report_period, fp_threshold);
674-
675690 std::list<SampleDedupWorkerThread> threads;
676691 size_t total_size = 0 ;
677692 size_t total_duplicate_size = 0 ;
@@ -695,7 +710,7 @@ int make_crawling_daemon(const po::variables_map &opts)
695710 chunk_size,
696711 fp_algo,
697712 chunk_algo,
698- sample_dedup_global ,
713+ state ,
699714 snap);
700715 threads.back ().create (" sample_dedup" );
701716 }
@@ -724,31 +739,14 @@ int make_crawling_daemon(const po::variables_map &opts)
724739 return -EINVAL;
725740 }
726741
727- l.lock ();
728742 if (run_once) {
729- all_stop = true ;
743+ state. set_all_stop () ;
730744 break ;
731745 }
732746 }
733- l.unlock ();
734747
735748 dout (0 ) << " done" << dendl;
736- return 0 ;
737- }
738-
739- static void handle_signal (int signum)
740- {
741- std::unique_lock l{glock};
742- switch (signum) {
743- case SIGINT:
744- case SIGTERM:
745- all_stop = true ;
746- dout (0 ) << " got a signal(" << signum << " ), daemon wil be terminiated" << dendl;
747- break ;
748-
749- default :
750- ceph_abort_msgf (" unexpected signal %d" , signum);
751- }
749+ return ret;
752750}
753751
754752int main (int argc, const char **argv)
@@ -805,13 +803,17 @@ int main(int argc, const char **argv)
805803 }
806804
807805 init_async_signal_handler ();
808- register_async_signal_handler_oneshot (SIGINT, handle_signal);
809- register_async_signal_handler_oneshot (SIGTERM, handle_signal);
806+ register_async_signal_handler_oneshot (SIGINT,
807+ SampleDedupWorkerThread::SampleDedupGlobal::handle_signal);
808+ register_async_signal_handler_oneshot (SIGTERM,
809+ SampleDedupWorkerThread::SampleDedupGlobal::handle_signal);
810810
811- int ret = make_crawling_daemon (opts);
811+ int ret = run_crawling_daemon (opts);
812812
813- unregister_async_signal_handler (SIGINT, handle_signal);
814- unregister_async_signal_handler (SIGTERM, handle_signal);
813+ unregister_async_signal_handler (SIGINT,
814+ SampleDedupWorkerThread::SampleDedupGlobal::handle_signal);
815+ unregister_async_signal_handler (SIGTERM,
816+ SampleDedupWorkerThread::SampleDedupGlobal::handle_signal);
815817 shutdown_async_signal_handler ();
816818
817819 return forker.signal_exit (ret);
0 commit comments