Skip to content
This repository was archived by the owner on Oct 3, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 21 additions & 21 deletions lib/assembler.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ Assembler.prototype.assembleApp = function (callback) {
}
}

var createApp = function createApp() {
var createApp = function createApp(callback) {
self.app = new self.App(appOptions);
callback();
};
Expand All @@ -111,10 +111,13 @@ Assembler.prototype.assembleApp = function (callback) {
return callback(err);
appOptions.domainRepository = self.domainRepository;
appOptions.commandBus = self.commandBus;
createApp();
createApp(function(){
DomainObject.initializeConstructors();
callback();
});
});
} else {
createApp();
createApp(callback);
}
};

Expand Down Expand Up @@ -241,7 +244,6 @@ Assembler.prototype.tearDownApp = function (callback) {
var self = this;

// TODO: review this, anything else to unload/destroy?

async.series([
function (next) {
self.destroyEventBusEmitter(next);
Expand Down Expand Up @@ -277,19 +279,19 @@ Assembler.prototype.destroyReporters = function (callback) {
if (self.reporters.length == 0)
return callback();

var queue = async.queue(function (reporter, callback) {
var queue = async.queue(function (reporter, taskCallback) {
reporter.destroy(function (err) {
if (err)
if (err) {
self.logger.error("Assembler#unloadReporters", "destroying reporter failed: " + err);
callback(err);
return callback(err);
}
taskCallback();
});
}, Infinity);

queue.drain = function (err) {
if (err)
self.logger.error("Assembler#unloadReporters", "error: " + err);
queue.drain = function() {
self.reporters = [];
callback(err);
callback();
};
queue.push(self.reporters);
};
Expand All @@ -300,23 +302,23 @@ Assembler.prototype.destroyReportStores = function (callback) {
if (self.reportStores.length == 0)
return callback();

var queue = async.queue(function (reportStore, callback) {
var queue = async.queue(function (reportStore, taskCallback) {
if (reportStore.destroy) {
reportStore.destroy(function (err) {
if (err)
if (err) {
self.logger.error("Assembler#destroyReportStores", "destroying report store failed: " + err);
callback(err);
return callback(err);
}
taskCallback();
});
} else {
callback();
taskCallback();
}
}, Infinity);

queue.drain = function (err) {
if (err)
self.logger.error("Assembler#destroyReportStores", "error: " + err);
queue.drain = function() {
self.reportStores = [];
callback(err);
callback();
};
queue.push(self.reportStores);
};
Expand Down Expand Up @@ -411,8 +413,6 @@ Assembler.prototype._assembleDomainInfrastructure = function (callback) {
logger: self.logger
});

DomainObject.initializeConstructors();

self.eventBusEmitter = eventBusEmitter;
self.domainRepository = domainRepository;
self.commandBus = commandBus;
Expand Down
1 change: 1 addition & 0 deletions lib/command_bus.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class CommandBus
p2.end()
p.end()
done args...
transaction.callback = callback
else
transaction = (done) ->
p.start()
Expand Down
5 changes: 2 additions & 3 deletions lib/command_bus_client.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ class CommandBusClient
request = @_makeRequest path: "/commands"
stream = request.stream

request.on "error", (err) ->
callback err
request.on "error", callback

request.on "response", (response) =>
@_processResponse response, callback
Expand Down Expand Up @@ -96,4 +95,4 @@ class CommandBusClient
logger.error "CommandBusClient", "command failed remotely: #{error.stack || error.message || error}"
callback error

module.exports = CommandBusClient
module.exports = CommandBusClient
1 change: 0 additions & 1 deletion lib/command_bus_server.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ class CommandBusServer

logger.log "CommandBusServer", "deserialize command \"#{commandName}\""
@commandBus.deserializeCommand commandName, payload, (err, command) =>
logger.log "CommandBusServer", "start command \"#{commandName}\""
@commandBus.executeCommand command, (err) ->
if err?
logger.alert "CommandBusServer", "error while executing command (#{err})"
Expand Down
1 change: 1 addition & 0 deletions lib/command_handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ CommandHandler.registerAllInDirectoryOnBus = function (path, options) {
var logger = options.logger;
fs.readdirSync(path).forEach(function (fileName) {
var CommandHandler = require(path + "/" + fileName);
CommandHandler.logger = logger;
logger.log("CommandHandler", "loading command handler for command \"" + CommandHandler.getCommandName() + "\" (" + fileName + ")");
commandBus.registerCommandHandler(CommandHandler);
});
Expand Down
15 changes: 5 additions & 10 deletions lib/domain_repository.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class DomainRepository
@transactionQueue = async.queue (transaction, done) =>
if @halted
@logger.warning "transaction", "skipped (#{@transactionQueue.length()} more transaction(s) in queue)"
transaction.callback() if transaction.callback?
done()
else
@transacting = true
Expand Down Expand Up @@ -75,16 +76,10 @@ class DomainRepository
entityInstantiator.findByUid uid, callback

replayAllEvents: (callback) ->
@store.findAllEvents (err, events) =>
if events.length > 0
eventQueue = async.queue (event, eventTaskCallback) =>
event.replayed = true
@_publishEvent event, eventTaskCallback
, 1
eventQueue.drain = callback
eventQueue.push events
else
callback()
@store.findAllEventsOneByOne (err, event, eventHandlerCallback) =>
event.replayed = true
@_publishEvent event, eventHandlerCallback
, callback

getLastPublishedEvents: () ->
@emitter.lastEmittedEvents
Expand Down
11 changes: 5 additions & 6 deletions lib/entity.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ Entity = (name, finalCtor, Ctor) ->
Base::_initializeAtVersion = (version) ->
@$version = version
@$appliedEvents = []
@$domainRepository = finalCtor.$domainRepository
@$commandBus = finalCtor.$commandBus
@$logger = finalCtor.$logger
@$domainRepository = Entity.domainRepository
@$commandBus = Entity.commandBus
@$logger = Entity.logger

Base::_serialize = (contained) ->
throw new Error "References between entity are forbidden" if contained
Expand Down Expand Up @@ -96,12 +96,11 @@ Entity = (name, finalCtor, Ctor) ->
deferred = Q.defer()
deferred.promise.nodeify callback
if uid?
@$domainRepository.findEntityByUid @, uid, (err, entity) ->
@$domainRepository.findEntityByUid @, uid, (err, entity) =>
if err?
deferred.reject err
else if not entity
deferred.reject new Error "Could not find entity with UID " + uid
else
@$logger.error "findByUid", "Could not find entity with UID #{uid}" if not entity
deferred.resolve entity
else
deferred.reject(new Error 'Please provide a UID to find')
Expand Down
37 changes: 19 additions & 18 deletions lib/event_bus/common/receiver.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,25 +69,26 @@ CommonEventBusReceiver.prototype._handleEvent = function (event, callback) {
return callback();
}

var parallelHandling = async.queue(function (eventHandler, callback) {
eventHandler(event, function (err) {
if (err) {
self.logger.warning("CommonEventBusReceiver#_handleEvent", "an error occurred in an event handler: " + (err.stack || err.message || err));
errors.push(err);
var pendingHandlers = 0;
eventHandlers.forEach(function(eventHandler){
pendingHandlers++
eventHandler(event, function (err) {
if (err) {
self.logger.warning("CommonEventBusReceiver#_handleEvent", "an error occurred in an event handler: " + (err.stack || err.message || err));
errors.push(err);
}
pendingHandlers--;
if (pendingHandlers == 0) {
if (errors.length > 0) {
var error = new Error("Some errors occured when handling the event: [" + errors.join(" | ") + "]");
callback(error);
} else {
self.lastEvent = event;
callback(null);
}
callback(err);
});
}, Infinity);

parallelHandling.drain = function () {
if (errors.length > 0) {
var err = new Error("Some errors occured when handling the event: [" + errors.join(" | ") + "]");
return callback(err);
}
self.lastEvent = event;
callback();
};
parallelHandling.push(eventHandlers);
}
});
});
};

CommonEventBusReceiver.prototype.stop = function (callback) {
Expand Down
5 changes: 4 additions & 1 deletion lib/event_store/base.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ class BaseEventStore
findAllEvents: (options, callback) ->
throw new Error "implement me"

findAllEventsOneByOne: (options, callback) ->
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this, move the behaviour to findAllEvents

throw new Error "implement me"

findAllEventsByEntityUid: (entityUid, options, callback) ->
throw new Error "Implement me"

Expand All @@ -23,4 +26,4 @@ class BaseEventStore
# implement me if you want snapshots in your store
callback? null

module.exports = BaseEventStore
module.exports = BaseEventStore
80 changes: 47 additions & 33 deletions lib/event_store/mongodb.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,19 @@ class MongoDbEventStore extends Base

destroy: (callback) ->
@_closeConnectionAndReturn @db, null, (err) =>
return callback err if err?
@db = null
@eventCollection = null
@snapshotCollection = null
callback null

_closeConnectionAndReturn: (db, err, callback) ->
db.close() if db?
callback err
if db?
db.close (closeErr) ->
return callback closeErr if closeErr?
callback err
else
callback err

setup: (callback) ->
async.series [
Expand All @@ -70,18 +75,22 @@ class MongoDbEventStore extends Base
uid = uuid.v4()
callback null, uid

findAllEvents: (callback) ->
findAllEventsOneByOne: (eventHandler, callback) ->
p = new Profiler "MongoDbEventStore#_find(db request)", @logger
p.start()
@eventCollection.find({}).sort("timestamp":1).toArray (err, items) =>
p.end()

if err?
callback err
else if not items?
callback null, []
else
@_instantiateEventsFromRows items, callback
cursor = @eventCollection.find({}).sort("timestamp":1)
retrieve = =>
cursor.nextObject (err, item) =>
return callback err if err?
if item?
@_instantiateEventFromRow item, (err, event) ->
eventHandler err, event, (err) ->
return callback err if err?
retrieve()
else
p.end()
callback null
retrieve()

findAllEventsByEntityUid: (entityUid, order, callback) ->
[order, callback] = [null, order] unless callback?
Expand Down Expand Up @@ -211,27 +220,8 @@ class MongoDbEventStore extends Base
return callback null, events if rows.length is 0

rowsQueue = async.queue (row, rowCallback) =>
uid = row.uid
name = row.name
entityUid = row.entityUid
data = row.data
timestamp = row.timestamp
version = row.version

@_loadAttachmentsFromRow row, (err, attachments) ->
return rowCallback err if err?

for attachmentName, attachmentBody of attachments
data[attachmentName] = attachmentBody

event = new Event
name: name
data: data
uid: uid
entityUid: entityUid
timestamp: timestamp
version: version

@_instantiateEventFromRow row, (err, event) ->
return callback err if err?
events.push event
defer rowCallback
, 1
Expand All @@ -241,6 +231,30 @@ class MongoDbEventStore extends Base

rowsQueue.push rows

_instantiateEventFromRow: (row, callback) ->
uid = row.uid
name = row.name
entityUid = row.entityUid
data = row.data
timestamp = row.timestamp
version = row.version

@_loadAttachmentsFromRow row, (err, attachments) ->
return rowCallback err if err?

for attachmentName, attachmentBody of attachments
data[attachmentName] = attachmentBody

event = new Event
name: name
data: data
uid: uid
entityUid: entityUid
timestamp: timestamp
version: version

callback null, event

_loadAttachmentsFromRow: (row, callback) ->
attachments = {}
for attachmentName, attachmentBody of row._attachments
Expand Down
2 changes: 1 addition & 1 deletion lib/main.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
require("coffee-script");
require("coffee-script/register");

module.exports = {
Assembler: require("./assembler"),
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
"framework"
],
"dependencies": {
"coffee-script": "1.3.3",
"coffee-script": "1.7.1",
"async": "0.1.22",
"node-uuid": "1.3.3",
"redis": "0.9.0",
"hiredis": "~0.1.14",
"hiredis": "0.1.16",
"nano": "~3.3.6",
"devnull": "0.0.10",
"mongodb": "~1.2.12",
Expand Down