Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Partition/ReadOnlyPartition.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class ReadOnlyPartition extends WatchesFile(ReadablePartition) {
* @param {string} filename
*/
onChange(filename) {
/* istanbul ignore if */
if (!this.fd) {
return;
}
Expand Down
98 changes: 87 additions & 11 deletions src/Partition/ReadablePartition.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
const fs = require('fs');
const path = require('path');
const events = require('events');
const { assert } = require('../util');
const { assert, alignTo } = require('../util');

const DEFAULT_READ_BUFFER_SIZE = 64 * 1024;
const DOCUMENT_HEADER_SIZE = 16;
const DOCUMENT_ALIGNMENT = 4;
const DOCUMENT_SEPARATOR = "\x00\x00\x1E\n";
const DOCUMENT_FOOTER_SIZE = 4 /* additional data size footer */ + DOCUMENT_SEPARATOR.length;

// node-event-store partition V02
const HEADER_MAGIC = "nesprt02";
// node-event-store partition V03
const HEADER_MAGIC = "nesprt03";

class CorruptFileError extends Error {}
class InvalidDataSizeError extends Error {}
Expand All @@ -20,6 +22,7 @@ class InvalidDataSizeError extends Error {}
* @returns {number}
*/
function hash(str) {
/* istanbul ignore if */
if (str.length === 0) {
return 0;
}
Expand Down Expand Up @@ -160,8 +163,8 @@ class ReadablePartition extends events.EventEmitter {
* @returns {number} The size of the data including header, padded to 16 bytes alignment and ended with a line break.
*/
documentWriteSize(dataSize) {
const padSize = (DOCUMENT_ALIGNMENT - ((dataSize + 1) % DOCUMENT_ALIGNMENT)) % DOCUMENT_ALIGNMENT;
return dataSize + 1 + padSize + DOCUMENT_HEADER_SIZE;
const padSize = alignTo(dataSize + DOCUMENT_FOOTER_SIZE, DOCUMENT_ALIGNMENT);
return DOCUMENT_HEADER_SIZE + dataSize + padSize + DOCUMENT_FOOTER_SIZE;
}

/**
Expand Down Expand Up @@ -208,7 +211,7 @@ class ReadablePartition extends events.EventEmitter {
* @param {number} offset The position inside the buffer to start reading from.
* @param {number} position The file position to start reading from.
* @param {number} [size] The expected byte size of the document at the given position.
* @returns {{ dataSize: number, sequenceNumber, number, time64: number }} The metadata fields of the document
* @returns {{ dataSize: number, sequenceNumber: number, time64: number }} The metadata fields of the document
* @throws {Error} if the storage entry at the given position is corrupted.
* @throws {InvalidDataSizeError} if the document size at the given position does not match the provided size.
* @throws {CorruptFileError} if the document at the given position can not be read completely.
Expand All @@ -221,7 +224,8 @@ class ReadablePartition extends events.EventEmitter {
throw new InvalidDataSizeError(`Invalid document size ${dataSize} at position ${position}, expected ${size}.`);
}

if (position + dataSize + DOCUMENT_HEADER_SIZE > this.size) {
const writeSize = this.documentWriteSize(dataSize);
if (position + writeSize > this.size) {
throw new CorruptFileError(`Invalid document at position ${position}. This may be caused by an unfinished write.`);
}

Expand Down Expand Up @@ -249,6 +253,25 @@ class ReadablePartition extends events.EventEmitter {
return ({ buffer: this.readBuffer, cursor: bufferCursor, length: this.readBufferLength });
}

/**
* Prepare the read buffer for reading *before* the specified position. Don't try to reader *after* the returned cursor.
*
* @protected
* @param {number} position The position in the file to prepare the read buffer for reading before.
* @returns {{ buffer: Buffer|null, cursor: number, length: number }} A reader object with properties `buffer`, `cursor` and `length`.
*/
prepareReadBufferBackwards(position) {
if (position < 0) {
return ({ buffer: null, cursor: 0, length: 0 });
}
let bufferCursor = position - this.readBufferPos;
if (this.readBufferPos < 0 || (this.readBufferPos > 0 && bufferCursor < DOCUMENT_FOOTER_SIZE)) {
this.fillBuffer(Math.max(position - this.readBuffer.byteLength, 0));
bufferCursor = position - this.readBufferPos;
}
return ({ buffer: this.readBuffer, cursor: bufferCursor, length: this.readBufferLength });
}

/**
* Read the data from the given position.
*
Expand All @@ -265,7 +288,7 @@ class ReadablePartition extends events.EventEmitter {
return false;
}

assert((position % DOCUMENT_ALIGNMENT) === 0, `Invalid read position. Needs to be a multiple of ${DOCUMENT_ALIGNMENT}.`);
assert((position % DOCUMENT_ALIGNMENT) === 0, `Invalid read position ${position}. Needs to be a multiple of ${DOCUMENT_ALIGNMENT}.`);

const reader = this.prepareReadBuffer(position);
if (reader.length < size + DOCUMENT_HEADER_SIZE) {
Expand All @@ -290,22 +313,75 @@ class ReadablePartition extends events.EventEmitter {
return reader.buffer.toString('utf8', dataPosition, dataPosition + dataSize);
}

/**
* Find the start position of the document that precedes the given position.
*
* @protected
* @param {number} position The file position to read backwards from.
* @returns {number|boolean} The start position of the first document before the given position or false if no header could be found.
*/
findDocumentPositionBefore(position) {
assert(this.fd, 'Partition is not opened.');
position -= (position % DOCUMENT_ALIGNMENT);
if (position <= 0) {
return false;
}

const separatorSize = DOCUMENT_SEPARATOR.length;
// Optimization if we are at an exact document boundary, where we can just read the document size
let reader = this.prepareReadBufferBackwards(position);
const block = reader.buffer.toString('ascii', reader.cursor - separatorSize, reader.cursor);
if (block === DOCUMENT_SEPARATOR) {
const dataSize = reader.buffer.readUInt32BE(reader.cursor - separatorSize - 4);
return position - this.documentWriteSize(dataSize);
}

do {
reader = this.prepareReadBufferBackwards(position - separatorSize);

const bufferSeparatorPosition = reader.buffer.lastIndexOf(DOCUMENT_SEPARATOR, reader.cursor - separatorSize, 'ascii');
if (bufferSeparatorPosition >= 0) {
position = this.readBufferPos + bufferSeparatorPosition + separatorSize;
break;
}
position -= this.readBufferLength;
} while (position > 0);
return position;
}

/**
* @api
* @param {number} [after] The document position to start reading from.
* @returns {Generator<string>} A generator that returns all documents in this partition.
*/
*readAll() {
let position = 0;
*readAll(after = 0) {
let position = after < 0 ? this.size + after + 1 : after;
let data;
while ((data = this.readFrom(position)) !== false) {
yield data;
position += this.documentWriteSize(Buffer.byteLength(data, 'utf8'));
}
}

/**
* @api
* @param {number} [before] The document position to start reading backward from.
* @returns {Generator<string>} A generator that returns all documents in this partition in reverse order.
*/
*readAllBackwards(before = -1) {
let position = before < 0 ? this.size + before + 1 : before;
while ((position = this.findDocumentPositionBefore(position)) !== false) {
const data = this.readFrom(position);
yield data;
}
}
}

module.exports = ReadablePartition;
module.exports.CorruptFileError = CorruptFileError;
module.exports.InvalidDataSizeError = InvalidDataSizeError;
module.exports.HEADER_MAGIC = HEADER_MAGIC;
module.exports.HEADER_MAGIC = HEADER_MAGIC;
module.exports.DOCUMENT_SEPARATOR = DOCUMENT_SEPARATOR;
module.exports.DOCUMENT_ALIGNMENT = DOCUMENT_ALIGNMENT;
module.exports.DOCUMENT_HEADER_SIZE = DOCUMENT_HEADER_SIZE;
module.exports.DOCUMENT_FOOTER_SIZE = DOCUMENT_FOOTER_SIZE;
40 changes: 19 additions & 21 deletions src/Partition/WritablePartition.js
Original file line number Diff line number Diff line change
@@ -1,25 +1,15 @@
const fs = require('fs');
const mkdirpSync = require('mkdirp').sync;
const ReadablePartition = require('./ReadablePartition');
const { assert, buildMetadataHeader } = require('../util');
const { assert, buildMetadataHeader, alignTo } = require('../util');
const Clock = require('../Clock');

const DEFAULT_WRITE_BUFFER_SIZE = 16 * 1024;
const DOCUMENT_HEADER_SIZE = 16;
const DOCUMENT_ALIGNMENT = 4;
const DOCUMENT_PAD = ' '.repeat(15) + "\n";
const { DOCUMENT_ALIGNMENT, DOCUMENT_SEPARATOR, DOCUMENT_HEADER_SIZE, DOCUMENT_FOOTER_SIZE } = ReadablePartition;
const DOCUMENT_PAD = ' '.repeat(DOCUMENT_ALIGNMENT);

const NES_EPOCH = new Date('2020-01-01T00:00:00');

/**
* @param {number} dataSize
* @returns {string} The data padded to 16 bytes alignment and ended with a line break.
*/
function padData(dataSize) {
const padSize = (DOCUMENT_ALIGNMENT - ((dataSize + 1) % DOCUMENT_ALIGNMENT)) % DOCUMENT_ALIGNMENT;
return DOCUMENT_PAD.substr(-padSize - 1);
}

/**
* A partition is a single file where the storage will write documents to depending on some partitioning rules.
* In the case of an event store, this is most likely the (write) streams.
Expand Down Expand Up @@ -83,11 +73,10 @@ class WritablePartition extends ReadablePartition {
this.flushCallbacks = [];

if (super.open() === false) {
const stat = fs.statSync(this.fileName);
if (stat.size !== 0) {
if (this.size !== -this.headerSize) {
// If file is not empty, we can not open and initialize it
return false;
}
this.metadata.epoch = Date.now();
this.writeMetadata();
this.size = 0;
}
Expand Down Expand Up @@ -122,6 +111,7 @@ class WritablePartition extends ReadablePartition {
* @returns void
*/
writeMetadata() {
this.metadata.epoch = Date.now();
const metadataBuffer = buildMetadataHeader(ReadablePartition.HEADER_MAGIC, this.metadata);
fs.writeSync(this.fd, metadataBuffer, 0, metadataBuffer.byteLength, 0);
this.headerSize = metadataBuffer.byteLength;
Expand Down Expand Up @@ -218,6 +208,12 @@ class WritablePartition extends ReadablePartition {
let bytesWritten = 0;
bytesWritten += fs.writeSync(this.fd, dataHeader);
bytesWritten += fs.writeSync(this.fd, data);
const padSize = alignTo(dataSize + DOCUMENT_FOOTER_SIZE, DOCUMENT_ALIGNMENT);
bytesWritten += fs.writeSync(this.fd, DOCUMENT_PAD.substr(0, padSize));
const dataSizeBuffer = Buffer.alloc(4);
dataSizeBuffer.writeUInt32BE(dataSize, 0);
bytesWritten += fs.writeSync(this.fd, dataSizeBuffer);
bytesWritten += fs.writeSync(this.fd, DOCUMENT_SEPARATOR);
if (typeof callback === 'function') {
process.nextTick(callback);
}
Expand All @@ -234,12 +230,17 @@ class WritablePartition extends ReadablePartition {
* @returns {number} Number of bytes written.
*/
writeBuffered(data, dataSize, sequenceNumber, callback) {
const bytesToWrite = Buffer.byteLength(data, 'utf8') + DOCUMENT_HEADER_SIZE;
const bytesToWrite = this.documentWriteSize(Buffer.byteLength(data, 'utf8'));
this.flushIfWriteBufferTooSmall(bytesToWrite);

let bytesWritten = 0;
bytesWritten += this.writeDocumentHeader(this.writeBuffer, this.writeBufferCursor, dataSize, sequenceNumber);
bytesWritten += this.writeBuffer.write(data, this.writeBufferCursor + bytesWritten, 'utf8');
const padSize = alignTo(dataSize + DOCUMENT_FOOTER_SIZE, DOCUMENT_ALIGNMENT);
bytesWritten += this.writeBuffer.write(DOCUMENT_PAD.substr(0, padSize), this.writeBufferCursor + bytesWritten, 'utf8');
this.writeBuffer.writeUInt32BE(dataSize, this.writeBufferCursor + bytesWritten);
bytesWritten += 4;
bytesWritten += this.writeBuffer.write(DOCUMENT_SEPARATOR, this.writeBufferCursor + bytesWritten, 'utf8');
this.writeBufferCursor += bytesWritten;
this.writeBufferDocuments++;
if (typeof callback === 'function') {
Expand Down Expand Up @@ -267,7 +268,6 @@ class WritablePartition extends ReadablePartition {
const dataSize = Buffer.byteLength(data, 'utf8');
assert(dataSize <= 64 * 1024 * 1024, 'Document is too large! Maximum is 64 MB');

data += padData(dataSize);
const dataPosition = this.size;
if (dataSize + DOCUMENT_HEADER_SIZE >= this.writeBuffer.byteLength * 4 / 5) {
this.size += this.writeUnbuffered(data, dataSize, sequenceNumber, callback);
Expand Down Expand Up @@ -319,9 +319,7 @@ class WritablePartition extends ReadablePartition {
if (after > this.size) {
return;
}
if (after < 0) {
after = 0;
}
after = Math.max(0, after);
this.flush();

let position = after, data;
Expand Down
14 changes: 13 additions & 1 deletion src/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@ function assert(condition, message, ErrorType = Error) {
}
}

/**
* Return the amount required to align value to the given alignment.
* It calculates the difference of the alignment and the modulo of value by alignment.
* @param {number} value
* @param {number} alignment
* @returns {number}
*/
function alignTo(value, alignment) {
return (alignment - (value % alignment)) % alignment;
}

/**
* @param {string} secret The secret to use for calculating further HMACs
* @returns {function(string)} A function that calculates the HMAC for a given string
Expand Down Expand Up @@ -177,5 +188,6 @@ module.exports = {
matches,
buildMetadataForMatcher,
buildMatcherFromMetadata,
buildMetadataHeader
buildMetadataHeader,
alignTo
};
Loading