diff --git a/include/iocore/cache/CacheDefs.h b/include/iocore/cache/CacheDefs.h index c5b3e304bf4..17bf581b53d 100644 --- a/include/iocore/cache/CacheDefs.h +++ b/include/iocore/cache/CacheDefs.h @@ -95,6 +95,7 @@ enum CacheEventType { CACHE_EVENT_SCAN_OPERATION_BLOCKED = CACHE_EVENT_EVENTS_START + 23, CACHE_EVENT_SCAN_OPERATION_FAILED = CACHE_EVENT_EVENTS_START + 24, CACHE_EVENT_SCAN_DONE = CACHE_EVENT_EVENTS_START + 25, + CACHE_EVENT_OPEN_DIR_RETRY = CACHE_EVENT_EVENTS_START + 26, ////////////////////////// // Internal error codes // ////////////////////////// diff --git a/src/iocore/cache/Cache.cc b/src/iocore/cache/Cache.cc index 295a4eadb8c..2fe3d3fc3f3 100644 --- a/src/iocore/cache/Cache.cc +++ b/src/iocore/cache/Cache.cc @@ -109,6 +109,28 @@ DbgCtl dbg_ctl_cache_init{"cache_init"}; DbgCtl dbg_ctl_cache_hosting{"cache_hosting"}; DbgCtl dbg_ctl_cache_update{"cache_update"}; +CacheVC * +new_CacheVC_for_read(Continuation *cont, const CacheKey *key, CacheHTTPHdr *request, const HttpConfigAccessor *params, + StripeSM *stripe) +{ + CacheVC *cache_vc = new_CacheVC(cont); + + cache_vc->first_key = *key; + cache_vc->key = *key; + cache_vc->earliest_key = *key; + cache_vc->stripe = stripe; + cache_vc->vio.op = VIO::READ; + cache_vc->op_type = static_cast(CacheOpType::Read); + cache_vc->frag_type = CACHE_FRAG_TYPE_HTTP; + cache_vc->params = params; + cache_vc->request.copy_shallow(request); + + ts::Metrics::Gauge::increment(cache_rsb.status[cache_vc->op_type].active); + ts::Metrics::Gauge::increment(stripe->cache_vol->vol_rsb.status[cache_vc->op_type].active); + + return cache_vc; +} + } // end anonymous namespace // Global list of the volumes created @@ -543,20 +565,24 @@ Cache::open_read(Continuation *cont, const CacheKey *key, CacheHTTPHdr *request, OpenDirEntry *od = nullptr; CacheVC *c = nullptr; + // Read-While-Writer + // This OpenDirEntry lookup doesn't need stripe mutex lock because OpenDir has own reader-writer lock + od = stripe->open_read(key); + if (od != nullptr) { + c = new_CacheVC_for_read(cont, key, request, params, stripe); + c->od = od; + cont->handleEvent(CACHE_EVENT_OPEN_READ_RWW, nullptr); + SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter); + if (c->handleEvent(EVENT_IMMEDIATE, nullptr) == EVENT_DONE) { + return ACTION_RESULT_DONE; + } + return &c->_action; + } + { CACHE_TRY_LOCK(lock, stripe->mutex, mutex->thread_holding); - if (!lock.is_locked() || (od = stripe->open_read(key)) || stripe->directory.probe(key, stripe, &result, &last_collision)) { - c = new_CacheVC(cont); - c->first_key = c->key = c->earliest_key = *key; - c->stripe = stripe; - c->vio.op = VIO::READ; - c->op_type = static_cast(CacheOpType::Read); - ts::Metrics::Gauge::increment(cache_rsb.status[c->op_type].active); - ts::Metrics::Gauge::increment(stripe->cache_vol->vol_rsb.status[c->op_type].active); - c->request.copy_shallow(request); - c->frag_type = CACHE_FRAG_TYPE_HTTP; - c->params = params; - c->od = od; + if (!lock.is_locked() || stripe->directory.probe(key, stripe, &result, &last_collision)) { + c = new_CacheVC_for_read(cont, key, request, params, stripe); } if (!lock.is_locked()) { SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead); @@ -566,9 +592,7 @@ Cache::open_read(Continuation *cont, const CacheKey *key, CacheHTTPHdr *request, if (!c) { goto Lmiss; } - if (c->od) { - goto Lwriter; - } + // hit c->dir = c->first_dir = result; c->last_collision = last_collision; @@ -587,13 +611,6 @@ Cache::open_read(Continuation *cont, const CacheKey *key, CacheHTTPHdr *request, ts::Metrics::Counter::increment(stripe->cache_vol->vol_rsb.status[static_cast(CacheOpType::Read)].failure); cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, reinterpret_cast(-ECACHE_NO_DOC)); return ACTION_RESULT_DONE; -Lwriter: - cont->handleEvent(CACHE_EVENT_OPEN_READ_RWW, nullptr); - SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter); - if (c->handleEvent(EVENT_IMMEDIATE, nullptr) == EVENT_DONE) { - return ACTION_RESULT_DONE; - } - return &c->_action; Lcallreturn: if (c->handleEvent(AIO_EVENT_DONE, nullptr) == EVENT_DONE) { return ACTION_RESULT_DONE; diff --git a/src/iocore/cache/CacheDir.cc b/src/iocore/cache/CacheDir.cc index 538fe2a53f4..04149ccce59 100644 --- a/src/iocore/cache/CacheDir.cc +++ b/src/iocore/cache/CacheDir.cc @@ -28,9 +28,12 @@ #include "PreservationTable.h" #include "Stripe.h" +#include "iocore/eventsystem/Event.h" #include "tscore/hugepages.h" #include "tscore/Random.h" #include "ts/ats_probe.h" +#include "tsutil/Bravo.h" +#include #ifdef LOOP_CHECK_MODE #define DIR_LOOP_THRESHOLD 1000 @@ -46,6 +49,7 @@ DbgCtl dbg_ctl_dir_clean{"dir_clean"}; #ifdef DEBUG DbgCtl dbg_ctl_cache_stats{"cache_stats"}; +DbgCtl dbg_ctl_cache_open_dir{"cache_open_dir"}; DbgCtl dbg_ctl_dir_probe_hit{"dir_probe_hit"}; DbgCtl dbg_ctl_dir_probe_tag{"dir_probe_tag"}; DbgCtl dbg_ctl_dir_probe_miss{"dir_probe_miss"}; @@ -78,10 +82,11 @@ OpenDir::OpenDir() int OpenDir::open_write(CacheVC *cont, int allow_if_writers, int max_writers) { - ink_assert(cont->stripe->mutex->thread_holding == this_ethread()); + std::lock_guard lock(_shared_mutex); + unsigned int h = cont->first_key.slice32(0); int b = h % OPEN_DIR_BUCKETS; - for (OpenDirEntry *d = bucket[b].head; d; d = d->link.next) { + for (OpenDirEntry *d = _bucket[b].head; d; d = d->link.next) { if (!(d->writers.head->first_key == cont->first_key)) { continue; } @@ -107,17 +112,27 @@ OpenDir::open_write(CacheVC *cont, int allow_if_writers, int max_writers) dir_clear(&od->first_dir); cont->od = od; cont->write_vector = &od->vector; - bucket[b].push(od); + _bucket[b].push(od); return 1; } +/** + This event handler is called in two cases: + + 1. Direct call from OpenDir::close_write - writer lock is already acquired + 2. Self retry through event system - need to acquire writer lock + */ int -OpenDir::signal_readers(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) +OpenDir::signal_readers(int event, Event * /* ATS UNUSED*/) { + if (event == CACHE_EVENT_OPEN_DIR_RETRY) { + _shared_mutex.lock(); + } + Queue newly_delayed_readers; EThread *t = mutex->thread_holding; CacheVC *c = nullptr; - while ((c = delayed_readers.dequeue())) { + while ((c = _delayed_readers.dequeue())) { CACHE_TRY_LOCK(lock, c->mutex, t); if (lock.is_locked()) { c->f.open_read_timeout = 0; @@ -127,28 +142,34 @@ OpenDir::signal_readers(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) newly_delayed_readers.push(c); } if (newly_delayed_readers.head) { - delayed_readers = newly_delayed_readers; - EThread *t1 = newly_delayed_readers.head->mutex->thread_holding; + _delayed_readers = newly_delayed_readers; + EThread *t1 = newly_delayed_readers.head->mutex->thread_holding; if (!t1) { t1 = mutex->thread_holding; } - t1->schedule_in(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay)); + t1->schedule_in(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay), CACHE_EVENT_OPEN_DIR_RETRY); } - return 0; + + if (event == CACHE_EVENT_OPEN_DIR_RETRY) { + _shared_mutex.unlock(); + } + + return EVENT_DONE; } int OpenDir::close_write(CacheVC *cont) { - ink_assert(cont->stripe->mutex->thread_holding == this_ethread()); + std::lock_guard lock(_shared_mutex); + cont->od->writers.remove(cont); cont->od->num_writers--; if (!cont->od->writers.head) { unsigned int h = cont->first_key.slice32(0); int b = h % OPEN_DIR_BUCKETS; - bucket[b].remove(cont->od); - delayed_readers.append(cont->od->readers); - signal_readers(0, nullptr); + _bucket[b].remove(cont->od); + _delayed_readers.append(cont->od->readers); + signal_readers(EVENT_CALL, nullptr); cont->od->vector.clear(); THREAD_FREE(cont->od, openDirEntryAllocator, cont->mutex->thread_holding); } @@ -159,9 +180,11 @@ OpenDir::close_write(CacheVC *cont) OpenDirEntry * OpenDir::open_read(const CryptoHash *key) const { + ts::bravo::shared_lock lock(_shared_mutex); + unsigned int h = key->slice32(0); int b = h % OPEN_DIR_BUCKETS; - for (OpenDirEntry *d = bucket[b].head; d; d = d->link.next) { + for (OpenDirEntry *d = _bucket[b].head; d; d = d->link.next) { if (d->writers.head->first_key == *key) { return d; } diff --git a/src/iocore/cache/P_CacheDir.h b/src/iocore/cache/P_CacheDir.h index caf9647db7b..876cb4afbca 100644 --- a/src/iocore/cache/P_CacheDir.h +++ b/src/iocore/cache/P_CacheDir.h @@ -28,6 +28,7 @@ #include "iocore/eventsystem/Continuation.h" #include "iocore/aio/AIO.h" #include "tscore/Version.h" +#include "tsutil/Bravo.h" #include #include @@ -225,16 +226,23 @@ struct OpenDirEntry { } }; -struct OpenDir : public Continuation { - Queue delayed_readers; - DLL bucket[OPEN_DIR_BUCKETS]; +class OpenDir : public Continuation +{ +public: + OpenDir(); int open_write(CacheVC *c, int allow_if_writers, int max_writers); int close_write(CacheVC *c); OpenDirEntry *open_read(const CryptoHash *key) const; - int signal_readers(int event, Event *e); - OpenDir(); + // event handler + int signal_readers(int event, Event *e); + +private: + mutable ts::bravo::shared_mutex _shared_mutex; + + Queue _delayed_readers; + DLL _bucket[OPEN_DIR_BUCKETS]; }; struct CacheSync : public Continuation { diff --git a/src/iocore/cache/StripeSM.h b/src/iocore/cache/StripeSM.h index 3ae476ac69e..b7c211db2ec 100644 --- a/src/iocore/cache/StripeSM.h +++ b/src/iocore/cache/StripeSM.h @@ -101,14 +101,15 @@ class StripeSM : public Continuation, public Stripe int recover_data(); - int open_write(CacheVC *cont, int allow_if_writers, int max_writers); - int open_write_lock(CacheVC *cont, int allow_if_writers, int max_writers); - int close_write(CacheVC *cont); - int begin_read(CacheVC *cont) const; - // unused read-write interlock code - // currently http handles a write-lock failure by retrying the read + // OpenDir API + int open_write(CacheVC *cont, int allow_if_writers, int max_writers); + int open_write_lock(CacheVC *cont, int allow_if_writers, int max_writers); + int close_write(CacheVC *cont); OpenDirEntry *open_read(const CryptoHash *key) const; - int close_read(CacheVC *cont) const; + + // PreservationTable API + int begin_read(CacheVC *cont) const; + int close_read(CacheVC *cont) const; int clear_dir_aio(); int clear_dir();