|
4 | 4 |
|
5 | 5 | const nb_native = require('./nb_native');
|
6 | 6 |
|
7 |
| -class NewlineReaderFilePathEntry { |
8 |
| - constructor(fs_context, filepath) { |
9 |
| - this.fs_context = fs_context; |
10 |
| - this.path = filepath; |
11 |
| - } |
12 |
| - |
13 |
| - async open(mode = 'rw*') { |
14 |
| - return nb_native().fs.open(this.fs_context, this.path, mode); |
15 |
| - } |
16 |
| -} |
17 |
| - |
18 |
| -class NewlineReader { |
| 7 | +class LineReader { |
19 | 8 | /**
|
20 |
| - * Newline character code |
21 |
| - */ |
22 |
| - static NL_CODE = 10; |
23 |
| - |
24 |
| - /** |
25 |
| - * NewlineReader allows to read a file line by line. |
| 9 | + * LineReader allows to read a file line by line. |
26 | 10 | * @param {nb.NativeFSContext} fs_context
|
27 | 11 | * @param {string} filepath
|
| 12 | + * @param {number | string} delim |
28 | 13 | * @param {{
|
29 | 14 | * lock?: 'EXCLUSIVE' | 'SHARED'
|
30 | 15 | * bufsize?: number;
|
31 | 16 | * skip_leftover_line?: boolean;
|
32 | 17 | * skip_overflow_lines?: boolean;
|
33 | 18 | * read_file_offset?: number;
|
| 19 | + * encoding?: BufferEncoding; |
34 | 20 | * }} [cfg]
|
35 | 21 | **/
|
36 |
| - constructor(fs_context, filepath, cfg) { |
| 22 | + constructor(fs_context, filepath, delim, cfg) { |
| 23 | + if (!['number', 'string'].includes(typeof(delim))) { |
| 24 | + throw new Error('delim must be of type string or a number'); |
| 25 | + } |
| 26 | + |
37 | 27 | this.path = filepath;
|
38 | 28 | this.lock = cfg?.lock;
|
| 29 | + this.delim = delim; |
| 30 | + this.encoding = cfg?.encoding || 'utf8'; |
39 | 31 | this.skip_leftover_line = Boolean(cfg?.skip_leftover_line);
|
40 | 32 | this.skip_overflow_lines = Boolean(cfg?.skip_overflow_lines);
|
41 | 33 |
|
@@ -69,20 +61,22 @@ class NewlineReader {
|
69 | 61 | async nextline() {
|
70 | 62 | if (!this.fh) await this.init();
|
71 | 63 |
|
| 64 | + const delim_length = typeof this.delim === 'string' ? Buffer.byteLength(this.delim, this.encoding) : 1; |
| 65 | + |
72 | 66 | // TODO - in case more data will be appended to the file - after each read the reader must set reader.eof = false if someone will keep on reading from a file while it is being written.
|
73 | 67 | while (!this.eof) {
|
74 | 68 | // extract next line if terminated in current buffer
|
75 | 69 | if (this.start < this.end) {
|
76 |
| - const term_idx = this.buf.subarray(this.start, this.end).indexOf(NewlineReader.NL_CODE); |
| 70 | + const term_idx = this.buf.subarray(this.start, this.end).indexOf(this.delim, null, this.encoding); |
77 | 71 | if (term_idx >= 0) {
|
78 | 72 | if (this.overflow_state) {
|
79 | 73 | console.warn('line too long finally terminated:', this.info());
|
80 | 74 | this.overflow_state = false;
|
81 |
| - this.start += term_idx + 1; |
| 75 | + this.start += term_idx + delim_length; |
82 | 76 | continue;
|
83 | 77 | }
|
84 |
| - const line = this.buf.toString('utf8', this.start, this.start + term_idx); |
85 |
| - this.start += term_idx + 1; |
| 78 | + const line = this.buf.toString(this.encoding, this.start, this.start + term_idx); |
| 79 | + this.start += term_idx + delim_length; |
86 | 80 | this.next_line_file_offset = this.read_file_offset - (this.end - this.start);
|
87 | 81 | return line;
|
88 | 82 | }
|
@@ -120,7 +114,7 @@ class NewlineReader {
|
120 | 114 | } else if (this.overflow_state) {
|
121 | 115 | console.warn('line too long finally terminated at eof:', this.info());
|
122 | 116 | } else {
|
123 |
| - const line = this.buf.toString('utf8', this.start, this.end); |
| 117 | + const line = this.buf.toString(this.encoding, this.start, this.end); |
124 | 118 | this.start = this.end;
|
125 | 119 | this.next_line_file_offset = this.read_file_offset;
|
126 | 120 | return line;
|
@@ -158,17 +152,6 @@ class NewlineReader {
|
158 | 152 | return [count, true];
|
159 | 153 | }
|
160 | 154 |
|
161 |
| - /** |
162 |
| - * forEachFilePathEntry is a wrapper around `forEach` where each entry in |
163 |
| - * log file is assumed to be a file path and the given callback function |
164 |
| - * is invoked with that entry wrapped in a class with some convenient wrappers. |
165 |
| - * @param {(entry: NewlineReaderFilePathEntry) => Promise<boolean>} cb |
166 |
| - * @returns {Promise<[number, boolean]>} |
167 |
| - */ |
168 |
| - async forEachFilePathEntry(cb) { |
169 |
| - return this.forEach(entry => cb(new NewlineReaderFilePathEntry(this.fs_context, entry))); |
170 |
| - } |
171 |
| - |
172 | 155 | // reset will reset the reader and will allow reading the file from
|
173 | 156 | // the beginning again, this does not reopens the file so if the file
|
174 | 157 | // was moved, this will still keep on reading from the previous FD.
|
@@ -204,5 +187,53 @@ class NewlineReader {
|
204 | 187 | }
|
205 | 188 | }
|
206 | 189 |
|
| 190 | +class NewlineReaderFilePathEntry { |
| 191 | + constructor(fs_context, filepath) { |
| 192 | + this.fs_context = fs_context; |
| 193 | + this.path = filepath; |
| 194 | + } |
| 195 | + |
| 196 | + async open(mode = 'rw*') { |
| 197 | + return nb_native().fs.open(this.fs_context, this.path, mode); |
| 198 | + } |
| 199 | +} |
| 200 | + |
| 201 | +class NewlineReader extends LineReader { |
| 202 | + /** |
| 203 | + * Newline character code |
| 204 | + */ |
| 205 | + static NL_CODE = 10; |
| 206 | + |
| 207 | + /** |
| 208 | + * NewlineReader allows to read a file line by line. |
| 209 | + * @param {nb.NativeFSContext} fs_context |
| 210 | + * @param {string} filepath |
| 211 | + * @param {{ |
| 212 | + * lock?: 'EXCLUSIVE' | 'SHARED' |
| 213 | + * bufsize?: number; |
| 214 | + * skip_leftover_line?: boolean; |
| 215 | + * skip_overflow_lines?: boolean; |
| 216 | + * read_file_offset?: number; |
| 217 | + * encoding?: BufferEncoding; |
| 218 | + * }} [cfg] |
| 219 | + **/ |
| 220 | + constructor(fs_context, filepath, cfg) { |
| 221 | + super(fs_context, filepath, NewlineReader.NL_CODE, cfg); |
| 222 | + } |
| 223 | + |
| 224 | + /** |
| 225 | + * forEachFilePathEntry is a wrapper around `forEach` where each entry in |
| 226 | + * log file is assumed to be a file path and the given callback function |
| 227 | + * is invoked with that entry wrapped in a class with some convenient wrappers. |
| 228 | + * @param {(entry: NewlineReaderFilePathEntry) => Promise<boolean>} cb |
| 229 | + * @returns {Promise<[number, boolean]>} |
| 230 | + */ |
| 231 | + async forEachFilePathEntry(cb) { |
| 232 | + return this.forEach(entry => cb(new NewlineReaderFilePathEntry(this.fs_context, entry))); |
| 233 | + } |
| 234 | +} |
| 235 | + |
207 | 236 | exports.NewlineReader = NewlineReader;
|
208 | 237 | exports.NewlineReaderEntry = NewlineReaderFilePathEntry;
|
| 238 | +exports.LineReader = LineReader; |
| 239 | + |
0 commit comments