diff --git a/doc/admin-guide/files/records.yaml.en.rst b/doc/admin-guide/files/records.yaml.en.rst index 2336a2de6ad..917f971d441 100644 --- a/doc/admin-guide/files/records.yaml.en.rst +++ b/doc/admin-guide/files/records.yaml.en.rst @@ -2659,6 +2659,22 @@ Cache Control :units: millisecond How long to wait between each write cycle when syncing the cache directory to disk. +.. ts:cv:: CONFIG proxy.config.cache.dir.sync_parallel_tasks INT 1 + + Number of parallel tasks to use for directory syncing. Each task syncs + directories for a different physical drive on ET_TASK threads. + + ======= ================================================================== + Value Description + ======= ================================================================== + ``-1`` Unlimited - one task per drive (maximum parallelism) + ``1`` Sequential - one task for all drives (default, safe) + ``N`` Parallel - up to N tasks (drives) sync concurrently + ======= ================================================================== + + Default is ``1`` (sequential). Set to ``-1`` for maximum parallelism on + high-end NVMe arrays, or to ``4-8`` for balanced performance on multi-drive + systems. .. ts:cv:: CONFIG proxy.config.cache.limits.http.max_alts INT 5 diff --git a/src/iocore/cache/Cache.cc b/src/iocore/cache/Cache.cc index 295a4eadb8c..512a5e7bf76 100644 --- a/src/iocore/cache/Cache.cc +++ b/src/iocore/cache/Cache.cc @@ -64,6 +64,7 @@ int cache_config_log_alternate_eviction = 0; int cache_config_dir_sync_frequency = 60; int cache_config_dir_sync_delay = 500; int cache_config_dir_sync_max_write = (2 * 1024 * 1024); +int cache_config_dir_sync_parallel_tasks = 1; int cache_config_permit_pinning = 0; int cache_config_select_alternate = 1; int cache_config_max_doc_size = 0; @@ -90,7 +91,6 @@ Cache *theCache = nullptr; std::vector> gdisks; int gndisks = 0; Cache *caches[NUM_CACHE_FRAG_TYPES] = {nullptr}; -CacheSync *cacheDirSync = nullptr; Store theCacheStore; StripeSM **gstripes = nullptr; std::atomic gnstripes = 0; @@ -884,6 +884,9 @@ ink_cache_init(ts::ModuleVersion v) cacheProcessor.wait_for_cache = RecGetRecordInt("proxy.config.http.wait_for_cache").value_or(0); + RecEstablishStaticConfigInt32(cache_config_dir_sync_parallel_tasks, "proxy.config.cache.dir.sync_parallel_tasks"); + Dbg(dbg_ctl_cache_init, "proxy.config.cache.dir.sync_parallel_tasks = %d", cache_config_dir_sync_parallel_tasks); + RecEstablishStaticConfigInt32(cache_config_persist_bad_disks, "proxy.config.cache.persist_bad_disks"); Dbg(dbg_ctl_cache_init, "proxy.config.cache.persist_bad_disks = %d", cache_config_persist_bad_disks); if (cache_config_persist_bad_disks) { diff --git a/src/iocore/cache/CacheDir.cc b/src/iocore/cache/CacheDir.cc index 55b3e953b41..097287b06ac 100644 --- a/src/iocore/cache/CacheDir.cc +++ b/src/iocore/cache/CacheDir.cc @@ -31,6 +31,9 @@ #include "tscore/hugepages.h" #include "tscore/Random.h" #include "ts/ats_probe.h" +#include "iocore/eventsystem/Tasks.h" + +#include #ifdef LOOP_CHECK_MODE #define DIR_LOOP_THRESHOLD 1000 @@ -862,14 +865,56 @@ dir_lookaside_remove(const CacheKey *key, StripeSM *stripe) return; } -// Cache Sync -// +// Cache Dir Sync void dir_sync_init() { - cacheDirSync = new CacheSync; - cacheDirSync->trigger = eventProcessor.schedule_in(cacheDirSync, HRTIME_SECONDS(cache_config_dir_sync_frequency)); + static std::vector> cache_syncs; + static bool initialized = false; + std::unordered_map> drive_stripe_map; + + if (initialized) { + Warning("dir_sync_init() called multiple times - ignoring"); + return; + } + initialized = true; + + for (int i = 0; i < gnstripes; i++) { + drive_stripe_map[gstripes[i]->disk].push_back(i); + } + + if (drive_stripe_map.empty()) { + Dbg(dbg_ctl_cache_dir_sync, "No stripes to sync - dir_sync_init complete"); + return; + } + + int num_tasks = std::max(1, (cache_config_dir_sync_parallel_tasks == -1) ? static_cast(drive_stripe_map.size()) : + cache_config_dir_sync_parallel_tasks); + + cache_syncs.resize(num_tasks); + for (int i = 0; i < num_tasks; i++) { + cache_syncs[i] = std::make_unique(); + } + + int task_idx = 0; + + for (auto &[disk, indices] : drive_stripe_map) { + int target_task = task_idx % num_tasks; + + Dbg(dbg_ctl_cache_dir_sync, "Disk %s: %zu stripe(s) assigned to task %d", disk->path, indices.size(), target_task); + for (int stripe_idx : indices) { + cache_syncs[target_task]->stripe_indices.push_back(stripe_idx); + } + task_idx++; + } + + for (int i = 0; i < num_tasks; i++) { + Dbg(dbg_ctl_cache_dir_sync, "Task %d: syncing %zu stripe(s)", i, cache_syncs[i]->stripe_indices.size()); + cache_syncs[i]->current_index = 0; + cache_syncs[i]->trigger = + eventProcessor.schedule_in(cache_syncs[i].get(), HRTIME_SECONDS(cache_config_dir_sync_frequency), ET_TASK); + } } void @@ -930,7 +975,7 @@ sync_cache_dir_on_shutdown() } int -CacheSync::mainEvent(int event, Event *e) +CacheSync::mainEvent(int event, Event * /* e ATS_UNUSED */) { if (trigger) { trigger->cancel_action(); @@ -938,26 +983,27 @@ CacheSync::mainEvent(int event, Event *e) } Lrestart: - if (stripe_index >= gnstripes) { - stripe_index = 0; + if (current_index >= static_cast(stripe_indices.size())) { + current_index = 0; +#if FREE_BUF_BETWEEN_CYCLES + // Free buffer between sync cycles to avoid holding large amounts of memory if (buf) { if (buf_huge) { ats_free_hugepage(buf, buflen); } else { ats_free(buf); } - buflen = 0; buf = nullptr; + buflen = 0; buf_huge = false; } - Dbg(dbg_ctl_cache_dir_sync, "sync done"); - if (event == EVENT_INTERVAL) { - trigger = e->ethread->schedule_in(this, HRTIME_SECONDS(cache_config_dir_sync_frequency)); - } else { - trigger = eventProcessor.schedule_in(this, HRTIME_SECONDS(cache_config_dir_sync_frequency)); - } +#endif + Dbg(dbg_ctl_cache_dir_sync, "sync cycle done"); + trigger = eventProcessor.schedule_in(this, HRTIME_SECONDS(cache_config_dir_sync_frequency), ET_TASK); return EVENT_CONT; } + stripe_index = stripe_indices[current_index]; + current_index++; StripeSM *stripe = gstripes[stripe_index]; // must be named "vol" to make STAT macros work. @@ -1007,6 +1053,7 @@ CacheSync::mainEvent(int event, Event *e) if (stripe->is_io_in_progress() || stripe->get_agg_buf_pos()) { Dbg(dbg_ctl_cache_dir_sync, "Dir %s: waiting for agg buffer", stripe->hash_text.get()); stripe->dir_sync_waiting = true; + stripe->waiting_dir_sync = this; if (!stripe->is_io_in_progress()) { stripe->aggWrite(EVENT_IMMEDIATE, nullptr); } @@ -1072,9 +1119,7 @@ CacheSync::mainEvent(int event, Event *e) return EVENT_CONT; } Ldone: - // done writepos = 0; - ++stripe_index; goto Lrestart; } diff --git a/src/iocore/cache/P_CacheDir.h b/src/iocore/cache/P_CacheDir.h index 6f46118ef9f..55e5ebfe029 100644 --- a/src/iocore/cache/P_CacheDir.h +++ b/src/iocore/cache/P_CacheDir.h @@ -28,6 +28,7 @@ #include "iocore/eventsystem/Continuation.h" #include "iocore/aio/AIO.h" #include "tscore/Version.h" +#include "tscore/hugepages.h" #include #include @@ -246,10 +247,25 @@ struct CacheSync : public Continuation { AIOCallback io; Event *trigger = nullptr; ink_hrtime start_time = 0; - int mainEvent(int event, Event *e); - void aio_write(int fd, char *b, int n, off_t o); + + std::vector stripe_indices; + int current_index{0}; + + int mainEvent(int event, Event *e); + void aio_write(int fd, char *b, int n, off_t o); CacheSync() : Continuation(new_ProxyMutex()) { SET_HANDLER(&CacheSync::mainEvent); } + + ~CacheSync() + { + if (buf) { + if (buf_huge) { + ats_free_hugepage(buf, buflen); + } else { + ats_free(buf); + } + } + } }; struct StripteHeaderFooter { diff --git a/src/iocore/cache/P_CacheInternal.h b/src/iocore/cache/P_CacheInternal.h index 8a8ee3c3aa7..674dd98059c 100644 --- a/src/iocore/cache/P_CacheInternal.h +++ b/src/iocore/cache/P_CacheInternal.h @@ -96,6 +96,7 @@ extern CacheStatsBlock cache_rsb; extern int cache_config_dir_sync_frequency; extern int cache_config_dir_sync_delay; extern int cache_config_dir_sync_max_write; +extern int cache_config_dir_sync_parallel_tasks; extern int cache_config_http_max_alts; extern int cache_config_log_alternate_eviction; extern int cache_config_permit_pinning; @@ -140,7 +141,6 @@ struct CacheRemoveCont : public Continuation { // Global Data extern ClassAllocator cacheVConnectionAllocator; extern ClassAllocator cacheEvacuateDocVConnectionAllocator; -extern CacheSync *cacheDirSync; // Function Prototypes int cache_write(CacheVC *, CacheHTTPInfoVector *); int get_alternate_index(CacheHTTPInfoVector *cache_vector, CacheKey key); diff --git a/src/iocore/cache/StripeSM.cc b/src/iocore/cache/StripeSM.cc index 40a3877d2fa..6e28c1a0809 100644 --- a/src/iocore/cache/StripeSM.cc +++ b/src/iocore/cache/StripeSM.cc @@ -709,9 +709,9 @@ StripeSM::aggWriteDone(int event, Event *e) { cancel_trigger(); - // ensure we have the cacheDirSync lock if we intend to call it later + // ensure we have the waiting_dir_sync lock if we intend to call it later // retaking the current mutex recursively is a NOOP - CACHE_TRY_LOCK(lock, dir_sync_waiting ? cacheDirSync->mutex : mutex, mutex->thread_holding); + CACHE_TRY_LOCK(lock, dir_sync_waiting ? waiting_dir_sync->mutex : mutex, mutex->thread_holding); if (!lock.is_locked()) { eventProcessor.schedule_in(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay)); return EVENT_CONT; @@ -759,7 +759,8 @@ StripeSM::aggWriteDone(int event, Event *e) } if (dir_sync_waiting) { dir_sync_waiting = false; - cacheDirSync->handleEvent(EVENT_IMMEDIATE, nullptr); + waiting_dir_sync->handleEvent(EVENT_IMMEDIATE, nullptr); + waiting_dir_sync = nullptr; } if (this->_write_buffer.get_pending_writers().head || sync.head) { return aggWrite(event, e); diff --git a/src/iocore/cache/StripeSM.h b/src/iocore/cache/StripeSM.h index 3ae476ac69e..40bb972babf 100644 --- a/src/iocore/cache/StripeSM.h +++ b/src/iocore/cache/StripeSM.h @@ -85,13 +85,14 @@ class StripeSM : public Continuation, public Stripe StripeInitInfo *init_info = nullptr; - Cache *cache = nullptr; - uint32_t last_sync_serial = 0; - uint32_t last_write_serial = 0; - bool recover_wrapped = false; - bool dir_sync_waiting = false; - bool dir_sync_in_progress = false; - bool writing_end_marker = false; + Cache *cache = nullptr; + uint32_t last_sync_serial = 0; + uint32_t last_write_serial = 0; + bool recover_wrapped = false; + bool dir_sync_waiting = false; + bool dir_sync_in_progress = false; + CacheSync *waiting_dir_sync = nullptr; + bool writing_end_marker = false; CacheKey first_fragment_key; int64_t first_fragment_offset = 0; diff --git a/src/records/RecordsConfig.cc b/src/records/RecordsConfig.cc index 8cd99ca6a9c..fd692ff606f 100644 --- a/src/records/RecordsConfig.cc +++ b/src/records/RecordsConfig.cc @@ -858,6 +858,8 @@ static constexpr RecordElement RecordsConfig[] = , {RECT_CONFIG, "proxy.config.cache.dir.sync_max_write", RECD_INT, "2097152", RECU_DYNAMIC, RR_NULL, RECC_NULL, nullptr, RECA_NULL} , + {RECT_CONFIG, "proxy.config.cache.dir.sync_parallel_tasks", RECD_INT, "1", RECU_RESTART_TS, RR_NULL, RECC_NULL, nullptr, RECA_NULL} + , {RECT_CONFIG, "proxy.config.cache.hostdb.disable_reverse_lookup", RECD_INT, "0", RECU_DYNAMIC, RR_NULL, RECC_NULL, nullptr, RECA_NULL} , {RECT_CONFIG, "proxy.config.cache.select_alternate", RECD_INT, "1", RECU_DYNAMIC, RR_NULL, RECC_NULL, nullptr, RECA_NULL}