@@ -2198,6 +2198,55 @@ inline int RGWLC::advance_head(const std::string& lc_shard,
21982198 return ret;
21992199} /* advance head */
22002200
2201+ inline int RGWLC::check_if_shard_done (const std::string& lc_shard,
2202+ rgw::sal::Lifecycle::LCHead& head, int worker_ix)
2203+ {
2204+ int ret{0 };
2205+
2206+ if (head.get_marker ().empty ()) {
2207+ /* done with this shard */
2208+ ldpp_dout (this , 5 ) <<
2209+ " RGWLC::process() next_entry not found. cycle finished lc_shard="
2210+ << lc_shard << " worker=" << worker_ix
2211+ << dendl;
2212+ head.set_shard_rollover_date (ceph_clock_now ());
2213+ ret = sal_lc->put_head (lc_shard, head);
2214+ if (ret < 0 ) {
2215+ ldpp_dout (this , 0 ) << " RGWLC::process() failed to put head "
2216+ << lc_shard
2217+ << dendl;
2218+ }
2219+ ret = 1 ; // to mark that shard is done
2220+ }
2221+ return ret;
2222+ }
2223+
2224+ inline int RGWLC::update_head (const std::string& lc_shard,
2225+ rgw::sal::Lifecycle::LCHead& head,
2226+ rgw::sal::Lifecycle::LCEntry& entry,
2227+ time_t start_date, int worker_ix)
2228+ {
2229+ int ret{0 };
2230+
2231+ ret = advance_head (lc_shard, head, entry, start_date);
2232+ if (ret != 0 ) {
2233+ ldpp_dout (this , 0 ) << " RGWLC::update_head() failed to advance head "
2234+ << lc_shard
2235+ << dendl;
2236+ goto exit;
2237+ }
2238+
2239+ ret = check_if_shard_done (lc_shard, head, worker_ix);
2240+ if (ret < 0 ) {
2241+ ldpp_dout (this , 0 ) << " RGWLC::update_head() failed to check if shard is done "
2242+ << lc_shard
2243+ << dendl;
2244+ }
2245+
2246+ exit:
2247+ return ret;
2248+ }
2249+
22012250int RGWLC::process (int index, int max_lock_secs, LCWorker* worker,
22022251 bool once = false )
22032252{
@@ -2279,13 +2328,14 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
22792328 /* fetches the entry pointed to by head.bucket */
22802329 ret = sal_lc->get_entry (lc_shard, head->get_marker (), &entry);
22812330 if (ret == -ENOENT) {
2282- ret = sal_lc->get_next_entry (lc_shard, head->get_marker (), &entry);
2283- if (ret < 0 ) {
2284- ldpp_dout (this , 0 ) << " RGWLC::process() sal_lc->get_next_entry(lc_shard, "
2285- << " head.marker, entry) returned error ret==" << ret
2286- << dendl;
2287- goto exit;
2288- }
2331+ /* skip to next entry */
2332+ std::unique_ptr<rgw::sal::Lifecycle::LCEntry> tmp_entry = sal_lc->get_entry ();
2333+ tmp_entry->set_bucket (head->get_marker ());
2334+
2335+ if (update_head (lc_shard, *head.get (), *tmp_entry.get (), now, worker->ix ) != 0 ) {
2336+ goto exit;
2337+ }
2338+ continue ;
22892339 }
22902340 if (ret < 0 ) {
22912341 ldpp_dout (this , 0 ) << " RGWLC::process() sal_lc->get_entry(lc_shard, head.marker, entry) "
@@ -2306,51 +2356,21 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
23062356 << " RGWLC::process(): ACTIVE entry: " << entry
23072357 << " index: " << index << " worker ix: " << worker->ix << dendl;
23082358 /* skip to next entry */
2309- if (advance_head (lc_shard, *head.get (), *entry.get (), now) < 0 ) {
2310- goto exit;
2311- }
2312- /* done with this shard */
2313- if (head->get_marker ().empty ()) {
2314- ldpp_dout (this , 5 ) <<
2315- " RGWLC::process() cycle finished lc_shard="
2316- << lc_shard << " worker=" << worker->ix
2317- << dendl;
2318- head->set_shard_rollover_date (ceph_clock_now ());
2319- ret = sal_lc->put_head (lc_shard, *head.get ());
2320- if (ret < 0 ) {
2321- ldpp_dout (this , 0 ) << " RGWLC::process() failed to put head "
2322- << lc_shard
2323- << dendl;
2324- }
2325- goto exit;
2359+ if (update_head (lc_shard, *head.get (), *entry.get (), now, worker->ix ) != 0 ) {
2360+ goto exit;
23262361 }
23272362 continue ;
23282363 }
23292364 } else {
23302365 if ((entry->get_status () == lc_complete) &&
23312366 already_run_today (cct, entry->get_start_time ())) {
2332- /* skip to next entry */
2333- if (advance_head (lc_shard, *head.get (), *entry.get (), now) < 0 ) {
2334- goto exit;
2335- }
23362367 ldpp_dout (this , 5 ) << " RGWLC::process() worker ix: " << worker->ix
23372368 << " SKIP processing for already-processed bucket " << entry->get_bucket ()
23382369 << dendl;
2339- /* done with this shard */
2340- if (head->get_marker ().empty ()) {
2341- ldpp_dout (this , 5 ) <<
2342- " RGWLC::process() cycle finished lc_shard="
2343- << lc_shard << " worker=" << worker->ix
2344- << dendl;
2345- head->set_shard_rollover_date (ceph_clock_now ());
2346- ret = sal_lc->put_head (lc_shard, *head.get ());
2347- if (ret < 0 ) {
2348- ldpp_dout (this , 0 ) << " RGWLC::process() failed to put head "
2349- << lc_shard
2350- << dendl;
2351- }
2352- goto exit;
2353- }
2370+ /* skip to next entry */
2371+ if (update_head (lc_shard, *head.get (), *entry.get (), now, worker->ix ) != 0 ) {
2372+ goto exit;
2373+ }
23542374 continue ;
23552375 }
23562376 }
@@ -2432,19 +2452,7 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
24322452 }
24332453 }
24342454
2435- /* done with this shard */
2436- if (head->get_marker ().empty ()) {
2437- ldpp_dout (this , 5 ) <<
2438- " RGWLC::process() cycle finished lc_shard="
2439- << lc_shard << " worker=" << worker->ix
2440- << dendl;
2441- head->set_shard_rollover_date (ceph_clock_now ());
2442- ret = sal_lc->put_head (lc_shard, *head.get ());
2443- if (ret < 0 ) {
2444- ldpp_dout (this , 0 ) << " RGWLC::process() failed to put head "
2445- << lc_shard
2446- << dendl;
2447- }
2455+ if (check_if_shard_done (lc_shard, *head.get (), worker->ix ) != 0 ) {
24482456 goto exit;
24492457 }
24502458 } while (1 && !once && !going_down ());
0 commit comments