Skip to content

[NSFS | GLACIER] Support newline and backslash character in Glacier logs #9142

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/sdk/glacier.js
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,32 @@ class Glacier {
return restore_status.state === Glacier.RESTORE_STATUS_CAN_RESTORE;
}

/**
* encode_log takes in data intended for the backend and encodes
* it.
*
* This method must be overwritten for all the backends if they need
* different encodings for their logs.
* @param {string} data
* @returns {string}
*/
encode_log(data) {
return data;
}

/**
* decode_log takes in data intended for the backend and decodes
* it.
*
* This method must be overwritten for all the backends if they need
* different encodings for their logs.
* @param {string} data
* @returns {string}
*/
decode_log(data) {
return data;
}

/**
* get_restore_status returns status of the object at the given
* file_path
Expand Down
69 changes: 58 additions & 11 deletions src/sdk/glacier_tapecloud.js
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ class TapeCloudUtils {
}

class TapeCloudGlacier extends Glacier {
static LOG_DELIM = ' -- ';

/**
* @param {nb.NativeFSContext} fs_context
* @param {LogFile} log_file
Expand All @@ -204,8 +206,14 @@ class TapeCloudGlacier extends Glacier {
async stage_migrate(fs_context, log_file, failure_recorder) {
dbg.log2('TapeCloudGlacier.stage_migrate starting for', log_file.log_path);

// Wrap failure recorder to make sure we correctly encode the entries
// before appending them to the failure log
const encoded_failure_recorder = async failure => failure_recorder(this.encode_log(failure));

try {
await log_file.collect(Glacier.MIGRATE_STAGE_WAL_NAME, async (entry, batch_recorder) => {
entry = this.decode_log(entry);

let entry_fh;
let should_migrate = true;
try {
Expand Down Expand Up @@ -234,7 +242,7 @@ class TapeCloudGlacier extends Glacier {
// Can't really do anything if this fails - provider
// needs to make sure that appropriate error handling
// is being done there
await failure_recorder(entry);
await encoded_failure_recorder(entry);
return;
}

Expand All @@ -244,14 +252,14 @@ class TapeCloudGlacier extends Glacier {
// Mark the file staged
try {
await entry_fh.replacexattr(fs_context, { [Glacier.XATTR_STAGE_MIGRATE]: Date.now().toString() });
await batch_recorder(entry);
await batch_recorder(this.encode_log(entry));
} catch (error) {
dbg.error('failed to mark the entry migrate staged', error);

// Can't really do anything if this fails - provider
// needs to make sure that appropriate error handling
// is being done there
await failure_recorder(entry);
await encoded_failure_recorder(entry);
} finally {
entry_fh?.close(fs_context);
}
Expand All @@ -272,16 +280,23 @@ class TapeCloudGlacier extends Glacier {
*/
async migrate(fs_context, log_file, failure_recorder) {
dbg.log2('TapeCloudGlacier.migrate starting for', log_file.log_path);

// Wrap failure recorder to make sure we correctly encode the entries
// before appending them to the failure log
const encoded_failure_recorder = async failure => failure_recorder(this.encode_log(failure));

try {
// This will throw error only if our eeadm error handler
// panics as well and at that point it's okay to
// not handle the error and rather keep the log file around
await this._migrate(log_file.log_path, failure_recorder);
await this._migrate(log_file.log_path, encoded_failure_recorder);

// Un-stage all the files - We don't need to deal with the cases
// where some files have migrated and some have not as that is
// not important for staging/un-staging.
await log_file.collect_and_process(async entry => {
entry = this.decode_log(entry);

let fh;
try {
fh = await nb_native().fs.open(fs_context, entry);
Expand All @@ -297,7 +312,7 @@ class TapeCloudGlacier extends Glacier {
// Add the enty to the failure log - This could be wasteful as it might
// add entries which have already been migrated but this is a better
// retry.
await failure_recorder(entry);
await encoded_failure_recorder(entry);
} finally {
await fh?.close(fs_context);
}
Expand All @@ -319,8 +334,14 @@ class TapeCloudGlacier extends Glacier {
async stage_restore(fs_context, log_file, failure_recorder) {
dbg.log2('TapeCloudGlacier.stage_restore starting for', log_file.log_path);

// Wrap failure recorder to make sure we correctly encode the entries
// before appending them to the failure log
const encoded_failure_recorder = async failure => failure_recorder(this.encode_log(failure));

try {
await log_file.collect(Glacier.RESTORE_STAGE_WAL_NAME, async (entry, batch_recorder) => {
entry = this.decode_log(entry);

let fh;
try {
fh = await nb_native().fs.open(fs_context, entry);
Expand All @@ -347,9 +368,9 @@ class TapeCloudGlacier extends Glacier {
// 3. If we read corrupt value then either the file is getting staged or is
// getting un-staged - In either case we must requeue.
if (stat.xattr[Glacier.XATTR_STAGE_MIGRATE]) {
await failure_recorder(entry);
await encoded_failure_recorder(entry);
} else {
await batch_recorder(entry);
await batch_recorder(this.encode_log(entry));
}
} catch (error) {
if (error.code === 'ENOENT') {
Expand All @@ -361,7 +382,7 @@ class TapeCloudGlacier extends Glacier {
'adding log entry', entry,
'to failure recorder due to error', error,
);
await failure_recorder(entry);
await encoded_failure_recorder(entry);
} finally {
await fh?.close(fs_context);
}
Expand All @@ -383,25 +404,32 @@ class TapeCloudGlacier extends Glacier {
async restore(fs_context, log_file, failure_recorder) {
dbg.log2('TapeCloudGlacier.restore starting for', log_file.log_path);

// Wrap failure recorder to make sure we correctly encode the entries
// before appending them to the failure log
const encoded_failure_recorder = async failure => failure_recorder(this.encode_log(failure));

try {
const success = await this._recall(
log_file.log_path,
async entry_path => {
entry_path = this.decode_log(entry_path);
dbg.log2('TapeCloudGlacier.restore.partial_failure - entry:', entry_path);
await failure_recorder(entry_path);
await encoded_failure_recorder(entry_path);
},
async entry_path => {
entry_path = this.decode_log(entry_path);
dbg.log2('TapeCloudGlacier.restore.partial_success - entry:', entry_path);
await this._finalize_restore(fs_context, entry_path, failure_recorder);
await this._finalize_restore(fs_context, entry_path, encoded_failure_recorder);
}
);

// We will iterate through the entire log file iff and we get a success message from
// the recall call.
if (success) {
await log_file.collect_and_process(async (entry_path, batch_recorder) => {
entry_path = this.decode_log(entry_path);
dbg.log2('TapeCloudGlacier.restore.batch - entry:', entry_path);
await this._finalize_restore(fs_context, entry_path, failure_recorder);
await this._finalize_restore(fs_context, entry_path, encoded_failure_recorder);
});
}

Expand All @@ -425,6 +453,25 @@ class TapeCloudGlacier extends Glacier {
return result.toLowerCase().trim() === 'true';
}

/**
*
* @param {string} data
* @returns
*/
encode_log(data) {
const encoded = data.replace(/\\/g, '\\\\').replace(/\n/g, '\\n');
return `${TapeCloudGlacier.LOG_DELIM}${encoded}`;
}

/**
*
* @param {string} data
* @returns
*/
decode_log(data) {
return data.substring(TapeCloudGlacier.LOG_DELIM.length).replace(/\\n/g, '\n').replace(/\\\\/g, '\\');
}

// ============= PRIVATE FUNCTIONS =============

/**
Expand Down
4 changes: 2 additions & 2 deletions src/sdk/namespace_fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -3697,13 +3697,13 @@ class NamespaceFS {
async append_to_migrate_wal(entry) {
if (!config.NSFS_GLACIER_LOGS_ENABLED) return;

await NamespaceFS.migrate_wal.append(entry);
await NamespaceFS.migrate_wal.append(Glacier.getBackend().encode_log(entry));
}

async append_to_restore_wal(entry) {
if (!config.NSFS_GLACIER_LOGS_ENABLED) return;

await NamespaceFS.restore_wal.append(entry);
await NamespaceFS.restore_wal.append(Glacier.getBackend().encode_log(entry));
}

static get migrate_wal() {
Expand Down
126 changes: 124 additions & 2 deletions src/test/unit_tests/nsfs/test_nsfs_glacier_backend.js
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ mocha.describe('nsfs_glacier', function() {
});

mocha.describe('nsfs_glacier_tapecloud', async function() {
const restore_key_spcl_char_1 = 'restore_key_2_\n';
const restore_key_spcl_char_2 = 'restore_key_2_\n_2';
const upload_key = 'upload_key_1';
const restore_key = 'restore_key_1';
const xattr = { key: 'value', key2: 'value2' };
Expand Down Expand Up @@ -280,10 +282,10 @@ mocha.describe('nsfs_glacier', function() {
failure_backend._process_expired = async () => { /**noop*/ };
failure_backend._recall = async (_file, failure_recorder, success_recorder) => {
// This unintentionally also replicates duplicate entries in WAL
await failure_recorder(failed_file_path);
await failure_recorder(failure_backend.encode_log(failed_file_path));

// This unintentionally also replicates duplicate entries in WAL
await success_recorder(success_file_path);
await success_recorder(failure_backend.encode_log(success_file_path));

return false;
};
Expand Down Expand Up @@ -324,6 +326,126 @@ mocha.describe('nsfs_glacier', function() {
assert(failure_stats.xattr[Glacier.XATTR_RESTORE_REQUEST]);
});

mocha.it('restore-object should successfully restore objects with special characters', async function() {
const now = Date.now();
const data = crypto.randomBytes(100);
const all_params = [
{
bucket: upload_bkt,
key: restore_key_spcl_char_1,
storage_class: s3_utils.STORAGE_CLASS_GLACIER,
xattr,
days: 1,
source_stream: buffer_utils.buffer_to_read_stream(data)
},
{
bucket: upload_bkt,
key: restore_key_spcl_char_2,
storage_class: s3_utils.STORAGE_CLASS_GLACIER,
xattr,
days: 1,
source_stream: buffer_utils.buffer_to_read_stream(data)
}
];

for (const params of all_params) {
const upload_res = await glacier_ns.upload_object(params, dummy_object_sdk);
console.log('upload_object response', inspect(upload_res));

const restore_res = await glacier_ns.restore_object(params, dummy_object_sdk);
assert(restore_res);

// Issue restore
await backend.perform(glacier_ns.prepare_fs_context(dummy_object_sdk), "RESTORE");


// Ensure object is restored
const md = await glacier_ns.read_object_md(params, dummy_object_sdk);

assert(!md.restore_status.ongoing);

const expected_expiry = Glacier.generate_expiry(new Date(), params.days, '', config.NSFS_GLACIER_EXPIRY_TZ);
assert(expected_expiry.getTime() >= md.restore_status.expiry_time.getTime());
assert(now <= md.restore_status.expiry_time.getTime());
}
});

mocha.it('restore-object should not restore failed item with special characters', async function() {
const now = Date.now();
const data = crypto.randomBytes(100);
const failed_restore_key = `${restore_key_spcl_char_1}_failured`;
const success_restore_key = `${restore_key_spcl_char_1}_success`;

const failed_params = {
bucket: upload_bkt,
key: failed_restore_key,
storage_class: s3_utils.STORAGE_CLASS_GLACIER,
xattr,
days: 1,
source_stream: buffer_utils.buffer_to_read_stream(data)
};

const success_params = {
bucket: upload_bkt,
key: success_restore_key,
storage_class: s3_utils.STORAGE_CLASS_GLACIER,
xattr,
days: 1,
source_stream: buffer_utils.buffer_to_read_stream(data)
};

const failed_file_path = glacier_ns._get_file_path(failed_params);
const success_file_path = glacier_ns._get_file_path(success_params);

const failure_backend = new TapeCloudGlacier();
failure_backend._migrate = async () => true;
failure_backend._process_expired = async () => { /**noop*/ };
failure_backend._recall = async (_file, failure_recorder, success_recorder) => {
// This unintentionally also replicates duplicate entries in WAL
await failure_recorder(failure_backend.encode_log(failed_file_path));

// This unintentionally also replicates duplicate entries in WAL
await success_recorder(failure_backend.encode_log(success_file_path));

return false;
};

const upload_res_1 = await glacier_ns.upload_object(failed_params, dummy_object_sdk);
console.log('upload_object response', inspect(upload_res_1));

const upload_res_2 = await glacier_ns.upload_object(success_params, dummy_object_sdk);
console.log('upload_object response', inspect(upload_res_2));

const restore_res_1 = await glacier_ns.restore_object(failed_params, dummy_object_sdk);
assert(restore_res_1);

const restore_res_2 = await glacier_ns.restore_object(success_params, dummy_object_sdk);
assert(restore_res_2);

const fs_context = glacier_ns.prepare_fs_context(dummy_object_sdk);

// Issue restore
await failure_backend.perform(glacier_ns.prepare_fs_context(dummy_object_sdk), "RESTORE");

// Ensure success object is restored
const success_md = await glacier_ns.read_object_md(success_params, dummy_object_sdk);

assert(!success_md.restore_status.ongoing);

const expected_expiry = Glacier.generate_expiry(new Date(), success_params.days, '', config.NSFS_GLACIER_EXPIRY_TZ);
assert(expected_expiry.getTime() >= success_md.restore_status.expiry_time.getTime());
assert(now <= success_md.restore_status.expiry_time.getTime());

// Ensure failed object is NOT restored
const failure_stats = await nb_native().fs.stat(
fs_context,
failed_file_path,
);

assert(!failure_stats.xattr[Glacier.XATTR_RESTORE_EXPIRY] || failure_stats.xattr[Glacier.XATTR_RESTORE_EXPIRY] === '');
assert(failure_stats.xattr[Glacier.XATTR_RESTORE_REQUEST]);
});

mocha.it('_finalize_restore should tolerate deleted objects', async function() {
// should not throw error if the path does not exist
await backend._finalize_restore(glacier_ns.prepare_fs_context(dummy_object_sdk), '/path/does/not/exist');
Expand Down