2929
3030#include " fmt/format.h"
3131
32+ #define dout_subsys ceph_subsys_rgw
3233namespace file ::listing {
3334
3435namespace bi = boost::intrusive;
@@ -281,7 +282,7 @@ struct BucketCache : public Notifiable
281282
282283 typedef std::tuple<BucketCacheEntry<D, B>*, uint32_t > GetBucketResult;
283284
284- GetBucketResult get_bucket (const std::string& name, uint32_t flags)
285+ GetBucketResult get_bucket (const DoutPrefixProvider* dpp, const std::string& name, uint32_t flags)
285286 {
286287 /* this fn returns a bucket locked appropriately, having atomically
287288 * found or inserted the required BucketCacheEntry in_avl*/
@@ -309,10 +310,11 @@ struct BucketCache : public Notifiable
309310 } else {
310311 /* BucketCacheEntry not in cache */
311312 if (! (flags & BucketCache<D, B>::FLAG_CREATE)) {
312- /* the caller does not want to instantiate a new cache
313+ /* the caller does not want to instantiate a new cache
313314 * entry (i.e., only wants to notify on an existing one) */
314- return result;
315- }
315+ lat.lock ->unlock ();
316+ return result;
317+ }
316318 /* we need to create it */
317319 b = static_cast <BucketCacheEntry<D, B>*>(
318320 lru.insert (&fac, cohort::lru::Edge::MRU, iflags));
@@ -399,7 +401,7 @@ struct BucketCache : public Notifiable
399401
400402 int rc __attribute__ ((unused)) = 0 ;
401403 GetBucketResult gbr =
402- get_bucket (sal_bucket->get_name (),
404+ get_bucket (dpp, sal_bucket->get_name (),
403405 BucketCache<D, B>::FLAG_LOCK | BucketCache<D, B>::FLAG_CREATE);
404406 auto [b /* BucketCacheEntry */ , flags] = gbr;
405407 if (b /* XXX again, can this fail? */ ) {
@@ -450,6 +452,10 @@ struct BucketCache : public Notifiable
450452 } else {
451453 /* position at start of index */
452454 auto rc = cursor.get (key, data, MDB_FIRST);
455+ if (rc == MDB_NOTFOUND) {
456+ /* no initial key */
457+ return 0 ;
458+ }
453459 if (rc == MDB_SUCCESS) {
454460 proc_result ();
455461 }
@@ -472,12 +478,12 @@ struct BucketCache : public Notifiable
472478 using namespace LMDBSafe ;
473479
474480 int rc{0 };
475- GetBucketResult gbr = get_bucket (bname, BucketCache<D, B>::FLAG_LOCK);
481+ GetBucketResult gbr = get_bucket (nullptr , bname, BucketCache<D, B>::FLAG_LOCK);
476482 auto [b /* BucketCacheEntry */ , flags] = gbr;
477483 if (b) {
478484 unique_lock ulk{b->mtx , std::adopt_lock};
479485 if ((b->name != bname) ||
480- (b != opaque) ||
486+ (opaque && ( b != opaque) ) ||
481487 (! (b->flags & BucketCacheEntry<D, B>::FLAG_FILLED))) {
482488 /* do nothing */
483489 return 0 ;
@@ -544,6 +550,78 @@ struct BucketCache : public Notifiable
544550 return rc;
545551 } /* notify */
546552
553+ int add_entry (const DoutPrefixProvider* dpp, std::string bname, rgw_bucket_dir_entry bde) {
554+ using namespace LMDBSafe ;
555+
556+ GetBucketResult gbr = get_bucket (dpp, bname, BucketCache<D, B>::FLAG_LOCK);
557+ auto [b /* BucketCacheEntry */ , flags] = gbr;
558+ if (b) {
559+ unique_lock ulk{b->mtx , std::adopt_lock};
560+ ulk.unlock ();
561+
562+ auto txn = b->env ->getRWTransaction ();
563+ auto concat_k = concat_key (bde.key );
564+ std::string ser_data;
565+ zpp::bits::out out (ser_data);
566+ struct timespec ts {
567+ ceph::real_clock::to_timespec (bde.meta.mtime)
568+ };
569+ auto errc =
570+ out (bde.key .name , bde.key .instance , /* XXX bde.key.ns, */
571+ bde.ver .pool , bde.ver .epoch , bde.exists , bde.meta .category ,
572+ bde.meta .size , ts.tv_sec , ts.tv_nsec , bde.meta .owner ,
573+ bde.meta .owner_display_name , bde.meta .accounted_size ,
574+ bde.meta .storage_class , bde.meta .appendable , bde.meta .etag );
575+ if (errc.code != std::errc{0 }) {
576+ abort ();
577+ }
578+ txn->put (b->dbi , concat_k, ser_data);
579+
580+ txn->commit ();
581+ lru.unref (b, cohort::lru::FLAG_NONE);
582+ } /* b */
583+
584+ return 0 ;
585+ } /* add_entry */
586+
587+ int remove_entry (const DoutPrefixProvider* dpp, std::string bname, cls_rgw_obj_key key) {
588+ using namespace LMDBSafe ;
589+
590+ GetBucketResult gbr = get_bucket (dpp, bname, BucketCache<D, B>::FLAG_LOCK);
591+ auto [b /* BucketCacheEntry */ , flags] = gbr;
592+ if (b) {
593+ unique_lock ulk{b->mtx , std::adopt_lock};
594+ ulk.unlock ();
595+
596+ auto txn = b->env ->getRWTransaction ();
597+ auto concat_k = concat_key (key);
598+ txn->del (b->dbi , concat_k);
599+ txn->commit ();
600+
601+ lru.unref (b, cohort::lru::FLAG_NONE);
602+ } /* b */
603+
604+ return 0 ;
605+ } /* remove_entry */
606+
607+ int invalidate_bucket (const DoutPrefixProvider* dpp, std::string bname) {
608+ using namespace LMDBSafe ;
609+
610+ GetBucketResult gbr = get_bucket (dpp, bname, BucketCache<D, B>::FLAG_LOCK);
611+ auto [b /* BucketCacheEntry */ , flags] = gbr;
612+ if (b) {
613+ unique_lock ulk{b->mtx , std::adopt_lock};
614+
615+ auto txn = b->env ->getRWTransaction ();
616+ mdb_drop (*txn, b->dbi , 0 );
617+ txn->commit ();
618+ b->flags &= ~BucketCacheEntry<D, B>::FLAG_FILLED;
619+
620+ ulk.unlock ();
621+ } /* b */
622+
623+ return 0 ;
624+ } /* invalidate_bucket */
547625}; /* BucketCache */
548626
549627} // namespace file::listing
0 commit comments