From f7b9cd7fe425e3b7ffecf352172e52ead1585941 Mon Sep 17 00:00:00 2001 From: Thibault Poncelet Date: Tue, 18 Feb 2014 16:55:59 +0100 Subject: [PATCH 01/14] Use a cursor to fetch all events from mongodb database, remove async from redis event bus reciever --- lib/domain_repository.coffee | 7 +++--- lib/event_bus/common/receiver.js | 37 ++++++++++++++++---------------- lib/event_store/mongodb.coffee | 24 ++++++++++++--------- 3 files changed, 37 insertions(+), 31 deletions(-) diff --git a/lib/domain_repository.coffee b/lib/domain_repository.coffee index 67ca0ae..3988323 100644 --- a/lib/domain_repository.coffee +++ b/lib/domain_repository.coffee @@ -75,16 +75,17 @@ class DomainRepository entityInstantiator.findByUid uid, callback replayAllEvents: (callback) -> - @store.findAllEvents (err, events) => + @store.findAllEvents (err, events, batchCallback) => if events.length > 0 eventQueue = async.queue (event, eventTaskCallback) => event.replayed = true @_publishEvent event, eventTaskCallback , 1 - eventQueue.drain = callback + eventQueue.drain = batchCallback eventQueue.push events else - callback() + batchCallback() + , callback getLastPublishedEvents: () -> @emitter.lastEmittedEvents diff --git a/lib/event_bus/common/receiver.js b/lib/event_bus/common/receiver.js index adf1c52..d118421 100644 --- a/lib/event_bus/common/receiver.js +++ b/lib/event_bus/common/receiver.js @@ -69,25 +69,26 @@ CommonEventBusReceiver.prototype._handleEvent = function (event, callback) { return callback(); } - var parallelHandling = async.queue(function (eventHandler, callback) { - eventHandler(event, function (err) { - if (err) { - self.logger.warning("CommonEventBusReceiver#_handleEvent", "an error occurred in an event handler: " + (err.stack || err.message || err)); - errors.push(err); + var pendingHandlers = 0; + eventHandlers.forEach(function(eventHandler){ + pendingHandlers++ + eventHandler(event, function (err) { + if (err) { + self.logger.warning("CommonEventBusReceiver#_handleEvent", "an error occurred in an event handler: " + (err.stack || err.message || err)); + errors.push(err); + } + pendingHandlers--; + if (pendingHandlers == 0) { + if (errors.length > 0) { + var error = new Error("Some errors occured when handling the event: [" + errors.join(" | ") + "]"); + callback(error); + } else { + self.lastEvent = event; + callback(null); } - callback(err); - }); - }, Infinity); - - parallelHandling.drain = function () { - if (errors.length > 0) { - var err = new Error("Some errors occured when handling the event: [" + errors.join(" | ") + "]"); - return callback(err); - } - self.lastEvent = event; - callback(); - }; - parallelHandling.push(eventHandlers); + } + }); + }); }; CommonEventBusReceiver.prototype.stop = function (callback) { diff --git a/lib/event_store/mongodb.coffee b/lib/event_store/mongodb.coffee index 96c3886..b6c3ce7 100644 --- a/lib/event_store/mongodb.coffee +++ b/lib/event_store/mongodb.coffee @@ -70,18 +70,22 @@ class MongoDbEventStore extends Base uid = uuid.v4() callback null, uid - findAllEvents: (callback) -> + findAllEvents: (batchCallback, callback) -> p = new Profiler "MongoDbEventStore#_find(db request)", @logger p.start() - @eventCollection.find({}).sort("timestamp":1).toArray (err, items) => - p.end() - - if err? - callback err - else if not items? - callback null, [] - else - @_instantiateEventsFromRows items, callback + cursor = @eventCollection.find({}).sort("timestamp":1) + retrieve = => + cursor.nextObject (err, item) => + return callback err if err? + if item? + @_instantiateEventsFromRows [item], (err, events) -> + batchCallback err, events, (err) -> + return callback err if err? + retrieve() + else + p.end() + callback null + retrieve() findAllEventsByEntityUid: (entityUid, order, callback) -> [order, callback] = [null, order] unless callback? From 291ba7055d45cd1bcd9bd7dc354aecd0266c33bc Mon Sep 17 00:00:00 2001 From: Thibault Poncelet Date: Tue, 1 Apr 2014 11:50:51 +0200 Subject: [PATCH 02/14] Refactor replay mechanism Fetch events domain events one by one instead all in memory --- lib/domain_repository.coffee | 12 ++------ lib/event_store/base.coffee | 5 +++- lib/event_store/mongodb.coffee | 53 +++++++++++++++++++--------------- 3 files changed, 35 insertions(+), 35 deletions(-) diff --git a/lib/domain_repository.coffee b/lib/domain_repository.coffee index 3988323..4070f1c 100644 --- a/lib/domain_repository.coffee +++ b/lib/domain_repository.coffee @@ -75,16 +75,8 @@ class DomainRepository entityInstantiator.findByUid uid, callback replayAllEvents: (callback) -> - @store.findAllEvents (err, events, batchCallback) => - if events.length > 0 - eventQueue = async.queue (event, eventTaskCallback) => - event.replayed = true - @_publishEvent event, eventTaskCallback - , 1 - eventQueue.drain = batchCallback - eventQueue.push events - else - batchCallback() + @store.findAllEventsOneByOne (err, event, eventHandlerCallback) => + @_publishEvent event, eventHandlerCallback , callback getLastPublishedEvents: () -> diff --git a/lib/event_store/base.coffee b/lib/event_store/base.coffee index 600e6c0..39e7b9c 100644 --- a/lib/event_store/base.coffee +++ b/lib/event_store/base.coffee @@ -9,6 +9,9 @@ class BaseEventStore findAllEvents: (options, callback) -> throw new Error "implement me" + findAllEventsOneByOne: (options, callback) -> + throw new Error "implement me" + findAllEventsByEntityUid: (entityUid, options, callback) -> throw new Error "Implement me" @@ -23,4 +26,4 @@ class BaseEventStore # implement me if you want snapshots in your store callback? null -module.exports = BaseEventStore +module.exports = BaseEventStore \ No newline at end of file diff --git a/lib/event_store/mongodb.coffee b/lib/event_store/mongodb.coffee index b6c3ce7..e387620 100644 --- a/lib/event_store/mongodb.coffee +++ b/lib/event_store/mongodb.coffee @@ -70,7 +70,7 @@ class MongoDbEventStore extends Base uid = uuid.v4() callback null, uid - findAllEvents: (batchCallback, callback) -> + findAllEventsOneByOne: (eventHandler, callback) -> p = new Profiler "MongoDbEventStore#_find(db request)", @logger p.start() cursor = @eventCollection.find({}).sort("timestamp":1) @@ -78,8 +78,8 @@ class MongoDbEventStore extends Base cursor.nextObject (err, item) => return callback err if err? if item? - @_instantiateEventsFromRows [item], (err, events) -> - batchCallback err, events, (err) -> + @_instantiateEventFromRow item, (err, event) -> + eventHandler err, event, (err) -> return callback err if err? retrieve() else @@ -215,27 +215,8 @@ class MongoDbEventStore extends Base return callback null, events if rows.length is 0 rowsQueue = async.queue (row, rowCallback) => - uid = row.uid - name = row.name - entityUid = row.entityUid - data = row.data - timestamp = row.timestamp - version = row.version - - @_loadAttachmentsFromRow row, (err, attachments) -> - return rowCallback err if err? - - for attachmentName, attachmentBody of attachments - data[attachmentName] = attachmentBody - - event = new Event - name: name - data: data - uid: uid - entityUid: entityUid - timestamp: timestamp - version: version - + @_instantiateEventFromRow row, (err, event) -> + return callback err if err? events.push event defer rowCallback , 1 @@ -245,6 +226,30 @@ class MongoDbEventStore extends Base rowsQueue.push rows + _instantiateEventFromRow: (row, callback) -> + uid = row.uid + name = row.name + entityUid = row.entityUid + data = row.data + timestamp = row.timestamp + version = row.version + + @_loadAttachmentsFromRow row, (err, attachments) -> + return rowCallback err if err? + + for attachmentName, attachmentBody of attachments + data[attachmentName] = attachmentBody + + event = new Event + name: name + data: data + uid: uid + entityUid: entityUid + timestamp: timestamp + version: version + + callback null, event + _loadAttachmentsFromRow: (row, callback) -> attachments = {} for attachmentName, attachmentBody of row._attachments From c8cadbec7d626b6f9f0a3ad8022e08bf69cd3426 Mon Sep 17 00:00:00 2001 From: Thibault Poncelet Date: Tue, 1 Apr 2014 11:52:37 +0200 Subject: [PATCH 03/14] Bump coffee-script --- lib/main.js | 2 +- package.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/main.js b/lib/main.js index b56cdbd..fc14563 100644 --- a/lib/main.js +++ b/lib/main.js @@ -1,4 +1,4 @@ -require("coffee-script"); +require("coffee-script/register"); module.exports = { Assembler: require("./assembler"), diff --git a/package.json b/package.json index b50e3e7..d1bb5e8 100644 --- a/package.json +++ b/package.json @@ -18,7 +18,7 @@ "framework" ], "dependencies": { - "coffee-script": "1.3.3", + "coffee-script": "1.7.1", "async": "0.1.22", "node-uuid": "1.3.3", "redis": "0.9.0", From 897a44d3b5471a6b0106ef1272c001ce1d53e4d7 Mon Sep 17 00:00:00 2001 From: Thibault Poncelet Date: Thu, 20 Feb 2014 11:54:53 +0100 Subject: [PATCH 04/14] Bump hiredis --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index d1bb5e8..ec1457e 100644 --- a/package.json +++ b/package.json @@ -22,7 +22,7 @@ "async": "0.1.22", "node-uuid": "1.3.3", "redis": "0.9.0", - "hiredis": "~0.1.14", + "hiredis": "0.1.16", "nano": "~3.3.6", "devnull": "0.0.10", "mongodb": "~1.2.12", From 2df27bb24bb77587c4e53aa87b85c84741c73d10 Mon Sep 17 00:00:00 2001 From: Thibault Poncelet Date: Fri, 21 Feb 2014 13:57:07 +0100 Subject: [PATCH 05/14] Callback executeCommand even when a validation exists on the command handler and the repository is alted oterwise it could create deadlocks --- lib/assembler.js | 1 - lib/command_bus.coffee | 1 + lib/command_bus_client.coffee | 5 ++--- lib/command_bus_server.coffee | 1 - lib/domain_repository.coffee | 1 + 5 files changed, 4 insertions(+), 5 deletions(-) diff --git a/lib/assembler.js b/lib/assembler.js index 02465d2..bdd3e41 100644 --- a/lib/assembler.js +++ b/lib/assembler.js @@ -241,7 +241,6 @@ Assembler.prototype.tearDownApp = function (callback) { var self = this; // TODO: review this, anything else to unload/destroy? - async.series([ function (next) { self.destroyEventBusEmitter(next); diff --git a/lib/command_bus.coffee b/lib/command_bus.coffee index e62d401..fc8b2ac 100644 --- a/lib/command_bus.coffee +++ b/lib/command_bus.coffee @@ -57,6 +57,7 @@ class CommandBus p2.end() p.end() done args... + transaction.callback = callback else transaction = (done) -> p.start() diff --git a/lib/command_bus_client.coffee b/lib/command_bus_client.coffee index a7d0159..1b99deb 100644 --- a/lib/command_bus_client.coffee +++ b/lib/command_bus_client.coffee @@ -36,8 +36,7 @@ class CommandBusClient request = @_makeRequest path: "/commands" stream = request.stream - request.on "error", (err) -> - callback err + request.on "error", callback request.on "response", (response) => @_processResponse response, callback @@ -96,4 +95,4 @@ class CommandBusClient logger.error "CommandBusClient", "command failed remotely: #{error.stack || error.message || error}" callback error -module.exports = CommandBusClient +module.exports = CommandBusClient \ No newline at end of file diff --git a/lib/command_bus_server.coffee b/lib/command_bus_server.coffee index 79094de..4f38c72 100644 --- a/lib/command_bus_server.coffee +++ b/lib/command_bus_server.coffee @@ -87,7 +87,6 @@ class CommandBusServer logger.log "CommandBusServer", "deserialize command \"#{commandName}\"" @commandBus.deserializeCommand commandName, payload, (err, command) => - logger.log "CommandBusServer", "start command \"#{commandName}\"" @commandBus.executeCommand command, (err) -> if err? logger.alert "CommandBusServer", "error while executing command (#{err})" diff --git a/lib/domain_repository.coffee b/lib/domain_repository.coffee index 4070f1c..469c358 100644 --- a/lib/domain_repository.coffee +++ b/lib/domain_repository.coffee @@ -22,6 +22,7 @@ class DomainRepository @transactionQueue = async.queue (transaction, done) => if @halted @logger.warning "transaction", "skipped (#{@transactionQueue.length()} more transaction(s) in queue)" + transaction.callback() if transaction.callback? done() else @transacting = true From 8c43181b6cddfed92ecea6de05d4bac0b4366faf Mon Sep 17 00:00:00 2001 From: Thibault Poncelet Date: Tue, 1 Apr 2014 12:07:16 +0200 Subject: [PATCH 06/14] Handle errors appropriately in mongoDB event store closeConnectionAndReturn() --- lib/event_store/mongodb.coffee | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/lib/event_store/mongodb.coffee b/lib/event_store/mongodb.coffee index e387620..ec27502 100644 --- a/lib/event_store/mongodb.coffee +++ b/lib/event_store/mongodb.coffee @@ -43,14 +43,19 @@ class MongoDbEventStore extends Base destroy: (callback) -> @_closeConnectionAndReturn @db, null, (err) => + return callback err if err? @db = null @eventCollection = null @snapshotCollection = null callback null _closeConnectionAndReturn: (db, err, callback) -> - db.close() if db? - callback err + if db? + db.close (closeErr) -> + return callback closeErr if closeErr? + callback err + else + callback err setup: (callback) -> async.series [ From 5fd0201814e727b842d85bddaad619b603c94677 Mon Sep 17 00:00:00 2001 From: Thibault Poncelet Date: Tue, 1 Apr 2014 12:08:24 +0200 Subject: [PATCH 07/14] Handle erors appropriately in async queues for Assembler --- lib/assembler.js | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/lib/assembler.js b/lib/assembler.js index bdd3e41..52b1bd4 100644 --- a/lib/assembler.js +++ b/lib/assembler.js @@ -276,19 +276,19 @@ Assembler.prototype.destroyReporters = function (callback) { if (self.reporters.length == 0) return callback(); - var queue = async.queue(function (reporter, callback) { + var queue = async.queue(function (reporter, taskCallback) { reporter.destroy(function (err) { - if (err) + if (err) { self.logger.error("Assembler#unloadReporters", "destroying reporter failed: " + err); - callback(err); + return callback(err); + } + taskCallback(); }); }, Infinity); - queue.drain = function (err) { - if (err) - self.logger.error("Assembler#unloadReporters", "error: " + err); + queue.drain = function() { self.reporters = []; - callback(err); + callback(); }; queue.push(self.reporters); }; @@ -299,23 +299,23 @@ Assembler.prototype.destroyReportStores = function (callback) { if (self.reportStores.length == 0) return callback(); - var queue = async.queue(function (reportStore, callback) { + var queue = async.queue(function (reportStore, taskCallback) { if (reportStore.destroy) { reportStore.destroy(function (err) { - if (err) + if (err) { self.logger.error("Assembler#destroyReportStores", "destroying report store failed: " + err); - callback(err); + return callback(err); + } + taskCallback(); }); } else { - callback(); + taskCallback(); } }, Infinity); - queue.drain = function (err) { - if (err) - self.logger.error("Assembler#destroyReportStores", "error: " + err); + queue.drain = function() { self.reportStores = []; - callback(err); + callback(); }; queue.push(self.reportStores); }; From c9605f291f7c13c60b66dce2f645f286b3e4893f Mon Sep 17 00:00:00 2001 From: Thibault Poncelet Date: Mon, 24 Feb 2014 10:50:38 +0100 Subject: [PATCH 08/14] Set replayed flag on events during replay --- lib/domain_repository.coffee | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/domain_repository.coffee b/lib/domain_repository.coffee index 469c358..d4de410 100644 --- a/lib/domain_repository.coffee +++ b/lib/domain_repository.coffee @@ -77,6 +77,7 @@ class DomainRepository replayAllEvents: (callback) -> @store.findAllEventsOneByOne (err, event, eventHandlerCallback) => + event.replayed = true @_publishEvent event, eventHandlerCallback , callback From aaa20bdc2df04c9da905409767fd02529fbffd07 Mon Sep 17 00:00:00 2001 From: Thibault Poncelet Date: Mon, 24 Feb 2014 15:01:53 +0100 Subject: [PATCH 09/14] Use Entity instead of finalCtor to initialize Entity bases, otherwize may be undefined --- lib/entity.coffee | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/entity.coffee b/lib/entity.coffee index e1d8a4b..04f8d57 100644 --- a/lib/entity.coffee +++ b/lib/entity.coffee @@ -35,9 +35,9 @@ Entity = (name, finalCtor, Ctor) -> Base::_initializeAtVersion = (version) -> @$version = version @$appliedEvents = [] - @$domainRepository = finalCtor.$domainRepository - @$commandBus = finalCtor.$commandBus - @$logger = finalCtor.$logger + @$domainRepository = Entity.domainRepository + @$commandBus = Entity.commandBus + @$logger = Entity.logger Base::_serialize = (contained) -> throw new Error "References between entity are forbidden" if contained From 7a4ca9942d93fbb75719924948a885efdcb29613 Mon Sep 17 00:00:00 2001 From: Thibault Poncelet Date: Tue, 25 Feb 2014 10:26:17 +0100 Subject: [PATCH 10/14] Initialize domain objects after app instantiation --- lib/assembler.js | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/lib/assembler.js b/lib/assembler.js index 52b1bd4..52b2b7e 100644 --- a/lib/assembler.js +++ b/lib/assembler.js @@ -100,7 +100,7 @@ Assembler.prototype.assembleApp = function (callback) { } } - var createApp = function createApp() { + var createApp = function createApp(callback) { self.app = new self.App(appOptions); callback(); }; @@ -111,10 +111,15 @@ Assembler.prototype.assembleApp = function (callback) { return callback(err); appOptions.domainRepository = self.domainRepository; appOptions.commandBus = self.commandBus; - createApp(); + createApp(function(){ + DomainObject.initializeConstructors(); + callback(); + }); }); } else { - createApp(); + createApp(function() { + callback(); + }); } }; @@ -410,8 +415,6 @@ Assembler.prototype._assembleDomainInfrastructure = function (callback) { logger: self.logger }); - DomainObject.initializeConstructors(); - self.eventBusEmitter = eventBusEmitter; self.domainRepository = domainRepository; self.commandBus = commandBus; From 6264ee5763d316eb7f059bb1f8aedbc16e2a5822 Mon Sep 17 00:00:00 2001 From: David Jeusette Date: Thu, 6 Mar 2014 21:05:02 +0100 Subject: [PATCH 11/14] Add logger in CommandHandler --- lib/command_handler.js | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/command_handler.js b/lib/command_handler.js index 03408c0..86f2501 100644 --- a/lib/command_handler.js +++ b/lib/command_handler.js @@ -28,6 +28,7 @@ CommandHandler.registerAllInDirectoryOnBus = function (path, options) { var logger = options.logger; fs.readdirSync(path).forEach(function (fileName) { var CommandHandler = require(path + "/" + fileName); + CommandHandler.logger = logger; logger.log("CommandHandler", "loading command handler for command \"" + CommandHandler.getCommandName() + "\" (" + fileName + ")"); commandBus.registerCommandHandler(CommandHandler); }); From 9fbad9b651f14fc6941402751d6da78d58bebf01 Mon Sep 17 00:00:00 2001 From: David Jeusette Date: Sat, 15 Mar 2014 09:16:16 +0100 Subject: [PATCH 12/14] Avoid returning an error when entity could not be found --- lib/entity.coffee | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/entity.coffee b/lib/entity.coffee index 04f8d57..dcfb5e1 100644 --- a/lib/entity.coffee +++ b/lib/entity.coffee @@ -96,12 +96,12 @@ Entity = (name, finalCtor, Ctor) -> deferred = Q.defer() deferred.promise.nodeify callback if uid? - @$domainRepository.findEntityByUid @, uid, (err, entity) -> + @$domainRepository.findEntityByUid @, uid, (err, entity) => if err? deferred.reject err - else if not entity - deferred.reject new Error "Could not find entity with UID " + uid else + if not entity + @$logger.error "findByUid", "Could not find entity with UID #{uid}" deferred.resolve entity else deferred.reject(new Error 'Please provide a UID to find') From 16e67744fb9984333c8986378aa7bffa157f6d7f Mon Sep 17 00:00:00 2001 From: David Jeusette Date: Sat, 15 Mar 2014 20:21:39 +0100 Subject: [PATCH 13/14] Refactor condition statement --- lib/entity.coffee | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/entity.coffee b/lib/entity.coffee index dcfb5e1..0e635db 100644 --- a/lib/entity.coffee +++ b/lib/entity.coffee @@ -100,8 +100,7 @@ Entity = (name, finalCtor, Ctor) -> if err? deferred.reject err else - if not entity - @$logger.error "findByUid", "Could not find entity with UID #{uid}" + @$logger.error "findByUid", "Could not find entity with UID #{uid}" if not entity deferred.resolve entity else deferred.reject(new Error 'Please provide a UID to find') From edef836a6b2ddcc2429cd9b3035d8f2701444980 Mon Sep 17 00:00:00 2001 From: David Jeusette Date: Mon, 29 Sep 2014 10:11:43 +0200 Subject: [PATCH 14/14] Refactor callback function --- lib/assembler.js | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/assembler.js b/lib/assembler.js index 52b2b7e..1de706d 100644 --- a/lib/assembler.js +++ b/lib/assembler.js @@ -117,9 +117,7 @@ Assembler.prototype.assembleApp = function (callback) { }); }); } else { - createApp(function() { - callback(); - }); + createApp(callback); } };