diff --git a/README.md b/README.md index f4c51d6..d7af612 100644 --- a/README.md +++ b/README.md @@ -413,7 +413,18 @@ leads to every single document being flushed directly. #### Consistency -Since the storage is append-only, consistency is automatically guaranteed. +Since the storage is append-only, consistency is automatically guaranteed for all successful writes. Writes that fail in +the middle, e.g. because the machine crashes before the full write buffer is flushed, will lead to a torn write. This is +a partial invalid write. To recover from such a state, the storage will detect torn writes and truncate them when an existing +lock is reclaimed. This can be done by instantiating the store with the following option: + +```javascript +const eventstore = new EventStore('my-event-store', { storageConfig: { lock: EventStore.LOCK_RECLAIM } }); +``` + +Note that this option will effectively bypass the lock that prevents multiple instances from being created, so you should +not use this carelessly. Having multiple instances write to the same files will lead to inconsistent data that can not be +easily recovered from. #### Isolation diff --git a/src/Consumer.js b/src/Consumer.js index aae189a..cb2e887 100644 --- a/src/Consumer.js +++ b/src/Consumer.js @@ -1,8 +1,7 @@ const stream = require('stream'); const fs = require('fs'); const path = require('path'); -const mkdirpSync = require('mkdirp').sync; -const { assert } = require('./util'); +const { assert, ensureDirectory } = require('./util'); const Storage = require('./Storage/ReadableStorage'); const MAX_CATCHUP_BATCH = 10; @@ -59,9 +58,7 @@ class Consumer extends stream.Readable { this.indexName = indexName; const consumerDirectory = path.join(this.storage.indexDirectory, 'consumers'); this.fileName = path.join(consumerDirectory, this.storage.storageFile + '.' + indexName + '.' + identifier); - if (!fs.existsSync(consumerDirectory)) { - mkdirpSync(consumerDirectory); - } else { + if (ensureDirectory(consumerDirectory)) { this.cleanUpFailedWrites(); } } diff --git a/src/EventStore.js b/src/EventStore.js index 2fd12be..f31c9af 100644 --- a/src/EventStore.js +++ b/src/EventStore.js @@ -397,4 +397,6 @@ class EventStore extends events.EventEmitter { module.exports = EventStore; module.exports.ExpectedVersion = ExpectedVersion; -module.exports.OptimisticConcurrencyError = OptimisticConcurrencyError; \ No newline at end of file +module.exports.OptimisticConcurrencyError = OptimisticConcurrencyError; +module.exports.LOCK_THROW = Storage.LOCK_THROW; +module.exports.LOCK_RECLAIM = Storage.LOCK_RECLAIM; diff --git a/src/Index/WritableIndex.js b/src/Index/WritableIndex.js index 2e735de..6ba1edc 100644 --- a/src/Index/WritableIndex.js +++ b/src/Index/WritableIndex.js @@ -1,7 +1,6 @@ const fs = require('fs'); -const mkdirpSync = require('mkdirp').sync; const ReadableIndex = require('./ReadableIndex'); -const { assertEqual, buildMetadataHeader } = require('../util'); +const { assertEqual, buildMetadataHeader, ensureDirectory } = require('../util'); /** * An index is a simple append-only file that stores an ordered list of entry elements pointing to the actual file position @@ -45,9 +44,7 @@ class WritableIndex extends ReadableIndex { */ initialize(options) { super.initialize(options); - if (!fs.existsSync(options.dataDirectory)) { - mkdirpSync(options.dataDirectory); - } + ensureDirectory(options.dataDirectory); this.fileMode = 'a+'; this.writeBuffer = Buffer.allocUnsafe(options.writeBufferSize >>> 0); // jshint ignore:line diff --git a/src/Partition/ReadablePartition.js b/src/Partition/ReadablePartition.js index 616d21a..71c7c52 100644 --- a/src/Partition/ReadablePartition.js +++ b/src/Partition/ReadablePartition.js @@ -123,6 +123,21 @@ class ReadablePartition extends events.EventEmitter { return true; } + /** + * @returns {number} -1 if the partition is ok and the sequence number of the broken document if a torn write was detected. + */ + checkTornWrite() { + const reader = this.prepareReadBufferBackwards(this.size); + const separator = reader.buffer.toString('ascii', reader.cursor - DOCUMENT_SEPARATOR.length, reader.cursor); + if (separator !== DOCUMENT_SEPARATOR) { + const position = this.findDocumentPositionBefore(this.size); + const reader = this.prepareReadBuffer(position); + const { sequenceNumber } = this.readDocumentHeader(reader.buffer, reader.cursor, position); + return sequenceNumber; + } + return -1; + } + /** * Read the partition metadata from the file. * @@ -348,7 +363,7 @@ class ReadablePartition extends events.EventEmitter { } position -= this.readBufferLength; } while (position > 0); - return position; + return Math.max(0, position); } /** diff --git a/src/Partition/WritablePartition.js b/src/Partition/WritablePartition.js index 545d0b9..014387a 100644 --- a/src/Partition/WritablePartition.js +++ b/src/Partition/WritablePartition.js @@ -1,7 +1,6 @@ const fs = require('fs'); -const mkdirpSync = require('mkdirp').sync; const ReadablePartition = require('./ReadablePartition'); -const { assert, buildMetadataHeader, alignTo } = require('../util'); +const { assert, buildMetadataHeader, alignTo, ensureDirectory } = require('../util'); const Clock = require('../Clock'); const DEFAULT_WRITE_BUFFER_SIZE = 16 * 1024; @@ -40,9 +39,7 @@ class WritablePartition extends ReadablePartition { config.metadata = Object.assign(defaults.metadata, config.metadata); config = Object.assign(defaults, config); super(name, config); - if (!fs.existsSync(this.dataDirectory)) { - mkdirpSync(this.dataDirectory); - } + ensureDirectory(this.dataDirectory); this.fileMode = 'a+'; this.writeBufferSize = config.writeBufferSize >>> 0; // jshint ignore:line this.maxWriteBufferDocuments = config.maxWriteBufferDocuments >>> 0; // jshint ignore:line diff --git a/src/Storage/ReadableStorage.js b/src/Storage/ReadableStorage.js index 47ea87b..8b69e62 100644 --- a/src/Storage/ReadableStorage.js +++ b/src/Storage/ReadableStorage.js @@ -138,6 +138,7 @@ class ReadableStorage extends events.EventEmitter { for (let file of files) { if (file.substr(-6) === '.index') continue; if (file.substr(-7) === '.branch') continue; + if (file.substr(-5) === '.lock') continue; if (file.substr(0, this.storageFile.length) !== this.storageFile) continue; const partition = this.createPartition(file, this.partitionConfig); diff --git a/src/Storage/WritableStorage.js b/src/Storage/WritableStorage.js index 051ef1e..4b5cf03 100644 --- a/src/Storage/WritableStorage.js +++ b/src/Storage/WritableStorage.js @@ -1,13 +1,15 @@ const fs = require('fs'); -const mkdirpSync = require('mkdirp').sync; const path = require('path'); const WritablePartition = require('../Partition/WritablePartition'); const WritableIndex = require('../Index/WritableIndex'); const ReadableStorage = require('./ReadableStorage'); -const { assert, matches, buildMetadataForMatcher, buildMatcherFromMetadata } = require('../util'); +const { assert, matches, buildMetadataForMatcher, buildMatcherFromMetadata, ensureDirectory } = require('../util'); const DEFAULT_WRITE_BUFFER_SIZE = 16 * 1024; +const LOCK_RECLAIM = 0x1; +const LOCK_THROW = 0x2; + class StorageLockedError extends Error {} /** @@ -37,6 +39,7 @@ class WritableStorage extends ReadableStorage { * @param {function(object, number): string} [config.partitioner] A function that takes a document and sequence number and returns a partition name that the document should be stored in. Defaults to write all documents to the primary partition. * @param {object} [config.indexOptions] An options object that should be passed to all indexes on construction. * @param {string} [config.hmacSecret] A private key that is used to verify matchers retrieved from indexes. + * @param {number} [config.lock] One of LOCK_* constants that defines how an existing lock should be handled. */ constructor(storageName = 'storage', config = {}) { if (typeof storageName !== 'string') { @@ -53,13 +56,13 @@ class WritableStorage extends ReadableStorage { }; config = Object.assign(defaults, config); config.indexOptions = Object.assign({ syncOnFlush: config.syncOnFlush }, config.indexOptions); - if (!fs.existsSync(config.dataDirectory)) { - try { - mkdirpSync(config.dataDirectory); - } catch (e) { - } - } + ensureDirectory(config.dataDirectory); super(storageName, config); + + this.lockFile = path.resolve(this.dataDirectory, this.storageFile + '.lock'); + if (config.lock === LOCK_RECLAIM) { + this.unlock(); + } this.partitioner = config.partitioner; } @@ -75,6 +78,26 @@ class WritableStorage extends ReadableStorage { return super.open(); } + /** + * Check all partitions torn writes and truncate the storage to the position before the first torn write. + * This might delete correctly written events in partitions, if their sequence number is higher than the + * torn write in another partition. + */ + checkTornWrites() { + let lastValidSequenceNumber = Number.MAX_SAFE_INTEGER; + this.forEachPartition(partition => { + partition.open(); + const tornSequenceNumber = partition.checkTornWrite(); + if (tornSequenceNumber >= 0) { + lastValidSequenceNumber = Math.min(lastValidSequenceNumber, tornSequenceNumber); + } + }); + if (lastValidSequenceNumber < Number.MAX_SAFE_INTEGER) { + this.truncate(lastValidSequenceNumber); + } + this.forEachPartition(partition => partition.close()); + } + /** * Attempt to lock this storage by means of a lock directory. * @returns {boolean} True if the lock was created or false if the lock is already in place. @@ -85,7 +108,6 @@ class WritableStorage extends ReadableStorage { if (this.locked) { return false; } - this.lockFile = path.resolve(this.dataDirectory, this.storageFile + '.lock'); try { fs.mkdirSync(this.lockFile); this.locked = true; @@ -105,7 +127,12 @@ class WritableStorage extends ReadableStorage { * Current implementation just deletes a lock file that is named like the storage. */ unlock() { - fs.rmdirSync(this.lockFile); + if (fs.existsSync(this.lockFile)) { + if (!this.locked) { + this.checkTornWrites(); + } + fs.rmdirSync(this.lockFile); + } this.locked = false; } @@ -370,4 +397,6 @@ class WritableStorage extends ReadableStorage { } module.exports = WritableStorage; -module.exports.StorageLockedError = StorageLockedError; \ No newline at end of file +module.exports.StorageLockedError = StorageLockedError; +module.exports.LOCK_THROW = LOCK_THROW; +module.exports.LOCK_RECLAIM = LOCK_RECLAIM; \ No newline at end of file diff --git a/src/util.js b/src/util.js index db13dbe..0b92ed7 100644 --- a/src/util.js +++ b/src/util.js @@ -1,4 +1,6 @@ const crypto = require('crypto'); +const fs = require('fs'); +const mkdirpSync = require('mkdirp').sync; /** * Assert that actual and expected match or throw an Error with the given message appended by information about expected and actual value. @@ -183,6 +185,23 @@ function wrapAndCheck(index, length) { return index; } +/** + * Ensure that the given directory exists. + * @param {string} dirName + * @return {boolean} true if the directory existed already + */ +function ensureDirectory(dirName) { + if (!fs.existsSync(dirName)) { + try { + mkdirpSync(dirName); + } catch (e) { + } + return false; + } + return true; +} + + module.exports = { assert, assertEqual, @@ -193,5 +212,6 @@ module.exports = { buildMetadataForMatcher, buildMatcherFromMetadata, buildMetadataHeader, - alignTo + alignTo, + ensureDirectory }; \ No newline at end of file diff --git a/test/EventStore.spec.js b/test/EventStore.spec.js index 5335aa8..e7d3ad8 100644 --- a/test/EventStore.spec.js +++ b/test/EventStore.spec.js @@ -60,6 +60,33 @@ describe('EventStore', function() { fs.readdir = originalReaddir; }); + it('repairs torn writes', function(done) { + eventstore = new EventStore({ + storageDirectory + }); + + const events = [{foo: 'bar'.repeat(500)}]; + eventstore.on('ready', () => { + eventstore.commit('foo-bar', events, () => { + // Simulate a torn write (but indexes are still written) + fs.truncateSync(eventstore.storage.getPartition('foo-bar').fileName, 512); + + // The previous instance was not closed, so the lock still exists + eventstore = new EventStore({ + storageDirectory, + storageConfig: { + lock: EventStore.LOCK_RECLAIM + } + }); + eventstore.on('ready', () => { + expect(eventstore.length).to.be(0); + expect(eventstore.getStreamVersion('foo-bar')).to.be(0); + done(); + }); + }); + }); + }); + it('throws when trying to open non-existing store read-only', function() { expect(() => new EventStore({ storageDirectory,