diff --git a/config.js b/config.js index 5a270ae03c..7396009a56 100644 --- a/config.js +++ b/config.js @@ -916,6 +916,11 @@ config.NSFS_GLACIER_DMAPI_TPS_HTTP_HEADER = 'x-tape-meta-copy'; // but can be overridden to any numberical value config.NSFS_GLACIER_DMAPI_PMIG_DAYS = config.S3_RESTORE_REQUEST_MAX_DAYS; +// NSFS_GLACIER_DMAPI_FINALIZE_RESTORE_ENABLE if enabled will force NooBaa to +// examine the DMAPI xattr of the file before finalizing the restore to prevent +// accidental blocking reads from happening. +config.NSFS_GLACIER_DMAPI_FINALIZE_RESTORE_ENABLE = false; + config.NSFS_STATFS_CACHE_SIZE = 10000; config.NSFS_STATFS_CACHE_EXPIRY_MS = 1 * 1000; diff --git a/src/manage_nsfs/manage_nsfs_glacier.js b/src/manage_nsfs/manage_nsfs_glacier.js index 1f7069fac4..7c7f3a5104 100644 --- a/src/manage_nsfs/manage_nsfs_glacier.js +++ b/src/manage_nsfs/manage_nsfs_glacier.js @@ -9,86 +9,53 @@ const { Glacier } = require('../sdk/glacier'); const native_fs_utils = require('../util/native_fs_utils'); const { is_desired_time, record_current_time } = require('./manage_nsfs_cli_utils'); -const CLUSTER_LOCK = 'cluster.lock'; -const SCAN_LOCK = 'scan.lock'; - async function process_migrations() { const fs_context = native_fs_utils.get_process_fs_context(); - const lock_path = path.join(config.NSFS_GLACIER_LOGS_DIR, CLUSTER_LOCK); - await native_fs_utils.lock_and_run(fs_context, lock_path, async () => { - const backend = Glacier.getBackend(); - - if ( - await backend.low_free_space() || - await time_exceeded(fs_context, config.NSFS_GLACIER_MIGRATE_INTERVAL, Glacier.MIGRATE_TIMESTAMP_FILE) || - await migrate_log_exceeds_threshold() - ) { - await run_glacier_migrations(fs_context, backend); - const timestamp_file_path = path.join(config.NSFS_GLACIER_LOGS_DIR, Glacier.MIGRATE_TIMESTAMP_FILE); - await record_current_time(fs_context, timestamp_file_path); - } - }); -} - -/** - * run_tape_migrations reads the migration WALs and attempts to migrate the - * files mentioned in the WAL. - * @param {nb.NativeFSContext} fs_context - * @param {import('../sdk/glacier').Glacier} backend - */ -async function run_glacier_migrations(fs_context, backend) { - await run_glacier_operation(fs_context, Glacier.MIGRATE_WAL_NAME, backend.migrate.bind(backend)); + const backend = Glacier.getBackend(); + + if ( + await backend.low_free_space() || + await time_exceeded(fs_context, config.NSFS_GLACIER_MIGRATE_INTERVAL, Glacier.MIGRATE_TIMESTAMP_FILE) || + await migrate_log_exceeds_threshold() + ) { + await backend.perform(prepare_galcier_fs_context(fs_context), "MIGRATION"); + const timestamp_file_path = path.join(config.NSFS_GLACIER_LOGS_DIR, Glacier.MIGRATE_TIMESTAMP_FILE); + await record_current_time(fs_context, timestamp_file_path); + } } async function process_restores() { const fs_context = native_fs_utils.get_process_fs_context(); - const lock_path = path.join(config.NSFS_GLACIER_LOGS_DIR, CLUSTER_LOCK); - await native_fs_utils.lock_and_run(fs_context, lock_path, async () => { - const backend = Glacier.getBackend(); + const backend = Glacier.getBackend(); - if ( - await backend.low_free_space() || - !(await time_exceeded(fs_context, config.NSFS_GLACIER_RESTORE_INTERVAL, Glacier.RESTORE_TIMESTAMP_FILE)) - ) return; + if ( + await backend.low_free_space() || + !(await time_exceeded(fs_context, config.NSFS_GLACIER_RESTORE_INTERVAL, Glacier.RESTORE_TIMESTAMP_FILE)) + ) return; - - await run_glacier_restore(fs_context, backend); - const timestamp_file_path = path.join(config.NSFS_GLACIER_LOGS_DIR, Glacier.RESTORE_TIMESTAMP_FILE); - await record_current_time(fs_context, timestamp_file_path); - }); -} - -/** - * run_tape_restore reads the restore WALs and attempts to restore the - * files mentioned in the WAL. - * @param {nb.NativeFSContext} fs_context - * @param {import('../sdk/glacier').Glacier} backend - */ -async function run_glacier_restore(fs_context, backend) { - await run_glacier_operation(fs_context, Glacier.RESTORE_WAL_NAME, backend.restore.bind(backend)); + await backend.perform(prepare_galcier_fs_context(fs_context), "RESTORE"); + const timestamp_file_path = path.join(config.NSFS_GLACIER_LOGS_DIR, Glacier.RESTORE_TIMESTAMP_FILE); + await record_current_time(fs_context, timestamp_file_path); } async function process_expiry() { const fs_context = native_fs_utils.get_process_fs_context(); - const lock_path = path.join(config.NSFS_GLACIER_LOGS_DIR, SCAN_LOCK); - await native_fs_utils.lock_and_run(fs_context, lock_path, async () => { - const backend = Glacier.getBackend(); - const timestamp_file_path = path.join(config.NSFS_GLACIER_LOGS_DIR, Glacier.EXPIRY_TIMESTAMP_FILE); - if ( - await backend.low_free_space() || - await is_desired_time( - fs_context, - new Date(), - config.NSFS_GLACIER_EXPIRY_RUN_TIME, - config.NSFS_GLACIER_EXPIRY_RUN_DELAY_LIMIT_MINS, - timestamp_file_path, - config.NSFS_GLACIER_EXPIRY_TZ - ) - ) { - await backend.expiry(fs_context); - await record_current_time(fs_context, timestamp_file_path); - } - }); + const backend = Glacier.getBackend(); + const timestamp_file_path = path.join(config.NSFS_GLACIER_LOGS_DIR, Glacier.EXPIRY_TIMESTAMP_FILE); + if ( + await backend.low_free_space() || + await is_desired_time( + fs_context, + new Date(), + config.NSFS_GLACIER_EXPIRY_RUN_TIME, + config.NSFS_GLACIER_EXPIRY_RUN_DELAY_LIMIT_MINS, + timestamp_file_path, + config.NSFS_GLACIER_EXPIRY_TZ + ) + ) { + await backend.perform(prepare_galcier_fs_context(fs_context), "EXPIRY"); + await record_current_time(fs_context, timestamp_file_path); + } } @@ -137,27 +104,6 @@ async function migrate_log_exceeds_threshold(threshold = config.NSFS_GLACIER_MIG return log_size > threshold; } -/** - * run_glacier_operations takes a log_namespace and a callback and executes the - * callback on each log file in that namespace. It will also generate a failure - * log file and persist the failures in that log file. - * @param {nb.NativeFSContext} fs_context - * @param {string} log_namespace - * @param {Function} cb - */ -async function run_glacier_operation(fs_context, log_namespace, cb) { - const log = new PersistentLogger(config.NSFS_GLACIER_LOGS_DIR, log_namespace, { locking: 'EXCLUSIVE' }); - - fs_context = prepare_galcier_fs_context(fs_context); - try { - await log.process(async (entry, failure_recorder) => cb(fs_context, entry, failure_recorder)); - } catch (error) { - console.error('failed to process log in namespace:', log_namespace); - } finally { - await log.close(); - } -} - /** * prepare_galcier_fs_context returns a shallow copy of given * fs_context with backend set to 'GPFS'. diff --git a/src/sdk/glacier.js b/src/sdk/glacier.js index 70bdf9fa6f..dcc589e287 100644 --- a/src/sdk/glacier.js +++ b/src/sdk/glacier.js @@ -1,11 +1,16 @@ /* Copyright (C) 2024 NooBaa */ 'use strict'; +const path = require('path'); const nb_native = require('../util/nb_native'); const s3_utils = require('../endpoint/s3/s3_utils'); const { round_up_to_next_time_of_day } = require('../util/time_utils'); const dbg = require('../util/debug_module')(__filename); const config = require('../../config'); +const { PersistentLogger } = require('../util/persistent_logger'); +const native_fs_utils = require('../util/native_fs_utils'); + +/** @import {LogFile} from "../util/persistent_logger" */ class Glacier { // These names start with the word 'timestamp' so as to assure @@ -41,6 +46,8 @@ class Glacier { static STORAGE_CLASS_XATTR = 'user.storage_class'; + static XATTR_STAGE_MIGRATE = 'user.noobaa.migrate.staged'; + /** * GPFS_DMAPI_XATTR_TAPE_INDICATOR if set on a file indicates that the file is on tape. * @@ -65,7 +72,9 @@ class Glacier { static GPFS_DMAPI_XATTR_TAPE_TPS = 'dmapi.IBMTPS'; static MIGRATE_WAL_NAME = 'migrate'; + static MIGRATE_STAGE_WAL_NAME = 'stage.migrate'; static RESTORE_WAL_NAME = 'restore'; + static RESTORE_STAGE_WAL_NAME = 'stage.restore'; /** @type {nb.RestoreState} */ static RESTORE_STATUS_CAN_RESTORE = 'CAN_RESTORE'; @@ -74,17 +83,46 @@ class Glacier { /** @type {nb.RestoreState} */ static RESTORE_STATUS_RESTORED = 'RESTORED'; + static GLACIER_CLUSTER_LOCK = 'glacier.cluster.lock'; + static GLACIER_MIGRATE_CLUSTER_LOCK = 'glacier.cluster.migrate.lock'; + static GLACIER_RESTORE_CLUSTER_LOCK = 'glacier.cluster.restore.lock'; + static GLACIER_SCAN_LOCK = 'glacier.scan.lock'; + + /** + * stage_migrate must take a LogFile object (this should be from the + * `GLACIER.MIGRATE_WAL_NAME` namespace) which will have + * newline seperated entries of filenames which needs to be + * migrated to GLACIER and should stage the files for migration. + * + * The function should return false if it needs the log file to be + * preserved. + * @param {nb.NativeFSContext} fs_context + * @param {LogFile} log_file log filename + * @param {(entry: string) => Promise} failure_recorder + * @returns {Promise} + */ + async stage_migrate(fs_context, log_file, failure_recorder) { + try { + await log_file.collect(Glacier.MIGRATE_STAGE_WAL_NAME, async (entry, batch_recorder) => batch_recorder(entry)); + return true; + } catch (error) { + dbg.error('Glacier.stage_migrate error:', error); + throw error; + } + } + /** - * migrate must take a file name which will have newline seperated - * entries of filenames which needs to be migrated to GLACIER and - * should perform migration of those files if feasible. + * migrate must take a LofFile object (this should from the + * `GLACIER.MIGRATE_STAGE_WAL_NAME` namespace) which will have newline + * separated entries of filenames which needs to be migrated to GLACIER + * and should perform migration of those files if feasible. * * The function should return false if it needs the log file to be * preserved. * * NOTE: This needs to be implemented by each backend. * @param {nb.NativeFSContext} fs_context - * @param {string} log_file log filename + * @param {LogFile} log_file log filename * @param {(entry: string) => Promise} failure_recorder * @returns {Promise} */ @@ -93,16 +131,39 @@ class Glacier { } /** - * restore must take a file name which will have newline seperated - * entries of filenames which needs to be restored from GLACIER and - * should perform restore of those files if feasible + * stage_restore must take a log file (from `Glacier.RESTORE_STAGE_WAL_NAME`) + * which will have newline seperated entries of filenames which needs to be + * migrated to GLACIER and should stage the files for migration. + * + * The function should return false if it needs the log file to be + * preserved. + * @param {nb.NativeFSContext} fs_context + * @param {LogFile} log_file log filename + * @param {(entry: string) => Promise} failure_recorder + * @returns {Promise} + */ + async stage_restore(fs_context, log_file, failure_recorder) { + try { + await log_file.collect(Glacier.RESTORE_STAGE_WAL_NAME, async (entry, batch_recorder) => batch_recorder(entry)); + return true; + } catch (error) { + dbg.error('Glacier.stage_restore error:', error); + throw error; + } + } + + /** + * restore must take a log file (from `Glacier.RESTORE_WAL_NAME`) which will + * have newline seperated entries of filenames which needs to be + * restored from GLACIER and should perform restore of those files if + * feasible * * The function should return false if it needs the log file to be * preserved. * * NOTE: This needs to be implemented by each backend. * @param {nb.NativeFSContext} fs_context - * @param {string} log_file log filename + * @param {LogFile} log_file log filename * @param {(entry: string) => Promise} failure_recorder * @returns {Promise} */ @@ -136,6 +197,78 @@ class Glacier { throw new Error('Unimplementented'); } + /** + * @param {nb.NativeFSContext} fs_context + * @param {"MIGRATION" | "RESTORE" | "EXPIRY"} type + */ + async perform(fs_context, type) { + const lock_path = lock_file => path.join(config.NSFS_GLACIER_LOGS_DIR, lock_file); + + if (type === 'EXPIRY') { + await native_fs_utils.lock_and_run(fs_context, lock_path(Glacier.GLACIER_SCAN_LOCK), async () => { + await this.expiry(fs_context); + }); + } + + /** @typedef {( + * fs_context: nb.NativeFSContext, + * file: LogFile, + * failure_recorder: (entry: string) => Promise + * ) => Promise} log_cb */ + + /** + * @param {string} namespace + * @param {log_cb} cb + */ + const process_glacier_logs = async (namespace, cb) => { + const logs = new PersistentLogger( + config.NSFS_GLACIER_LOGS_DIR, + namespace, { locking: 'EXCLUSIVE' }, + ); + await logs.process(async (entry, failure_recorder) => cb(fs_context, entry, failure_recorder)); + }; + + /** + * + * @param {string} primary_log_ns + * @param {string} staged_log_ns + * @param {log_cb} process_staged_fn + * @param {log_cb} process_primary_fn + * @param {string} stage_lock_file + */ + const run_operation = async (primary_log_ns, staged_log_ns, process_staged_fn, process_primary_fn, stage_lock_file) => { + // Acquire a cluster wide lock for all the operations for staging + await native_fs_utils.lock_and_run(fs_context, lock_path(Glacier.GLACIER_CLUSTER_LOCK), async () => { + await process_glacier_logs(primary_log_ns, process_staged_fn); + }); + + // Acquire a type specific lock to consume staged logs + await native_fs_utils.lock_and_run( + fs_context, lock_path(stage_lock_file), async () => { + await process_glacier_logs(staged_log_ns, process_primary_fn); + } + ); + }; + + if (type === 'MIGRATION') { + await run_operation( + Glacier.MIGRATE_WAL_NAME, + Glacier.MIGRATE_STAGE_WAL_NAME, + this.stage_migrate.bind(this), + this.migrate.bind(this), + Glacier.GLACIER_MIGRATE_CLUSTER_LOCK, + ); + } else if (type === 'RESTORE') { + await run_operation( + Glacier.RESTORE_WAL_NAME, + Glacier.RESTORE_STAGE_WAL_NAME, + this.stage_restore.bind(this), + this.restore.bind(this), + Glacier.GLACIER_RESTORE_CLUSTER_LOCK, + ); + } + } + /** * should_migrate returns true if the given file must be migrated * @@ -319,6 +452,7 @@ class Glacier { xattr_get_keys: [ Glacier.XATTR_RESTORE_REQUEST, Glacier.STORAGE_CLASS_XATTR, + Glacier.XATTR_STAGE_MIGRATE, ], }); } diff --git a/src/sdk/glacier_tapecloud.js b/src/sdk/glacier_tapecloud.js index 90184e6b6c..9333eee1f4 100644 --- a/src/sdk/glacier_tapecloud.js +++ b/src/sdk/glacier_tapecloud.js @@ -5,7 +5,6 @@ const { spawn } = require("child_process"); const events = require('events'); const os = require("os"); const path = require("path"); -const { LogFile } = require("../util/persistent_logger"); const { NewlineReader, NewlineReaderEntry } = require('../util/file_reader'); const { Glacier } = require("./glacier"); const config = require('../../config'); @@ -16,6 +15,7 @@ const dbg = require('../util/debug_module')(__filename); const ERROR_DUPLICATE_TASK = "GLESM431E"; +/** @import {LogFile} from "../util/persistent_logger" */ function get_bin_path(bin_name) { return path.join(config.NSFS_GLACIER_TAPECLOUD_BIN_DIR, bin_name); @@ -119,9 +119,14 @@ class TapeCloudUtils { const { stdout } = error; // Find the line in the stdout which has the line 'task ID is, ' and extract id - const match = stdout.match(/task ID is (\d+)/); - if (match.length !== 2) { - throw error; + // ID in latest version must look like 1005:1111:4444 + let match = stdout.match(/task ID is (\d+:\d+:\d+)/); + if (!match || match.length !== 2) { + // Fallback to the older task ID extraction in case we fail to extract new one + match = stdout.match(/task ID is (\d+)/); + if (!match || match.length !== 2) { + throw error; + } } const task_id = match[1]; @@ -190,17 +195,32 @@ class TapeCloudUtils { } class TapeCloudGlacier extends Glacier { - async migrate(fs_context, log_file, failure_recorder) { - dbg.log2('TapeCloudGlacier.migrate starting for', log_file); - - const file = new LogFile(fs_context, log_file); + /** + * @param {nb.NativeFSContext} fs_context + * @param {LogFile} log_file + * @param {(entry: string) => Promise} failure_recorder + * @returns {Promise} + */ + async stage_migrate(fs_context, log_file, failure_recorder) { + dbg.log2('TapeCloudGlacier.stage_migrate starting for', log_file.log_path); try { - await file.collect_and_process(async (entry, batch_recorder) => { + await log_file.collect(Glacier.MIGRATE_STAGE_WAL_NAME, async (entry, batch_recorder) => { + let entry_fh; let should_migrate = true; try { - should_migrate = await this.should_migrate(fs_context, entry); + entry_fh = await nb_native().fs.open(fs_context, entry); + const stat = await entry_fh.stat(fs_context, { + xattr_get_keys: [ + Glacier.XATTR_RESTORE_REQUEST, + Glacier.XATTR_RESTORE_EXPIRY, + Glacier.STORAGE_CLASS_XATTR, + ], + }); + should_migrate = await this.should_migrate(fs_context, entry, stat); } catch (err) { + await entry_fh?.close(fs_context); + if (err.code === 'ENOENT') { // Skip this file return; @@ -221,40 +241,116 @@ class TapeCloudGlacier extends Glacier { // Skip the file if it shouldn't be migrated if (!should_migrate) return; - // Can't really do anything if this fails - provider - // needs to make sure that appropriate error handling - // is being done there - await batch_recorder(entry); - }, - async batch => { - // This will throw error only if our eeadm error handler - // panics as well and at that point it's okay to - // not handle the error and rather keep the log file around - await this._migrate(batch, failure_recorder); + // Mark the file staged + try { + await entry_fh.replacexattr(fs_context, { [Glacier.XATTR_STAGE_MIGRATE]: Date.now().toString() }); + await batch_recorder(entry); + } catch (error) { + dbg.error('failed to mark the entry migrate staged', error); + + // Can't really do anything if this fails - provider + // needs to make sure that appropriate error handling + // is being done there + await failure_recorder(entry); + } finally { + entry_fh?.close(fs_context); + } }); return true; } catch (error) { - dbg.error('unexpected error in processing migrate:', error, 'for:', log_file); + dbg.error('unexpected error in staging migrate:', error, 'for:', log_file.log_path); return false; } } - async restore(fs_context, log_file, failure_recorder) { - dbg.log2('TapeCloudGlacier.restore starting for', log_file); + /** + * @param {nb.NativeFSContext} fs_context + * @param {LogFile} log_file + * @param {(entry: string) => Promise} failure_recorder + * @returns {Promise} + */ + async migrate(fs_context, log_file, failure_recorder) { + dbg.log2('TapeCloudGlacier.migrate starting for', log_file.log_path); + try { + // This will throw error only if our eeadm error handler + // panics as well and at that point it's okay to + // not handle the error and rather keep the log file around + await this._migrate(log_file.log_path, failure_recorder); + + // Un-stage all the files - We don't need to deal with the cases + // where some files have migrated and some have not as that is + // not important for staging/un-staging. + await log_file.collect_and_process(async entry => { + let fh; + try { + fh = await nb_native().fs.open(fs_context, entry); + await fh.replacexattr(fs_context, {}, Glacier.XATTR_STAGE_MIGRATE); + } catch (error) { + if (error.code === 'ENOENT') { + // This is OK + return; + } + + dbg.error('failed to remove stage marker:', error, 'for:', entry); + + // Add the enty to the failure log - This could be wasteful as it might + // add entries which have already been migrated but this is a better + // retry. + await failure_recorder(entry); + } finally { + await fh?.close(fs_context); + } + }); + + return true; + } catch (error) { + dbg.error('unexpected error in processing migrate:', error, 'for:', log_file.log_path); + return false; + } + } + + /** + * @param {nb.NativeFSContext} fs_context + * @param {LogFile} log_file + * @param {(entry: string) => Promise} failure_recorder + * @returns {Promise} + */ + async stage_restore(fs_context, log_file, failure_recorder) { + dbg.log2('TapeCloudGlacier.stage_restore starting for', log_file.log_path); - const file = new LogFile(fs_context, log_file); try { - await file.collect_and_process(async (entry, batch_recorder) => { + await log_file.collect(Glacier.RESTORE_STAGE_WAL_NAME, async (entry, batch_recorder) => { + let fh; try { - const should_restore = await Glacier.should_restore(fs_context, entry); + fh = await nb_native().fs.open(fs_context, entry); + const stat = await fh.stat(fs_context, { + xattr_get_keys: [ + Glacier.XATTR_RESTORE_REQUEST, + Glacier.STORAGE_CLASS_XATTR, + Glacier.XATTR_STAGE_MIGRATE, + ], + }); + + const should_restore = await Glacier.should_restore(fs_context, entry, stat); if (!should_restore) { // Skip this file return; } - // Add entry to the tempwal - await batch_recorder(entry); + // If the file staged for migrate then add it to failure log for retry + // + // It doesn't matter if we read the appropriate value for this xattr or not + // 1. If we read nothing we are sure that this file is not staged + // 2. If we read the appropriate value then the file is either being migrated + // or has recently completed migration but hasn't been unmarked. + // 3. If we read corrupt value then either the file is getting staged or is + // getting un-staged - In either case we must requeue. + if (stat.xattr[Glacier.XATTR_STAGE_MIGRATE]) { + await failure_recorder(entry); + } else { + await batch_recorder(entry); + } } catch (error) { if (error.code === 'ENOENT') { // Skip this file @@ -266,34 +362,52 @@ class TapeCloudGlacier extends Glacier { 'to failure recorder due to error', error, ); await failure_recorder(entry); - } - }, - async batch => { - const success = await this._recall( - batch, - async entry_path => { - dbg.log2('TapeCloudGlacier.restore.partial_failure - entry:', entry_path); - await failure_recorder(entry_path); - }, - async entry_path => { - dbg.log2('TapeCloudGlacier.restore.partial_success - entry:', entry_path); - await this._finalize_restore(fs_context, entry_path); - } - ); - - // We will iterate through the entire log file iff and we get a success message from - // the recall call. - if (success) { - const batch_file = new LogFile(fs_context, batch); - await batch_file.collect_and_process(async (entry_path, batch_recorder) => { - dbg.log2('TapeCloudGlacier.restore.batch - entry:', entry_path); - await this._finalize_restore(fs_context, entry_path); - }); + } finally { + await fh?.close(fs_context); } }); + + return true; + } catch (error) { + dbg.error('unexpected error in staging restore:', error, 'for:', log_file.log_path); + return false; + } + } + + /** + * @param {nb.NativeFSContext} fs_context + * @param {LogFile} log_file + * @param {(entry: string) => Promise} failure_recorder + * @returns {Promise} + */ + async restore(fs_context, log_file, failure_recorder) { + dbg.log2('TapeCloudGlacier.restore starting for', log_file.log_path); + + try { + const success = await this._recall( + log_file.log_path, + async entry_path => { + dbg.log2('TapeCloudGlacier.restore.partial_failure - entry:', entry_path); + await failure_recorder(entry_path); + }, + async entry_path => { + dbg.log2('TapeCloudGlacier.restore.partial_success - entry:', entry_path); + await this._finalize_restore(fs_context, entry_path, failure_recorder); + } + ); + + // We will iterate through the entire log file iff and we get a success message from + // the recall call. + if (success) { + await log_file.collect_and_process(async (entry_path, batch_recorder) => { + dbg.log2('TapeCloudGlacier.restore.batch - entry:', entry_path); + await this._finalize_restore(fs_context, entry_path, failure_recorder); + }); + } + return true; } catch (error) { - dbg.error('unexpected error in processing restore:', error, 'for:', log_file); + dbg.error('unexpected error in processing restore:', error, 'for:', log_file.log_path); return false; } } @@ -352,8 +466,9 @@ class TapeCloudGlacier extends Glacier { * * @param {nb.NativeFSContext} fs_context * @param {string} entry_path + * @param {(entry: string) => Promise} [failure_recorder] */ - async _finalize_restore(fs_context, entry_path) { + async _finalize_restore(fs_context, entry_path, failure_recorder) { dbg.log2('TapeCloudGlacier.restore._finalize_restore - entry:', entry_path); const entry = new NewlineReaderEntry(fs_context, entry_path); @@ -371,11 +486,35 @@ class TapeCloudGlacier extends Glacier { throw error; } - const stat = await fh.stat(fs_context, { - xattr_get_keys: [ - Glacier.XATTR_RESTORE_REQUEST, - ] - }); + const xattr_get_keys = [Glacier.XATTR_RESTORE_REQUEST]; + if (fs_context.use_dmapi) { + xattr_get_keys.push(Glacier.GPFS_DMAPI_XATTR_TAPE_PREMIG); + } + const stat = await fh.stat(fs_context, { xattr_get_keys }); + + // This is a hacky solution and would work only if + // config.NSFS_GLACIER_DMAPI_ENABLE is enabled. This prevents + // the following case: + // 1. PUT obj + // 2. NOOBAA MIGRATE TRIGGERED (staging xattr placed) + // 3. PUT obj (staging xattr gone) + // 4. RESTORE-OBJECT obj + // 5. NOOBAA RESTORE TRIGGERED + // 6. EEADM RESTORE TRIGGERED - File is resident + // 7. EEADM MIGRATE MIGRATES the resident file + // 9. NooBaa finalizes the restore + if ( + config.NSFS_GLACIER_DMAPI_FINALIZE_RESTORE_ENABLE && + !stat.xattr[Glacier.GPFS_DMAPI_XATTR_TAPE_PREMIG] + ) { + dbg.warn("TapeCloudGlacier._finalize_restore: file not premig yet - will retry - for", entry_path); + if (!failure_recorder) { + throw new Error('restored file not actually restored'); + } + + await failure_recorder(entry_path); + return; + } const days = Number(stat.xattr[Glacier.XATTR_RESTORE_REQUEST]); diff --git a/src/test/unit_tests/nsfs/test_nsfs_glacier_backend.js b/src/test/unit_tests/nsfs/test_nsfs_glacier_backend.js index 1dc2e637ff..63a742e756 100644 --- a/src/test/unit_tests/nsfs/test_nsfs_glacier_backend.js +++ b/src/test/unit_tests/nsfs/test_nsfs_glacier_backend.js @@ -13,7 +13,6 @@ const NamespaceFS = require('../../../sdk/namespace_fs'); const s3_utils = require('../../../endpoint/s3/s3_utils'); const buffer_utils = require('../../../util/buffer_utils'); const endpoint_stats_collector = require('../../../sdk/endpoint_stats_collector'); -const { NewlineReader } = require('../../../util/file_reader'); const { TapeCloudGlacier, TapeCloudUtils } = require('../../../sdk/glacier_tapecloud'); const { PersistentLogger } = require('../../../util/persistent_logger'); const { Glacier } = require('../../../sdk/glacier'); @@ -87,6 +86,17 @@ class NoOpWriteStream extends Stream.Writable { } } +function get_patched_backend() { + const backend = new TapeCloudGlacier(); + + // Patch backend for test + backend._migrate = async () => true; + backend._recall = async () => true; + backend._process_expired = async () => null; + + return backend; +} + /* Justification: Disable max-lines-per-function for test functions as it is not much helpful in the sense that "describe" function capture entire test suite instead of being a logical abstraction */ @@ -161,49 +171,40 @@ mocha.describe('nsfs_glacier', function() { const xattr = { key: 'value', key2: 'value2' }; xattr[s3_utils.XATTR_SORT_SYMBOL] = true; - const backend = new TapeCloudGlacier(); - - // Patch backend for test - backend._migrate = async () => true; - backend._recall = async () => true; - backend._process_expired = async () => { /**noop*/ }; + const backend = get_patched_backend(); - mocha.it('upload to GLACIER should work', async function() { + mocha.it('upload to GLACIER should work', async function() { const data = crypto.randomBytes(100); const upload_res = await glacier_ns.upload_object({ bucket: upload_bkt, key: upload_key, - storage_class: s3_utils.STORAGE_CLASS_GLACIER, + storage_class: s3_utils.STORAGE_CLASS_GLACIER, xattr, source_stream: buffer_utils.buffer_to_read_stream(data) }, dummy_object_sdk); console.log('upload_object response', inspect(upload_res)); - // Check if the log contains the entry - let found = false; - await NamespaceFS.migrate_wal._process(async file => { - const fs_context = glacier_ns.prepare_fs_context(dummy_object_sdk); - const reader = new NewlineReader(fs_context, file, { lock: 'EXCLUSIVE' }); - - await reader.forEachFilePathEntry(async entry => { - if (entry.path.endsWith(`${upload_key}`)) { - found = true; - - // Not only should the file exist, it should be ready for - // migration as well - assert(backend.should_migrate(fs_context, entry.path)); - } + // Check if the log contains the entry + let found = false; + await NamespaceFS.migrate_wal._process(async file => { + const fs_context = glacier_ns.prepare_fs_context(dummy_object_sdk); + await file.collect_and_process(async entry => { + if (entry.endsWith(`${upload_key}`)) { + found = true; - return true; - }); + // Not only should the file exist, it should be ready for + // migration as well + assert(backend.should_migrate(fs_context, entry)); + } + }); - // Don't delete the file - return false; - }); + // Don't delete the file + return false; + }); - assert(found); - }); + assert(found); + }); mocha.it('restore-object should successfully restore', async function() { const now = Date.now(); @@ -232,13 +233,7 @@ mocha.describe('nsfs_glacier', function() { await assert.rejects(glacier_ns.read_object_stream(params, dummy_object_sdk, dummy_stream_writer)); // Issue restore - await NamespaceFS.restore_wal._process(async file => { - const fs_context = glacier_ns.prepare_fs_context(dummy_object_sdk); - await backend.restore(fs_context, file); - - // Don't delete the file - return false; - }); + await backend.perform(glacier_ns.prepare_fs_context(dummy_object_sdk), "RESTORE"); // Ensure object is restored const md = await glacier_ns.read_object_md(params, dummy_object_sdk); @@ -308,12 +303,7 @@ mocha.describe('nsfs_glacier', function() { const fs_context = glacier_ns.prepare_fs_context(dummy_object_sdk); // Issue restore - await NamespaceFS.restore_wal._process(async file => { - await failure_backend.restore(fs_context, file, async () => { /*noop*/ }); - - // Don't delete the file - return false; - }); + await failure_backend.perform(fs_context, "RESTORE"); // Ensure success object is restored const success_md = await glacier_ns.read_object_md(success_params, dummy_object_sdk); diff --git a/src/util/bucket_logs_utils.js b/src/util/bucket_logs_utils.js index aa4a07baed..44f53a4eb7 100644 --- a/src/util/bucket_logs_utils.js +++ b/src/util/bucket_logs_utils.js @@ -6,7 +6,7 @@ const config = require('../../config'); const stream = require('stream'); const crypto = require('crypto'); const path = require('path'); -const { PersistentLogger, LogFile } = require('../util/persistent_logger'); +const { PersistentLogger } = require('../util/persistent_logger'); const { format_aws_date } = require('../util/time_utils'); const nsfs_schema_utils = require('../manage_nsfs/nsfs_schema_utils'); const semaphore = require('../util/semaphore'); @@ -36,7 +36,7 @@ async function export_logs_to_target(fs_context, s3_connection, bucket_to_owner_ if (!entry.name.endsWith('.log')) return; const log = new PersistentLogger(config.PERSISTENT_BUCKET_LOG_DIR, path.parse(entry.name).name, { locking: 'EXCLUSIVE' }); try { - return log.process(async file => _upload_to_targets(fs_context, s3_connection, file, bucket_to_owner_keys_func)); + return log.process(async file => _upload_to_targets(s3_connection, file, bucket_to_owner_keys_func)); } catch (err) { dbg.error('processing log file failed', log.file); throw err; @@ -51,19 +51,17 @@ async function export_logs_to_target(fs_context, s3_connection, bucket_to_owner_ * This function gets a persistent log file, will go over it's entries one by one, * and will upload the entry to the target_bucket using the provided s3 connection * in order to know which user to use to upload to each bucket we will need to provide bucket_to_owner_keys_func - * @param {nb.NativeFSContext} fs_context * @param {AWS.S3} s3_connection - * @param {string} log_file + * @param {import('../util/persistent_logger').LogFile} log_file * @param {function} bucket_to_owner_keys_func * @returns {Promise} */ -async function _upload_to_targets(fs_context, s3_connection, log_file, bucket_to_owner_keys_func) { +async function _upload_to_targets(s3_connection, log_file, bucket_to_owner_keys_func) { const bucket_streams = {}; const promises = []; try { - const file = new LogFile(fs_context, log_file); - dbg.log1('uploading file to target buckets', log_file); - await file.collect_and_process(async entry => { + dbg.log1('uploading file to target buckets', log_file.log_path); + await log_file.collect_and_process(async entry => { const log_entry = JSON.parse(entry); nsfs_schema_utils.validate_log_schema(log_entry); const target_bucket = log_entry.log_bucket; @@ -100,7 +98,7 @@ async function _upload_to_targets(fs_context, s3_connection, log_file, bucket_to Object.values(bucket_streams).forEach(st => st.end()); await Promise.all(promises); } catch (error) { - dbg.error('unexpected error in upload to bucket:', error, 'for:', log_file); + dbg.error('unexpected error in upload to bucket:', error, 'for:', log_file.log_path); return false; } return true; diff --git a/src/util/file_reader.js b/src/util/file_reader.js index a8be5e9184..af409eebbc 100644 --- a/src/util/file_reader.js +++ b/src/util/file_reader.js @@ -199,8 +199,18 @@ class NewlineReader { } } + /** + * close will close the file descriptor and will + * set the internaly file handler to `null`. HOWEVER, + * the reader can still be used after close is called + * as the reader will initiialize the file handler + * again if a read is attempted. + */ async close() { - if (this.fh) await this.fh.close(this.fs_context); + const fh = this.fh; + this.fh = null; + + if (fh) await fh.close(this.fs_context); } } diff --git a/src/util/native_fs_utils.js b/src/util/native_fs_utils.js index 4a12a80b66..a461754090 100644 --- a/src/util/native_fs_utils.js +++ b/src/util/native_fs_utils.js @@ -719,7 +719,7 @@ function translate_error_codes(err, entity) { * acquiring the lock * @param {nb.NativeFSContext} fs_context * @param {string} lock_path - * @param {Function} cb + * @param {() => Promise} cb */ async function lock_and_run(fs_context, lock_path, cb) { const lockfd = await nb_native().fs.open(fs_context, lock_path, 'w'); @@ -732,6 +732,80 @@ async function lock_and_run(fs_context, lock_path, cb) { } } +/** + * open_with_lock tries to open a file and return it only after successfully + * acquiring the lock on the file. Once lock is aqcuired, it also performs + * several checks. By default, open_with_lock would perform the same as + * open. + * - If locking is needed then cfg.locking must be specified. + * - If the function fails to take a lock then it will return + * undefined. + * - If no flag is given and 'EXCLUSIVE' locking is required then + * the flag is automatically updated to O_RDRW. + * @param {nb.NativeFSContext} fs_context + * @param {string} file_path + * @param {string} [flags] + * @param {number} [mode] + * @param {{ + * lock?: "EXCLUSIVE" | "SHARED", + * retries?: number + * backoff?: number + * }} [cfg] + * @returns {Promise} + */ +async function open_with_lock(fs_context, file_path, flags, mode, cfg) { + cfg = cfg || {}; + + let file; + let retries = 10; + const backoff = cfg.backoff || 5; + flags = flags || (cfg.lock === 'EXCLUSIVE' ? '+*' : 'r'); + + const includes_any = (string, ...chars) => { + for (const ch of chars) { + if (string.includes(ch)) return true; + } + }; + + // Validate flag and lock compatibility + if (cfg.lock === 'EXCLUSIVE' && !includes_any(flags, "w", "a", "+")) { + throw new Error('incompatible lock and open flags'); + } + if (cfg.lock === 'SHARED' && !includes_any(flags, "r", "+")) { + throw new Error('incompatible lock and open flags'); + } + + for (; retries > 0; retries -= 1) { + try { + file = await nb_native().fs.open(fs_context, file_path, flags, mode); + + if (cfg.lock) { + await file.fcntllock(fs_context, cfg.lock); + } + + const fh_stat = await file.stat(fs_context); + const path_stat = await nb_native().fs.stat(fs_context, file_path); + + if (fh_stat.ino === path_stat.ino && fh_stat.nlink > 0) { + return file; + } + + dbg.log0('failed to open_with_lock: file_path:', file_path); + + // Close the file before retrying + await file.close(fs_context); + await P.delay(backoff * (1 + Math.random())); + } catch (error) { + await file?.close(fs_context); + throw error; + } + } + + // If we are here then it means we exceeded retries + // and failed to acquire a lock. + await file?.close(fs_context); +} + exports.get_umasked_mode = get_umasked_mode; exports._make_path_dirs = _make_path_dirs; exports._create_path = _create_path; @@ -743,6 +817,7 @@ exports.get_user_by_distinguished_name = get_user_by_distinguished_name; exports.get_config_files_tmpdir = get_config_files_tmpdir; exports.stat_ignore_enoent = stat_ignore_enoent; exports.stat_if_exists = stat_if_exists; +exports.open_with_lock = open_with_lock; exports._is_gpfs = _is_gpfs; exports.safe_move = safe_move; diff --git a/src/util/notifications_util.js b/src/util/notifications_util.js index 1ab8d59bc4..095a49cfc8 100644 --- a/src/util/notifications_util.js +++ b/src/util/notifications_util.js @@ -3,7 +3,7 @@ const dbg = require('../util/debug_module')(__filename); const config = require('../../config'); -const { PersistentLogger, LogFile } = require('../util/persistent_logger'); +const { PersistentLogger } = require('../util/persistent_logger'); const Kafka = require('node-rdkafka'); const os = require('os'); const fs = require('fs'); @@ -97,7 +97,7 @@ class Notificator { dbg.log1("process_notification_files node_namespace =", node_namespace, ", file =", entry.name); const log = get_notification_logger('EXCLUSIVE', node_namespace); try { - await log.process(async (file, failure_append) => await this._notify(this.fs_context, file, failure_append)); + await log.process(async (file, failure_append) => await this._notify(file, failure_append)); } catch (err) { dbg.error('processing notifications log file failed', log.file); throw err; @@ -113,12 +113,11 @@ class Notificator { } /** - * @param {nb.NativeFSContext} fs_context - * @param {string} log_file + * @param {import('../util/persistent_logger').LogFile} file + * @param {(entry: string) => Promise} failure_append * @returns {Promise} */ - async _notify(fs_context, log_file, failure_append) { - const file = new LogFile(fs_context, log_file); + async _notify(file, failure_append) { let send_promises = []; let notif; await file.collect_and_process(async str => { diff --git a/src/util/persistent_logger.js b/src/util/persistent_logger.js index 0e55bc745d..ba58aa78f7 100644 --- a/src/util/persistent_logger.js +++ b/src/util/persistent_logger.js @@ -4,7 +4,6 @@ const path = require('path'); const nb_native = require('./nb_native'); const native_fs_utils = require('./native_fs_utils'); -const P = require('./promise'); const semaphore = require('./semaphore'); const { NewlineReader } = require('./file_reader'); const dbg = require('./debug_module')(__filename); @@ -52,51 +51,16 @@ class PersistentLogger { return this.init_lock.surround(async () => { if (this.fh) return this.fh; - const total_retries = 10; - const backoff = 5; - - for (let retries = 0; retries < total_retries; retries++) { - let fh = null; - try { - fh = await this._open(); - if (this.locking) await fh.fcntllock(this.fs_context, this.locking); - - const fh_stat = await fh.stat(this.fs_context); - const path_stat = await nb_native().fs.stat(this.fs_context, this.active_path); - - if (fh_stat.ino === path_stat.ino && fh_stat.nlink > 0) { - this.fh = fh; - this.local_size = 0; - this.fh_stat = fh_stat; - - // Prevent closing the fh if we succedded in the init - fh = null; - - return this.fh; - } - - dbg.log0( - 'failed to init active log file, retry:', retries + 1, - 'active path:', this.active_path, - ); - await P.delay(backoff * (1 + Math.random())); - } catch (error) { - dbg.log0( - 'an error occured during init:', error, - 'active path:', this.active_path, - ); - throw error; - } finally { - if (fh) await fh.close(this.fs_context); - } - } - - dbg.log0( - 'init retries exceeded, total retries:', - total_retries, - 'active path:', this.active_path, + await native_fs_utils._create_path(this.dir, this.fs_context); + this.fh = await native_fs_utils.open_with_lock( + this.fs_context, + this.active_path, + 'as+', + undefined, + { lock: this.locking, retries: 10, backoff: 5 } ); - throw new Error('init retries exceeded'); + + return this.fh; }); } @@ -133,7 +97,7 @@ class PersistentLogger { /** * process_inactive takes a callback and runs it on all past WAL files. * It does so in lexographically sorted order. - * @param {(file: string) => Promise} cb callback + * @param {(file: LogFile) => Promise} cb callback * @param {boolean} replace_active */ async _process(cb, replace_active = true) { @@ -155,11 +119,30 @@ class PersistentLogger { let result = true; for (const file of filtered_files) { dbg.log1('Processing', this.dir, file); - const delete_processed = await cb(path.join(this.dir, file.name)); - if (delete_processed) { - await nb_native().fs.unlink(this.fs_context, path.join(this.dir, file.name)); - } else { - result = false; + + // Classes are hoisted - this elsint rule doesn't makes sense for classes (and functions()?) + // eslint-disable-next-line no-use-before-define + const log_file = new LogFile(this.fs_context, path.join(this.dir, file.name)); + await log_file.init(); + dbg.log1('Initialized log file -', log_file.log_path); + + const delete_processed = await cb(log_file); + + try { + if (delete_processed) { + await nb_native().fs.unlink(this.fs_context, log_file.log_path); + } else { + result = false; + } + } catch (error) { + dbg.warn('failed to delete the log file:', log_file.log_path); + throw error; + } finally { + // It is safe to call this even if the callback itself has + // for some reason chosen to deinit the log file early (maybe + // for early release of lock) as deinit wouldn't do anything + // if it doesn't hold a file handler. + await log_file.deinit(); } } return result; @@ -168,7 +151,7 @@ class PersistentLogger { /** * process is a safe wrapper around _process function which creates a failure logger for the * callback function which allows persisting failures to disk - * @param {(file: string, failure_recorder: (entry: string) => Promise) => Promise} cb callback + * @param {(file: LogFile, failure_recorder: (entry: string) => Promise) => Promise} cb callback */ async process(cb) { let failure_log = null; @@ -264,6 +247,78 @@ class LogFile { constructor(fs_context, log_path) { this.fs_context = fs_context; this.log_path = log_path; + this.log_reader = new NewlineReader( + this.fs_context, + this.log_path, { lock: 'EXCLUSIVE', skip_overflow_lines: true, skip_leftover_line: true }, + ); + } + + /** + * init eagerly initializes the underlying log reader which also means + * that it will immediately acquire an EX lock on the underlying file. + * + * Calling this method isn't necessary as both `collect` and `collect_and_process` + * lazily initializes the reader. However, explicit initializing can help + * manage the lifecycle of the underlying lock. + */ + async init() { + if (this.log_reader.fh) return; + await this.log_reader.init(); + } + + /** + * deinit closes the underlying log file and will reset the reader as well. + * This also means that deinit will lose the lock hence this should be called + * with great care. + */ + async deinit() { + if (!this.log_reader.fh) return; + await this.log_reader.close(); + this.log_reader.reset(); + } + + /** + * collect is the simpler alternative of `collect_and_process` where it takes + * the namespace under which the filtered file should be written and takes a batch_recorder + * which will write the given entry to the filtered log. + * + * The use case for simple collect is when the user intends to create a filtered log from + * main log but wants to process the filtered log at a later stage. + * + * The filtered log is generated via `PersistentLogger` hence all the operations available + * in the logger can be performed against the filtered log given the same namespace. + * @param {string} namespace + * @param {(entry: string, batch_recorder: (entry: string) => Promise) => Promise} collect + */ + async collect(namespace, collect) { + let filtered_log = null; + try { + filtered_log = new PersistentLogger( + path.dirname(this.log_path), + namespace, + { locking: 'EXCLUSIVE' } + ); + + // Reset before each call - It is OK to reset it here even + // if the reader hasn't been initialized yet. + this.log_reader.reset(); + + await this.log_reader.forEach(async entry => { + await collect(entry, filtered_log.append.bind(filtered_log)); + return true; + }); + + if (filtered_log.local_size === 0) return; + + await filtered_log.close(); + } catch (error) { + dbg.error('unexpected error in consuming log file:', this.log_path); + + // bubble the error to the caller + throw error; + } finally { + await filtered_log?.close(); + } } /** @@ -280,7 +335,6 @@ class LogFile { * @returns {Promise} */ async collect_and_process(collect, process) { - let log_reader = null; let filtered_log = null; try { filtered_log = new PersistentLogger( @@ -288,11 +342,11 @@ class LogFile { `tmp_consume_${Date.now().toString()}`, { locking: 'EXCLUSIVE' } ); - log_reader = new NewlineReader( - this.fs_context, - this.log_path, { lock: 'EXCLUSIVE', skip_overflow_lines: true, skip_leftover_line: true }, - ); - await log_reader.forEach(async entry => { + // Reset before each call - It is OK to reset it here even + // if the reader hasn't been initialized yet. + this.log_reader.reset(); + + await this.log_reader.forEach(async entry => { await collect(entry, filtered_log.append.bind(filtered_log)); return true; }); @@ -307,10 +361,6 @@ class LogFile { // bubble the error to the caller throw error; } finally { - if (log_reader) { - await log_reader.close(); - } - if (filtered_log) { await filtered_log.close(); await filtered_log.remove();