Skip to content

Commit 197213b

Browse files
committed
add support for parallel recall and migrate
Signed-off-by: Utkarsh Srivastava <[email protected]> remove fcntltrylock Signed-off-by: Utkarsh Srivastava <[email protected]> add support for new task ID format Signed-off-by: Utkarsh Srivastava <[email protected]>
1 parent 704e34d commit 197213b

File tree

10 files changed

+623
-277
lines changed

10 files changed

+623
-277
lines changed

config.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -916,6 +916,11 @@ config.NSFS_GLACIER_DMAPI_TPS_HTTP_HEADER = 'x-tape-meta-copy';
916916
// but can be overridden to any numberical value
917917
config.NSFS_GLACIER_DMAPI_PMIG_DAYS = config.S3_RESTORE_REQUEST_MAX_DAYS;
918918

919+
// NSFS_GLACIER_DMAPI_FINALIZE_RESTORE_ENABLE if enabled will force NooBaa to
920+
// examine the DMAPI xattr of the file before finalizing the restore to prevent
921+
// accidental blocking reads from happening.
922+
config.NSFS_GLACIER_DMAPI_FINALIZE_RESTORE_ENABLE = false;
923+
919924
config.NSFS_STATFS_CACHE_SIZE = 10000;
920925
config.NSFS_STATFS_CACHE_EXPIRY_MS = 1 * 1000;
921926

src/manage_nsfs/manage_nsfs_glacier.js

Lines changed: 35 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -9,86 +9,53 @@ const { Glacier } = require('../sdk/glacier');
99
const native_fs_utils = require('../util/native_fs_utils');
1010
const { is_desired_time, record_current_time } = require('./manage_nsfs_cli_utils');
1111

12-
const CLUSTER_LOCK = 'cluster.lock';
13-
const SCAN_LOCK = 'scan.lock';
14-
1512
async function process_migrations() {
1613
const fs_context = native_fs_utils.get_process_fs_context();
17-
const lock_path = path.join(config.NSFS_GLACIER_LOGS_DIR, CLUSTER_LOCK);
18-
await native_fs_utils.lock_and_run(fs_context, lock_path, async () => {
19-
const backend = Glacier.getBackend();
20-
21-
if (
22-
await backend.low_free_space() ||
23-
await time_exceeded(fs_context, config.NSFS_GLACIER_MIGRATE_INTERVAL, Glacier.MIGRATE_TIMESTAMP_FILE) ||
24-
await migrate_log_exceeds_threshold()
25-
) {
26-
await run_glacier_migrations(fs_context, backend);
27-
const timestamp_file_path = path.join(config.NSFS_GLACIER_LOGS_DIR, Glacier.MIGRATE_TIMESTAMP_FILE);
28-
await record_current_time(fs_context, timestamp_file_path);
29-
}
30-
});
31-
}
32-
33-
/**
34-
* run_tape_migrations reads the migration WALs and attempts to migrate the
35-
* files mentioned in the WAL.
36-
* @param {nb.NativeFSContext} fs_context
37-
* @param {import('../sdk/glacier').Glacier} backend
38-
*/
39-
async function run_glacier_migrations(fs_context, backend) {
40-
await run_glacier_operation(fs_context, Glacier.MIGRATE_WAL_NAME, backend.migrate.bind(backend));
14+
const backend = Glacier.getBackend();
15+
16+
if (
17+
await backend.low_free_space() ||
18+
await time_exceeded(fs_context, config.NSFS_GLACIER_MIGRATE_INTERVAL, Glacier.MIGRATE_TIMESTAMP_FILE) ||
19+
await migrate_log_exceeds_threshold()
20+
) {
21+
await backend.perform(prepare_galcier_fs_context(fs_context), "MIGRATION");
22+
const timestamp_file_path = path.join(config.NSFS_GLACIER_LOGS_DIR, Glacier.MIGRATE_TIMESTAMP_FILE);
23+
await record_current_time(fs_context, timestamp_file_path);
24+
}
4125
}
4226

4327
async function process_restores() {
4428
const fs_context = native_fs_utils.get_process_fs_context();
45-
const lock_path = path.join(config.NSFS_GLACIER_LOGS_DIR, CLUSTER_LOCK);
46-
await native_fs_utils.lock_and_run(fs_context, lock_path, async () => {
47-
const backend = Glacier.getBackend();
29+
const backend = Glacier.getBackend();
4830

49-
if (
50-
await backend.low_free_space() ||
51-
!(await time_exceeded(fs_context, config.NSFS_GLACIER_RESTORE_INTERVAL, Glacier.RESTORE_TIMESTAMP_FILE))
52-
) return;
31+
if (
32+
await backend.low_free_space() ||
33+
!(await time_exceeded(fs_context, config.NSFS_GLACIER_RESTORE_INTERVAL, Glacier.RESTORE_TIMESTAMP_FILE))
34+
) return;
5335

54-
55-
await run_glacier_restore(fs_context, backend);
56-
const timestamp_file_path = path.join(config.NSFS_GLACIER_LOGS_DIR, Glacier.RESTORE_TIMESTAMP_FILE);
57-
await record_current_time(fs_context, timestamp_file_path);
58-
});
59-
}
60-
61-
/**
62-
* run_tape_restore reads the restore WALs and attempts to restore the
63-
* files mentioned in the WAL.
64-
* @param {nb.NativeFSContext} fs_context
65-
* @param {import('../sdk/glacier').Glacier} backend
66-
*/
67-
async function run_glacier_restore(fs_context, backend) {
68-
await run_glacier_operation(fs_context, Glacier.RESTORE_WAL_NAME, backend.restore.bind(backend));
36+
await backend.perform(prepare_galcier_fs_context(fs_context), "RESTORE");
37+
const timestamp_file_path = path.join(config.NSFS_GLACIER_LOGS_DIR, Glacier.RESTORE_TIMESTAMP_FILE);
38+
await record_current_time(fs_context, timestamp_file_path);
6939
}
7040

7141
async function process_expiry() {
7242
const fs_context = native_fs_utils.get_process_fs_context();
73-
const lock_path = path.join(config.NSFS_GLACIER_LOGS_DIR, SCAN_LOCK);
74-
await native_fs_utils.lock_and_run(fs_context, lock_path, async () => {
75-
const backend = Glacier.getBackend();
76-
const timestamp_file_path = path.join(config.NSFS_GLACIER_LOGS_DIR, Glacier.EXPIRY_TIMESTAMP_FILE);
77-
if (
78-
await backend.low_free_space() ||
79-
await is_desired_time(
80-
fs_context,
81-
new Date(),
82-
config.NSFS_GLACIER_EXPIRY_RUN_TIME,
83-
config.NSFS_GLACIER_EXPIRY_RUN_DELAY_LIMIT_MINS,
84-
timestamp_file_path,
85-
config.NSFS_GLACIER_EXPIRY_TZ
86-
)
87-
) {
88-
await backend.expiry(fs_context);
89-
await record_current_time(fs_context, timestamp_file_path);
90-
}
91-
});
43+
const backend = Glacier.getBackend();
44+
const timestamp_file_path = path.join(config.NSFS_GLACIER_LOGS_DIR, Glacier.EXPIRY_TIMESTAMP_FILE);
45+
if (
46+
await backend.low_free_space() ||
47+
await is_desired_time(
48+
fs_context,
49+
new Date(),
50+
config.NSFS_GLACIER_EXPIRY_RUN_TIME,
51+
config.NSFS_GLACIER_EXPIRY_RUN_DELAY_LIMIT_MINS,
52+
timestamp_file_path,
53+
config.NSFS_GLACIER_EXPIRY_TZ
54+
)
55+
) {
56+
await backend.perform(prepare_galcier_fs_context(fs_context), "EXPIRY");
57+
await record_current_time(fs_context, timestamp_file_path);
58+
}
9259
}
9360

9461

@@ -137,27 +104,6 @@ async function migrate_log_exceeds_threshold(threshold = config.NSFS_GLACIER_MIG
137104
return log_size > threshold;
138105
}
139106

140-
/**
141-
* run_glacier_operations takes a log_namespace and a callback and executes the
142-
* callback on each log file in that namespace. It will also generate a failure
143-
* log file and persist the failures in that log file.
144-
* @param {nb.NativeFSContext} fs_context
145-
* @param {string} log_namespace
146-
* @param {Function} cb
147-
*/
148-
async function run_glacier_operation(fs_context, log_namespace, cb) {
149-
const log = new PersistentLogger(config.NSFS_GLACIER_LOGS_DIR, log_namespace, { locking: 'EXCLUSIVE' });
150-
151-
fs_context = prepare_galcier_fs_context(fs_context);
152-
try {
153-
await log.process(async (entry, failure_recorder) => cb(fs_context, entry, failure_recorder));
154-
} catch (error) {
155-
console.error('failed to process log in namespace:', log_namespace);
156-
} finally {
157-
await log.close();
158-
}
159-
}
160-
161107
/**
162108
* prepare_galcier_fs_context returns a shallow copy of given
163109
* fs_context with backend set to 'GPFS'.

src/sdk/glacier.js

Lines changed: 142 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
/* Copyright (C) 2024 NooBaa */
22
'use strict';
33

4+
const path = require('path');
45
const nb_native = require('../util/nb_native');
56
const s3_utils = require('../endpoint/s3/s3_utils');
67
const { round_up_to_next_time_of_day } = require('../util/time_utils');
78
const dbg = require('../util/debug_module')(__filename);
89
const config = require('../../config');
10+
const { PersistentLogger } = require('../util/persistent_logger');
11+
const native_fs_utils = require('../util/native_fs_utils');
12+
13+
/** @import {LogFile} from "../util/persistent_logger" */
914

1015
class Glacier {
1116
// These names start with the word 'timestamp' so as to assure
@@ -41,6 +46,8 @@ class Glacier {
4146

4247
static STORAGE_CLASS_XATTR = 'user.storage_class';
4348

49+
static XATTR_STAGE_MIGRATE = 'user.noobaa.migrate.staged';
50+
4451
/**
4552
* GPFS_DMAPI_XATTR_TAPE_INDICATOR if set on a file indicates that the file is on tape.
4653
*
@@ -65,7 +72,9 @@ class Glacier {
6572
static GPFS_DMAPI_XATTR_TAPE_TPS = 'dmapi.IBMTPS';
6673

6774
static MIGRATE_WAL_NAME = 'migrate';
75+
static MIGRATE_STAGE_WAL_NAME = 'stage.migrate';
6876
static RESTORE_WAL_NAME = 'restore';
77+
static RESTORE_STAGE_WAL_NAME = 'stage.restore';
6978

7079
/** @type {nb.RestoreState} */
7180
static RESTORE_STATUS_CAN_RESTORE = 'CAN_RESTORE';
@@ -74,17 +83,46 @@ class Glacier {
7483
/** @type {nb.RestoreState} */
7584
static RESTORE_STATUS_RESTORED = 'RESTORED';
7685

86+
static GLACIER_CLUSTER_LOCK = 'glacier.cluster.lock';
87+
static GLACIER_MIGRATE_CLUSTER_LOCK = 'glacier.cluster.migrate.lock';
88+
static GLACIER_RESTORE_CLUSTER_LOCK = 'glacier.cluster.restore.lock';
89+
static GLACIER_SCAN_LOCK = 'glacier.scan.lock';
90+
91+
/**
92+
* stage_migrate must take a LogFile object (this should be from the
93+
* `GLACIER.MIGRATE_WAL_NAME` namespace) which will have
94+
* newline seperated entries of filenames which needs to be
95+
* migrated to GLACIER and should stage the files for migration.
96+
*
97+
* The function should return false if it needs the log file to be
98+
* preserved.
99+
* @param {nb.NativeFSContext} fs_context
100+
* @param {LogFile} log_file log filename
101+
* @param {(entry: string) => Promise<void>} failure_recorder
102+
* @returns {Promise<boolean>}
103+
*/
104+
async stage_migrate(fs_context, log_file, failure_recorder) {
105+
try {
106+
await log_file.collect(Glacier.MIGRATE_STAGE_WAL_NAME, async (entry, batch_recorder) => batch_recorder(entry));
107+
return true;
108+
} catch (error) {
109+
dbg.error('Glacier.stage_migrate error:', error);
110+
throw error;
111+
}
112+
}
113+
77114
/**
78-
* migrate must take a file name which will have newline seperated
79-
* entries of filenames which needs to be migrated to GLACIER and
80-
* should perform migration of those files if feasible.
115+
* migrate must take a LofFile object (this should from the
116+
* `GLACIER.MIGRATE_STAGE_WAL_NAME` namespace) which will have newline
117+
* separated entries of filenames which needs to be migrated to GLACIER
118+
* and should perform migration of those files if feasible.
81119
*
82120
* The function should return false if it needs the log file to be
83121
* preserved.
84122
*
85123
* NOTE: This needs to be implemented by each backend.
86124
* @param {nb.NativeFSContext} fs_context
87-
* @param {string} log_file log filename
125+
* @param {LogFile} log_file log filename
88126
* @param {(entry: string) => Promise<void>} failure_recorder
89127
* @returns {Promise<boolean>}
90128
*/
@@ -93,16 +131,39 @@ class Glacier {
93131
}
94132

95133
/**
96-
* restore must take a file name which will have newline seperated
97-
* entries of filenames which needs to be restored from GLACIER and
98-
* should perform restore of those files if feasible
134+
* stage_restore must take a log file (from `Glacier.RESTORE_STAGE_WAL_NAME`)
135+
* which will have newline seperated entries of filenames which needs to be
136+
* migrated to GLACIER and should stage the files for migration.
137+
*
138+
* The function should return false if it needs the log file to be
139+
* preserved.
140+
* @param {nb.NativeFSContext} fs_context
141+
* @param {LogFile} log_file log filename
142+
* @param {(entry: string) => Promise<void>} failure_recorder
143+
* @returns {Promise<boolean>}
144+
*/
145+
async stage_restore(fs_context, log_file, failure_recorder) {
146+
try {
147+
await log_file.collect(Glacier.RESTORE_STAGE_WAL_NAME, async (entry, batch_recorder) => batch_recorder(entry));
148+
return true;
149+
} catch (error) {
150+
dbg.error('Glacier.stage_restore error:', error);
151+
throw error;
152+
}
153+
}
154+
155+
/**
156+
* restore must take a log file (from `Glacier.RESTORE_WAL_NAME`) which will
157+
* have newline seperated entries of filenames which needs to be
158+
* restored from GLACIER and should perform restore of those files if
159+
* feasible
99160
*
100161
* The function should return false if it needs the log file to be
101162
* preserved.
102163
*
103164
* NOTE: This needs to be implemented by each backend.
104165
* @param {nb.NativeFSContext} fs_context
105-
* @param {string} log_file log filename
166+
* @param {LogFile} log_file log filename
106167
* @param {(entry: string) => Promise<void>} failure_recorder
107168
* @returns {Promise<boolean>}
108169
*/
@@ -136,6 +197,78 @@ class Glacier {
136197
throw new Error('Unimplementented');
137198
}
138199

200+
/**
201+
* @param {nb.NativeFSContext} fs_context
202+
* @param {"MIGRATION" | "RESTORE" | "EXPIRY"} type
203+
*/
204+
async perform(fs_context, type) {
205+
const lock_path = lock_file => path.join(config.NSFS_GLACIER_LOGS_DIR, lock_file);
206+
207+
if (type === 'EXPIRY') {
208+
await native_fs_utils.lock_and_run(fs_context, lock_path(Glacier.GLACIER_SCAN_LOCK), async () => {
209+
await this.expiry(fs_context);
210+
});
211+
}
212+
213+
/** @typedef {(
214+
* fs_context: nb.NativeFSContext,
215+
* file: LogFile,
216+
* failure_recorder: (entry: string) => Promise<void>
217+
* ) => Promise<boolean>} log_cb */
218+
219+
/**
220+
* @param {string} namespace
221+
* @param {log_cb} cb
222+
*/
223+
const process_glacier_logs = async (namespace, cb) => {
224+
const logs = new PersistentLogger(
225+
config.NSFS_GLACIER_LOGS_DIR,
226+
namespace, { locking: 'EXCLUSIVE' },
227+
);
228+
await logs.process(async (entry, failure_recorder) => cb(fs_context, entry, failure_recorder));
229+
};
230+
231+
/**
232+
*
233+
* @param {string} primary_log_ns
234+
* @param {string} staged_log_ns
235+
* @param {log_cb} process_staged_fn
236+
* @param {log_cb} process_primary_fn
237+
* @param {string} stage_lock_file
238+
*/
239+
const run_operation = async (primary_log_ns, staged_log_ns, process_staged_fn, process_primary_fn, stage_lock_file) => {
240+
// Acquire a cluster wide lock for all the operations for staging
241+
await native_fs_utils.lock_and_run(fs_context, lock_path(Glacier.GLACIER_CLUSTER_LOCK), async () => {
242+
await process_glacier_logs(primary_log_ns, process_staged_fn);
243+
});
244+
245+
// Acquire a type specific lock to consume staged logs
246+
await native_fs_utils.lock_and_run(
247+
fs_context, lock_path(stage_lock_file), async () => {
248+
await process_glacier_logs(staged_log_ns, process_primary_fn);
249+
}
250+
);
251+
};
252+
253+
if (type === 'MIGRATION') {
254+
await run_operation(
255+
Glacier.MIGRATE_WAL_NAME,
256+
Glacier.MIGRATE_STAGE_WAL_NAME,
257+
this.stage_migrate.bind(this),
258+
this.migrate.bind(this),
259+
Glacier.GLACIER_MIGRATE_CLUSTER_LOCK,
260+
);
261+
} else if (type === 'RESTORE') {
262+
await run_operation(
263+
Glacier.RESTORE_WAL_NAME,
264+
Glacier.RESTORE_STAGE_WAL_NAME,
265+
this.stage_restore.bind(this),
266+
this.restore.bind(this),
267+
Glacier.GLACIER_RESTORE_CLUSTER_LOCK,
268+
);
269+
}
270+
}
271+
139272
/**
140273
* should_migrate returns true if the given file must be migrated
141274
*
@@ -319,6 +452,7 @@ class Glacier {
319452
xattr_get_keys: [
320453
Glacier.XATTR_RESTORE_REQUEST,
321454
Glacier.STORAGE_CLASS_XATTR,
455+
Glacier.XATTR_STAGE_MIGRATE,
322456
],
323457
});
324458
}

0 commit comments

Comments
 (0)