Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,18 @@ leads to every single document being flushed directly.

#### Consistency

Since the storage is append-only, consistency is automatically guaranteed.
Since the storage is append-only, consistency is automatically guaranteed for all successful writes. Writes that fail in
the middle, e.g. because the machine crashes before the full write buffer is flushed, will lead to a torn write. This is
a partial invalid write. To recover from such a state, the storage will detect torn writes and truncate them when an existing
lock is reclaimed. This can be done by instantiating the store with the following option:

```javascript
const eventstore = new EventStore('my-event-store', { storageConfig: { lock: EventStore.LOCK_RECLAIM } });
```

Note that this option will effectively bypass the lock that prevents multiple instances from being created, so you should
not use this carelessly. Having multiple instances write to the same files will lead to inconsistent data that can not be
easily recovered from.

#### Isolation

Expand Down
7 changes: 2 additions & 5 deletions src/Consumer.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
const stream = require('stream');
const fs = require('fs');
const path = require('path');
const mkdirpSync = require('mkdirp').sync;
const { assert } = require('./util');
const { assert, ensureDirectory } = require('./util');

const Storage = require('./Storage/ReadableStorage');
const MAX_CATCHUP_BATCH = 10;
Expand Down Expand Up @@ -59,9 +58,7 @@ class Consumer extends stream.Readable {
this.indexName = indexName;
const consumerDirectory = path.join(this.storage.indexDirectory, 'consumers');
this.fileName = path.join(consumerDirectory, this.storage.storageFile + '.' + indexName + '.' + identifier);
if (!fs.existsSync(consumerDirectory)) {
mkdirpSync(consumerDirectory);
} else {
if (ensureDirectory(consumerDirectory)) {
this.cleanUpFailedWrites();
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/EventStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -397,4 +397,6 @@ class EventStore extends events.EventEmitter {

module.exports = EventStore;
module.exports.ExpectedVersion = ExpectedVersion;
module.exports.OptimisticConcurrencyError = OptimisticConcurrencyError;
module.exports.OptimisticConcurrencyError = OptimisticConcurrencyError;
module.exports.LOCK_THROW = Storage.LOCK_THROW;
module.exports.LOCK_RECLAIM = Storage.LOCK_RECLAIM;
7 changes: 2 additions & 5 deletions src/Index/WritableIndex.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
const fs = require('fs');
const mkdirpSync = require('mkdirp').sync;
const ReadableIndex = require('./ReadableIndex');
const { assertEqual, buildMetadataHeader } = require('../util');
const { assertEqual, buildMetadataHeader, ensureDirectory } = require('../util');

/**
* An index is a simple append-only file that stores an ordered list of entry elements pointing to the actual file position
Expand Down Expand Up @@ -45,9 +44,7 @@ class WritableIndex extends ReadableIndex {
*/
initialize(options) {
super.initialize(options);
if (!fs.existsSync(options.dataDirectory)) {
mkdirpSync(options.dataDirectory);
}
ensureDirectory(options.dataDirectory);

this.fileMode = 'a+';
this.writeBuffer = Buffer.allocUnsafe(options.writeBufferSize >>> 0); // jshint ignore:line
Expand Down
17 changes: 16 additions & 1 deletion src/Partition/ReadablePartition.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,21 @@ class ReadablePartition extends events.EventEmitter {
return true;
}

/**
* @returns {number} -1 if the partition is ok and the sequence number of the broken document if a torn write was detected.
*/
checkTornWrite() {
const reader = this.prepareReadBufferBackwards(this.size);
const separator = reader.buffer.toString('ascii', reader.cursor - DOCUMENT_SEPARATOR.length, reader.cursor);
if (separator !== DOCUMENT_SEPARATOR) {
const position = this.findDocumentPositionBefore(this.size);
const reader = this.prepareReadBuffer(position);
const { sequenceNumber } = this.readDocumentHeader(reader.buffer, reader.cursor, position);
return sequenceNumber;
}
return -1;
}

/**
* Read the partition metadata from the file.
*
Expand Down Expand Up @@ -348,7 +363,7 @@ class ReadablePartition extends events.EventEmitter {
}
position -= this.readBufferLength;
} while (position > 0);
return position;
return Math.max(0, position);
}

/**
Expand Down
7 changes: 2 additions & 5 deletions src/Partition/WritablePartition.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
const fs = require('fs');
const mkdirpSync = require('mkdirp').sync;
const ReadablePartition = require('./ReadablePartition');
const { assert, buildMetadataHeader, alignTo } = require('../util');
const { assert, buildMetadataHeader, alignTo, ensureDirectory } = require('../util');
const Clock = require('../Clock');

const DEFAULT_WRITE_BUFFER_SIZE = 16 * 1024;
Expand Down Expand Up @@ -40,9 +39,7 @@ class WritablePartition extends ReadablePartition {
config.metadata = Object.assign(defaults.metadata, config.metadata);
config = Object.assign(defaults, config);
super(name, config);
if (!fs.existsSync(this.dataDirectory)) {
mkdirpSync(this.dataDirectory);
}
ensureDirectory(this.dataDirectory);
this.fileMode = 'a+';
this.writeBufferSize = config.writeBufferSize >>> 0; // jshint ignore:line
this.maxWriteBufferDocuments = config.maxWriteBufferDocuments >>> 0; // jshint ignore:line
Expand Down
1 change: 1 addition & 0 deletions src/Storage/ReadableStorage.js
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ class ReadableStorage extends events.EventEmitter {
for (let file of files) {
if (file.substr(-6) === '.index') continue;
if (file.substr(-7) === '.branch') continue;
if (file.substr(-5) === '.lock') continue;
if (file.substr(0, this.storageFile.length) !== this.storageFile) continue;

const partition = this.createPartition(file, this.partitionConfig);
Expand Down
51 changes: 40 additions & 11 deletions src/Storage/WritableStorage.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
const fs = require('fs');
const mkdirpSync = require('mkdirp').sync;
const path = require('path');
const WritablePartition = require('../Partition/WritablePartition');
const WritableIndex = require('../Index/WritableIndex');
const ReadableStorage = require('./ReadableStorage');
const { assert, matches, buildMetadataForMatcher, buildMatcherFromMetadata } = require('../util');
const { assert, matches, buildMetadataForMatcher, buildMatcherFromMetadata, ensureDirectory } = require('../util');

const DEFAULT_WRITE_BUFFER_SIZE = 16 * 1024;

const LOCK_RECLAIM = 0x1;
const LOCK_THROW = 0x2;

class StorageLockedError extends Error {}

/**
Expand Down Expand Up @@ -37,6 +39,7 @@ class WritableStorage extends ReadableStorage {
* @param {function(object, number): string} [config.partitioner] A function that takes a document and sequence number and returns a partition name that the document should be stored in. Defaults to write all documents to the primary partition.
* @param {object} [config.indexOptions] An options object that should be passed to all indexes on construction.
* @param {string} [config.hmacSecret] A private key that is used to verify matchers retrieved from indexes.
* @param {number} [config.lock] One of LOCK_* constants that defines how an existing lock should be handled.
*/
constructor(storageName = 'storage', config = {}) {
if (typeof storageName !== 'string') {
Expand All @@ -53,13 +56,13 @@ class WritableStorage extends ReadableStorage {
};
config = Object.assign(defaults, config);
config.indexOptions = Object.assign({ syncOnFlush: config.syncOnFlush }, config.indexOptions);
if (!fs.existsSync(config.dataDirectory)) {
try {
mkdirpSync(config.dataDirectory);
} catch (e) {
}
}
ensureDirectory(config.dataDirectory);
super(storageName, config);

this.lockFile = path.resolve(this.dataDirectory, this.storageFile + '.lock');
if (config.lock === LOCK_RECLAIM) {
this.unlock();
}
this.partitioner = config.partitioner;
}

Expand All @@ -75,6 +78,26 @@ class WritableStorage extends ReadableStorage {
return super.open();
}

/**
* Check all partitions torn writes and truncate the storage to the position before the first torn write.
* This might delete correctly written events in partitions, if their sequence number is higher than the
* torn write in another partition.
*/
checkTornWrites() {
let lastValidSequenceNumber = Number.MAX_SAFE_INTEGER;
this.forEachPartition(partition => {
partition.open();
const tornSequenceNumber = partition.checkTornWrite();
if (tornSequenceNumber >= 0) {
lastValidSequenceNumber = Math.min(lastValidSequenceNumber, tornSequenceNumber);
}
});
if (lastValidSequenceNumber < Number.MAX_SAFE_INTEGER) {
this.truncate(lastValidSequenceNumber);
}
this.forEachPartition(partition => partition.close());
}

/**
* Attempt to lock this storage by means of a lock directory.
* @returns {boolean} True if the lock was created or false if the lock is already in place.
Expand All @@ -85,7 +108,6 @@ class WritableStorage extends ReadableStorage {
if (this.locked) {
return false;
}
this.lockFile = path.resolve(this.dataDirectory, this.storageFile + '.lock');
try {
fs.mkdirSync(this.lockFile);
this.locked = true;
Expand All @@ -105,7 +127,12 @@ class WritableStorage extends ReadableStorage {
* Current implementation just deletes a lock file that is named like the storage.
*/
unlock() {
fs.rmdirSync(this.lockFile);
if (fs.existsSync(this.lockFile)) {
if (!this.locked) {
this.checkTornWrites();
}
fs.rmdirSync(this.lockFile);
}
this.locked = false;
}

Expand Down Expand Up @@ -370,4 +397,6 @@ class WritableStorage extends ReadableStorage {
}

module.exports = WritableStorage;
module.exports.StorageLockedError = StorageLockedError;
module.exports.StorageLockedError = StorageLockedError;
module.exports.LOCK_THROW = LOCK_THROW;
module.exports.LOCK_RECLAIM = LOCK_RECLAIM;
22 changes: 21 additions & 1 deletion src/util.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
const crypto = require('crypto');
const fs = require('fs');
const mkdirpSync = require('mkdirp').sync;

/**
* Assert that actual and expected match or throw an Error with the given message appended by information about expected and actual value.
Expand Down Expand Up @@ -183,6 +185,23 @@ function wrapAndCheck(index, length) {
return index;
}

/**
* Ensure that the given directory exists.
* @param {string} dirName
* @return {boolean} true if the directory existed already
*/
function ensureDirectory(dirName) {
if (!fs.existsSync(dirName)) {
try {
mkdirpSync(dirName);
} catch (e) {
}
return false;
}
return true;
}


module.exports = {
assert,
assertEqual,
Expand All @@ -193,5 +212,6 @@ module.exports = {
buildMetadataForMatcher,
buildMatcherFromMetadata,
buildMetadataHeader,
alignTo
alignTo,
ensureDirectory
};
27 changes: 27 additions & 0 deletions test/EventStore.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,33 @@ describe('EventStore', function() {
fs.readdir = originalReaddir;
});

it('repairs torn writes', function(done) {
eventstore = new EventStore({
storageDirectory
});

const events = [{foo: 'bar'.repeat(500)}];
eventstore.on('ready', () => {
eventstore.commit('foo-bar', events, () => {
// Simulate a torn write (but indexes are still written)
fs.truncateSync(eventstore.storage.getPartition('foo-bar').fileName, 512);

// The previous instance was not closed, so the lock still exists
eventstore = new EventStore({
storageDirectory,
storageConfig: {
lock: EventStore.LOCK_RECLAIM
}
});
eventstore.on('ready', () => {
expect(eventstore.length).to.be(0);
expect(eventstore.getStreamVersion('foo-bar')).to.be(0);
done();
});
});
});
});

it('throws when trying to open non-existing store read-only', function() {
expect(() => new EventStore({
storageDirectory,
Expand Down