diff --git a/src/Partition/ReadOnlyPartition.js b/src/Partition/ReadOnlyPartition.js index 9b8f492..948168e 100644 --- a/src/Partition/ReadOnlyPartition.js +++ b/src/Partition/ReadOnlyPartition.js @@ -18,6 +18,7 @@ class ReadOnlyPartition extends WatchesFile(ReadablePartition) { * @param {string} filename */ onChange(filename) { + /* istanbul ignore if */ if (!this.fd) { return; } diff --git a/src/Partition/ReadablePartition.js b/src/Partition/ReadablePartition.js index e3dded9..b462ad9 100644 --- a/src/Partition/ReadablePartition.js +++ b/src/Partition/ReadablePartition.js @@ -1,14 +1,16 @@ const fs = require('fs'); const path = require('path'); const events = require('events'); -const { assert } = require('../util'); +const { assert, alignTo } = require('../util'); const DEFAULT_READ_BUFFER_SIZE = 64 * 1024; const DOCUMENT_HEADER_SIZE = 16; const DOCUMENT_ALIGNMENT = 4; +const DOCUMENT_SEPARATOR = "\x00\x00\x1E\n"; +const DOCUMENT_FOOTER_SIZE = 4 /* additional data size footer */ + DOCUMENT_SEPARATOR.length; -// node-event-store partition V02 -const HEADER_MAGIC = "nesprt02"; +// node-event-store partition V03 +const HEADER_MAGIC = "nesprt03"; class CorruptFileError extends Error {} class InvalidDataSizeError extends Error {} @@ -20,6 +22,7 @@ class InvalidDataSizeError extends Error {} * @returns {number} */ function hash(str) { + /* istanbul ignore if */ if (str.length === 0) { return 0; } @@ -160,8 +163,8 @@ class ReadablePartition extends events.EventEmitter { * @returns {number} The size of the data including header, padded to 16 bytes alignment and ended with a line break. */ documentWriteSize(dataSize) { - const padSize = (DOCUMENT_ALIGNMENT - ((dataSize + 1) % DOCUMENT_ALIGNMENT)) % DOCUMENT_ALIGNMENT; - return dataSize + 1 + padSize + DOCUMENT_HEADER_SIZE; + const padSize = alignTo(dataSize + DOCUMENT_FOOTER_SIZE, DOCUMENT_ALIGNMENT); + return DOCUMENT_HEADER_SIZE + dataSize + padSize + DOCUMENT_FOOTER_SIZE; } /** @@ -208,7 +211,7 @@ class ReadablePartition extends events.EventEmitter { * @param {number} offset The position inside the buffer to start reading from. * @param {number} position The file position to start reading from. * @param {number} [size] The expected byte size of the document at the given position. - * @returns {{ dataSize: number, sequenceNumber, number, time64: number }} The metadata fields of the document + * @returns {{ dataSize: number, sequenceNumber: number, time64: number }} The metadata fields of the document * @throws {Error} if the storage entry at the given position is corrupted. * @throws {InvalidDataSizeError} if the document size at the given position does not match the provided size. * @throws {CorruptFileError} if the document at the given position can not be read completely. @@ -221,7 +224,8 @@ class ReadablePartition extends events.EventEmitter { throw new InvalidDataSizeError(`Invalid document size ${dataSize} at position ${position}, expected ${size}.`); } - if (position + dataSize + DOCUMENT_HEADER_SIZE > this.size) { + const writeSize = this.documentWriteSize(dataSize); + if (position + writeSize > this.size) { throw new CorruptFileError(`Invalid document at position ${position}. This may be caused by an unfinished write.`); } @@ -249,6 +253,25 @@ class ReadablePartition extends events.EventEmitter { return ({ buffer: this.readBuffer, cursor: bufferCursor, length: this.readBufferLength }); } + /** + * Prepare the read buffer for reading *before* the specified position. Don't try to reader *after* the returned cursor. + * + * @protected + * @param {number} position The position in the file to prepare the read buffer for reading before. + * @returns {{ buffer: Buffer|null, cursor: number, length: number }} A reader object with properties `buffer`, `cursor` and `length`. + */ + prepareReadBufferBackwards(position) { + if (position < 0) { + return ({ buffer: null, cursor: 0, length: 0 }); + } + let bufferCursor = position - this.readBufferPos; + if (this.readBufferPos < 0 || (this.readBufferPos > 0 && bufferCursor < DOCUMENT_FOOTER_SIZE)) { + this.fillBuffer(Math.max(position - this.readBuffer.byteLength, 0)); + bufferCursor = position - this.readBufferPos; + } + return ({ buffer: this.readBuffer, cursor: bufferCursor, length: this.readBufferLength }); + } + /** * Read the data from the given position. * @@ -265,7 +288,7 @@ class ReadablePartition extends events.EventEmitter { return false; } - assert((position % DOCUMENT_ALIGNMENT) === 0, `Invalid read position. Needs to be a multiple of ${DOCUMENT_ALIGNMENT}.`); + assert((position % DOCUMENT_ALIGNMENT) === 0, `Invalid read position ${position}. Needs to be a multiple of ${DOCUMENT_ALIGNMENT}.`); const reader = this.prepareReadBuffer(position); if (reader.length < size + DOCUMENT_HEADER_SIZE) { @@ -290,12 +313,49 @@ class ReadablePartition extends events.EventEmitter { return reader.buffer.toString('utf8', dataPosition, dataPosition + dataSize); } + /** + * Find the start position of the document that precedes the given position. + * + * @protected + * @param {number} position The file position to read backwards from. + * @returns {number|boolean} The start position of the first document before the given position or false if no header could be found. + */ + findDocumentPositionBefore(position) { + assert(this.fd, 'Partition is not opened.'); + position -= (position % DOCUMENT_ALIGNMENT); + if (position <= 0) { + return false; + } + + const separatorSize = DOCUMENT_SEPARATOR.length; + // Optimization if we are at an exact document boundary, where we can just read the document size + let reader = this.prepareReadBufferBackwards(position); + const block = reader.buffer.toString('ascii', reader.cursor - separatorSize, reader.cursor); + if (block === DOCUMENT_SEPARATOR) { + const dataSize = reader.buffer.readUInt32BE(reader.cursor - separatorSize - 4); + return position - this.documentWriteSize(dataSize); + } + + do { + reader = this.prepareReadBufferBackwards(position - separatorSize); + + const bufferSeparatorPosition = reader.buffer.lastIndexOf(DOCUMENT_SEPARATOR, reader.cursor - separatorSize, 'ascii'); + if (bufferSeparatorPosition >= 0) { + position = this.readBufferPos + bufferSeparatorPosition + separatorSize; + break; + } + position -= this.readBufferLength; + } while (position > 0); + return position; + } + /** * @api + * @param {number} [after] The document position to start reading from. * @returns {Generator} A generator that returns all documents in this partition. */ - *readAll() { - let position = 0; + *readAll(after = 0) { + let position = after < 0 ? this.size + after + 1 : after; let data; while ((data = this.readFrom(position)) !== false) { yield data; @@ -303,9 +363,25 @@ class ReadablePartition extends events.EventEmitter { } } + /** + * @api + * @param {number} [before] The document position to start reading backward from. + * @returns {Generator} A generator that returns all documents in this partition in reverse order. + */ + *readAllBackwards(before = -1) { + let position = before < 0 ? this.size + before + 1 : before; + while ((position = this.findDocumentPositionBefore(position)) !== false) { + const data = this.readFrom(position); + yield data; + } + } } module.exports = ReadablePartition; module.exports.CorruptFileError = CorruptFileError; module.exports.InvalidDataSizeError = InvalidDataSizeError; -module.exports.HEADER_MAGIC = HEADER_MAGIC; \ No newline at end of file +module.exports.HEADER_MAGIC = HEADER_MAGIC; +module.exports.DOCUMENT_SEPARATOR = DOCUMENT_SEPARATOR; +module.exports.DOCUMENT_ALIGNMENT = DOCUMENT_ALIGNMENT; +module.exports.DOCUMENT_HEADER_SIZE = DOCUMENT_HEADER_SIZE; +module.exports.DOCUMENT_FOOTER_SIZE = DOCUMENT_FOOTER_SIZE; \ No newline at end of file diff --git a/src/Partition/WritablePartition.js b/src/Partition/WritablePartition.js index 5ef1719..b7835cb 100644 --- a/src/Partition/WritablePartition.js +++ b/src/Partition/WritablePartition.js @@ -1,25 +1,15 @@ const fs = require('fs'); const mkdirpSync = require('mkdirp').sync; const ReadablePartition = require('./ReadablePartition'); -const { assert, buildMetadataHeader } = require('../util'); +const { assert, buildMetadataHeader, alignTo } = require('../util'); const Clock = require('../Clock'); const DEFAULT_WRITE_BUFFER_SIZE = 16 * 1024; -const DOCUMENT_HEADER_SIZE = 16; -const DOCUMENT_ALIGNMENT = 4; -const DOCUMENT_PAD = ' '.repeat(15) + "\n"; +const { DOCUMENT_ALIGNMENT, DOCUMENT_SEPARATOR, DOCUMENT_HEADER_SIZE, DOCUMENT_FOOTER_SIZE } = ReadablePartition; +const DOCUMENT_PAD = ' '.repeat(DOCUMENT_ALIGNMENT); const NES_EPOCH = new Date('2020-01-01T00:00:00'); -/** - * @param {number} dataSize - * @returns {string} The data padded to 16 bytes alignment and ended with a line break. - */ -function padData(dataSize) { - const padSize = (DOCUMENT_ALIGNMENT - ((dataSize + 1) % DOCUMENT_ALIGNMENT)) % DOCUMENT_ALIGNMENT; - return DOCUMENT_PAD.substr(-padSize - 1); -} - /** * A partition is a single file where the storage will write documents to depending on some partitioning rules. * In the case of an event store, this is most likely the (write) streams. @@ -83,11 +73,10 @@ class WritablePartition extends ReadablePartition { this.flushCallbacks = []; if (super.open() === false) { - const stat = fs.statSync(this.fileName); - if (stat.size !== 0) { + if (this.size !== -this.headerSize) { + // If file is not empty, we can not open and initialize it return false; } - this.metadata.epoch = Date.now(); this.writeMetadata(); this.size = 0; } @@ -122,6 +111,7 @@ class WritablePartition extends ReadablePartition { * @returns void */ writeMetadata() { + this.metadata.epoch = Date.now(); const metadataBuffer = buildMetadataHeader(ReadablePartition.HEADER_MAGIC, this.metadata); fs.writeSync(this.fd, metadataBuffer, 0, metadataBuffer.byteLength, 0); this.headerSize = metadataBuffer.byteLength; @@ -218,6 +208,12 @@ class WritablePartition extends ReadablePartition { let bytesWritten = 0; bytesWritten += fs.writeSync(this.fd, dataHeader); bytesWritten += fs.writeSync(this.fd, data); + const padSize = alignTo(dataSize + DOCUMENT_FOOTER_SIZE, DOCUMENT_ALIGNMENT); + bytesWritten += fs.writeSync(this.fd, DOCUMENT_PAD.substr(0, padSize)); + const dataSizeBuffer = Buffer.alloc(4); + dataSizeBuffer.writeUInt32BE(dataSize, 0); + bytesWritten += fs.writeSync(this.fd, dataSizeBuffer); + bytesWritten += fs.writeSync(this.fd, DOCUMENT_SEPARATOR); if (typeof callback === 'function') { process.nextTick(callback); } @@ -234,12 +230,17 @@ class WritablePartition extends ReadablePartition { * @returns {number} Number of bytes written. */ writeBuffered(data, dataSize, sequenceNumber, callback) { - const bytesToWrite = Buffer.byteLength(data, 'utf8') + DOCUMENT_HEADER_SIZE; + const bytesToWrite = this.documentWriteSize(Buffer.byteLength(data, 'utf8')); this.flushIfWriteBufferTooSmall(bytesToWrite); let bytesWritten = 0; bytesWritten += this.writeDocumentHeader(this.writeBuffer, this.writeBufferCursor, dataSize, sequenceNumber); bytesWritten += this.writeBuffer.write(data, this.writeBufferCursor + bytesWritten, 'utf8'); + const padSize = alignTo(dataSize + DOCUMENT_FOOTER_SIZE, DOCUMENT_ALIGNMENT); + bytesWritten += this.writeBuffer.write(DOCUMENT_PAD.substr(0, padSize), this.writeBufferCursor + bytesWritten, 'utf8'); + this.writeBuffer.writeUInt32BE(dataSize, this.writeBufferCursor + bytesWritten); + bytesWritten += 4; + bytesWritten += this.writeBuffer.write(DOCUMENT_SEPARATOR, this.writeBufferCursor + bytesWritten, 'utf8'); this.writeBufferCursor += bytesWritten; this.writeBufferDocuments++; if (typeof callback === 'function') { @@ -267,7 +268,6 @@ class WritablePartition extends ReadablePartition { const dataSize = Buffer.byteLength(data, 'utf8'); assert(dataSize <= 64 * 1024 * 1024, 'Document is too large! Maximum is 64 MB'); - data += padData(dataSize); const dataPosition = this.size; if (dataSize + DOCUMENT_HEADER_SIZE >= this.writeBuffer.byteLength * 4 / 5) { this.size += this.writeUnbuffered(data, dataSize, sequenceNumber, callback); @@ -319,9 +319,7 @@ class WritablePartition extends ReadablePartition { if (after > this.size) { return; } - if (after < 0) { - after = 0; - } + after = Math.max(0, after); this.flush(); let position = after, data; diff --git a/src/util.js b/src/util.js index aad331c..703d805 100644 --- a/src/util.js +++ b/src/util.js @@ -26,6 +26,17 @@ function assert(condition, message, ErrorType = Error) { } } +/** + * Return the amount required to align value to the given alignment. + * It calculates the difference of the alignment and the modulo of value by alignment. + * @param {number} value + * @param {number} alignment + * @returns {number} + */ +function alignTo(value, alignment) { + return (alignment - (value % alignment)) % alignment; +} + /** * @param {string} secret The secret to use for calculating further HMACs * @returns {function(string)} A function that calculates the HMAC for a given string @@ -177,5 +188,6 @@ module.exports = { matches, buildMetadataForMatcher, buildMatcherFromMetadata, - buildMetadataHeader + buildMetadataHeader, + alignTo }; \ No newline at end of file diff --git a/test/Partition.spec.js b/test/Partition.spec.js index ba1f261..a42b668 100644 --- a/test/Partition.spec.js +++ b/test/Partition.spec.js @@ -26,8 +26,8 @@ describe('Partition', function() { /** * @returns {ReadOnlyPartition} */ - function createReader() { - const reader = new Partition.ReadOnly(partition.name, { dataDirectory }); + function createReader(options = {}) { + const reader = new Partition.ReadOnly(partition.name, { ...options, dataDirectory }); readers[readers.length] = reader; return reader; } @@ -156,6 +156,80 @@ describe('Partition', function() { }); + describe('readAll', function() { + + it('reads all documents in write order', function() { + partition.open(); + fillPartition(50, i => 'foo-' + i.toString()); + partition.close(); + partition.open(); + let i = 1; + for (let data of partition.readAll()) { + expect(data).to.be('foo-' + i.toString()); + i++; + } + expect(i).to.be(51); + }); + + it('reads all documents in write order from arbitrary position', function() { + partition.open(); + fillPartition(50, i => 'foo-' + i.toString()); + partition.close(); + partition.open(); + let i = 50; + for (let data of partition.readAll(-partition.documentWriteSize('foo-50'.length)-1)) { + expect(data).to.be('foo-' + i.toString()); + i++; + } + expect(i).to.be(51); + }); + + it('reads all documents in backwards write order', function() { + partition.open(); + fillPartition(50, i => 'foo-' + i.toString()); + partition.close(); + partition.open(); + let i = 50; + for (let data of partition.readAllBackwards()) { + expect(data).to.be('foo-' + i.toString()); + i--; + } + expect(i).to.be(0); + }); + + it('reads all documents in backwards write order from arbitary position', function() { + partition.open(); + fillPartition(50, i => 'foo-' + i.toString()); + partition.close(); + partition.open(); + let i = 50; + for (let data of partition.readAllBackwards(-9)) { + expect(data).to.be('foo-' + i.toString()); + i--; + } + expect(i).to.be(0); + i = 50; + for (let data of partition.readAllBackwards(partition.size - 12)) { + expect(data).to.be('foo-' + i.toString()); + i--; + } + expect(i).to.be(0); + }); + + it('can find document boundaries by scanning across readbuffers', function() { + partition.open(); + fillPartition(2, i => '0xFF'.repeat(64)); + const lastPosition = partition.write('0xFF'.repeat(64)); + partition.close(); + + const reader = createReader({ readBufferSize: 64 }); + reader.open(); + expect(reader.findDocumentPositionBefore(reader.size - 8)).to.be(lastPosition); + reader.close(); + }); + + }); + describe('readFrom', function() { it('returns false when partition is not open', function() { diff --git a/test/Storage.spec.js b/test/Storage.spec.js index 3691ec2..f72e91e 100644 --- a/test/Storage.spec.js +++ b/test/Storage.spec.js @@ -239,6 +239,23 @@ describe('Storage', function() { expect(index.isOpen()).to.be(true); }); + it('works with arbitrarily sized documents', function() { + storage = createStorage({ writeBufferSize: 1024 }); + storage.open(); + + for (let i = 1; i <= 10; i++) { + storage.write({ foo: i, pad: ' '.repeat(storage.partitionConfig.writeBufferSize * i / 12) }); + } + + storage.close(); + storage = createStorage(); + storage.open(); + + for (let i = 1; i <= 8; i++) { + expect(storage.read(i).foo).to.eql(i); + } + }); + }); describe('readRange', function() { @@ -981,5 +998,20 @@ describe('Storage', function() { reader.close(); }); + it('partitions are opened only once', function(done){ + storage = createStorage({ syncOnFlush: true, partitioner: (document, number) => document.type }); + storage.open(); + + let reader = createReader(); + reader.open(); + reader.on('partition-created', (id) => { + const partitionInstance = reader.getPartition(id); + expect(reader.getPartition(id)).to.be(partitionInstance); + reader.close(); + done(); + }); + + storage.getPartition(''); + }); }); });