From 21420b08f80f8dd25fdabdbb7180e8a5ad6c6e07 Mon Sep 17 00:00:00 2001 From: Guy Margalit Date: Tue, 14 Jan 2025 01:11:22 +0200 Subject: [PATCH] nsfs refactor file_reader and file_writer Signed-off-by: Guy Margalit --- config.js | 3 +- src/sdk/namespace_fs.js | 263 +++++++----------- .../internal/test_file_reader.test.js | 133 +++++++++ src/tools/file_writer_hashing.js | 9 +- src/util/file_reader.js | 225 ++++++++++++++- src/util/file_writer.js | 73 +++-- src/util/native_fs_utils.js | 120 +++++++- 7 files changed, 616 insertions(+), 210 deletions(-) create mode 100644 src/test/unit_tests/internal/test_file_reader.test.js diff --git a/config.js b/config.js index 1e043f49b7..9ce125eefd 100644 --- a/config.js +++ b/config.js @@ -985,7 +985,8 @@ config.NSFS_GLACIER_RESERVED_BUCKET_TAGS = {}; // anonymous account name config.ANONYMOUS_ACCOUNT_NAME = 'anonymous'; -config.NFSF_UPLOAD_STREAM_MEM_THRESHOLD = 8 * 1024 * 1024; +config.NSFS_UPLOAD_STREAM_MEM_THRESHOLD = 8 * 1024 * 1024; +config.NSFS_DOWNLOAD_STREAM_MEM_THRESHOLD = 8 * 1024 * 1024; // we want to change our handling related to EACCESS error config.NSFS_LIST_IGNORE_ENTRY_ON_EACCES = true; diff --git a/src/sdk/namespace_fs.js b/src/sdk/namespace_fs.js index e835d3b060..b250cd3144 100644 --- a/src/sdk/namespace_fs.js +++ b/src/sdk/namespace_fs.js @@ -8,13 +8,13 @@ const fs = require('fs'); const path = require('path'); const util = require('util'); const mime = require('mime-types'); +const stream = require('stream'); const P = require('../util/promise'); const dbg = require('../util/debug_module')(__filename); const config = require('../../config'); const crypto = require('crypto'); const s3_utils = require('../endpoint/s3/s3_utils'); const error_utils = require('../util/error_utils'); -const stream_utils = require('../util/stream_utils'); const buffer_utils = require('../util/buffer_utils'); const size_utils = require('../util/size_utils'); const http_utils = require('../util/http_utils'); @@ -28,6 +28,7 @@ const lifecycle_utils = require('../util/lifecycle_utils'); const NoobaaEvent = require('../manage_nsfs/manage_nsfs_events_utils').NoobaaEvent; const { PersistentLogger } = require('../util/persistent_logger'); const { Glacier } = require('./glacier'); +const { FileReader } = require('../util/file_reader'); const multi_buffer_pool = new buffer_utils.MultiSizeBuffersPool({ sorted_buf_sizes: [ @@ -102,7 +103,7 @@ function sort_entries_by_name(a, b) { return 0; } -function _get_version_id_by_stat({ino, mtimeNsBigint}) { +function _get_version_id_by_stat({ ino, mtimeNsBigint }) { // TODO: GPFS might require generation number to be added to version_id return 'mtime-' + mtimeNsBigint.toString(36) + '-ino-' + ino.toString(36); } @@ -236,21 +237,6 @@ function is_symbolic_link(stat) { } } -/** - * NOTICE that even files that were written sequentially, can still be identified as sparse: - * 1. After writing, but before all the data is synced, the size is higher than blocks size. - * 2. For files that were moved to an archive tier. - * 3. For files that fetch and cache data from remote storage, which are still not in the cache. - * It's not good enough for avoiding recall storms as needed by _fail_if_archived_or_sparse_file. - * However, using this check is useful for guessing that a reads is going to take more time - * and avoid holding off large buffers from the buffers_pool. - * @param {nb.NativeFSStats} stat - * @returns {boolean} - */ -function is_sparse_file(stat) { - return (stat.blocks * 512 < stat.size); -} - /** * @param {fs.Dirent} e * @returns {string} @@ -512,7 +498,6 @@ class NamespaceFS { this.versioning = (config.NSFS_VERSIONING_ENABLED && versioning) || VERSIONING_STATUS_ENUM.VER_DISABLED; this.stats = stats; this.force_md5_etag = force_md5_etag; - this.warmup_buffer = nb_native().fs.dio_buffer_alloc(4096); } /** @@ -834,7 +819,7 @@ class NamespaceFS { if (version_id_marker) start_marker = version_id_marker; marker_index = _.findIndex( sorted_entries, - {name: start_marker} + { name: start_marker } ) + 1; } else { marker_index = _.sortedLastIndexBy( @@ -879,7 +864,7 @@ class NamespaceFS { try { dbg.warn('NamespaceFS: open dir streaming', dir_path, 'size', cached_dir.stat.size); dir_handle = await nb_native().fs.opendir(fs_context, dir_path); //, { bufferSize: 128 }); - for (;;) { + for (; ;) { const dir_entry = await dir_handle.read(fs_context); if (!dir_entry) break; await process_entry(dir_entry); @@ -975,8 +960,9 @@ class NamespaceFS { let isDir; let retries = (this._is_versioning_enabled() || this._is_versioning_suspended()) ? config.NSFS_RENAME_RETRIES : 0; try { - for (;;) { + for (; ;) { try { + object_sdk.throw_if_aborted(); file_path = await this._find_version_path(fs_context, params, true); await this._check_path_in_bucket_boundaries(fs_context, file_path); await this._load_bucket(params, fs_context); @@ -1002,6 +988,7 @@ class NamespaceFS { dbg.warn(`NamespaceFS.read_object_md: retrying retries=${retries} file_path=${file_path}`, err); retries -= 1; if (retries <= 0 || !native_fs_utils.should_retry_link_unlink(err)) throw err; + object_sdk.throw_if_aborted(); await P.delay(get_random_delay(config.NSFS_RANDOM_DELAY_BASE, 0, 50)); } } @@ -1029,25 +1016,33 @@ class NamespaceFS { } catch (err) { //failed to get object new NoobaaEvent(NoobaaEvent.OBJECT_GET_FAILED).create_event(params.key, - {bucket_path: this.bucket_path, object_name: params.key}, err); + { bucket_path: this.bucket_path, object_name: params.key }, err); dbg.log0('NamespaceFS: read_object_stream couldnt find dir content xattr', err); } } return false; } - // eslint-disable-next-line max-statements + /** + * + * @param {*} params + * @param {nb.ObjectSDK} object_sdk + * @param {nb.S3Response|stream.Writable} res + * @returns + */ async read_object_stream(params, object_sdk, res) { - let buffer_pool_cleanup = null; const fs_context = this.prepare_fs_context(object_sdk); + const signal = object_sdk.abort_controller.signal; let file_path; let file; + try { await this._load_bucket(params, fs_context); let retries = (this._is_versioning_enabled() || this._is_versioning_suspended()) ? config.NSFS_RENAME_RETRIES : 0; let stat; - for (;;) { + for (; ;) { try { + object_sdk.throw_if_aborted(); file_path = await this._find_version_path(fs_context, params); await this._check_path_in_bucket_boundaries(fs_context, file_path); @@ -1057,7 +1052,8 @@ class NamespaceFS { if (await this._is_empty_directory_content(file_path, fs_context, params)) { res.end(); // since we don't write anything to the stream wait_finished might not be needed. added just in case there is a delay - await stream_utils.wait_finished(res, { signal: object_sdk.abort_controller.signal }); + object_sdk.throw_if_aborted(); + await stream.promises.finished(res, { signal }); return null; } @@ -1082,9 +1078,10 @@ class NamespaceFS { retries -= 1; if (retries <= 0 || !native_fs_utils.should_retry_link_unlink(err)) { new NoobaaEvent(NoobaaEvent.OBJECT_GET_FAILED).create_event(params.key, - {bucket_path: this.bucket_path, object_name: params.key}, err); + { bucket_path: this.bucket_path, object_name: params.key }, err); throw err; } + object_sdk.throw_if_aborted(); await P.delay(get_random_delay(config.NSFS_RANDOM_DELAY_BASE, 0, 50)); } } @@ -1098,121 +1095,58 @@ class NamespaceFS { const start = Number(params.start) || 0; const end = isNaN(Number(params.end)) ? Infinity : Number(params.end); - let num_bytes = 0; - let num_buffers = 0; - const log2_size_histogram = {}; - let drain_promise = null; + object_sdk.throw_if_aborted(); - dbg.log0('NamespaceFS: read_object_stream', { + dbg.log1('NamespaceFS: read_object_stream', { file_path, start, end, size: stat.size, }); - let count = 1; - for (let pos = start; pos < end;) { - object_sdk.throw_if_aborted(); - - // Our buffer pool keeps large buffers and we want to avoid spending - // all our large buffers and then have them waiting for high latency calls - // such as reading from archive/on-demand cache files. - // Instead, we detect the case where a file is "sparse", - // and then use just a small buffer to wait for a tiny read, - // which will recall the file from archive or load from remote into cache, - // and once it returns we can continue to the full fledged read. - if (config.NSFS_BUF_WARMUP_SPARSE_FILE_READS && is_sparse_file(stat)) { - dbg.log0('NamespaceFS: read_object_stream - warmup sparse file', { - file_path, pos, size: stat.size, blocks: stat.blocks, - }); - await file.read(fs_context, this.warmup_buffer, 0, 1, pos); - } + const file_reader = new FileReader({ + fs_context, + file, + file_path, + start, + end, + stat, + multi_buffer_pool, + signal, + stats: this.stats, + bucket: params.bucket, + namespace_resource_id: this.namespace_resource_id, + }); - const remain_size = Math.min(Math.max(0, end - pos), stat.size); - - // allocate or reuse buffer - // TODO buffers_pool and the underlying semaphore should support abort signal - // to avoid sleeping inside the semaphore until the timeout while the request is already aborted. - const { buffer, callback } = await multi_buffer_pool.get_buffers_pool(remain_size).get_buffer(); - buffer_pool_cleanup = callback; // must be called ***IMMEDIATELY*** after get_buffer - object_sdk.throw_if_aborted(); - - // read from file - const read_size = Math.min(buffer.length, remain_size); - const bytesRead = await file.read(fs_context, buffer, 0, read_size, pos); - if (!bytesRead) { - buffer_pool_cleanup = null; - callback(); - break; - } - object_sdk.throw_if_aborted(); - const data = buffer.slice(0, bytesRead); - - // update stats - pos += bytesRead; - num_bytes += bytesRead; - num_buffers += 1; - const log2_size = Math.ceil(Math.log2(bytesRead)); - log2_size_histogram[log2_size] = (log2_size_histogram[log2_size] || 0) + 1; - - // collect read stats - this.stats?.update_nsfs_read_stats({ - namespace_resource_id: this.namespace_resource_id, - bucket_name: params.bucket, - size: bytesRead, - count - }); - // clear count for next updates - count = 0; - - // wait for response buffer to drain before adding more data if needed - - // this occurs when the output network is slower than the input file - if (drain_promise) { - await drain_promise; - drain_promise = null; - object_sdk.throw_if_aborted(); - } + const start_time = process.hrtime.bigint(); + await file_reader.read_into_stream(res); + const took_ms = Number(process.hrtime.bigint() - start_time) / 1e6; - // write the data out to response - buffer_pool_cleanup = null; // cleanup is now in the socket responsibility - const write_ok = res.write(data, null, callback); - if (!write_ok) { - drain_promise = stream_utils.wait_drain(res, { signal: object_sdk.abort_controller.signal }); - drain_promise.catch(() => undefined); // this avoids UnhandledPromiseRejection - } - } + // end the response stream to complete the response + res.end(); - await this._glacier_force_expire_on_get(fs_context, file_path, file, stat); + dbg.log1('NamespaceFS: read_object_stream completed', { + file_path, start, end, size: stat.size, took_ms, + num_bytes: file_reader.num_bytes, + num_buffers: file_reader.num_buffers, + avg_buffer: file_reader.num_bytes / file_reader.num_buffers, + log2_size_histogram: file_reader.log2_size_histogram, + }); await file.close(fs_context); file = null; - object_sdk.throw_if_aborted(); - - // wait for the last drain if pending. - if (drain_promise) { - await drain_promise; - drain_promise = null; - object_sdk.throw_if_aborted(); - } - - // end the stream - res.end(); - await stream_utils.wait_finished(res, { signal: object_sdk.abort_controller.signal }); + // wait for the response to finish to make sure we handled the error if any object_sdk.throw_if_aborted(); + await stream.promises.finished(res, { signal }); - dbg.log0('NamespaceFS: read_object_stream completed file', file_path, { - num_bytes, - num_buffers, - avg_buffer: num_bytes / num_buffers, - log2_size_histogram, - }); + await this._glacier_force_expire_on_get(fs_context, file_path, file, stat); - // return null to signal the caller that we already handled the response + // return null to let the caller know that we already handled the response return null; } catch (err) { dbg.log0('NamespaceFS: read_object_stream error file', file_path, err); //failed to get object new NoobaaEvent(NoobaaEvent.OBJECT_STREAM_GET_FAILED).create_event(params.key, - {bucket_path: this.bucket_path, object_name: params.key}, err); + { bucket_path: this.bucket_path, object_name: params.key }, err); throw native_fs_utils.translate_error_codes(err, native_fs_utils.entity_enum.OBJECT); } finally { @@ -1224,18 +1158,6 @@ class NamespaceFS { } catch (err) { dbg.warn('NamespaceFS: read_object_stream file close error', err); } - try { - // release buffer back to pool if needed - if (buffer_pool_cleanup) { - dbg.log0('NamespaceFS: read_object_stream finally buffer_pool_cleanup', file_path); - buffer_pool_cleanup(); - } - } catch (err) { - //failed to get object - new NoobaaEvent(NoobaaEvent.OBJECT_CLEANUP_FAILED).create_event(params.key, - { bucket_path: this.bucket_path, object_name: params.key }, err); - dbg.warn('NamespaceFS: read_object_stream buffer pool cleanup error', err); - } } } @@ -1276,7 +1198,7 @@ class NamespaceFS { this.run_update_issues_report(object_sdk, err); //filed to put object new NoobaaEvent(NoobaaEvent.OBJECT_UPLOAD_FAILED).create_event(params.key, - {bucket_path: this.bucket_path, object_name: params.key}, err); + { bucket_path: this.bucket_path, object_name: params.key }, err); dbg.warn('NamespaceFS: upload_object buffer pool cleanup error', err); throw native_fs_utils.translate_error_codes(err, native_fs_utils.entity_enum.OBJECT); } finally { @@ -1388,7 +1310,7 @@ class NamespaceFS { // xattr_copy = false implies on non server side copy fallback copy (copy status = FALLBACK) // target file can be undefined when it's a folder created and size is 0 async _finish_upload({ fs_context, params, open_mode, target_file, upload_path, file_path, digest = undefined, - copy_res = undefined, offset }) { + copy_res = undefined, offset }) { const part_upload = file_path === upload_path; const same_inode = params.copy_source && copy_res === COPY_STATUS_ENUM.SAME_INODE; const should_replace_xattr = params.copy_source ? copy_res === COPY_STATUS_ENUM.FALLBACK : true; @@ -1480,7 +1402,7 @@ class NamespaceFS { dbg.log2('_move_to_dest', fs_context, source_path, dest_path, target_file, open_mode, key); let retries = config.NSFS_RENAME_RETRIES; // will retry renaming a file in case of parallel deleting of the destination path - for (;;) { + for (; ;) { try { if (this._is_versioning_disabled()) { await native_fs_utils._make_path_dirs(dest_path, fs_context); @@ -1528,7 +1450,7 @@ class NamespaceFS { const is_gpfs = native_fs_utils._is_gpfs(fs_context); const is_dir_content = this._is_directory_content(latest_ver_path, key); let retries = config.NSFS_RENAME_RETRIES; - for (;;) { + for (; ;) { try { let new_ver_info; let latest_ver_info; @@ -1663,7 +1585,6 @@ class NamespaceFS { async _upload_stream({ fs_context, params, target_file, object_sdk, offset }) { const { copy_source } = params; try { - // Not using async iterators with ReadableStreams due to unsettled promises issues on abort/destroy const md5_enabled = this._is_force_md5_enabled(object_sdk); const file_writer = new FileWriter({ target_file, @@ -1672,7 +1593,6 @@ class NamespaceFS { md5_enabled, stats: this.stats, bucket: params.bucket, - large_buf_size: multi_buffer_pool.get_buffers_pool(undefined).buf_size, namespace_resource_id: this.namespace_resource_id, }); file_writer.on('error', err => dbg.error('namespace_fs._upload_stream: error occured on FileWriter: ', err)); @@ -1684,8 +1604,9 @@ class NamespaceFS { } else if (params.source_params) { await params.source_ns.read_object_stream(params.source_params, object_sdk, file_writer); } else { - await stream_utils.pipeline([params.source_stream, file_writer]); - await stream_utils.wait_finished(file_writer); + await file_writer.write_entire_stream(params.source_stream, { + signal: object_sdk.abort_controller.signal + }); } return { digest: file_writer.digest, total_bytes: file_writer.total_bytes }; } catch (error) { @@ -1759,8 +1680,8 @@ class NamespaceFS { fs_context, path.join(params.mpu_path, 'create_object_upload'), Buffer.from(create_params), { - mode: native_fs_utils.get_umasked_mode(config.BASE_MODE_FILE), - }, + mode: native_fs_utils.get_umasked_mode(config.BASE_MODE_FILE), + }, ); return { obj_id: params.obj_id }; } catch (err) { @@ -1856,18 +1777,18 @@ class NamespaceFS { const entries = await nb_native().fs.readdir(fs_context, params.mpu_path); const multiparts = await Promise.all( entries - .filter(e => e.name.startsWith('part-')) - .map(async e => { - const num = Number(e.name.slice('part-'.length)); - const part_path = path.join(params.mpu_path, e.name); - const stat = await nb_native().fs.stat(fs_context, part_path); - return { - num, - size: stat.size, - etag: this._get_etag(stat), - last_modified: new Date(stat.mtime), - }; - }) + .filter(e => e.name.startsWith('part-')) + .map(async e => { + const num = Number(e.name.slice('part-'.length)); + const part_path = path.join(params.mpu_path, e.name); + const stat = await nb_native().fs.stat(fs_context, part_path); + return { + num, + size: stat.size, + etag: this._get_etag(stat), + last_modified: new Date(stat.mtime), + }; + }) ); return { is_truncated: false, @@ -2179,14 +2100,14 @@ class NamespaceFS { dbg.error(`NamespaceFS.delete_object_tagging: failed in dir ${file_path} with error: `, err); throw native_fs_utils.translate_error_codes(err, native_fs_utils.entity_enum.OBJECT); } - return {version_id: params.version_id}; + return { version_id: params.version_id }; } async put_object_tagging(params, object_sdk) { const fs_xattr = {}; const tagging = params.tagging && Object.fromEntries(params.tagging.map(tag => ([tag.key, tag.value]))); for (const [xattr_key, xattr_value] of Object.entries(tagging)) { - fs_xattr[XATTR_TAG + xattr_key] = xattr_value; + fs_xattr[XATTR_TAG + xattr_key] = xattr_value; } const fs_context = this.prepare_fs_context(object_sdk); const file_path = await this._find_version_path(fs_context, params, true); @@ -2373,7 +2294,7 @@ class NamespaceFS { // INTERNALS // /////////////// - _get_file_path({key}) { + _get_file_path({ key }) { // not allowing keys with dots follow by slash which can be treated as relative paths and "leave" the bucket_path // We are not using `path.isAbsolute` as path like '/../..' will return true and we can still "leave" the bucket_path if (key.includes('./')) throw new Error('Bad relative path key ' + key); @@ -2580,7 +2501,7 @@ class NamespaceFS { * @param {fs.Dirent} ent * @returns {string} */ - _get_version_entry_key(dir_key, ent) { + _get_version_entry_key(dir_key, ent) { return dir_key + HIDDEN_VERSIONS_PATH + '/' + ent.name; } @@ -2628,6 +2549,7 @@ class NamespaceFS { const storage_class = Glacier.storage_class_from_xattr(stat.xattr); const size = Number(stat.xattr?.[XATTR_DIR_CONTENT] || stat.size); const tag_count = stat.xattr ? this._number_of_tags_fs_xttr(stat.xattr) : 0; + const restore_status = Glacier.get_restore_status(stat.xattr, new Date(), this._get_file_path({ key })); const nc_noncurrent_time = (stat.xattr?.[XATTR_NON_CURRENT_TIMESTASMP] && Number(stat.xattr[XATTR_NON_CURRENT_TIMESTASMP])) || stat.ctime.getTime(); @@ -2644,7 +2566,7 @@ class NamespaceFS { is_latest, delete_marker, storage_class, - restore_status: Glacier.get_restore_status(stat.xattr, new Date(), this._get_file_path({key})), + restore_status, xattr: to_xattr(stat.xattr), tag_count, tagging: get_tags_from_xattr(stat.xattr), @@ -2701,6 +2623,7 @@ class NamespaceFS { async _load_bucket(params, fs_context) { try { + // TODO(guymguym): can we find a way to avoid this stat? await nb_native().fs.stat(fs_context, this.bucket_path); } catch (err) { dbg.warn('_load_bucket failed, on bucket_path', this.bucket_path, 'got error', err); @@ -2972,7 +2895,7 @@ class NamespaceFS { } _get_version_id_by_xattr(stat) { - return (stat && stat.xattr[XATTR_VERSION_ID]) || 'null'; + return (stat && stat.xattr[XATTR_VERSION_ID]) || 'null'; } _get_versions_dir_path(key, is_dir_content) { @@ -3132,7 +3055,7 @@ class NamespaceFS { const is_lifecycle_deletion = this.is_lifecycle_deletion_flow(params); const is_gpfs = native_fs_utils._is_gpfs(fs_context); - for (;;) { + for (; ;) { let file_path; let files; try { @@ -3278,7 +3201,7 @@ class NamespaceFS { dbg.log1('Namespace_fs._promote_version_to_latest', params, deleted_version_info, latest_ver_path); let retries = config.NSFS_RENAME_RETRIES; - for (;;) { + for (; ;) { try { const latest_version_info = await this._get_version_info(fs_context, latest_ver_path); if (latest_version_info) return; @@ -3334,7 +3257,7 @@ class NamespaceFS { let retries = config.NSFS_RENAME_RETRIES; let latest_ver_info; let versioned_path; - for (;;) { + for (; ;) { try { latest_ver_info = await this._get_version_info(fs_context, latest_ver_path); dbg.log1('Namespace_fs._delete_latest_version:', latest_ver_info); @@ -3402,7 +3325,7 @@ class NamespaceFS { const is_gpfs = native_fs_utils._is_gpfs(fs_context); let retries = config.NSFS_RENAME_RETRIES; - for (;;) { + for (; ;) { try { const null_versioned_path_info = await this._get_version_info(fs_context, null_versioned_path); dbg.log1('Namespace_fs._delete_null_version_from_versions_directory:', null_versioned_path, null_versioned_path_info); @@ -3430,7 +3353,7 @@ class NamespaceFS { let retries = config.NSFS_RENAME_RETRIES; let upload_params; let delete_marker_version_id; - for (;;) { + for (; ;) { try { upload_params = await this._start_upload(fs_context, undefined, undefined, params, 'w'); @@ -3483,12 +3406,12 @@ class NamespaceFS { // find max past version by comparing the mtimeNsBigint val const max_entry_info = arr.reduce((acc, cur) => (cur && cur.mtimeNsBigint > acc.mtimeNsBigint ? cur : acc), - { mtimeNsBigint: BigInt(0), name: undefined }); + { mtimeNsBigint: BigInt(0), name: undefined }); return max_entry_info.mtimeNsBigint > BigInt(0) && this._get_version_info(fs_context, path.join(versions_dir, max_entry_info.name)); } catch (err) { dbg.warn('namespace_fs.find_max_version_past: .versions/ folder could not be found', err); - } + } } _is_hidden_version_path(dir_key) { @@ -3536,7 +3459,7 @@ class NamespaceFS { } return { move_to_versions: { src_file: dst_file, dir_file }, - move_to_dst: { src_file, dst_file, dir_file} + move_to_dst: { src_file, dst_file, dir_file } }; } catch (err) { dbg.warn('NamespaceFS._open_files couldn\'t open files', err); diff --git a/src/test/unit_tests/internal/test_file_reader.test.js b/src/test/unit_tests/internal/test_file_reader.test.js new file mode 100644 index 0000000000..46642d9cca --- /dev/null +++ b/src/test/unit_tests/internal/test_file_reader.test.js @@ -0,0 +1,133 @@ +/* Copyright (C) 2020 NooBaa */ +'use strict'; + +const fs = require('fs'); +const path = require('path'); +const assert = require('assert'); +const buffer_utils = require('../../../util/buffer_utils'); +const native_fs_utils = require('../../../util/native_fs_utils'); +const { FileReader } = require('../../../util/file_reader'); +const { multi_buffer_pool } = require('../../../sdk/namespace_fs'); + +const fs_context = {}; + +describe('FileReader', () => { + + const test_files = fs.readdirSync(__dirname).map(file => path.join(__dirname, file)); + + /** + * @param {(file_path: string, start?: number, end?: number) => void} tester + */ + function describe_read_cases(tester) { + describe('list files and read entire', () => { + for (const file_path of test_files) { + tester(file_path); + } + }); + describe('skip start cases', () => { + tester(__filename, 1, Infinity); + tester(__filename, 3, Infinity); + tester(__filename, 11, Infinity); + tester(__filename, 1023, Infinity); + tester(__filename, 1024, Infinity); + tester(__filename, 1025, Infinity); + }); + describe('edge cases', () => { + tester(__filename, 0, 1); + tester(__filename, 0, 2); + tester(__filename, 0, 3); + tester(__filename, 1, 2); + tester(__filename, 1, 3); + tester(__filename, 2, 3); + tester(__filename, 0, 1023); + tester(__filename, 0, 1024); + tester(__filename, 0, 1025); + tester(__filename, 1, 1023); + tester(__filename, 1, 1024); + tester(__filename, 1, 1025); + tester(__filename, 1023, 1024); + tester(__filename, 1023, 1025); + tester(__filename, 1024, 1025); + tester(__filename, 123, 345); + tester(__filename, 1000000000, Infinity); + }); + } + + describe('as stream.Readable', () => { + + describe_read_cases(tester); + + function tester(file_path, start = 0, end = Infinity) { + const basename = path.basename(file_path); + it(`test read ${start}-${end} ${basename}`, async () => { + await native_fs_utils.use_file({ + fs_context, + bucket_path: file_path, + open_path: file_path, + scope: async file => { + const stat = await file.stat(fs_context); + const aborter = new AbortController(); + const signal = aborter.signal; + const file_reader = new FileReader({ + fs_context, + file, + file_path, + stat, + start, + end, + signal, + multi_buffer_pool, + highWaterMark: 1024, // bytes + }); + const data = await buffer_utils.read_stream_join(file_reader); + const node_fs_stream = fs.createReadStream(file_path, { start, end: end > 0 ? end - 1 : 0 }); + const node_fs_data = await buffer_utils.read_stream_join(node_fs_stream); + assert.strictEqual(data.length, node_fs_data.length); + assert.strictEqual(data.toString(), node_fs_data.toString()); + } + }); + }); + } + }); + + describe('read_into_stream with buffer pooling', () => { + + describe_read_cases(tester); + + function tester(file_path, start = 0, end = Infinity) { + const basename = path.basename(file_path); + it(`test read ${start}-${end} ${basename}`, async () => { + await native_fs_utils.use_file({ + fs_context, + bucket_path: file_path, + open_path: file_path, + scope: async file => { + const stat = await file.stat(fs_context); + const aborter = new AbortController(); + const signal = aborter.signal; + const file_reader = new FileReader({ + fs_context, + file, + file_path, + stat, + start, + end, + signal, + multi_buffer_pool, + highWaterMark: 1024, // bytes + }); + const writable = buffer_utils.write_stream(); + await file_reader.read_into_stream(writable); + const data = writable.join(); + const node_fs_stream = fs.createReadStream(file_path, { start, end: end > 0 ? end - 1 : 0 }); + const node_fs_data = await buffer_utils.read_stream_join(node_fs_stream); + assert.strictEqual(data.length, node_fs_data.length); + assert.strictEqual(data.toString(), node_fs_data.toString()); + } + }); + }); + } + + }); + +}); diff --git a/src/tools/file_writer_hashing.js b/src/tools/file_writer_hashing.js index e3f2c980dc..6542a9d3ed 100644 --- a/src/tools/file_writer_hashing.js +++ b/src/tools/file_writer_hashing.js @@ -6,7 +6,6 @@ const assert = require('assert'); const FileWriter = require('../util/file_writer'); const config = require('../../config'); const nb_native = require('../util/nb_native'); -const stream_utils = require('../util/stream_utils'); const P = require('../util/promise'); const stream = require('stream'); const fs = require('fs'); @@ -72,12 +71,11 @@ async function hash_target(chunk_size = CHUNK, parts = PARTS, iov_max = IOV_MAX) }()); const target = new TargetHash(); const file_writer = new FileWriter({ - target_file: target, + target_file: /**@type {any}*/ (target), fs_context: DEFAULT_FS_CONFIG, namespace_resource_id: 'MajesticSloth' }); - await stream_utils.pipeline([source_stream, file_writer]); - await stream_utils.wait_finished(file_writer); + await file_writer.write_entire_stream(source_stream); const write_hash = target.digest(); console.log( 'Hash target', @@ -114,8 +112,7 @@ async function file_target(chunk_size = CHUNK, parts = PARTS, iov_max = IOV_MAX) fs_context: DEFAULT_FS_CONFIG, namespace_resource_id: 'MajesticSloth' }); - await stream_utils.pipeline([source_stream, file_writer]); - await stream_utils.wait_finished(file_writer); + await file_writer.write_entire_stream(source_stream); if (XATTR) { await target_file.replacexattr( DEFAULT_FS_CONFIG, diff --git a/src/util/file_reader.js b/src/util/file_reader.js index a8be5e9184..a6ce6c3347 100644 --- a/src/util/file_reader.js +++ b/src/util/file_reader.js @@ -1,8 +1,230 @@ /* Copyright (C) 2024 NooBaa */ - 'use strict'; +const stream = require('stream'); +const config = require('../../config'); const nb_native = require('./nb_native'); +const stream_utils = require('./stream_utils'); +const native_fs_utils = require('./native_fs_utils'); + +/** @typedef {import('./buffer_utils').MultiSizeBuffersPool} MultiSizeBuffersPool */ + +/** + * FileReader is a Readable stream that reads data from a filesystem file. + * + * The Readable interface is easy to use, however, for us, it is not efficient enough + * because it has to allocate a new buffer for each chunk of data read from the file. + * This allocation and delayed garbage collection becomes expensive in high throughputs + * (which is something to improve in nodejs itself). + * + * To solve this, we added the optimized method read_into_stream(target_stream) which uses + * a buffer pool to recycle the buffers and avoid the allocation overhead. + * + * The target_stream should be a Writable stream that will not use the buffer after the + * write callback, since we will release the buffer back to the pool in the callback. + */ +class FileReader extends stream.Readable { + + /** + * @param {{ + * fs_context: nb.NativeFSContext, + * file: nb.NativeFile, + * file_path: string, + * stat: nb.NativeFSStats, + * start: number, + * end: number, + * multi_buffer_pool: MultiSizeBuffersPool, + * signal: AbortSignal, + * stats?: import('../sdk/endpoint_stats_collector').EndpointStatsCollector, + * bucket?: string, + * namespace_resource_id?: string, + * highWaterMark?: number, + * }} params + */ + constructor({ fs_context, + file, + file_path, + start, + end, + stat, + multi_buffer_pool, + signal, + stats, + bucket, + namespace_resource_id, + highWaterMark = config.NSFS_DOWNLOAD_STREAM_MEM_THRESHOLD, + }) { + super({ highWaterMark }); + this.fs_context = fs_context; + this.file = file; + this.file_path = file_path; + this.stat = stat; + this.start = start || 0; + this.end = Math.min(stat.size, end ?? Infinity); + this.pos = this.start; + this.multi_buffer_pool = multi_buffer_pool; + this.signal = signal; + this.stats = stats; + this.stats_count_once = 1; + this.bucket = bucket; + this.namespace_resource_id = namespace_resource_id; + this.num_bytes = 0; + this.num_buffers = 0; + this.log2_size_histogram = {}; + } + + /** + * Readable stream implementation + * @param {number} [size] + */ + async _read(size) { + try { + size ||= this.readableHighWaterMark; + const remain_size = this.end - this.pos; + if (remain_size <= 0) { + this.push(null); + return; + } + const read_size = Math.min(size, remain_size); + const buffer = Buffer.allocUnsafe(read_size); + const nread = await this.read_into_buffer(buffer, 0, read_size); + if (nread === read_size) { + this.push(buffer); + } else if (nread > 0) { + this.push(buffer.subarray(0, nread)); + } else { + this.push(null); + } + } catch (err) { + this.emit('error', err); + } + } + + /** + * @param {Buffer} buf + * @param {number} offset + * @param {number} length + * @returns {Promise} + */ + async read_into_buffer(buf, offset, length) { + await this._warmup_sparse_file(this.pos); + this.signal.throwIfAborted(); + const nread = await this.file.read(this.fs_context, buf, offset, length, this.pos); + if (nread) { + this.pos += nread; + this._update_stats(nread); + } + return nread; + } + + + /** + * Alternative implementation without using Readable stream API + * This allows to use a buffer pool to avoid creating new buffers. + * + * The target_stream should be a Writable stream that will not use the buffer after the + * write callback, since we will release the buffer back to the pool in the callback. + * This means Transforms should not be used as target_stream. + * + * @param {stream.Writable} target_stream + */ + async read_into_stream(target_stream) { + if (target_stream instanceof stream.Transform) { + throw new Error('FileReader read_into_stream must be called with a Writable stream, not a Transform stream'); + } + + let buffer_pool_cleanup = null; + let drain_promise = null; + + try { + while (this.pos < this.end) { + // prefer to warmup sparse file before allocating a buffer + await this._warmup_sparse_file(this.pos); + + // allocate or reuse buffer + // TODO buffers_pool and the underlying semaphore should support abort signal + // to avoid sleeping inside the semaphore until the timeout while the request is already aborted. + this.signal.throwIfAborted(); + const remain_size = this.end - this.pos; + const { buffer, callback } = await this.multi_buffer_pool.get_buffers_pool(remain_size).get_buffer(); + buffer_pool_cleanup = callback; // must be called ***IMMEDIATELY*** after get_buffer + this.signal.throwIfAborted(); + + // read from file + const read_size = Math.min(buffer.length, remain_size); + const nread = await this.read_into_buffer(buffer, 0, read_size); + if (!nread) { + buffer_pool_cleanup = null; + callback(); + break; + } + + // wait for response buffer to drain before adding more data if needed - + // this occurs when the output network is slower than the input file + if (drain_promise) { + this.signal.throwIfAborted(); + await drain_promise; + drain_promise = null; + this.signal.throwIfAborted(); + } + + // write the data out to response + const data = buffer.subarray(0, nread); + buffer_pool_cleanup = null; // cleanup is now in the socket responsibility + const write_ok = target_stream.write(data, null, callback); + if (!write_ok) { + drain_promise = stream_utils.wait_drain(target_stream, { signal: this.signal }); + drain_promise.catch(() => undefined); // this avoids UnhandledPromiseRejection + } + } + + // wait for the last drain if pending. + if (drain_promise) { + this.signal.throwIfAborted(); + await drain_promise; + drain_promise = null; + this.signal.throwIfAborted(); + } + + } finally { + if (buffer_pool_cleanup) buffer_pool_cleanup(); + } + } + + /** + * @param {number} size + */ + _update_stats(size) { + this.num_bytes += size; + this.num_buffers += 1; + const log2_size = Math.ceil(Math.log2(size)); + this.log2_size_histogram[log2_size] = (this.log2_size_histogram[log2_size] || 0) + 1; + + // update stats collector but count the entire read operation just once + const count = this.stats_count_once; + this.stats_count_once = 0; // counting the entire operation just once + this.stats?.update_nsfs_write_stats({ + namespace_resource_id: this.namespace_resource_id, + size, + count, + bucket_name: this.bucket, + }); + } + + /** + * @param {number} pos + */ + async _warmup_sparse_file(pos) { + if (!config.NSFS_BUF_WARMUP_SPARSE_FILE_READS) return; + if (!native_fs_utils.is_sparse_file(this.stat)) return; + this.signal.throwIfAborted(); + await native_fs_utils.warmup_sparse_file(this.fs_context, this.file, this.file_path, this.stat, pos); + } + + +} + + class NewlineReaderFilePathEntry { constructor(fs_context, filepath) { @@ -206,3 +428,4 @@ class NewlineReader { exports.NewlineReader = NewlineReader; exports.NewlineReaderEntry = NewlineReaderFilePathEntry; +exports.FileReader = FileReader; diff --git a/src/util/file_writer.js b/src/util/file_writer.js index c8df126719..36ed64502e 100644 --- a/src/util/file_writer.js +++ b/src/util/file_writer.js @@ -4,7 +4,7 @@ const stream = require('stream'); const config = require('../../config'); const nb_native = require('./nb_native'); -const dbg = require('../util/debug_module')(__filename); +const dbg = require('./debug_module')(__filename); /** * FileWriter is a Writable stream that write data to a filesystem file, @@ -14,18 +14,17 @@ class FileWriter extends stream.Writable { /** * @param {{ - * target_file: object, - * fs_context: object, - * namespace_resource_id: string, - * md5_enabled: boolean, - * stats: import('../sdk/endpoint_stats_collector').EndpointStatsCollector, + * target_file: nb.NativeFile, + * fs_context: nb.NativeFSContext, + * md5_enabled?: boolean, * offset?: number, + * stats?: import('../sdk/endpoint_stats_collector').EndpointStatsCollector, * bucket?: string, - * large_buf_size?: number, + * namespace_resource_id?: string, * }} params */ - constructor({ target_file, fs_context, namespace_resource_id, md5_enabled, stats, offset, bucket, large_buf_size }) { - super({ highWaterMark: config.NFSF_UPLOAD_STREAM_MEM_THRESHOLD }); + constructor({ target_file, fs_context, md5_enabled, offset, stats, bucket, namespace_resource_id }) { + super({ highWaterMark: config.NSFS_UPLOAD_STREAM_MEM_THRESHOLD }); this.target_file = target_file; this.fs_context = fs_context; this.offset = offset; @@ -34,12 +33,48 @@ class FileWriter extends stream.Writable { this.stats = stats; this.bucket = bucket; this.namespace_resource_id = namespace_resource_id; - this.large_buf_size = large_buf_size || config.NSFS_BUF_SIZE_L; this.MD5Async = md5_enabled ? new (nb_native().crypto.MD5Async)() : undefined; const platform_iov_max = nb_native().fs.PLATFORM_IOV_MAX; this.iov_max = platform_iov_max ? Math.min(platform_iov_max, config.NSFS_DEFAULT_IOV_MAX) : config.NSFS_DEFAULT_IOV_MAX; } + /** + * @param {stream.Readable} source_stream + * @param {import('events').Abortable} [options] + */ + async write_entire_stream(source_stream, options) { + await stream.promises.pipeline(source_stream, this, options); + await stream.promises.finished(this, options); + } + + /** + * Ingests an array of buffers and writes them to the target file, + * while handling MD5 calculation and stats update. + * @param {Buffer[]} buffers + * @param {number} size + */ + async write_buffers(buffers, size) { + await Promise.all([ + this.MD5Async && this._update_md5(buffers, size), + this._write_all_buffers(buffers, size), + ]); + this._update_stats(size); + } + + /** + * Finalizes the MD5 calculation and sets the digest. + */ + async finalize() { + if (this.MD5Async) { + const digest = await this.MD5Async.digest(); + this.digest = digest.toString('hex'); + } + } + + /////////////// + // INTERNALS // + /////////////// + /** * @param {number} size */ @@ -66,6 +101,8 @@ class FileWriter extends stream.Writable { } /** + * Writes an array of buffers to the target file, + * splitting them into batches if it exceeds the platform's IOV_MAX. * @param {Buffer[]} buffers * @param {number} size */ @@ -85,6 +122,8 @@ class FileWriter extends stream.Writable { } /** + * Writes an array of buffers to the target file, + * updating the offset and total bytes * @param {Buffer[]} buffers * @param {number} size */ @@ -96,6 +135,7 @@ class FileWriter extends stream.Writable { } /** + * Implements the write method of Writable stream. * @param {Array<{ chunk: Buffer; encoding: BufferEncoding; }>} chunks * @param {(error?: Error | null) => void} callback */ @@ -106,11 +146,7 @@ class FileWriter extends stream.Writable { size += it.chunk.length; return it.chunk; }); - await Promise.all([ - this.MD5Async && this._update_md5(buffers, size), - this._write_all_buffers(buffers, size), - ]); - this._update_stats(size); + await this.write_buffers(buffers, size); return callback(); } catch (err) { console.error('FileWriter._writev: failed', err); @@ -119,15 +155,12 @@ class FileWriter extends stream.Writable { } /** + * Implements the final method of Writable stream. * @param {(error?: Error | null) => void} callback */ async _final(callback) { try { - if (this.MD5Async) { - const digest = await this.MD5Async.digest(); - this.digest = digest.toString('hex'); - } - + await this.finalize(); return callback(); } catch (err) { console.error('FileWriter._final: failed', err); diff --git a/src/util/native_fs_utils.js b/src/util/native_fs_utils.js index 4a12a80b66..5f59ce1cd0 100644 --- a/src/util/native_fs_utils.js +++ b/src/util/native_fs_utils.js @@ -10,6 +10,7 @@ const crypto = require('crypto'); const config = require('../../config'); const RpcError = require('../rpc/rpc_error'); const nb_native = require('../util/nb_native'); +const error_utils = require('../util/error_utils'); const gpfs_link_unlink_retry_err = 'EEXIST'; const gpfs_unlink_retry_catch = 'GPFS_UNLINK_RETRY'; @@ -66,25 +67,30 @@ async function _generate_unique_path(fs_context, tmp_dir_path) { * @param {string} open_mode */ // opens open_path on POSIX, and on GPFS it will open open_path parent folder -async function open_file(fs_context, bucket_path, open_path, open_mode = config.NSFS_OPEN_READ_MODE, - file_permissions = config.BASE_MODE_FILE) { +async function open_file( + fs_context, + bucket_path, + open_path, + open_mode = config.NSFS_OPEN_READ_MODE, + file_permissions = config.BASE_MODE_FILE, +) { let retries = config.NSFS_MKDIR_PATH_RETRIES; const dir_path = path.dirname(open_path); const actual_open_path = open_mode === 'wt' ? dir_path : open_path; const should_create_path_dirs = (open_mode === 'wt' || open_mode === 'w') && dir_path !== bucket_path; - for (;;) { + for (; ;) { try { if (should_create_path_dirs) { - dbg.log1(`NamespaceFS._open_file: mode=${open_mode} creating dirs`, open_path, bucket_path); + dbg.log1(`native_fs_utils.open_file: mode=${open_mode} creating dirs`, open_path, bucket_path); await _make_path_dirs(open_path, fs_context); } - dbg.log1(`NamespaceFS._open_file: mode=${open_mode}`, open_path); + dbg.log1(`native_fs_utils.open_file: mode=${open_mode}`, open_path); // for 'wt' open the tmpfile with the parent dir path const fd = await nb_native().fs.open(fs_context, actual_open_path, open_mode, get_umasked_mode(file_permissions)); return fd; } catch (err) { - dbg.warn(`native_fs_utils.open_file Retrying retries=${retries} mode=${open_mode} open_path=${open_path} dir_path=${dir_path} actual_open_path=${actual_open_path}`, err); + dbg.warn(`native_fs_utils.open_file: Retrying retries=${retries} mode=${open_mode} open_path=${open_path} dir_path=${dir_path} actual_open_path=${actual_open_path}`, err); if (err.code !== 'ENOENT') throw err; // case of concurrennt deletion of the dir_path if (retries <= 0 || !should_create_path_dirs) throw err; @@ -93,6 +99,56 @@ async function open_file(fs_context, bucket_path, open_path, open_mode = config. } } +/** + * Open a file and close it after the async scope function is done. + * + * @template T + * @param {{ + * fs_context: nb.NativeFSContext, + * bucket_path: string, + * open_path: string, + * open_mode?: string, + * file_permissions?: number, + * scope: (file: nb.NativeFile, file_path: string) => Promise, + * }} params + * @returns {Promise} + */ +async function use_file({ + fs_context, + bucket_path, + open_path, + open_mode, + file_permissions, + scope, +}) { + let file; + let ret; + + try { + file = await open_file(fs_context, bucket_path, open_path, open_mode, file_permissions); + } catch (err) { + dbg.error('native_fs_utils.use_file: open failed', open_path, err); + throw err; + } + + try { + ret = await scope(file, open_path); + } catch (err) { + dbg.error('native_fs_utils.use_file: scope failed', open_path, err); + throw err; + } finally { + if (file) { + try { + await file.close(fs_context); + } catch (err) { + dbg.warn('native_fs_utils.use_file: close failed', open_path, err); + } + } + } + + return ret; +} + /** * @param {MultiSizeBuffersPool} multi_buffers_pool * @param {nb.NativeFSContext} fs_context @@ -110,7 +166,7 @@ async function copy_bytes(multi_buffers_pool, fs_context, src_file, dst_file, si let bytes_written = 0; const total_bytes_to_write = Number(size); let write_pos = write_offset >= 0 ? write_offset : 0; - for (;;) { + for (; ;) { const total_bytes_left = total_bytes_to_write - bytes_written; if (total_bytes_left <= 0) break; const { buffer, callback } = await multi_buffers_pool.get_buffers_pool(total_bytes_left).get_buffer(); @@ -350,9 +406,7 @@ async function create_config_file(fs_context, schema_dir, config_path, config_da // validate config file doesn't exist try { await nb_native().fs.stat(fs_context, config_path); - const err = new Error('configuration file already exists'); - err.code = 'EEXIST'; - throw err; + throw error_utils.new_error_code('EEXIST', 'configuration file already exists'); } catch (err) { if (err.code !== 'ENOENT') throw err; } @@ -470,7 +524,7 @@ async function update_config_file(fs_context, schema_dir, config_path, config_da // moving tmp file to config path atomically dbg.log1('native_fs_utils: update_config_file moving from:', open_path, 'to:', config_path, 'is_gpfs=', is_gpfs); let retries = config.NSFS_RENAME_RETRIES; - for (;;) { + for (; ;) { try { const src_stat = is_gpfs ? undefined : await nb_native().fs.stat(fs_context, open_path); await safe_move(fs_context, open_path, config_path, src_stat, gpfs_options, tmp_dir_path); @@ -732,11 +786,52 @@ async function lock_and_run(fs_context, lock_path, cb) { } } +/** + * NOTICE that even files that were written sequentially, can still be identified as sparse: + * 1. After writing, but before all the data is synced, the size is higher than blocks size. + * 2. For files that were moved to an archive tier. + * 3. For files that fetch and cache data from remote storage, which are still not in the cache. + * It's not good enough for avoiding recall storms as needed by _fail_if_archived_or_sparse_file. + * However, using this check is useful for guessing that a reads is going to take more time + * and avoid holding off large buffers from the buffers_pool. + * @param {nb.NativeFSStats} stat + * @returns {boolean} + */ +function is_sparse_file(stat) { + return (stat.blocks * 512 < stat.size); +} + +let warmup_buffer; + +/** + * Our buffer pool keeps large buffers and we want to avoid spending + * all our large buffers and then have them waiting for high latency calls + * such as reading from archive/on-demand cache files. + * Instead, we detect the case where a file is "sparse", + * and then use just a small buffer to wait for a tiny read, + * which will recall the file from archive or load from remote into cache, + * and once it returns we can continue to the full fledged read. + * @param {nb.NativeFSContext} fs_context + * @param {nb.NativeFile} file + * @param {nb.NativeFSStats} stat + * @param {number} pos + */ +async function warmup_sparse_file(fs_context, file, file_path, stat, pos) { + dbg.log0('warmup_sparse_file', { + file_path, pos, size: stat.size, blocks: stat.blocks, + }); + if (!warmup_buffer) { + warmup_buffer = nb_native().fs.dio_buffer_alloc(4096); + } + await file.read(fs_context, warmup_buffer, 0, 1, pos); +} + exports.get_umasked_mode = get_umasked_mode; exports._make_path_dirs = _make_path_dirs; exports._create_path = _create_path; exports._generate_unique_path = _generate_unique_path; exports.open_file = open_file; +exports.use_file = use_file; exports.copy_bytes = copy_bytes; exports.finally_close_files = finally_close_files; exports.get_user_by_distinguished_name = get_user_by_distinguished_name; @@ -774,5 +869,6 @@ exports.get_bucket_tmpdir_full_path = get_bucket_tmpdir_full_path; exports.get_bucket_tmpdir_name = get_bucket_tmpdir_name; exports.entity_enum = entity_enum; exports.translate_error_codes = translate_error_codes; - exports.lock_and_run = lock_and_run; +exports.is_sparse_file = is_sparse_file; +exports.warmup_sparse_file = warmup_sparse_file;