Skip to content

Commit b1910bd

Browse files
committed
add a generic line reader
Signed-off-by: Utkarsh Srivastava <[email protected]> change glacier logs delim Signed-off-by: Utkarsh Srivastava <[email protected]>
1 parent 9ab748b commit b1910bd

File tree

4 files changed

+87
-45
lines changed

4 files changed

+87
-45
lines changed

src/sdk/glacier_tapecloud.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ class TapeCloudGlacier extends Glacier {
193193
async migrate(fs_context, log_file, failure_recorder) {
194194
dbg.log2('TapeCloudGlacier.migrate starting for', log_file);
195195

196-
const file = new LogFile(fs_context, log_file);
196+
const file = new LogFile(fs_context, log_file, '\n-- ');
197197

198198
try {
199199
await file.collect_and_process(async (entry, batch_recorder) => {
@@ -243,7 +243,7 @@ class TapeCloudGlacier extends Glacier {
243243
async restore(fs_context, log_file, failure_recorder) {
244244
dbg.log2('TapeCloudGlacier.restore starting for', log_file);
245245

246-
const file = new LogFile(fs_context, log_file);
246+
const file = new LogFile(fs_context, log_file, '\n-- ');
247247
try {
248248
await file.collect_and_process(async (entry, batch_recorder) => {
249249
try {

src/sdk/namespace_fs.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3687,13 +3687,13 @@ class NamespaceFS {
36873687
async append_to_migrate_wal(entry) {
36883688
if (!config.NSFS_GLACIER_LOGS_ENABLED) return;
36893689

3690-
await NamespaceFS.migrate_wal.append(entry);
3690+
await NamespaceFS.migrate_wal.append(`-- ${entry}`);
36913691
}
36923692

36933693
async append_to_restore_wal(entry) {
36943694
if (!config.NSFS_GLACIER_LOGS_ENABLED) return;
36953695

3696-
await NamespaceFS.restore_wal.append(entry);
3696+
await NamespaceFS.restore_wal.append(`-- ${entry}`);
36973697
}
36983698

36993699
static get migrate_wal() {

src/util/file_reader.js

Lines changed: 66 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -4,38 +4,30 @@
44

55
const nb_native = require('./nb_native');
66

7-
class NewlineReaderFilePathEntry {
8-
constructor(fs_context, filepath) {
9-
this.fs_context = fs_context;
10-
this.path = filepath;
11-
}
12-
13-
async open(mode = 'rw*') {
14-
return nb_native().fs.open(this.fs_context, this.path, mode);
15-
}
16-
}
17-
18-
class NewlineReader {
7+
class LineReader {
198
/**
20-
* Newline character code
21-
*/
22-
static NL_CODE = 10;
23-
24-
/**
25-
* NewlineReader allows to read a file line by line.
9+
* LineReader allows to read a file line by line.
2610
* @param {nb.NativeFSContext} fs_context
2711
* @param {string} filepath
12+
* @param {number | string} delim
2813
* @param {{
2914
* lock?: 'EXCLUSIVE' | 'SHARED'
3015
* bufsize?: number;
3116
* skip_leftover_line?: boolean;
3217
* skip_overflow_lines?: boolean;
3318
* read_file_offset?: number;
19+
* encoding?: BufferEncoding;
3420
* }} [cfg]
3521
**/
36-
constructor(fs_context, filepath, cfg) {
22+
constructor(fs_context, filepath, delim, cfg) {
23+
if (!['number', 'string'].includes(typeof(delim))) {
24+
throw new Error('delim must be of type string or a number');
25+
}
26+
3727
this.path = filepath;
3828
this.lock = cfg?.lock;
29+
this.delim = delim;
30+
this.encoding = cfg?.encoding || 'utf8';
3931
this.skip_leftover_line = Boolean(cfg?.skip_leftover_line);
4032
this.skip_overflow_lines = Boolean(cfg?.skip_overflow_lines);
4133

@@ -69,20 +61,22 @@ class NewlineReader {
6961
async nextline() {
7062
if (!this.fh) await this.init();
7163

64+
const delim_length = typeof this.delim === 'string' ? Buffer.byteLength(this.delim, this.encoding) : 1;
65+
7266
// TODO - in case more data will be appended to the file - after each read the reader must set reader.eof = false if someone will keep on reading from a file while it is being written.
7367
while (!this.eof) {
7468
// extract next line if terminated in current buffer
7569
if (this.start < this.end) {
76-
const term_idx = this.buf.subarray(this.start, this.end).indexOf(NewlineReader.NL_CODE);
70+
const term_idx = this.buf.subarray(this.start, this.end).indexOf(this.delim, null, this.encoding);
7771
if (term_idx >= 0) {
7872
if (this.overflow_state) {
7973
console.warn('line too long finally terminated:', this.info());
8074
this.overflow_state = false;
81-
this.start += term_idx + 1;
75+
this.start += term_idx + delim_length;
8276
continue;
8377
}
84-
const line = this.buf.toString('utf8', this.start, this.start + term_idx);
85-
this.start += term_idx + 1;
78+
const line = this.buf.toString(this.encoding, this.start, this.start + term_idx);
79+
this.start += term_idx + delim_length;
8680
this.next_line_file_offset = this.read_file_offset - (this.end - this.start);
8781
return line;
8882
}
@@ -120,7 +114,7 @@ class NewlineReader {
120114
} else if (this.overflow_state) {
121115
console.warn('line too long finally terminated at eof:', this.info());
122116
} else {
123-
const line = this.buf.toString('utf8', this.start, this.end);
117+
const line = this.buf.toString(this.encoding, this.start, this.end);
124118
this.start = this.end;
125119
this.next_line_file_offset = this.read_file_offset;
126120
return line;
@@ -158,17 +152,6 @@ class NewlineReader {
158152
return [count, true];
159153
}
160154

161-
/**
162-
* forEachFilePathEntry is a wrapper around `forEach` where each entry in
163-
* log file is assumed to be a file path and the given callback function
164-
* is invoked with that entry wrapped in a class with some convenient wrappers.
165-
* @param {(entry: NewlineReaderFilePathEntry) => Promise<boolean>} cb
166-
* @returns {Promise<[number, boolean]>}
167-
*/
168-
async forEachFilePathEntry(cb) {
169-
return this.forEach(entry => cb(new NewlineReaderFilePathEntry(this.fs_context, entry)));
170-
}
171-
172155
// reset will reset the reader and will allow reading the file from
173156
// the beginning again, this does not reopens the file so if the file
174157
// was moved, this will still keep on reading from the previous FD.
@@ -204,5 +187,53 @@ class NewlineReader {
204187
}
205188
}
206189

190+
class NewlineReaderFilePathEntry {
191+
constructor(fs_context, filepath) {
192+
this.fs_context = fs_context;
193+
this.path = filepath;
194+
}
195+
196+
async open(mode = 'rw*') {
197+
return nb_native().fs.open(this.fs_context, this.path, mode);
198+
}
199+
}
200+
201+
class NewlineReader extends LineReader {
202+
/**
203+
* Newline character code
204+
*/
205+
static NL_CODE = 10;
206+
207+
/**
208+
* NewlineReader allows to read a file line by line.
209+
* @param {nb.NativeFSContext} fs_context
210+
* @param {string} filepath
211+
* @param {{
212+
* lock?: 'EXCLUSIVE' | 'SHARED'
213+
* bufsize?: number;
214+
* skip_leftover_line?: boolean;
215+
* skip_overflow_lines?: boolean;
216+
* read_file_offset?: number;
217+
* encoding?: BufferEncoding;
218+
* }} [cfg]
219+
**/
220+
constructor(fs_context, filepath, cfg) {
221+
super(fs_context, filepath, NewlineReader.NL_CODE, cfg);
222+
}
223+
224+
/**
225+
* forEachFilePathEntry is a wrapper around `forEach` where each entry in
226+
* log file is assumed to be a file path and the given callback function
227+
* is invoked with that entry wrapped in a class with some convenient wrappers.
228+
* @param {(entry: NewlineReaderFilePathEntry) => Promise<boolean>} cb
229+
* @returns {Promise<[number, boolean]>}
230+
*/
231+
async forEachFilePathEntry(cb) {
232+
return this.forEach(entry => cb(new NewlineReaderFilePathEntry(this.fs_context, entry)));
233+
}
234+
}
235+
207236
exports.NewlineReader = NewlineReader;
208237
exports.NewlineReaderEntry = NewlineReaderFilePathEntry;
238+
exports.LineReader = LineReader;
239+

src/util/persistent_logger.js

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ const nb_native = require('./nb_native');
66
const native_fs_utils = require('./native_fs_utils');
77
const P = require('./promise');
88
const semaphore = require('./semaphore');
9-
const { NewlineReader } = require('./file_reader');
9+
const { NewlineReader, LineReader } = require('./file_reader');
1010
const dbg = require('./debug_module')(__filename);
1111

1212
/**
@@ -260,10 +260,12 @@ class LogFile {
260260
/**
261261
* @param {nb.NativeFSContext} fs_context
262262
* @param {string} log_path
263+
* @param {string | number} [delim]
263264
*/
264-
constructor(fs_context, log_path) {
265+
constructor(fs_context, log_path, delim) {
265266
this.fs_context = fs_context;
266267
this.log_path = log_path;
268+
this.delim = delim;
267269
}
268270

269271
/**
@@ -288,10 +290,19 @@ class LogFile {
288290
`tmp_consume_${Date.now().toString()}`, { locking: 'EXCLUSIVE' }
289291
);
290292

291-
log_reader = new NewlineReader(
292-
this.fs_context,
293-
this.log_path, { lock: 'EXCLUSIVE', skip_overflow_lines: true, skip_leftover_line: true },
294-
);
293+
if (this.delim) {
294+
log_reader = new LineReader(
295+
this.fs_context,
296+
this.log_path,
297+
this.delim,
298+
{ lock: 'EXCLUSIVE', skip_overflow_lines: true, skip_leftover_line: true }
299+
);
300+
} else {
301+
log_reader = new NewlineReader(
302+
this.fs_context,
303+
this.log_path, { lock: 'EXCLUSIVE', skip_overflow_lines: true, skip_leftover_line: true },
304+
);
305+
}
295306
await log_reader.forEach(async entry => {
296307
await collect(entry, filtered_log.append.bind(filtered_log));
297308
return true;

0 commit comments

Comments
 (0)