diff --git a/lib/assembler.js b/lib/assembler.js index 02465d2..b355558 100644 --- a/lib/assembler.js +++ b/lib/assembler.js @@ -8,12 +8,15 @@ var AggregateRoot = require("./aggregate_root"); var Service = require("./service"); var logger = require("./logger"); -function Assembler(App, configurationPath) { +function Assembler(App, configurationPath, options) { if (!App) throw new Error("Missing app constructor"); if (!configurationPath) throw new Error("Missing configuration file path"); + if (!options) + options = {}; + this.App = App; this.configurationPath = configurationPath; this.logger = logger; @@ -22,6 +25,7 @@ function Assembler(App, configurationPath) { this.reportStores = []; this.eventBusReceivers = []; this.env = process.env.NODE_ENV || "development"; + this.replaying = !!options.replaying; } // Assemble an app, load the reporters. @@ -30,8 +34,9 @@ Assembler.prototype.loadCompleteApp = function (options, callback) { if (!callback) { callback = options; - options = {} + options = {}; } + if (typeof options.clearQueues == "undefined") options.clearQueues = false; if (typeof options.setupEventStore == "undefined") @@ -40,10 +45,12 @@ Assembler.prototype.loadCompleteApp = function (options, callback) { options.setupReportStores = false; if (typeof options.startReporters == "undefined") options.startReporters = true; + if (typeof options.readOnly == "undefined") + options.readOnly = false; async.series([ function (next) { - self.assembleApp(next); + self.assembleApp(options, next); }, function (next) { if (options.clearQueues) @@ -81,7 +88,7 @@ Assembler.prototype.loadCompleteApp = function (options, callback) { }; // Assemble an app only, no reporters -Assembler.prototype.assembleApp = function (callback) { +Assembler.prototype.assembleApp = function (options, callback) { var self = this; if (self.app) @@ -100,21 +107,24 @@ Assembler.prototype.assembleApp = function (callback) { } } - var createApp = function createApp() { + var createApp = function createApp(callback) { self.app = new self.App(appOptions); callback(); }; if (self.configuration.domain) { - self._assembleDomainInfrastructure(function(err){ + self._assembleDomainInfrastructure(options, function(err){ if (err) return callback(err); appOptions.domainRepository = self.domainRepository; appOptions.commandBus = self.commandBus; - createApp(); + createApp(function(){ + DomainObject.initializeConstructors(); + callback(); + }); }); } else { - createApp(); + createApp(callback); } }; @@ -141,7 +151,8 @@ Assembler.prototype.loadReporters = function (options, callback) { var eventBusReceiverConfig = { queueName: reporterSettings.eventBusReceiver.queue, logger: self.logger, - scope: self.configuration.scope + scope: self.configuration.scope, + replaying: self.replaying }; if (reporterSettings.eventBusReceiver.options) { @@ -154,7 +165,8 @@ Assembler.prototype.loadReporters = function (options, callback) { var reporterOptions = { eventBusReceiver: eventBusReceiver, logger: logger, - app: self.app + app: self.app, + replaying: self.replaying }; var reporter = new reporterSettings.ctor(reporterOptions); @@ -189,7 +201,8 @@ Assembler.prototype.initializeReports = function (callback) { var ReportStore = reportSettings.store.ctor; var reportStore = new ReportStore({ uri: reportSettings.store.uri, - logger: self.logger + logger: self.logger, + replaying: self.replaying }); Report.initialize({store: reportStore, logger: self.logger}); self.reports.push(Report); @@ -241,7 +254,6 @@ Assembler.prototype.tearDownApp = function (callback) { var self = this; // TODO: review this, anything else to unload/destroy? - async.series([ function (next) { self.destroyEventBusEmitter(next); @@ -277,19 +289,19 @@ Assembler.prototype.destroyReporters = function (callback) { if (self.reporters.length == 0) return callback(); - var queue = async.queue(function (reporter, callback) { + var queue = async.queue(function (reporter, taskCallback) { reporter.destroy(function (err) { - if (err) + if (err) { self.logger.error("Assembler#unloadReporters", "destroying reporter failed: " + err); - callback(err); + return callback(err); + } + taskCallback(); }); }, Infinity); - queue.drain = function (err) { - if (err) - self.logger.error("Assembler#unloadReporters", "error: " + err); + queue.drain = function() { self.reporters = []; - callback(err); + callback(); }; queue.push(self.reporters); }; @@ -300,23 +312,23 @@ Assembler.prototype.destroyReportStores = function (callback) { if (self.reportStores.length == 0) return callback(); - var queue = async.queue(function (reportStore, callback) { + var queue = async.queue(function (reportStore, taskCallback) { if (reportStore.destroy) { reportStore.destroy(function (err) { - if (err) + if (err) { self.logger.error("Assembler#destroyReportStores", "destroying report store failed: " + err); - callback(err); + return callback(err); + } + taskCallback(); }); } else { - callback(); + taskCallback(); } }, Infinity); - queue.drain = function (err) { - if (err) - self.logger.error("Assembler#destroyReportStores", "error: " + err); + queue.drain = function() { self.reportStores = []; - callback(err); + callback(); }; queue.push(self.reportStores); }; @@ -351,7 +363,7 @@ Assembler.prototype.unloadEventBusReceivers = function (callback) { queue.push(self.eventBusReceivers); }; -Assembler.prototype._assembleDomainInfrastructure = function (callback) { +Assembler.prototype._assembleDomainInfrastructure = function (options, callback) { var self = this; var EventStore = self.configuration.domain.eventStore.ctor; @@ -360,12 +372,13 @@ Assembler.prototype._assembleDomainInfrastructure = function (callback) { logger: self.logger }); - function initializeDomainInfrastructure(callback) { + function initializeDomainInfrastructure(options, callback) { var EventBusEmitter = self.configuration.domain.eventBusEmitter.ctor; var eventBusEmitterOptions = { logger: self.logger, - scope: self.configuration.scope + scope: self.configuration.scope, + replaying: self.replaying }; if (self.configuration.domain.eventBusEmitter.options) { @@ -379,12 +392,19 @@ Assembler.prototype._assembleDomainInfrastructure = function (callback) { var domainRepository = new DomainRepository({ store: eventStore, emitter: eventBusEmitter, - logger: self.logger + logger: self.logger, + replaying: self.replaying }); + + var port; + if (!options.readOnly) + port = self.configuration.domain.commandBus && self.configuration.domain.commandBus.port; + var commandBus = new CommandBus({ - domainRepository: domainRepository, - port: self.configuration.domain.commandBus && self.configuration.domain.commandBus.port, - logger: self.logger + domainRepository : domainRepository, + port : port, + logger : self.logger, + replaying : self.replaying }); // These initializations take place on the constructor, not instances. @@ -411,8 +431,6 @@ Assembler.prototype._assembleDomainInfrastructure = function (callback) { logger: self.logger }); - DomainObject.initializeConstructors(); - self.eventBusEmitter = eventBusEmitter; self.domainRepository = domainRepository; self.commandBus = commandBus; @@ -425,10 +443,10 @@ Assembler.prototype._assembleDomainInfrastructure = function (callback) { eventStore.initialize(function(err){ if (err) return callback(err); - initializeDomainInfrastructure(callback); + initializeDomainInfrastructure(options, callback); }); } else { - initializeDomainInfrastructure(callback); + initializeDomainInfrastructure(options, callback); } }; diff --git a/lib/command_bus.coffee b/lib/command_bus.coffee index e62d401..9684dbd 100644 --- a/lib/command_bus.coffee +++ b/lib/command_bus.coffee @@ -4,11 +4,11 @@ Profiler = require "./profiler" class CommandBus - constructor: ({@domainRepository, @logger, @port}) -> + constructor: ({@domainRepository, @logger, @port, @replaying}) -> throw new Error "Missing domain repository" unless @domainRepository? throw new Error "Missing logger" unless @logger? - if @port + if @port && !@replaying @server = new CommandBusServer commandBus: @, port: @port, logger: @logger @server.listen @port @@ -16,7 +16,7 @@ class CommandBus registerCommandHandler: (commandHandler) -> commandName = commandHandler.getCommandName() - throw new Error "A command and its handler for command named \"#{command}\" were already registered" if @commandHandlers[commandName]? + throw new Error "A command and its handler for command named \"#{commandName}\" were already registered" if @commandHandlers[commandName]? @commandHandlers[commandName] = commandHandler createNewUid: (callback) -> @@ -48,6 +48,7 @@ class CommandBus commandHandler.validate (err, result) -> p1.end() if err? + logger.warning "executeCommand", "validation error", err callback err done err return @@ -57,6 +58,7 @@ class CommandBus p2.end() p.end() done args... + transaction.callback = callback else transaction = (done) -> p.start() diff --git a/lib/command_bus_client.coffee b/lib/command_bus_client.coffee index a7d0159..1b99deb 100644 --- a/lib/command_bus_client.coffee +++ b/lib/command_bus_client.coffee @@ -36,8 +36,7 @@ class CommandBusClient request = @_makeRequest path: "/commands" stream = request.stream - request.on "error", (err) -> - callback err + request.on "error", callback request.on "response", (response) => @_processResponse response, callback @@ -96,4 +95,4 @@ class CommandBusClient logger.error "CommandBusClient", "command failed remotely: #{error.stack || error.message || error}" callback error -module.exports = CommandBusClient +module.exports = CommandBusClient \ No newline at end of file diff --git a/lib/command_bus_server.coffee b/lib/command_bus_server.coffee index 79094de..62adad8 100644 --- a/lib/command_bus_server.coffee +++ b/lib/command_bus_server.coffee @@ -87,7 +87,10 @@ class CommandBusServer logger.log "CommandBusServer", "deserialize command \"#{commandName}\"" @commandBus.deserializeCommand commandName, payload, (err, command) => - logger.log "CommandBusServer", "start command \"#{commandName}\"" + if err? + logger.error "CommandBusServer", "error while deserializing command", err + return djump res, 500, error: err + @commandBus.executeCommand command, (err) -> if err? logger.alert "CommandBusServer", "error while executing command (#{err})" @@ -104,4 +107,4 @@ djump = (res, code, obj) -> else res.end() -module.exports = CommandBusServer \ No newline at end of file +module.exports = CommandBusServer diff --git a/lib/command_handler.js b/lib/command_handler.js index 03408c0..86f2501 100644 --- a/lib/command_handler.js +++ b/lib/command_handler.js @@ -28,6 +28,7 @@ CommandHandler.registerAllInDirectoryOnBus = function (path, options) { var logger = options.logger; fs.readdirSync(path).forEach(function (fileName) { var CommandHandler = require(path + "/" + fileName); + CommandHandler.logger = logger; logger.log("CommandHandler", "loading command handler for command \"" + CommandHandler.getCommandName() + "\" (" + fileName + ")"); commandBus.registerCommandHandler(CommandHandler); }); diff --git a/lib/domain_repository.coffee b/lib/domain_repository.coffee index 67ca0ae..31bbae4 100644 --- a/lib/domain_repository.coffee +++ b/lib/domain_repository.coffee @@ -1,27 +1,25 @@ -async = require "async" Profiler = require "./profiler" EntityInstantiator = require "./entity_instantiator" util = require "util" -defer = require "./defer" - -COUCHDB_STORE = "couchdb" -REDIS_STORE = "redis" +Queue = require "./queue" class DomainRepository - constructor: ({@store, @emitter, @logger}) -> + constructor: ({@store, @emitter, @logger, @replaying}) -> throw new Error "Missing store" unless @store? throw new Error "Missing event bus emitter" unless @emitter? throw new Error "Missing logger" unless @logger? + @entityEvents = {} @directListeners = {} @nextDirectListenerKey = 0 @transacting = false @halted = false @silent = false - @transactionQueue = async.queue (transaction, done) => + @transactionQueue = new Queue (transaction, done) => if @halted @logger.warning "transaction", "skipped (#{@transactionQueue.length()} more transaction(s) in queue)" + transaction.callback() if transaction.callback? done() else @transacting = true @@ -38,7 +36,8 @@ class DomainRepository done() else @logger.log "transaction", "succeeded, comitting" - @_commit => + @_commit (err) => + throw err if err? @logger.log "transaction", "committed (#{@transactionQueue.length()} more transaction(s) in queue)" @transacting = false done() @@ -74,17 +73,19 @@ class DomainRepository entityInstantiator = new EntityInstantiator store: @store, Entity: Entity, logger: @logger entityInstantiator.findByUid uid, callback - replayAllEvents: (callback) -> - @store.findAllEvents (err, events) => - if events.length > 0 - eventQueue = async.queue (event, eventTaskCallback) => - event.replayed = true - @_publishEvent event, eventTaskCallback - , 1 - eventQueue.drain = callback - eventQueue.push events - else - callback() + replayAllEvents: (options, callback) -> + return callback new Error("Replay mode not set") unless @replaying + + [options, callback] = [{}, options] unless callback? + lastEvent = null + + @store.iterateOverAllEvents options, (event, eventHandlerCallback) => + event.replayed = true + lastEvent = event + @_publishEvent event, eventHandlerCallback + , (err) -> + return callback err if err? + callback null, lastEvent getLastPublishedEvents: () -> @emitter.lastEmittedEvents @@ -93,6 +94,7 @@ class DomainRepository @store.findAllEventsByEntityUid entityUid, callback add: (entity) -> + throw new Error "Cannot write during replay" if @replaying throw new Error "Entity is missing its UID" unless entity.uid? @entityEvents[entity.uid] ?= [] addedEvents = @entityEvents[entity.uid] @@ -140,21 +142,19 @@ class DomainRepository savedEvents = []; - entityQueue = async.queue (entityAppliedEvents, entityTaskCallback) => + entityQueue = new Queue (entityAppliedEvents, entityTaskCallback) => firstEvent = entityAppliedEvents.shift() if firstEvent? - queue = async.queue (event, eventTaskCallback) => + queue = new Queue (event, eventTaskCallback) => nextEvent = entityAppliedEvents.shift() queue.push nextEvent if nextEvent? @logger.log "commit", "saving event \"#{event.name}\" for entity #{event.entityUid}" - @store.saveEvent event, (err, event) => - savedEvents.push(event); - if err? - eventTaskCallback err - else - eventTaskCallback null + @store.saveEvent event, (err, event) -> + return callback err if err? + savedEvents.push event + eventTaskCallback null , 1 queue.drain = entityTaskCallback @@ -164,17 +164,21 @@ class DomainRepository , Infinity # TODO determine if it is safe to treat all entitys in parallel? - entityQueue.drain = (err) => - return callback err if err? + entityQueue.drain = => return callback null unless savedEvents.length > 0 - publicationQueue = async.queue (event, publicationCallback) => - @_publishEvent event, publicationCallback + + publicationQueue = new Queue (event, publicationCallback) => + @_publishEvent event, (err) -> + return callback err if err? + publicationCallback() , Infinity + publicationQueue.drain = callback publicationQueue.push savedEvents for entityUid, entityEvents of @entityEvents entityQueue.push [entityEvents] + @entityEvents = {} _rollback: (callback) -> @@ -182,30 +186,37 @@ class DomainRepository callback() _publishEvent: (event, callback) -> - defer => - @logger.log "publishEvent", "publishing \"#{event.name}\" from entity #{event.entityUid} to direct listeners" - @_publishEventToDirectListeners event, (err) => - @logger.warn "publishEvent", "a direct listener failed: #{err}" if err? - @logger.log "publishEvent", "publishing \"#{event.name}\" from entity #{event.entityUid} to event bus" - if @silent - callback() - else - @emitter.emit event, (err) => - @logger.log "publishEvent", "event publication failed: #{err}" if err? - callback err + @_publishEventToDirectListeners event, (err) => + return callback err if err? + @_publishEventToEventBus event, callback - _publishEventToDirectListeners: (event, callback) -> - directListeners = @directListeners[event.name] + _publishEventToEventBus: (event, callback) -> + if @silent + callback null + else + @emitter.emit event, callback - queue = async.queue (directListener, callback) -> - directListener event, callback - , Infinity + _publishEventToDirectListeners: (event, callback) -> + directListeners = @directListeners[event.name] + pending = 0 + errors = [] + queuedListeners = false + logger = @logger - queue.drain = callback - queuedListeners = false for _, directListener of directListeners - queuedListeners = true unless queuedListeners - queue.push directListener + if !queuedListeners + queuedListeners = true + logger.log "DomainRepository#_publishEventToDirectListeners", "publishing \"#{event.name}\" from entity #{event.entityUid} to direct listeners" + + pending++ + directListener event, (err) -> + if err? + logger.error "DomainRepository#_publishEventToDirectListeners", "a direct listener failed: #{err}" + errors.push err + pending-- + if pending is 0 + error = if errors.length > 0 then new Error "Some direct listener(s) failed" else null + callback error callback null unless queuedListeners diff --git a/lib/entity.coffee b/lib/entity.coffee index e1d8a4b..82984c0 100644 --- a/lib/entity.coffee +++ b/lib/entity.coffee @@ -1,4 +1,3 @@ -async = require "async" Q = require "q" DomainObject = require "./domain_object" inherit = require "./inherit" @@ -35,9 +34,9 @@ Entity = (name, finalCtor, Ctor) -> Base::_initializeAtVersion = (version) -> @$version = version @$appliedEvents = [] - @$domainRepository = finalCtor.$domainRepository - @$commandBus = finalCtor.$commandBus - @$logger = finalCtor.$logger + @$domainRepository = Entity.domainRepository + @$commandBus = Entity.commandBus + @$logger = Entity.logger Base::_serialize = (contained) -> throw new Error "References between entity are forbidden" if contained @@ -96,15 +95,14 @@ Entity = (name, finalCtor, Ctor) -> deferred = Q.defer() deferred.promise.nodeify callback if uid? - @$domainRepository.findEntityByUid @, uid, (err, entity) -> + @$domainRepository.findEntityByUid @, uid, (err, entity) => if err? deferred.reject err - else if not entity - deferred.reject new Error "Could not find entity with UID " + uid else + @$logger.error "findByUid", "Could not find entity <#{name}> with UID #{uid}" if not entity deferred.resolve entity else - deferred.reject(new Error 'Please provide a UID to find') + deferred.reject(new Error "Please provide a UID to find") deferred.promise Base.findAllEvents = (uid, callback) -> diff --git a/lib/entity_instantiator.js b/lib/entity_instantiator.js index 0062e17..d35cdd3 100644 --- a/lib/entity_instantiator.js +++ b/lib/entity_instantiator.js @@ -27,56 +27,72 @@ EntityInstantiator.prototype.findByUid = function (uid, callback) { }; EntityInstantiator.prototype._instantiateFromSnapshot = function (snapshot, callback) { - var self = this; - - var entity = DomainObject.deserialize(snapshot.contents); + var self = this; + var entity = DomainObject.deserialize(snapshot.contents); var uid = snapshot.entityUid; var version = snapshot.version; + entity._initializeAtVersion(version); - self.logger.log("EntityInstantiator", "looking for events of entity \""+ uid + "\" after version " + version); - self.store.findAllEventsByEntityUidAfterVersion(uid, version, function (err, events) { - self.logger.log("EntityInstantiator", "found " + events.length + " event(s) of entity \""+ uid + "\" after version " + version); + var eventHandler = function(event, callback) { + entity.applyEvent(event, callback); + }; + + self.store.iterateOverEntityEventsAfterVersion(uid, version, eventHandler, function (err) { if (err) return callback(err); - entity.applyEvents(events, function (err) { - if (err) return callback(err); - var snapshotAge = entity.$version - version; - if (snapshotAge > SNAPSHOT_MAX_AGE) { - self.logger.log("EntityInstantiator", "updating outdated snapshot (" + snapshotAge + " versions) of entity \""+ uid + "\""); - self._snapEntity(entity); - } - self.logger.log("EntityInstantiator", "instantiated entity \""+ uid + "\" from snapshot at version " + version); - callback(null, entity); + + self.logger.log("EntityInstantiator", "instantiated entity \""+ uid + "\" from snapshot at version " + version); + callback(null, entity); + + var snapshotAge = entity.$version - version; + + self._snapEntityIfNeeded(entity, snapshotAge, function(err) { + if (err) self.logger.error("EntityInstantiator", "Snap entity failed", err); }); }); }; EntityInstantiator.prototype._instantiateThroughEventsFromUid = function (uid, callback) { - var self = this; + var self = this; + var entity = new self.Entity(); + + var eventHandler = function(event, callback) { + entity.applyEvent(event, callback); + }; - self.store.findAllEventsByEntityUid(uid, function (err, events) { - if (err) { - callback(err); - } else if (events.length == 0) { + self.store.iterateOverEntityEvents(uid, eventHandler, function (err) { + if (err) return callback(err); + + if (entity.$version === 0) callback(null, null); - } else { - self.Entity.buildFromEvents(events, function (err, entity) { - if (err) return callback(err); - if (entity.$version > SNAPSHOT_MAX_AGE) - self._snapEntity(entity); - self.logger.log("EntityInstantiator", "instantiated entity \""+ uid + "\" at version " + entity.$version + " from " + events.length + " events"); - callback(null, entity); + else { + self.logger.log("EntityInstantiator", "instantiated entity \"" + uid + "\" at version " + entity.$version); + callback(null, entity); + + self._snapEntityIfNeeded(entity, entity.$version, function(err) { + if (err) self.logger.error("EntityInstantiator", "Snap entity failed", err); }); } }); }; -EntityInstantiator.prototype._snapEntity = function (entity) { +EntityInstantiator.prototype._snapEntityIfNeeded = function (entity, snapshotAge, callback) { + if (snapshotAge > SNAPSHOT_MAX_AGE) { + this.logger.log("EntityInstantiator", "updating snapshot (" + snapshotAge + " versions) of entity \"" + entity.uid + "\""); + this._snapEntity(entity, callback); + } else + callback(); +}; + +EntityInstantiator.prototype._snapEntity = function (entity, callback) { var self = this; self.logger.log("EntityInstantiator", "snapping entity \"" + entity.uid +"\" at version " + entity.$version); var snapshot = Snapshot.makeFromEntity(entity); - self.store.saveSnapshot(snapshot); + self.store.saveSnapshot(snapshot, function(err) { + if (err) return callback(err); + callback(); + }); }; module.exports = EntityInstantiator; diff --git a/lib/event_bus/common/receiver.js b/lib/event_bus/common/receiver.js index adf1c52..d118421 100644 --- a/lib/event_bus/common/receiver.js +++ b/lib/event_bus/common/receiver.js @@ -69,25 +69,26 @@ CommonEventBusReceiver.prototype._handleEvent = function (event, callback) { return callback(); } - var parallelHandling = async.queue(function (eventHandler, callback) { - eventHandler(event, function (err) { - if (err) { - self.logger.warning("CommonEventBusReceiver#_handleEvent", "an error occurred in an event handler: " + (err.stack || err.message || err)); - errors.push(err); + var pendingHandlers = 0; + eventHandlers.forEach(function(eventHandler){ + pendingHandlers++ + eventHandler(event, function (err) { + if (err) { + self.logger.warning("CommonEventBusReceiver#_handleEvent", "an error occurred in an event handler: " + (err.stack || err.message || err)); + errors.push(err); + } + pendingHandlers--; + if (pendingHandlers == 0) { + if (errors.length > 0) { + var error = new Error("Some errors occured when handling the event: [" + errors.join(" | ") + "]"); + callback(error); + } else { + self.lastEvent = event; + callback(null); } - callback(err); - }); - }, Infinity); - - parallelHandling.drain = function () { - if (errors.length > 0) { - var err = new Error("Some errors occured when handling the event: [" + errors.join(" | ") + "]"); - return callback(err); - } - self.lastEvent = event; - callback(); - }; - parallelHandling.push(eventHandlers); + } + }); + }); }; CommonEventBusReceiver.prototype.stop = function (callback) { diff --git a/lib/event_bus/redis/emitter.js b/lib/event_bus/redis/emitter.js index 42d9438..6df31ba 100644 --- a/lib/event_bus/redis/emitter.js +++ b/lib/event_bus/redis/emitter.js @@ -18,7 +18,8 @@ function RedisEventBusEmitter(options) { this.host = options.host; this.port = options.port; this.queuedCalls = []; - this.lastEmittedEvents = {} + this.lastEmittedEvents = {}; + this.replaying = !!options.replaying; }; RedisEventBusEmitter.prototype.emit = function (event, callback) { @@ -47,7 +48,7 @@ RedisEventBusEmitter.prototype.emit = function (event, callback) { self.queueManager.hget(subscribedKey, event.name, function (err, subscribed){ if (err) return callback(err); if (subscribed) { - self.logger.log("RedisEventBusEmitter", "pushing event \"" + event.name + "\" to key \"" + key + "\""); + self.logger.log("RedisEventBusEmitter", "pushing event \"" + event.name + "\" from entity " + event.entityUid + " to key \"" + key + "\""); transaction.lpush(key, value); self.lastEmittedEvents[queueName] = event; } diff --git a/lib/event_bus/redis/queue.js b/lib/event_bus/redis/queue.js index 40c6ecd..9da9743 100644 --- a/lib/event_bus/redis/queue.js +++ b/lib/event_bus/redis/queue.js @@ -9,29 +9,36 @@ var RedisEventBus; inherit(RedisEventBusQueue, CommonEventBusQueue); -var QUEUE_KEY_SEPARATOR = ":"; -var QUEUE_KEY_PREFIX = "event-bus" + QUEUE_KEY_SEPARATOR + "queues" + QUEUE_KEY_SEPARATOR; -var IN_QUEUE_KEY_PREFIX = "in" + QUEUE_KEY_SEPARATOR; -var OUT_QUEUE_KEY_PREFIX = "out" + QUEUE_KEY_SEPARATOR; -var LAST_KEY_INDEX = -1; -var QUEUE_KEY_SUBSCRIBED = "events" + QUEUE_KEY_SEPARATOR; +var QUEUE_KEY_SEPARATOR = ":"; +var QUEUE_KEY_PREFIX = "event-bus" + QUEUE_KEY_SEPARATOR + "queues" + QUEUE_KEY_SEPARATOR; +var IN_QUEUE_KEY_PREFIX = "in" + QUEUE_KEY_SEPARATOR; +var OUT_QUEUE_KEY_PREFIX = "out" + QUEUE_KEY_SEPARATOR; +var LAST_KEY_INDEX = -1; +var QUEUE_KEY_SUBSCRIBED = "events" + QUEUE_KEY_SEPARATOR; +var QUEUE_KEY_REPLAYING_PREFIX = "replay" + QUEUE_KEY_SEPARATOR; function RedisEventBusQueue(options) { CommonEventBusQueue.call(this, options); - RedisEventBus = RedisEventBus || require("../redis"); - this.retries = 0; - this.scope = options.scope; - this.host = options.host; - this.port = options.port; - this.stopped = true; - this.canStart = true; + RedisEventBus = RedisEventBus || require("../redis"); + this.retries = 0; + this.scope = options.scope; + this.host = options.host; + this.port = options.port; + this.stopped = true; + this.canStart = true; + this.replaying = !!options.replaying; } RedisEventBusQueue.prototype.getQueueKeyPrefix = function () { + var result = QUEUE_KEY_PREFIX; + if (this.scope) - return this.scope + QUEUE_KEY_SEPARATOR + QUEUE_KEY_PREFIX; - else - return QUEUE_KEY_PREFIX + QUEUE_KEY_SEPARATOR; + result = this.scope + QUEUE_KEY_SEPARATOR + result; + + if (this.replaying) + result += QUEUE_KEY_REPLAYING_PREFIX; + + return result; }; RedisEventBusQueue.prototype.getQueueSetKey = RedisEventBusQueue.prototype.getQueueKeyPrefix; diff --git a/lib/event_bus/redis/receiver.js b/lib/event_bus/redis/receiver.js index 8c3f721..b83c802 100644 --- a/lib/event_bus/redis/receiver.js +++ b/lib/event_bus/redis/receiver.js @@ -6,9 +6,10 @@ inherit(RedisEventBusReceiver, CommonEventBusReceiver); function RedisEventBusReceiver(options) { CommonEventBusReceiver.call(this, options); - this.scope = options.scope; - this.host = options.host; - this.port = options.port; + this.scope = options.scope; + this.host = options.host; + this.port = options.port; + this.replaying = !!options.replaying; } RedisEventBusReceiver.prototype.initialize = function (callback) { @@ -19,7 +20,8 @@ RedisEventBusReceiver.prototype.initialize = function (callback) { host: self.host, port: self.port, logger: self.logger, - scope: self.scope + scope: self.scope, + replaying : self.replaying }); self.queue.registerHandler(function (event, callback) { diff --git a/lib/event_store.coffee b/lib/event_store.coffee index 6f4110b..4747f28 100644 --- a/lib/event_store.coffee +++ b/lib/event_store.coffee @@ -1,6 +1,8 @@ -CouchDbEventStore = require "./event_store/couchdb" -MongoDbEventStore = require "./event_store/mongodb" +CouchDbEventStore = require "./event_store/couchdb" +MongoDbEventStore = require "./event_store/mongodb" +PostgresqlEventStore = require "./event_store/postgresql" module.exports = - CouchDb: CouchDbEventStore - MongoDb: MongoDbEventStore + CouchDb : CouchDbEventStore + MongoDb : MongoDbEventStore + Postgresql : PostgresqlEventStore diff --git a/lib/event_store/base.coffee b/lib/event_store/base.coffee index 600e6c0..619648e 100644 --- a/lib/event_store/base.coffee +++ b/lib/event_store/base.coffee @@ -9,6 +9,9 @@ class BaseEventStore findAllEvents: (options, callback) -> throw new Error "implement me" + iterateOverAllEvents: (options, eventHandler, callback) -> + throw new Error "implement me" + findAllEventsByEntityUid: (entityUid, options, callback) -> throw new Error "Implement me" diff --git a/lib/event_store/mongodb.coffee b/lib/event_store/mongodb.coffee index 96c3886..c84c4b6 100644 --- a/lib/event_store/mongodb.coffee +++ b/lib/event_store/mongodb.coffee @@ -22,6 +22,10 @@ class MongoDbEventStore extends Base @eventCollectionName = "events" @snapshotCollectionName = "snapshots" + createNewUid: (callback) -> + uid = uuid.v4() + callback null, uid + initialize: (callback) -> async.waterfall [ (next) => @@ -43,45 +47,86 @@ class MongoDbEventStore extends Base destroy: (callback) -> @_closeConnectionAndReturn @db, null, (err) => + return callback err if err? @db = null @eventCollection = null @snapshotCollection = null callback null _closeConnectionAndReturn: (db, err, callback) -> - db.close() if db? - callback err + if db? + db.close (closeErr) -> + return callback closeErr if closeErr? + callback err + else + callback err setup: (callback) -> async.series [ (next) => @eventCollection.remove next (next) => - @eventCollection.ensureIndex {"entityUid": 1}, next + @eventCollection.ensureIndex {entityUid: 1}, next + (next) => + @eventCollection.ensureIndex {entityUid: 1, version: 1}, { unique: true }, next (next) => - @eventCollection.ensureIndex {"entityUid": 1, "version": 1}, { unique: true }, next + @eventCollection.ensureIndex {timestamp: 1, uid: 1}, next (next) => @snapshotCollection.remove next (next) => - @snapshotCollection.ensureIndex {"entityUid": 1}, next + @snapshotCollection.ensureIndex {entityUid: 1}, next ], callback createNewUid: (callback) -> uid = uuid.v4() callback null, uid - findAllEvents: (callback) -> - p = new Profiler "MongoDbEventStore#_find(db request)", @logger - p.start() - @eventCollection.find({}).sort("timestamp":1).toArray (err, items) => - p.end() + iterateOverAllEvents: (options, eventHandler, callback) -> + [options, eventHandler, callback] = [{}, options, eventHandler] unless callback? - if err? - callback err - else if not items? - callback null, [] - else - @_instantiateEventsFromRows items, callback + query = {} + sortQuery = + timestamp: 1 + uid: 1 + + startUid = options.startUid + if startUid? + @eventCollection.findOne uid: startUid, (err, event) => + return callback err if err? + + query = + timestamp: + $gte: event.timestamp + uid: + $ne: startUid + + @_iterateOverEvents query, sortQuery, eventHandler, callback + else + @_iterateOverEvents query, sortQuery, eventHandler, callback + + iterateOverEntityEventsAfterVersion: (entityUid, version, eventHandler, callback) -> + @_iterateOverEvents {entityUid: entityUid, version: {$gt: version}}, {version:1}, eventHandler, callback + + iterateOverEntityEvents: (entityUid, eventHandler, callback) -> + @_iterateOverEvents {entityUid: entityUid}, {version:1}, eventHandler, callback + + _iterateOverEvents: (params, order, eventHandler, callback) -> + p = new Profiler "MongoDbEventStore#_iterateOverEvents(db request)", @logger + p.start() + cursor = @eventCollection.find(params).batchSize(1000).sort(order) + retrieve = => + cursor.nextObject (err, item) => + return callback err if err? + if item? + @_instantiateEventFromRow item, (err, event) -> + return callback err if err? + eventHandler event, (err) -> + return callback err if err? + defer retrieve + else + p.end() + callback null + retrieve() findAllEventsByEntityUid: (entityUid, order, callback) -> [order, callback] = [null, order] unless callback? @@ -94,9 +139,6 @@ class MongoDbEventStore extends Base findSomeEventsByEntityUidBeforeVersion: (entityUid, version, eventCount, callback) -> @_findLimited { entityUid: entityUid, version: { "$lte": versionĀ } }, eventCount, callback - findAllEventsByEntityUidAfterVersion: (entityUid, version, callback) -> - @_find entityUid: entityUid, version: { $gt: version }, callback - saveEvent: (event, callback) => @createNewUid (err, eventUid) => return callback err if err? @@ -171,7 +213,7 @@ class MongoDbEventStore extends Base @logger.alert "MongoDbEventStore#saveSnapshot", "failed to save snapshot of entity \"#{snapshot.entityUid}\": #{err}" else @logger.log "MongoDbEventStore#saveSnapshot", "saved snapshot for entity \"#{snapshot.entityUid}\"" - callback? err + callback err _findLimited: (params, eventCount, callback) -> p = new Profiler "MongoDbEventStore#_findLimited(db request)", @logger @@ -211,27 +253,8 @@ class MongoDbEventStore extends Base return callback null, events if rows.length is 0 rowsQueue = async.queue (row, rowCallback) => - uid = row.uid - name = row.name - entityUid = row.entityUid - data = row.data - timestamp = row.timestamp - version = row.version - - @_loadAttachmentsFromRow row, (err, attachments) -> - return rowCallback err if err? - - for attachmentName, attachmentBody of attachments - data[attachmentName] = attachmentBody - - event = new Event - name: name - data: data - uid: uid - entityUid: entityUid - timestamp: timestamp - version: version - + @_instantiateEventFromRow row, (err, event) -> + return callback err if err? events.push event defer rowCallback , 1 @@ -241,6 +264,30 @@ class MongoDbEventStore extends Base rowsQueue.push rows + _instantiateEventFromRow: (row, callback) -> + uid = row.uid + name = row.name + entityUid = row.entityUid + data = row.data + timestamp = row.timestamp + version = row.version + + @_loadAttachmentsFromRow row, (err, attachments) -> + return rowCallback err if err? + + for attachmentName, attachmentBody of attachments + data[attachmentName] = attachmentBody + + event = new Event + name: name + data: data + uid: uid + entityUid: entityUid + timestamp: timestamp + version: version + + callback null, event + _loadAttachmentsFromRow: (row, callback) -> attachments = {} for attachmentName, attachmentBody of row._attachments diff --git a/lib/event_store/postgresql.coffee b/lib/event_store/postgresql.coffee new file mode 100644 index 0000000..f406c32 --- /dev/null +++ b/lib/event_store/postgresql.coffee @@ -0,0 +1,422 @@ +url = require "url" +async = require "async" +uuid = require "node-uuid" +Event = require "../event" +Snapshot = require "../snapshot" +Base = require "./base" +Profiler = require "../profiler" +pg = require("pg").native +defer = require "../defer" +format = require("util").format + +class PostgresqlEventStore extends Base + + constructor: ({@uri, @logger}) -> + throw new Error "Missing URI" unless @uri + throw new Error "Missing logger" unless @logger + + @eventTableName = "events" + @snapshotTableName = "snapshots" + + createNewUid: (callback) -> + uid = uuid.v4() + callback null, uid + + initialize: (callback) -> + pg.connect @uri, (err, client, done) => + return @_handleError(err, client, done, callback) if err? + + done() + callback null + + _handleError: (err, pgClient, pgCallback, callback) -> + pgCallback(pgClient) + callback(err) + + _dropEventTable: (callback) -> + pg.connect @uri, (err, client, done) => + return @_handleError(err, client, done, callback) if err? + + query = "DROP TABLE IF EXISTS %s;" + query = format query, @eventTableName + + client.query query, (err) => + return @_handleError(err, client, done, callback) if err? + done() + callback() + + _createEventTable: (callback) -> + pg.connect @uri, (err, client, done) => + return @_handleError(err, client, done, callback) if err? + + query = "CREATE TABLE IF NOT EXISTS %s ( + id SERIAL PRIMARY KEY, + version integer, + name varchar(255), + data text, + entity_uid varchar(255), + timestamp bigint, + attachments text);" + query = format query, @eventTableName + + client.query query, (err) => + return @_handleError(err, client, done, callback) if err? + done() + callback() + + _createIndexOnEventTable: (callback) -> + pg.connect @uri, (err, client, done) => + return @_handleError(err, client, done, callback) if err? + + indexName = "#{@eventTableName}_entity_uid_idx" + + query = "DROP INDEX IF EXISTS %s; + CREATE INDEX ON %s (entity_uid);" + query = format query, indexName, @eventTableName + + client.query query, (err) => + return @_handleError(err, client, done, callback) if err? + done() + callback() + + _dropSnapshotTable: (callback) -> + pg.connect @uri, (err, client, done) => + return @_handleError(err, client, done, callback) if err? + + query = "DROP TABLE IF EXISTS %s;" + query = format query, @snapshotTableName + + client.query query, (err) => + return @_handleError(err, client, done, callback) if err? + done() + callback() + + _createSnapshotTable: (callback) -> + pg.connect @uri, (err, client, done) => + return @_handleError(err, client, done, callback) if err? + + query = "CREATE TABLE IF NOT EXISTS %s ( + id SERIAL PRIMARY KEY, + contents text, + entity_uid varchar(255), + version integer);" + query = format query, @snapshotTableName + + client.query query, (err) => + return @_handleError(err, client, done, callback) if err? + done() + callback null + + _createIndexOnSnapshotTable: (callback) -> + pg.connect @uri, (err, client, done) => + return @_handleError(err, client, done, callback) if err? + + indexName = "#{@snapshotTableName}_entity_uid_idx" + + query = "DROP INDEX IF EXISTS %s; + CREATE INDEX ON %s (entity_uid);" + query = format query, indexName, @snapshotTableName + + client.query query, (err) => + return @_handleError(err, client, done, callback) if err? + done() + callback null + + _addAutoIncrementOnEventVersion: (callback) -> + pg.connect @uri, (err, client, done) => + return @_handleError(err, client, done, callback) if err? + + query = "CREATE OR REPLACE FUNCTION events_version_auto_increment() + RETURNS trigger AS $$ + DECLARE + _rel_id constant int := 'events'::regclass::int; + BEGIN + PERFORM pg_advisory_xact_lock(_rel_id); + + SELECT COALESCE(MAX(version) + 1, 1) + INTO NEW.version + FROM events + WHERE entity_uid = NEW.entity_uid; + + RETURN NEW; + END; + $$ LANGUAGE plpgsql STRICT; + + DROP TRIGGER IF EXISTS events_version_auto_increment on events; + + CREATE TRIGGER events_version_auto_increment + BEFORE INSERT ON events + FOR EACH ROW WHEN (NEW.version IS NULL) + EXECUTE PROCEDURE events_version_auto_increment();" + + client.query query, (err) => + return @_handleError(err, client, done, callback) if err? + done() + callback null + + destroy: (callback) -> + callback null + + setup: (callback) -> + async.series [ + (next) => + @logger.info "Postgres event store", "Dropping event table" + @_dropEventTable next + (next) => + @logger.info "Postgres event store", "Creating event table" + @_createEventTable next + (next) => + @logger.info "Postgres event store", "Creating index on event table" + @_createIndexOnEventTable next + (next) => + @logger.info "Postgres event store", "Adding auto increment on event table" + @_addAutoIncrementOnEventVersion next + (next) => + @logger.info "Postgres event store", "Dropping snapshot table" + @_dropSnapshotTable next + (next) => + @logger.info "Postgres event store", "Creating snapshot table" + @_createSnapshotTable next + (next) => + @logger.info "Postgres event store", "Creating index on snapshot table" + @_createIndexOnSnapshotTable next + ], callback + + iterateOverAllEvents: (eventHandler, callback) -> + @_iterateOverEvents null, eventHandler, callback + + iterateOverEntityEventsAfterVersion: (entityUid, version, eventHandler, callback) -> + @_iterateOverEvents "entity_uid='#{entityUid}' AND version > #{version}", eventHandler, callback + + iterateOverEntityEvents: (entityUid, eventHandler, callback) -> + @_iterateOverEvents "entity_uid='#{entityUid}'", eventHandler, callback + + _iterateOverEvents: (params, eventHandler, callback) -> + p = new Profiler "PostgresqlEventStore#_iterateOverEvents (db request)", @logger + p.start() + + pg.connect @uri, (err, client, done) => + return @_handleError(err, client, done, callback) if err? + + query = "SELECT * FROM %s" + query += " WHERE %s" if params? + query += " ORDER BY id ASC;" + + if params? + query = format query, @eventTableName, params + else + query = format query, @eventTableName + + clientReceiver = client.query query + + clientReceiver.on "error", (err) => + p.end() + return @_handleError(err, client, done, callback) + + clientReceiver.on "row", (row) => + @_instantiateEventFromRow row, (err, event) -> + return @_handleError(err, client, done, callback) if err? + + eventHandler event, (err) -> + return @_handleError(err, client, done, callback) if err? + + clientReceiver.on "end", (results) => + p.end() + done() + callback() + + findAllEvents: (options, callback) -> + params = true + order = "ASC" + + @_find params, order, callback + + _find: (params, order, limit, callback) -> + [limit, callback] = [null, limit] unless callback? + + p = new Profiler "PostgresqlEventStore#_find(db request)", @logger + p.start() + + pg.connect @uri, (err, client, done) => + return @_handleError(err, client, done, callback) if err? + + query = "SELECT * FROM %s WHERE %s ORDER BY id %s" + query += " LIMIT %s" if limit? + query += ";" + + if limit? + query = format query, @eventTableName, params, order, limit + else + query = format query, @eventTableName, params, order + + clientReceiver = client.query query, (err, results) => + p.end() + return @_handleError(err, client, done, callback) if err? + done() + @_instantiateEventsFromRows results.rows, (err, events) -> + callback err, events + + _count: (params, callback) -> + p = new Profiler "PostgresqlEventStore#_count(db request)", @logger + p.start() + + pg.connect @uri, (err, client, done) => + return @_handleError(err, client, done, callback) if err? + + query = "SELECT COUNT(*) FROM %s WHERE %s;" + query = format query, @eventTableName, params + + clientReceiver = client.query query, (err, results) => + p.end() + return @_handleError(err, client, done, callback) if err? + done() + callback null, results.count + + findAllEventsByEntityUid: (entityUid, order, callback) -> + [order, callback] = [null, order] unless callback? + + params = "entity_uid='#{entityUid}'" + order ?= "ASC" + + @_find params, order, callback + + countAllEventsByEntityUid: (entityUid, callback) -> + params = "entity_uid='#{entityUid}'" + + @_count params, callback + + findSomeEventsByEntityUidBeforeVersion: (entityUid, version, eventCount, callback) -> + params = "entity_uid='#{{entityUid}}' AND version <= #{version}" + order = "ASC" + + @_find params, order, eventCount, callback + + escapeString: (string) -> + return "NULL" if string is null + hasBackSlash = ~string.indexOf "\\" + prefix = if hasBackSlash then "E" else "" + string = string.replace /'/g, "''" + string = string.replace /\\/g, "\\\\" + prefix + "'" + string + "'" + + saveEvent: (event, callback) => + p = new Profiler "PostgresqlEventStore#saveEvent (db request)", @logger + p.start() + + data = {} + attachments = {} + + for key, value of event.data + if value instanceof Buffer + attachments[key] = value + else + data[key] = value + + data = JSON.stringify data + attachments = JSON.stringify attachments + + pg.connect @uri, (err, client, done) => + return @_handleError(err, client, done, callback) if err? + + query = "INSERT INTO %s (name, entity_uid, timestamp, data, attachments) VALUES (%s, %s, %d, %s, %s);" + query = format query, @eventTableName, @escapeString(event.name), @escapeString(event.entityUid), event.timestamp, @escapeString(data), @escapeString(attachments) + + clientReceiver = client.query query, (err, results) => + p.end() + return @_handleError(err, client, done, callback) if err? + done() + callback null, event + + loadSnapshotForEntityUid: (uid, callback) -> + p = new Profiler "PostgresqlEventStore#loadSnapshotForEntityUid (db request)", @logger + p.start() + + pg.connect @uri, (err, client, done) => + return @_handleError(err, client, done, callback) if err? + + query = "SELECT * FROM %s WHERE %s;" + params = "entity_uid='#{uid}'" + + query = format query, @snapshotTableName, params + + clientReceiver = client.query query, (err, results) => + p.end() + return @_handleError(err, client, done, callback) if err? + done() + + rawSnapshot = results.rows[0] + snapshot = null + + if rawSnapshot? + snapshotAttributes = + version : rawSnapshot.version + entityUid : rawSnapshot.entity_uid + contents : JSON.parse(rawSnapshot.contents) + + snapshot = new Snapshot snapshotAttributes + + callback null, snapshot + + saveSnapshot: (snapshot, callback) -> + p = new Profiler "PostgresqlEventStore#saveSnapshot (db request)", @logger + p.start() + + pg.connect @uri, (err, client, done) => + return @_handleError(err, client, done, callback) if err? + + version = snapshot.version + contents = JSON.stringify(snapshot.contents) + entityUid = snapshot.entityUid + + query = "WITH upsert AS (UPDATE %s SET version=%d, contents=%s WHERE entity_uid=%s RETURNING *) INSERT INTO %s (version, contents, entity_uid) SELECT %d, %s, %s WHERE NOT EXISTS (SELECT * FROM upsert);" + query = format query, @snapshotTableName, version, @escapeString(contents), @escapeString(entityUid), @snapshotTableName, version, @escapeString(contents), @escapeString(entityUid) + + clientReceiver = client.query query, (err, results) => + p.end() + return @_handleError(err, client, done, callback) if err? + done() + callback null + + _instantiateEventsFromRows: (rows, callback) -> + events = [] + return callback null, events if rows.length is 0 + + rowsQueue = async.queue (row, rowCallback) => + @_instantiateEventFromRow row, (err, event) -> + return callback err if err? + events.push event + defer rowCallback + , 1 + + rowsQueue.drain = -> + callback null, events + + rowsQueue.push rows + + _instantiateEventFromRow: (row, callback) -> + data = + uid : row.id + name : row.name + entityUid : row.entity_uid + data : JSON.parse(row.data) + timestamp : row.timestamp + version : row.version + + @_loadAttachmentsFromRow row, (err, attachments) -> + return rowCallback err if err? + + for attachmentName, attachmentBody of attachments + data[attachmentName] = attachmentBody + + event = new Event data + + callback null, event + + _loadAttachmentsFromRow: (row, callback) -> + attachments = {} + for attachmentName, attachmentBody of JSON.parse(row.attachments) + attachments[attachmentName] = attachmentBody.buffer + + callback null, attachments + +module.exports = PostgresqlEventStore diff --git a/lib/logger.js b/lib/logger.js index 0d25dc2..7df2d6b 100644 --- a/lib/logger.js +++ b/lib/logger.js @@ -2,7 +2,7 @@ var Logger = require("devnull"); var logLevel = process.env.LOG_LEVEL != null ? process.env.LOG_LEVEL : 6; var logger = new Logger({ - timestamp: false, + timestamp: true, level: logLevel }); @@ -11,13 +11,13 @@ logger.http = function(req, res, next) { req._startTime = process.hrtime(); res.end = function(chunk, encoding) { - var status = res.statusCode; - var len = parseInt(res.getHeader("Content-Length"), 10); - len = isNaN(len) ? "" : "- " + len; + var status = res.statusCode; + var len = parseInt(res.getHeader("Content-Length"), 10); + len = isNaN(len) ? "" : "- " + len; - var diff = process.hrtime(req._startTime); - var duration = (diff[0] * 1e9 + diff[1]) / 1000000; - var line = "" + req.method + " " + req.originalUrl + " " + res.statusCode + " " + duration + "ms " + len; + var diff = process.hrtime(req._startTime); + var duration = (diff[0] * 1e9 + diff[1]) / 1000000; + var line = "" + req.method + " " + req.originalUrl + " " + res.statusCode + " " + duration + "ms " + len; logger.log("http", line); res.end = end; diff --git a/lib/main.js b/lib/main.js index b56cdbd..fc14563 100644 --- a/lib/main.js +++ b/lib/main.js @@ -1,4 +1,4 @@ -require("coffee-script"); +require("coffee-script/register"); module.exports = { Assembler: require("./assembler"), diff --git a/lib/profiler.js b/lib/profiler.js index 642f3a0..f84a322 100644 --- a/lib/profiler.js +++ b/lib/profiler.js @@ -26,4 +26,4 @@ function _padLeft(str, length) { return str; }; -module.exports = Profiler; \ No newline at end of file +module.exports = Profiler; diff --git a/lib/queue.js b/lib/queue.js new file mode 100644 index 0000000..379ac48 --- /dev/null +++ b/lib/queue.js @@ -0,0 +1,37 @@ +function Queue(worker, concurrency) { + var workers = 0; + var q = { + tasks : [], + concurrency : concurrency, + drain : null, + push : function(data) { + if (!(data instanceof Array)) + data = [data]; + if (data.length == 0 && q.drain) + setImmediate(q.drain); + for (i = 0; i < data.length; i++) { + q.tasks.push(data[i]); + setImmediate(q.process); + } + }, + process : function() { + if (workers < q.concurrency && q.tasks.length) { + var task = q.tasks.shift(); + workers++; + var next = function () { + workers--; + if (q.drain && q.tasks.length + workers === 0) + q.drain(); + setImmediate(q.process); + }; + worker(task, next); + } + }, + length: function () { + return q.tasks.length; + } + }; + return q; +} + +module.exports = Queue; diff --git a/lib/reporter.js b/lib/reporter.js index 9915eec..fcadfbb 100644 --- a/lib/reporter.js +++ b/lib/reporter.js @@ -5,6 +5,7 @@ function Reporter(attributes) { this.app = attributes.app; this.logger = attributes.logger; this.eventBusReceiver = attributes.eventBusReceiver; + this.replaying = !!attributes.replaying; } Reporter.prototype.destroy = function(callback) { diff --git a/lib/snapshot.js b/lib/snapshot.js index f3787be..d7c6738 100644 --- a/lib/snapshot.js +++ b/lib/snapshot.js @@ -17,4 +17,4 @@ Snapshot.makeFromEntity = function (entity) { return snapshot; } -module.exports = Snapshot; \ No newline at end of file +module.exports = Snapshot; diff --git a/package.json b/package.json index b50e3e7..aa7e80c 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "author": "Julien Biezemans ", "name": "plutonium", - "version": "0.0.0-alpha", + "version": "0.0.1", "description": "Infrastructure framework based on DDD, Event Sourcing and CQRS principles", "main": "lib/main", "scripts": { @@ -18,14 +18,15 @@ "framework" ], "dependencies": { - "coffee-script": "1.3.3", - "async": "0.1.22", + "coffee-script": "1.7.1", + "async": "0.9.0", "node-uuid": "1.3.3", "redis": "0.9.0", - "hiredis": "~0.1.14", + "hiredis": "0.1.16", "nano": "~3.3.6", "devnull": "0.0.10", "mongodb": "~1.2.12", + "pg": "~3.4.2", "bson": "~0.1.7", "multipart": "0.1.5", "crypto": "0.0.3",