@@ -2198,6 +2198,55 @@ inline int RGWLC::advance_head(const std::string& lc_shard,
21982198 return ret;
21992199} /* advance head */
22002200
2201+ inline int RGWLC::check_if_shard_done (const std::string& lc_shard,
2202+ rgw::sal::Lifecycle::LCHead& head, int worker_ix)
2203+ {
2204+ int ret{0 };
2205+
2206+ if (head.get_marker ().empty ()) {
2207+ /* done with this shard */
2208+ ldpp_dout (this , 5 ) <<
2209+ " RGWLC::process() next_entry not found. cycle finished lc_shard="
2210+ << lc_shard << " worker=" << worker_ix
2211+ << dendl;
2212+ head.set_shard_rollover_date (ceph_clock_now ());
2213+ ret = sal_lc->put_head (lc_shard, head);
2214+ if (ret < 0 ) {
2215+ ldpp_dout (this , 0 ) << " RGWLC::process() failed to put head "
2216+ << lc_shard
2217+ << dendl;
2218+ }
2219+ ret = 1 ; // to mark that shard is done
2220+ }
2221+ return ret;
2222+ }
2223+
2224+ inline int RGWLC::update_head (const std::string& lc_shard,
2225+ rgw::sal::Lifecycle::LCHead& head,
2226+ rgw::sal::Lifecycle::LCEntry& entry,
2227+ time_t start_date, int worker_ix)
2228+ {
2229+ int ret{0 };
2230+
2231+ ret = advance_head (lc_shard, head, entry, start_date);
2232+ if (ret != 0 ) {
2233+ ldpp_dout (this , 0 ) << " RGWLC::update_head() failed to advance head "
2234+ << lc_shard
2235+ << dendl;
2236+ goto exit;
2237+ }
2238+
2239+ ret = check_if_shard_done (lc_shard, head, worker_ix);
2240+ if (ret < 0 ) {
2241+ ldpp_dout (this , 0 ) << " RGWLC::update_head() failed to check if shard is done "
2242+ << lc_shard
2243+ << dendl;
2244+ }
2245+
2246+ exit:
2247+ return ret;
2248+ }
2249+
22012250int RGWLC::process (int index, int max_lock_secs, LCWorker* worker,
22022251 bool once = false )
22032252{
@@ -2280,27 +2329,13 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
22802329 ret = sal_lc->get_entry (lc_shard, head->get_marker (), &entry);
22812330 if (ret == -ENOENT) {
22822331 /* skip to next entry */
2283- std::unique_ptr<rgw::sal::Lifecycle::LCEntry> tmp_entry = sal_lc->get_entry ();
2284- tmp_entry->set_bucket (head->get_marker ());
2285- if (advance_head (lc_shard, *head.get (), *tmp_entry.get (), now) < 0 ) {
2286- goto exit;
2287- }
2288- /* done with this shard */
2289- if (head->get_marker ().empty ()) {
2290- ldpp_dout (this , 5 ) <<
2291- " RGWLC::process() next_entry not found. cycle finished lc_shard="
2292- << lc_shard << " worker=" << worker->ix
2293- << dendl;
2294- head->set_shard_rollover_date (ceph_clock_now ());
2295- ret = sal_lc->put_head (lc_shard, *head.get ());
2296- if (ret < 0 ) {
2297- ldpp_dout (this , 0 ) << " RGWLC::process() failed to put head "
2298- << lc_shard
2299- << dendl;
2300- }
2301- goto exit;
2302- }
2303- continue ;
2332+ std::unique_ptr<rgw::sal::Lifecycle::LCEntry> tmp_entry = sal_lc->get_entry ();
2333+ tmp_entry->set_bucket (head->get_marker ());
2334+
2335+ if (update_head (lc_shard, *head.get (), *tmp_entry.get (), now, worker->ix ) != 0 ) {
2336+ goto exit;
2337+ }
2338+ continue ;
23042339 }
23052340 if (ret < 0 ) {
23062341 ldpp_dout (this , 0 ) << " RGWLC::process() sal_lc->get_entry(lc_shard, head.marker, entry) "
@@ -2321,51 +2356,21 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
23212356 << " RGWLC::process(): ACTIVE entry: " << entry
23222357 << " index: " << index << " worker ix: " << worker->ix << dendl;
23232358 /* skip to next entry */
2324- if (advance_head (lc_shard, *head.get (), *entry.get (), now) < 0 ) {
2325- goto exit;
2326- }
2327- /* done with this shard */
2328- if (head->get_marker ().empty ()) {
2329- ldpp_dout (this , 5 ) <<
2330- " RGWLC::process() cycle finished lc_shard="
2331- << lc_shard << " worker=" << worker->ix
2332- << dendl;
2333- head->set_shard_rollover_date (ceph_clock_now ());
2334- ret = sal_lc->put_head (lc_shard, *head.get ());
2335- if (ret < 0 ) {
2336- ldpp_dout (this , 0 ) << " RGWLC::process() failed to put head "
2337- << lc_shard
2338- << dendl;
2339- }
2340- goto exit;
2359+ if (update_head (lc_shard, *head.get (), *entry.get (), now, worker->ix ) != 0 ) {
2360+ goto exit;
23412361 }
23422362 continue ;
23432363 }
23442364 } else {
23452365 if ((entry->get_status () == lc_complete) &&
23462366 already_run_today (cct, entry->get_start_time ())) {
2347- /* skip to next entry */
2348- if (advance_head (lc_shard, *head.get (), *entry.get (), now) < 0 ) {
2349- goto exit;
2350- }
23512367 ldpp_dout (this , 5 ) << " RGWLC::process() worker ix: " << worker->ix
23522368 << " SKIP processing for already-processed bucket " << entry->get_bucket ()
23532369 << dendl;
2354- /* done with this shard */
2355- if (head->get_marker ().empty ()) {
2356- ldpp_dout (this , 5 ) <<
2357- " RGWLC::process() cycle finished lc_shard="
2358- << lc_shard << " worker=" << worker->ix
2359- << dendl;
2360- head->set_shard_rollover_date (ceph_clock_now ());
2361- ret = sal_lc->put_head (lc_shard, *head.get ());
2362- if (ret < 0 ) {
2363- ldpp_dout (this , 0 ) << " RGWLC::process() failed to put head "
2364- << lc_shard
2365- << dendl;
2366- }
2367- goto exit;
2368- }
2370+ /* skip to next entry */
2371+ if (update_head (lc_shard, *head.get (), *entry.get (), now, worker->ix ) != 0 ) {
2372+ goto exit;
2373+ }
23692374 continue ;
23702375 }
23712376 }
@@ -2447,19 +2452,7 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
24472452 }
24482453 }
24492454
2450- /* done with this shard */
2451- if (head->get_marker ().empty ()) {
2452- ldpp_dout (this , 5 ) <<
2453- " RGWLC::process() cycle finished lc_shard="
2454- << lc_shard << " worker=" << worker->ix
2455- << dendl;
2456- head->set_shard_rollover_date (ceph_clock_now ());
2457- ret = sal_lc->put_head (lc_shard, *head.get ());
2458- if (ret < 0 ) {
2459- ldpp_dout (this , 0 ) << " RGWLC::process() failed to put head "
2460- << lc_shard
2461- << dendl;
2462- }
2455+ if (check_if_shard_done (lc_shard, *head.get (), worker->ix ) != 0 ) {
24632456 goto exit;
24642457 }
24652458 } while (1 && !once && !going_down ());
0 commit comments