Skip to content

[NSFS | GLACIER] Add support for parallel recall and migrate #9183

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -916,6 +916,11 @@ config.NSFS_GLACIER_DMAPI_TPS_HTTP_HEADER = 'x-tape-meta-copy';
// but can be overridden to any numberical value
config.NSFS_GLACIER_DMAPI_PMIG_DAYS = config.S3_RESTORE_REQUEST_MAX_DAYS;

// NSFS_GLACIER_DMAPI_FINALIZE_RESTORE_ENABLE if enabled will force NooBaa to
// examine the DMAPI xattr of the file before finalizing the restore to prevent
// accidental blocking reads from happening.
config.NSFS_GLACIER_DMAPI_FINALIZE_RESTORE_ENABLE = false;

config.NSFS_STATFS_CACHE_SIZE = 10000;
config.NSFS_STATFS_CACHE_EXPIRY_MS = 1 * 1000;

Expand Down
124 changes: 35 additions & 89 deletions src/manage_nsfs/manage_nsfs_glacier.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,86 +9,53 @@ const { Glacier } = require('../sdk/glacier');
const native_fs_utils = require('../util/native_fs_utils');
const { is_desired_time, record_current_time } = require('./manage_nsfs_cli_utils');

const CLUSTER_LOCK = 'cluster.lock';
const SCAN_LOCK = 'scan.lock';

async function process_migrations() {
const fs_context = native_fs_utils.get_process_fs_context();
const lock_path = path.join(config.NSFS_GLACIER_LOGS_DIR, CLUSTER_LOCK);
await native_fs_utils.lock_and_run(fs_context, lock_path, async () => {
const backend = Glacier.getBackend();

if (
await backend.low_free_space() ||
await time_exceeded(fs_context, config.NSFS_GLACIER_MIGRATE_INTERVAL, Glacier.MIGRATE_TIMESTAMP_FILE) ||
await migrate_log_exceeds_threshold()
) {
await run_glacier_migrations(fs_context, backend);
const timestamp_file_path = path.join(config.NSFS_GLACIER_LOGS_DIR, Glacier.MIGRATE_TIMESTAMP_FILE);
await record_current_time(fs_context, timestamp_file_path);
}
});
}

/**
* run_tape_migrations reads the migration WALs and attempts to migrate the
* files mentioned in the WAL.
* @param {nb.NativeFSContext} fs_context
* @param {import('../sdk/glacier').Glacier} backend
*/
async function run_glacier_migrations(fs_context, backend) {
await run_glacier_operation(fs_context, Glacier.MIGRATE_WAL_NAME, backend.migrate.bind(backend));
const backend = Glacier.getBackend();

if (
await backend.low_free_space() ||
await time_exceeded(fs_context, config.NSFS_GLACIER_MIGRATE_INTERVAL, Glacier.MIGRATE_TIMESTAMP_FILE) ||
await migrate_log_exceeds_threshold()
) {
await backend.perform(prepare_galcier_fs_context(fs_context), "MIGRATION");
const timestamp_file_path = path.join(config.NSFS_GLACIER_LOGS_DIR, Glacier.MIGRATE_TIMESTAMP_FILE);
await record_current_time(fs_context, timestamp_file_path);
}
}

async function process_restores() {
const fs_context = native_fs_utils.get_process_fs_context();
const lock_path = path.join(config.NSFS_GLACIER_LOGS_DIR, CLUSTER_LOCK);
await native_fs_utils.lock_and_run(fs_context, lock_path, async () => {
const backend = Glacier.getBackend();
const backend = Glacier.getBackend();

if (
await backend.low_free_space() ||
!(await time_exceeded(fs_context, config.NSFS_GLACIER_RESTORE_INTERVAL, Glacier.RESTORE_TIMESTAMP_FILE))
) return;
if (
await backend.low_free_space() ||
!(await time_exceeded(fs_context, config.NSFS_GLACIER_RESTORE_INTERVAL, Glacier.RESTORE_TIMESTAMP_FILE))
) return;


await run_glacier_restore(fs_context, backend);
const timestamp_file_path = path.join(config.NSFS_GLACIER_LOGS_DIR, Glacier.RESTORE_TIMESTAMP_FILE);
await record_current_time(fs_context, timestamp_file_path);
});
}

/**
* run_tape_restore reads the restore WALs and attempts to restore the
* files mentioned in the WAL.
* @param {nb.NativeFSContext} fs_context
* @param {import('../sdk/glacier').Glacier} backend
*/
async function run_glacier_restore(fs_context, backend) {
await run_glacier_operation(fs_context, Glacier.RESTORE_WAL_NAME, backend.restore.bind(backend));
await backend.perform(prepare_galcier_fs_context(fs_context), "RESTORE");
const timestamp_file_path = path.join(config.NSFS_GLACIER_LOGS_DIR, Glacier.RESTORE_TIMESTAMP_FILE);
await record_current_time(fs_context, timestamp_file_path);
}

async function process_expiry() {
const fs_context = native_fs_utils.get_process_fs_context();
const lock_path = path.join(config.NSFS_GLACIER_LOGS_DIR, SCAN_LOCK);
await native_fs_utils.lock_and_run(fs_context, lock_path, async () => {
const backend = Glacier.getBackend();
const timestamp_file_path = path.join(config.NSFS_GLACIER_LOGS_DIR, Glacier.EXPIRY_TIMESTAMP_FILE);
if (
await backend.low_free_space() ||
await is_desired_time(
fs_context,
new Date(),
config.NSFS_GLACIER_EXPIRY_RUN_TIME,
config.NSFS_GLACIER_EXPIRY_RUN_DELAY_LIMIT_MINS,
timestamp_file_path,
config.NSFS_GLACIER_EXPIRY_TZ
)
) {
await backend.expiry(fs_context);
await record_current_time(fs_context, timestamp_file_path);
}
});
const backend = Glacier.getBackend();
const timestamp_file_path = path.join(config.NSFS_GLACIER_LOGS_DIR, Glacier.EXPIRY_TIMESTAMP_FILE);
if (
await backend.low_free_space() ||
await is_desired_time(
fs_context,
new Date(),
config.NSFS_GLACIER_EXPIRY_RUN_TIME,
config.NSFS_GLACIER_EXPIRY_RUN_DELAY_LIMIT_MINS,
timestamp_file_path,
config.NSFS_GLACIER_EXPIRY_TZ
)
) {
await backend.perform(prepare_galcier_fs_context(fs_context), "EXPIRY");
await record_current_time(fs_context, timestamp_file_path);
}
}


Expand Down Expand Up @@ -137,27 +104,6 @@ async function migrate_log_exceeds_threshold(threshold = config.NSFS_GLACIER_MIG
return log_size > threshold;
}

/**
* run_glacier_operations takes a log_namespace and a callback and executes the
* callback on each log file in that namespace. It will also generate a failure
* log file and persist the failures in that log file.
* @param {nb.NativeFSContext} fs_context
* @param {string} log_namespace
* @param {Function} cb
*/
async function run_glacier_operation(fs_context, log_namespace, cb) {
const log = new PersistentLogger(config.NSFS_GLACIER_LOGS_DIR, log_namespace, { locking: 'EXCLUSIVE' });

fs_context = prepare_galcier_fs_context(fs_context);
try {
await log.process(async (entry, failure_recorder) => cb(fs_context, entry, failure_recorder));
} catch (error) {
console.error('failed to process log in namespace:', log_namespace);
} finally {
await log.close();
}
}

/**
* prepare_galcier_fs_context returns a shallow copy of given
* fs_context with backend set to 'GPFS'.
Expand Down
150 changes: 142 additions & 8 deletions src/sdk/glacier.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
/* Copyright (C) 2024 NooBaa */
'use strict';

const path = require('path');
const nb_native = require('../util/nb_native');
const s3_utils = require('../endpoint/s3/s3_utils');
const { round_up_to_next_time_of_day } = require('../util/time_utils');
const dbg = require('../util/debug_module')(__filename);
const config = require('../../config');
const { PersistentLogger } = require('../util/persistent_logger');
const native_fs_utils = require('../util/native_fs_utils');

/** @import {LogFile} from "../util/persistent_logger" */

class Glacier {
// These names start with the word 'timestamp' so as to assure
Expand Down Expand Up @@ -41,6 +46,8 @@ class Glacier {

static STORAGE_CLASS_XATTR = 'user.storage_class';

static XATTR_STAGE_MIGRATE = 'user.noobaa.migrate.staged';

/**
* GPFS_DMAPI_XATTR_TAPE_INDICATOR if set on a file indicates that the file is on tape.
*
Expand All @@ -65,7 +72,9 @@ class Glacier {
static GPFS_DMAPI_XATTR_TAPE_TPS = 'dmapi.IBMTPS';

static MIGRATE_WAL_NAME = 'migrate';
static MIGRATE_STAGE_WAL_NAME = 'stage.migrate';
static RESTORE_WAL_NAME = 'restore';
static RESTORE_STAGE_WAL_NAME = 'stage.restore';

/** @type {nb.RestoreState} */
static RESTORE_STATUS_CAN_RESTORE = 'CAN_RESTORE';
Expand All @@ -74,17 +83,46 @@ class Glacier {
/** @type {nb.RestoreState} */
static RESTORE_STATUS_RESTORED = 'RESTORED';

static GLACIER_CLUSTER_LOCK = 'glacier.cluster.lock';
static GLACIER_MIGRATE_CLUSTER_LOCK = 'glacier.cluster.migrate.lock';
static GLACIER_RESTORE_CLUSTER_LOCK = 'glacier.cluster.restore.lock';
static GLACIER_SCAN_LOCK = 'glacier.scan.lock';

/**
* stage_migrate must take a LogFile object (this should be from the
* `GLACIER.MIGRATE_WAL_NAME` namespace) which will have
* newline seperated entries of filenames which needs to be
* migrated to GLACIER and should stage the files for migration.
*
* The function should return false if it needs the log file to be
* preserved.
* @param {nb.NativeFSContext} fs_context
* @param {LogFile} log_file log filename
* @param {(entry: string) => Promise<void>} failure_recorder
* @returns {Promise<boolean>}
*/
async stage_migrate(fs_context, log_file, failure_recorder) {
try {
await log_file.collect(Glacier.MIGRATE_STAGE_WAL_NAME, async (entry, batch_recorder) => batch_recorder(entry));
return true;
} catch (error) {
dbg.error('Glacier.stage_migrate error:', error);
throw error;
}
}

/**
* migrate must take a file name which will have newline seperated
* entries of filenames which needs to be migrated to GLACIER and
* should perform migration of those files if feasible.
* migrate must take a LofFile object (this should from the
* `GLACIER.MIGRATE_STAGE_WAL_NAME` namespace) which will have newline
* separated entries of filenames which needs to be migrated to GLACIER
* and should perform migration of those files if feasible.
*
* The function should return false if it needs the log file to be
* preserved.
*
* NOTE: This needs to be implemented by each backend.
* @param {nb.NativeFSContext} fs_context
* @param {string} log_file log filename
* @param {LogFile} log_file log filename
* @param {(entry: string) => Promise<void>} failure_recorder
* @returns {Promise<boolean>}
*/
Expand All @@ -93,16 +131,39 @@ class Glacier {
}

/**
* restore must take a file name which will have newline seperated
* entries of filenames which needs to be restored from GLACIER and
* should perform restore of those files if feasible
* stage_restore must take a log file (from `Glacier.RESTORE_STAGE_WAL_NAME`)
* which will have newline seperated entries of filenames which needs to be
* migrated to GLACIER and should stage the files for migration.
*
* The function should return false if it needs the log file to be
* preserved.
* @param {nb.NativeFSContext} fs_context
* @param {LogFile} log_file log filename
* @param {(entry: string) => Promise<void>} failure_recorder
* @returns {Promise<boolean>}
*/
async stage_restore(fs_context, log_file, failure_recorder) {
try {
await log_file.collect(Glacier.RESTORE_STAGE_WAL_NAME, async (entry, batch_recorder) => batch_recorder(entry));
return true;
} catch (error) {
dbg.error('Glacier.stage_restore error:', error);
throw error;
}
}

/**
* restore must take a log file (from `Glacier.RESTORE_WAL_NAME`) which will
* have newline seperated entries of filenames which needs to be
* restored from GLACIER and should perform restore of those files if
* feasible
*
* The function should return false if it needs the log file to be
* preserved.
*
* NOTE: This needs to be implemented by each backend.
* @param {nb.NativeFSContext} fs_context
* @param {string} log_file log filename
* @param {LogFile} log_file log filename
* @param {(entry: string) => Promise<void>} failure_recorder
* @returns {Promise<boolean>}
*/
Expand Down Expand Up @@ -136,6 +197,78 @@ class Glacier {
throw new Error('Unimplementented');
}

/**
* @param {nb.NativeFSContext} fs_context
* @param {"MIGRATION" | "RESTORE" | "EXPIRY"} type
*/
async perform(fs_context, type) {
const lock_path = lock_file => path.join(config.NSFS_GLACIER_LOGS_DIR, lock_file);

if (type === 'EXPIRY') {
await native_fs_utils.lock_and_run(fs_context, lock_path(Glacier.GLACIER_SCAN_LOCK), async () => {
await this.expiry(fs_context);
});
}

/** @typedef {(
* fs_context: nb.NativeFSContext,
* file: LogFile,
* failure_recorder: (entry: string) => Promise<void>
* ) => Promise<boolean>} log_cb */

/**
* @param {string} namespace
* @param {log_cb} cb
*/
const process_glacier_logs = async (namespace, cb) => {
const logs = new PersistentLogger(
config.NSFS_GLACIER_LOGS_DIR,
namespace, { locking: 'EXCLUSIVE' },
);
await logs.process(async (entry, failure_recorder) => cb(fs_context, entry, failure_recorder));
};

/**
*
* @param {string} primary_log_ns
* @param {string} staged_log_ns
* @param {log_cb} process_staged_fn
* @param {log_cb} process_primary_fn
* @param {string} stage_lock_file
*/
const run_operation = async (primary_log_ns, staged_log_ns, process_staged_fn, process_primary_fn, stage_lock_file) => {
// Acquire a cluster wide lock for all the operations for staging
await native_fs_utils.lock_and_run(fs_context, lock_path(Glacier.GLACIER_CLUSTER_LOCK), async () => {
await process_glacier_logs(primary_log_ns, process_staged_fn);
});

// Acquire a type specific lock to consume staged logs
await native_fs_utils.lock_and_run(
fs_context, lock_path(stage_lock_file), async () => {
await process_glacier_logs(staged_log_ns, process_primary_fn);
}
);
};

if (type === 'MIGRATION') {
await run_operation(
Glacier.MIGRATE_WAL_NAME,
Glacier.MIGRATE_STAGE_WAL_NAME,
this.stage_migrate.bind(this),
this.migrate.bind(this),
Glacier.GLACIER_MIGRATE_CLUSTER_LOCK,
);
} else if (type === 'RESTORE') {
await run_operation(
Glacier.RESTORE_WAL_NAME,
Glacier.RESTORE_STAGE_WAL_NAME,
this.stage_restore.bind(this),
this.restore.bind(this),
Glacier.GLACIER_RESTORE_CLUSTER_LOCK,
);
}
}

/**
* should_migrate returns true if the given file must be migrated
*
Expand Down Expand Up @@ -319,6 +452,7 @@ class Glacier {
xattr_get_keys: [
Glacier.XATTR_RESTORE_REQUEST,
Glacier.STORAGE_CLASS_XATTR,
Glacier.XATTR_STAGE_MIGRATE,
],
});
}
Expand Down
Loading