Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions doc/admin-guide/files/records.yaml.en.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2659,6 +2659,22 @@ Cache Control
:units: millisecond

How long to wait between each write cycle when syncing the cache directory to disk.
.. ts:cv:: CONFIG proxy.config.cache.dir.sync_parallel_tasks INT 1

Number of parallel tasks to use for directory syncing. Each task syncs
directories for a different physical drive on ET_TASK threads.

======= ==================================================================
Value Description
======= ==================================================================
``-1`` Unlimited - one task per drive (maximum parallelism)
``1`` Sequential - one task for all drives (default, safe)
``N`` Parallel - up to N tasks (drives) sync concurrently
======= ==================================================================

Default is ``1`` (sequential). Set to ``-1`` for maximum parallelism on
high-end NVMe arrays, or to ``4-8`` for balanced performance on multi-drive
systems.

.. ts:cv:: CONFIG proxy.config.cache.limits.http.max_alts INT 5

Expand Down
5 changes: 4 additions & 1 deletion src/iocore/cache/Cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ int cache_config_log_alternate_eviction = 0;
int cache_config_dir_sync_frequency = 60;
int cache_config_dir_sync_delay = 500;
int cache_config_dir_sync_max_write = (2 * 1024 * 1024);
int cache_config_dir_sync_parallel_tasks = 1;
int cache_config_permit_pinning = 0;
int cache_config_select_alternate = 1;
int cache_config_max_doc_size = 0;
Expand All @@ -90,7 +91,6 @@ Cache *theCache = nullptr;
std::vector<std::unique_ptr<CacheDisk>> gdisks;
int gndisks = 0;
Cache *caches[NUM_CACHE_FRAG_TYPES] = {nullptr};
CacheSync *cacheDirSync = nullptr;
Store theCacheStore;
StripeSM **gstripes = nullptr;
std::atomic<int> gnstripes = 0;
Expand Down Expand Up @@ -884,6 +884,9 @@ ink_cache_init(ts::ModuleVersion v)

cacheProcessor.wait_for_cache = RecGetRecordInt("proxy.config.http.wait_for_cache").value_or(0);

RecEstablishStaticConfigInt32(cache_config_dir_sync_parallel_tasks, "proxy.config.cache.dir.sync_parallel_tasks");
Dbg(dbg_ctl_cache_init, "proxy.config.cache.dir.sync_parallel_tasks = %d", cache_config_dir_sync_parallel_tasks);

RecEstablishStaticConfigInt32(cache_config_persist_bad_disks, "proxy.config.cache.persist_bad_disks");
Dbg(dbg_ctl_cache_init, "proxy.config.cache.persist_bad_disks = %d", cache_config_persist_bad_disks);
if (cache_config_persist_bad_disks) {
Expand Down
77 changes: 61 additions & 16 deletions src/iocore/cache/CacheDir.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
#include "tscore/hugepages.h"
#include "tscore/Random.h"
#include "ts/ats_probe.h"
#include "iocore/eventsystem/Tasks.h"

#include <unordered_map>

#ifdef LOOP_CHECK_MODE
#define DIR_LOOP_THRESHOLD 1000
Expand Down Expand Up @@ -862,14 +865,56 @@ dir_lookaside_remove(const CacheKey *key, StripeSM *stripe)
return;
}

// Cache Sync
//
// Cache Dir Sync

void
dir_sync_init()
{
cacheDirSync = new CacheSync;
cacheDirSync->trigger = eventProcessor.schedule_in(cacheDirSync, HRTIME_SECONDS(cache_config_dir_sync_frequency));
static std::vector<std::unique_ptr<CacheSync>> cache_syncs;
static bool initialized = false;
std::unordered_map<CacheDisk *, std::vector<int>> drive_stripe_map;

if (initialized) {
Warning("dir_sync_init() called multiple times - ignoring");
return;
}
initialized = true;

for (int i = 0; i < gnstripes; i++) {
drive_stripe_map[gstripes[i]->disk].push_back(i);
}

if (drive_stripe_map.empty()) {
Dbg(dbg_ctl_cache_dir_sync, "No stripes to sync - dir_sync_init complete");
return;
}

int num_tasks = std::max(1, (cache_config_dir_sync_parallel_tasks == -1) ? static_cast<int>(drive_stripe_map.size()) :
cache_config_dir_sync_parallel_tasks);

cache_syncs.resize(num_tasks);
for (int i = 0; i < num_tasks; i++) {
cache_syncs[i] = std::make_unique<CacheSync>();
}

int task_idx = 0;

for (auto &[disk, indices] : drive_stripe_map) {
int target_task = task_idx % num_tasks;

Dbg(dbg_ctl_cache_dir_sync, "Disk %s: %zu stripe(s) assigned to task %d", disk->path, indices.size(), target_task);
for (int stripe_idx : indices) {
cache_syncs[target_task]->stripe_indices.push_back(stripe_idx);
}
task_idx++;
}

for (int i = 0; i < num_tasks; i++) {
Dbg(dbg_ctl_cache_dir_sync, "Task %d: syncing %zu stripe(s)", i, cache_syncs[i]->stripe_indices.size());
cache_syncs[i]->current_index = 0;
cache_syncs[i]->trigger =
eventProcessor.schedule_in(cache_syncs[i].get(), HRTIME_SECONDS(cache_config_dir_sync_frequency), ET_TASK);
}
}

void
Expand Down Expand Up @@ -930,34 +975,35 @@ sync_cache_dir_on_shutdown()
}

int
CacheSync::mainEvent(int event, Event *e)
CacheSync::mainEvent(int event, Event * /* e ATS_UNUSED */)
{
if (trigger) {
trigger->cancel_action();
trigger = nullptr;
}

Lrestart:
if (stripe_index >= gnstripes) {
stripe_index = 0;
if (current_index >= static_cast<int>(stripe_indices.size())) {
current_index = 0;
#if FREE_BUF_BETWEEN_CYCLES
// Free buffer between sync cycles to avoid holding large amounts of memory
if (buf) {
if (buf_huge) {
ats_free_hugepage(buf, buflen);
} else {
ats_free(buf);
}
buflen = 0;
buf = nullptr;
buflen = 0;
buf_huge = false;
}
Dbg(dbg_ctl_cache_dir_sync, "sync done");
if (event == EVENT_INTERVAL) {
trigger = e->ethread->schedule_in(this, HRTIME_SECONDS(cache_config_dir_sync_frequency));
} else {
trigger = eventProcessor.schedule_in(this, HRTIME_SECONDS(cache_config_dir_sync_frequency));
}
#endif
Dbg(dbg_ctl_cache_dir_sync, "sync cycle done");
trigger = eventProcessor.schedule_in(this, HRTIME_SECONDS(cache_config_dir_sync_frequency), ET_TASK);
return EVENT_CONT;
}
stripe_index = stripe_indices[current_index];
current_index++;

StripeSM *stripe = gstripes[stripe_index]; // must be named "vol" to make STAT macros work.

Expand Down Expand Up @@ -1007,6 +1053,7 @@ CacheSync::mainEvent(int event, Event *e)
if (stripe->is_io_in_progress() || stripe->get_agg_buf_pos()) {
Dbg(dbg_ctl_cache_dir_sync, "Dir %s: waiting for agg buffer", stripe->hash_text.get());
stripe->dir_sync_waiting = true;
stripe->waiting_dir_sync = this;
if (!stripe->is_io_in_progress()) {
stripe->aggWrite(EVENT_IMMEDIATE, nullptr);
}
Expand Down Expand Up @@ -1072,9 +1119,7 @@ CacheSync::mainEvent(int event, Event *e)
return EVENT_CONT;
}
Ldone:
// done
writepos = 0;
++stripe_index;
goto Lrestart;
}

Expand Down
20 changes: 18 additions & 2 deletions src/iocore/cache/P_CacheDir.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "iocore/eventsystem/Continuation.h"
#include "iocore/aio/AIO.h"
#include "tscore/Version.h"
#include "tscore/hugepages.h"

#include <cstdint>
#include <ctime>
Expand Down Expand Up @@ -246,10 +247,25 @@ struct CacheSync : public Continuation {
AIOCallback io;
Event *trigger = nullptr;
ink_hrtime start_time = 0;
int mainEvent(int event, Event *e);
void aio_write(int fd, char *b, int n, off_t o);

std::vector<int> stripe_indices;
int current_index{0};

int mainEvent(int event, Event *e);
void aio_write(int fd, char *b, int n, off_t o);

CacheSync() : Continuation(new_ProxyMutex()) { SET_HANDLER(&CacheSync::mainEvent); }

~CacheSync()
{
if (buf) {
if (buf_huge) {
ats_free_hugepage(buf, buflen);
} else {
ats_free(buf);
}
}
}
};

struct StripteHeaderFooter {
Expand Down
2 changes: 1 addition & 1 deletion src/iocore/cache/P_CacheInternal.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ extern CacheStatsBlock cache_rsb;
extern int cache_config_dir_sync_frequency;
extern int cache_config_dir_sync_delay;
extern int cache_config_dir_sync_max_write;
extern int cache_config_dir_sync_parallel_tasks;
extern int cache_config_http_max_alts;
extern int cache_config_log_alternate_eviction;
extern int cache_config_permit_pinning;
Expand Down Expand Up @@ -140,7 +141,6 @@ struct CacheRemoveCont : public Continuation {
// Global Data
extern ClassAllocator<CacheVC, false> cacheVConnectionAllocator;
extern ClassAllocator<CacheEvacuateDocVC, false> cacheEvacuateDocVConnectionAllocator;
extern CacheSync *cacheDirSync;
// Function Prototypes
int cache_write(CacheVC *, CacheHTTPInfoVector *);
int get_alternate_index(CacheHTTPInfoVector *cache_vector, CacheKey key);
Expand Down
7 changes: 4 additions & 3 deletions src/iocore/cache/StripeSM.cc
Original file line number Diff line number Diff line change
Expand Up @@ -709,9 +709,9 @@ StripeSM::aggWriteDone(int event, Event *e)
{
cancel_trigger();

// ensure we have the cacheDirSync lock if we intend to call it later
// ensure we have the waiting_dir_sync lock if we intend to call it later
// retaking the current mutex recursively is a NOOP
CACHE_TRY_LOCK(lock, dir_sync_waiting ? cacheDirSync->mutex : mutex, mutex->thread_holding);
CACHE_TRY_LOCK(lock, dir_sync_waiting ? waiting_dir_sync->mutex : mutex, mutex->thread_holding);
if (!lock.is_locked()) {
eventProcessor.schedule_in(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay));
return EVENT_CONT;
Expand Down Expand Up @@ -759,7 +759,8 @@ StripeSM::aggWriteDone(int event, Event *e)
}
if (dir_sync_waiting) {
dir_sync_waiting = false;
cacheDirSync->handleEvent(EVENT_IMMEDIATE, nullptr);
waiting_dir_sync->handleEvent(EVENT_IMMEDIATE, nullptr);
waiting_dir_sync = nullptr;
}
if (this->_write_buffer.get_pending_writers().head || sync.head) {
return aggWrite(event, e);
Expand Down
15 changes: 8 additions & 7 deletions src/iocore/cache/StripeSM.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,14 @@ class StripeSM : public Continuation, public Stripe

StripeInitInfo *init_info = nullptr;

Cache *cache = nullptr;
uint32_t last_sync_serial = 0;
uint32_t last_write_serial = 0;
bool recover_wrapped = false;
bool dir_sync_waiting = false;
bool dir_sync_in_progress = false;
bool writing_end_marker = false;
Cache *cache = nullptr;
uint32_t last_sync_serial = 0;
uint32_t last_write_serial = 0;
bool recover_wrapped = false;
bool dir_sync_waiting = false;
bool dir_sync_in_progress = false;
CacheSync *waiting_dir_sync = nullptr;
bool writing_end_marker = false;

CacheKey first_fragment_key;
int64_t first_fragment_offset = 0;
Expand Down
2 changes: 2 additions & 0 deletions src/records/RecordsConfig.cc
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,8 @@ static constexpr RecordElement RecordsConfig[] =
,
{RECT_CONFIG, "proxy.config.cache.dir.sync_max_write", RECD_INT, "2097152", RECU_DYNAMIC, RR_NULL, RECC_NULL, nullptr, RECA_NULL}
,
{RECT_CONFIG, "proxy.config.cache.dir.sync_parallel_tasks", RECD_INT, "1", RECU_RESTART_TS, RR_NULL, RECC_NULL, nullptr, RECA_NULL}
,
{RECT_CONFIG, "proxy.config.cache.hostdb.disable_reverse_lookup", RECD_INT, "0", RECU_DYNAMIC, RR_NULL, RECC_NULL, nullptr, RECA_NULL}
,
{RECT_CONFIG, "proxy.config.cache.select_alternate", RECD_INT, "1", RECU_DYNAMIC, RR_NULL, RECC_NULL, nullptr, RECA_NULL}
Expand Down