Skip to content

Commit 703a128

Browse files
authored
Merge pull request #145 from albe/record-separator
Separate documents by a unique sequence and make partition backwards scannable
2 parents 11677db + e1f5f5e commit 703a128

File tree

6 files changed

+228
-35
lines changed

6 files changed

+228
-35
lines changed

src/Partition/ReadOnlyPartition.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ class ReadOnlyPartition extends WatchesFile(ReadablePartition) {
1818
* @param {string} filename
1919
*/
2020
onChange(filename) {
21+
/* istanbul ignore if */
2122
if (!this.fd) {
2223
return;
2324
}

src/Partition/ReadablePartition.js

Lines changed: 87 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
const fs = require('fs');
22
const path = require('path');
33
const events = require('events');
4-
const { assert } = require('../util');
4+
const { assert, alignTo } = require('../util');
55

66
const DEFAULT_READ_BUFFER_SIZE = 64 * 1024;
77
const DOCUMENT_HEADER_SIZE = 16;
88
const DOCUMENT_ALIGNMENT = 4;
9+
const DOCUMENT_SEPARATOR = "\x00\x00\x1E\n";
10+
const DOCUMENT_FOOTER_SIZE = 4 /* additional data size footer */ + DOCUMENT_SEPARATOR.length;
911

10-
// node-event-store partition V02
11-
const HEADER_MAGIC = "nesprt02";
12+
// node-event-store partition V03
13+
const HEADER_MAGIC = "nesprt03";
1214

1315
class CorruptFileError extends Error {}
1416
class InvalidDataSizeError extends Error {}
@@ -20,6 +22,7 @@ class InvalidDataSizeError extends Error {}
2022
* @returns {number}
2123
*/
2224
function hash(str) {
25+
/* istanbul ignore if */
2326
if (str.length === 0) {
2427
return 0;
2528
}
@@ -160,8 +163,8 @@ class ReadablePartition extends events.EventEmitter {
160163
* @returns {number} The size of the data including header, padded to 16 bytes alignment and ended with a line break.
161164
*/
162165
documentWriteSize(dataSize) {
163-
const padSize = (DOCUMENT_ALIGNMENT - ((dataSize + 1) % DOCUMENT_ALIGNMENT)) % DOCUMENT_ALIGNMENT;
164-
return dataSize + 1 + padSize + DOCUMENT_HEADER_SIZE;
166+
const padSize = alignTo(dataSize + DOCUMENT_FOOTER_SIZE, DOCUMENT_ALIGNMENT);
167+
return DOCUMENT_HEADER_SIZE + dataSize + padSize + DOCUMENT_FOOTER_SIZE;
165168
}
166169

167170
/**
@@ -208,7 +211,7 @@ class ReadablePartition extends events.EventEmitter {
208211
* @param {number} offset The position inside the buffer to start reading from.
209212
* @param {number} position The file position to start reading from.
210213
* @param {number} [size] The expected byte size of the document at the given position.
211-
* @returns {{ dataSize: number, sequenceNumber, number, time64: number }} The metadata fields of the document
214+
* @returns {{ dataSize: number, sequenceNumber: number, time64: number }} The metadata fields of the document
212215
* @throws {Error} if the storage entry at the given position is corrupted.
213216
* @throws {InvalidDataSizeError} if the document size at the given position does not match the provided size.
214217
* @throws {CorruptFileError} if the document at the given position can not be read completely.
@@ -221,7 +224,8 @@ class ReadablePartition extends events.EventEmitter {
221224
throw new InvalidDataSizeError(`Invalid document size ${dataSize} at position ${position}, expected ${size}.`);
222225
}
223226

224-
if (position + dataSize + DOCUMENT_HEADER_SIZE > this.size) {
227+
const writeSize = this.documentWriteSize(dataSize);
228+
if (position + writeSize > this.size) {
225229
throw new CorruptFileError(`Invalid document at position ${position}. This may be caused by an unfinished write.`);
226230
}
227231

@@ -249,6 +253,25 @@ class ReadablePartition extends events.EventEmitter {
249253
return ({ buffer: this.readBuffer, cursor: bufferCursor, length: this.readBufferLength });
250254
}
251255

256+
/**
257+
* Prepare the read buffer for reading *before* the specified position. Don't try to reader *after* the returned cursor.
258+
*
259+
* @protected
260+
* @param {number} position The position in the file to prepare the read buffer for reading before.
261+
* @returns {{ buffer: Buffer|null, cursor: number, length: number }} A reader object with properties `buffer`, `cursor` and `length`.
262+
*/
263+
prepareReadBufferBackwards(position) {
264+
if (position < 0) {
265+
return ({ buffer: null, cursor: 0, length: 0 });
266+
}
267+
let bufferCursor = position - this.readBufferPos;
268+
if (this.readBufferPos < 0 || (this.readBufferPos > 0 && bufferCursor < DOCUMENT_FOOTER_SIZE)) {
269+
this.fillBuffer(Math.max(position - this.readBuffer.byteLength, 0));
270+
bufferCursor = position - this.readBufferPos;
271+
}
272+
return ({ buffer: this.readBuffer, cursor: bufferCursor, length: this.readBufferLength });
273+
}
274+
252275
/**
253276
* Read the data from the given position.
254277
*
@@ -265,7 +288,7 @@ class ReadablePartition extends events.EventEmitter {
265288
return false;
266289
}
267290

268-
assert((position % DOCUMENT_ALIGNMENT) === 0, `Invalid read position. Needs to be a multiple of ${DOCUMENT_ALIGNMENT}.`);
291+
assert((position % DOCUMENT_ALIGNMENT) === 0, `Invalid read position ${position}. Needs to be a multiple of ${DOCUMENT_ALIGNMENT}.`);
269292

270293
const reader = this.prepareReadBuffer(position);
271294
if (reader.length < size + DOCUMENT_HEADER_SIZE) {
@@ -290,22 +313,75 @@ class ReadablePartition extends events.EventEmitter {
290313
return reader.buffer.toString('utf8', dataPosition, dataPosition + dataSize);
291314
}
292315

316+
/**
317+
* Find the start position of the document that precedes the given position.
318+
*
319+
* @protected
320+
* @param {number} position The file position to read backwards from.
321+
* @returns {number|boolean} The start position of the first document before the given position or false if no header could be found.
322+
*/
323+
findDocumentPositionBefore(position) {
324+
assert(this.fd, 'Partition is not opened.');
325+
position -= (position % DOCUMENT_ALIGNMENT);
326+
if (position <= 0) {
327+
return false;
328+
}
329+
330+
const separatorSize = DOCUMENT_SEPARATOR.length;
331+
// Optimization if we are at an exact document boundary, where we can just read the document size
332+
let reader = this.prepareReadBufferBackwards(position);
333+
const block = reader.buffer.toString('ascii', reader.cursor - separatorSize, reader.cursor);
334+
if (block === DOCUMENT_SEPARATOR) {
335+
const dataSize = reader.buffer.readUInt32BE(reader.cursor - separatorSize - 4);
336+
return position - this.documentWriteSize(dataSize);
337+
}
338+
339+
do {
340+
reader = this.prepareReadBufferBackwards(position - separatorSize);
341+
342+
const bufferSeparatorPosition = reader.buffer.lastIndexOf(DOCUMENT_SEPARATOR, reader.cursor - separatorSize, 'ascii');
343+
if (bufferSeparatorPosition >= 0) {
344+
position = this.readBufferPos + bufferSeparatorPosition + separatorSize;
345+
break;
346+
}
347+
position -= this.readBufferLength;
348+
} while (position > 0);
349+
return position;
350+
}
351+
293352
/**
294353
* @api
354+
* @param {number} [after] The document position to start reading from.
295355
* @returns {Generator<string>} A generator that returns all documents in this partition.
296356
*/
297-
*readAll() {
298-
let position = 0;
357+
*readAll(after = 0) {
358+
let position = after < 0 ? this.size + after + 1 : after;
299359
let data;
300360
while ((data = this.readFrom(position)) !== false) {
301361
yield data;
302362
position += this.documentWriteSize(Buffer.byteLength(data, 'utf8'));
303363
}
304364
}
305365

366+
/**
367+
* @api
368+
* @param {number} [before] The document position to start reading backward from.
369+
* @returns {Generator<string>} A generator that returns all documents in this partition in reverse order.
370+
*/
371+
*readAllBackwards(before = -1) {
372+
let position = before < 0 ? this.size + before + 1 : before;
373+
while ((position = this.findDocumentPositionBefore(position)) !== false) {
374+
const data = this.readFrom(position);
375+
yield data;
376+
}
377+
}
306378
}
307379

308380
module.exports = ReadablePartition;
309381
module.exports.CorruptFileError = CorruptFileError;
310382
module.exports.InvalidDataSizeError = InvalidDataSizeError;
311-
module.exports.HEADER_MAGIC = HEADER_MAGIC;
383+
module.exports.HEADER_MAGIC = HEADER_MAGIC;
384+
module.exports.DOCUMENT_SEPARATOR = DOCUMENT_SEPARATOR;
385+
module.exports.DOCUMENT_ALIGNMENT = DOCUMENT_ALIGNMENT;
386+
module.exports.DOCUMENT_HEADER_SIZE = DOCUMENT_HEADER_SIZE;
387+
module.exports.DOCUMENT_FOOTER_SIZE = DOCUMENT_FOOTER_SIZE;

src/Partition/WritablePartition.js

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,15 @@
11
const fs = require('fs');
22
const mkdirpSync = require('mkdirp').sync;
33
const ReadablePartition = require('./ReadablePartition');
4-
const { assert, buildMetadataHeader } = require('../util');
4+
const { assert, buildMetadataHeader, alignTo } = require('../util');
55
const Clock = require('../Clock');
66

77
const DEFAULT_WRITE_BUFFER_SIZE = 16 * 1024;
8-
const DOCUMENT_HEADER_SIZE = 16;
9-
const DOCUMENT_ALIGNMENT = 4;
10-
const DOCUMENT_PAD = ' '.repeat(15) + "\n";
8+
const { DOCUMENT_ALIGNMENT, DOCUMENT_SEPARATOR, DOCUMENT_HEADER_SIZE, DOCUMENT_FOOTER_SIZE } = ReadablePartition;
9+
const DOCUMENT_PAD = ' '.repeat(DOCUMENT_ALIGNMENT);
1110

1211
const NES_EPOCH = new Date('2020-01-01T00:00:00');
1312

14-
/**
15-
* @param {number} dataSize
16-
* @returns {string} The data padded to 16 bytes alignment and ended with a line break.
17-
*/
18-
function padData(dataSize) {
19-
const padSize = (DOCUMENT_ALIGNMENT - ((dataSize + 1) % DOCUMENT_ALIGNMENT)) % DOCUMENT_ALIGNMENT;
20-
return DOCUMENT_PAD.substr(-padSize - 1);
21-
}
22-
2313
/**
2414
* A partition is a single file where the storage will write documents to depending on some partitioning rules.
2515
* In the case of an event store, this is most likely the (write) streams.
@@ -83,11 +73,10 @@ class WritablePartition extends ReadablePartition {
8373
this.flushCallbacks = [];
8474

8575
if (super.open() === false) {
86-
const stat = fs.statSync(this.fileName);
87-
if (stat.size !== 0) {
76+
if (this.size !== -this.headerSize) {
77+
// If file is not empty, we can not open and initialize it
8878
return false;
8979
}
90-
this.metadata.epoch = Date.now();
9180
this.writeMetadata();
9281
this.size = 0;
9382
}
@@ -122,6 +111,7 @@ class WritablePartition extends ReadablePartition {
122111
* @returns void
123112
*/
124113
writeMetadata() {
114+
this.metadata.epoch = Date.now();
125115
const metadataBuffer = buildMetadataHeader(ReadablePartition.HEADER_MAGIC, this.metadata);
126116
fs.writeSync(this.fd, metadataBuffer, 0, metadataBuffer.byteLength, 0);
127117
this.headerSize = metadataBuffer.byteLength;
@@ -218,6 +208,12 @@ class WritablePartition extends ReadablePartition {
218208
let bytesWritten = 0;
219209
bytesWritten += fs.writeSync(this.fd, dataHeader);
220210
bytesWritten += fs.writeSync(this.fd, data);
211+
const padSize = alignTo(dataSize + DOCUMENT_FOOTER_SIZE, DOCUMENT_ALIGNMENT);
212+
bytesWritten += fs.writeSync(this.fd, DOCUMENT_PAD.substr(0, padSize));
213+
const dataSizeBuffer = Buffer.alloc(4);
214+
dataSizeBuffer.writeUInt32BE(dataSize, 0);
215+
bytesWritten += fs.writeSync(this.fd, dataSizeBuffer);
216+
bytesWritten += fs.writeSync(this.fd, DOCUMENT_SEPARATOR);
221217
if (typeof callback === 'function') {
222218
process.nextTick(callback);
223219
}
@@ -234,12 +230,17 @@ class WritablePartition extends ReadablePartition {
234230
* @returns {number} Number of bytes written.
235231
*/
236232
writeBuffered(data, dataSize, sequenceNumber, callback) {
237-
const bytesToWrite = Buffer.byteLength(data, 'utf8') + DOCUMENT_HEADER_SIZE;
233+
const bytesToWrite = this.documentWriteSize(Buffer.byteLength(data, 'utf8'));
238234
this.flushIfWriteBufferTooSmall(bytesToWrite);
239235

240236
let bytesWritten = 0;
241237
bytesWritten += this.writeDocumentHeader(this.writeBuffer, this.writeBufferCursor, dataSize, sequenceNumber);
242238
bytesWritten += this.writeBuffer.write(data, this.writeBufferCursor + bytesWritten, 'utf8');
239+
const padSize = alignTo(dataSize + DOCUMENT_FOOTER_SIZE, DOCUMENT_ALIGNMENT);
240+
bytesWritten += this.writeBuffer.write(DOCUMENT_PAD.substr(0, padSize), this.writeBufferCursor + bytesWritten, 'utf8');
241+
this.writeBuffer.writeUInt32BE(dataSize, this.writeBufferCursor + bytesWritten);
242+
bytesWritten += 4;
243+
bytesWritten += this.writeBuffer.write(DOCUMENT_SEPARATOR, this.writeBufferCursor + bytesWritten, 'utf8');
243244
this.writeBufferCursor += bytesWritten;
244245
this.writeBufferDocuments++;
245246
if (typeof callback === 'function') {
@@ -267,7 +268,6 @@ class WritablePartition extends ReadablePartition {
267268
const dataSize = Buffer.byteLength(data, 'utf8');
268269
assert(dataSize <= 64 * 1024 * 1024, 'Document is too large! Maximum is 64 MB');
269270

270-
data += padData(dataSize);
271271
const dataPosition = this.size;
272272
if (dataSize + DOCUMENT_HEADER_SIZE >= this.writeBuffer.byteLength * 4 / 5) {
273273
this.size += this.writeUnbuffered(data, dataSize, sequenceNumber, callback);
@@ -319,9 +319,7 @@ class WritablePartition extends ReadablePartition {
319319
if (after > this.size) {
320320
return;
321321
}
322-
if (after < 0) {
323-
after = 0;
324-
}
322+
after = Math.max(0, after);
325323
this.flush();
326324

327325
let position = after, data;

src/util.js

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,17 @@ function assert(condition, message, ErrorType = Error) {
2626
}
2727
}
2828

29+
/**
30+
* Return the amount required to align value to the given alignment.
31+
* It calculates the difference of the alignment and the modulo of value by alignment.
32+
* @param {number} value
33+
* @param {number} alignment
34+
* @returns {number}
35+
*/
36+
function alignTo(value, alignment) {
37+
return (alignment - (value % alignment)) % alignment;
38+
}
39+
2940
/**
3041
* @param {string} secret The secret to use for calculating further HMACs
3142
* @returns {function(string)} A function that calculates the HMAC for a given string
@@ -181,5 +192,6 @@ module.exports = {
181192
matches,
182193
buildMetadataForMatcher,
183194
buildMatcherFromMetadata,
184-
buildMetadataHeader
195+
buildMetadataHeader,
196+
alignTo
185197
};

0 commit comments

Comments
 (0)