Skip to content
This repository was archived by the owner on Oct 3, 2024. It is now read-only.
Open

Beta #58

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
93 commits
Select commit Hold shift + click to select a range
8f613ff
Sort by timestamp when finding all events not by version, should find…
thibaultponcelet May 14, 2013
d1668b7
Handle large binary data in an appropriate manner
thibaultponcelet May 29, 2013
8c55a5e
Handle undefined argument in command bus client
thibaultponcelet May 30, 2013
b9f9a49
Do not overrride 0 command arguments with null
thibaultponcelet May 30, 2013
0781b3f
Add possibility to choose the order when getting aggreate events
djeusette Jul 12, 2013
571ebcb
Merge pull request #2 from Djengo/new-admin
thibaultponcelet Jul 16, 2013
f547a28
Optimize redis event bus by publishing listened events only
thibaultponcelet Jul 26, 2013
a1f6384
Add CommonEventBusReceiver#onEvents method in order to specify the same
thibaultponcelet Sep 10, 2013
dbbcd4f
Inject app in the reporters
thibaultponcelet Sep 12, 2013
d001c9d
Store app at reporter instantiation
thibaultponcelet Sep 12, 2013
93f94aa
Merge pull request #4 from Djengo/test
djeusette Sep 25, 2013
75c7913
Merge branch 'dev' of github.com:Djengo/plutonium into dev
djeusette Oct 25, 2013
15023cb
Use a cursor to fetch all events from mongodb database, remove async …
thibaultponcelet Feb 18, 2014
5802868
Refactor replay mechanism
thibaultponcelet Feb 18, 2014
3f13432
Update coffee script
thibaultponcelet Feb 20, 2014
faf9561
Bump hiredis
thibaultponcelet Feb 20, 2014
71b0681
Callback executeCommand even when a validation exists
thibaultponcelet Feb 21, 2014
80eb628
Properly handle errors in async.queue and fix closeConnectionAndRetur…
djeusette Feb 21, 2014
d92c181
Set replayed flag on events during replay
thibaultponcelet Feb 24, 2014
f212ec7
Use Entity instead of finalCtor to initialize
thibaultponcelet Feb 24, 2014
17f3fb6
Initialize domain objects after app instantiation
thibaultponcelet Feb 25, 2014
e21b038
Merge pull request #5 from Djengo/improve-replay-from-alpha
djeusette Mar 5, 2014
7bb15ae
Merge branch 'dev' of github.com:Djengo/plutonium into dev
djeusette Mar 6, 2014
620a40c
Add logger in CommandHandler
djeusette Mar 6, 2014
a7a8792
Avoid returning an error when entity could not be found
djeusette Mar 15, 2014
5769cdc
Refactor condition statement
djeusette Mar 15, 2014
f7b9cd7
Use a cursor to fetch all events from mongodb database, remove async …
thibaultponcelet Feb 18, 2014
291ba70
Refactor replay mechanism
thibaultponcelet Apr 1, 2014
c8cadbe
Bump coffee-script
thibaultponcelet Apr 1, 2014
897a44d
Bump hiredis
thibaultponcelet Feb 20, 2014
2df27bb
Callback executeCommand even when a validation exists
thibaultponcelet Feb 21, 2014
8c43181
Handle errors appropriately in mongoDB event store closeConnectionAnd…
thibaultponcelet Apr 1, 2014
5fd0201
Handle erors appropriately in async queues for Assembler
thibaultponcelet Apr 1, 2014
c9605f2
Set replayed flag on events during replay
thibaultponcelet Feb 24, 2014
aaa20bd
Use Entity instead of finalCtor to initialize
thibaultponcelet Feb 24, 2014
7a4ca99
Initialize domain objects after app instantiation
thibaultponcelet Feb 25, 2014
6264ee5
Add logger in CommandHandler
djeusette Mar 6, 2014
9fbad9b
Avoid returning an error when entity could not be found
djeusette Mar 15, 2014
16e6774
Refactor condition statement
djeusette Mar 15, 2014
0a9eb4a
Add timestamp to logs
thibaultponcelet Apr 28, 2014
001c17c
Add replaying flag to assembler and event bus
thibaultponcelet May 9, 2014
563ee34
Handle two-steps replay in domain repository
thibaultponcelet May 9, 2014
6d1f4b8
Do not start command bus server when replaying
thibaultponcelet May 12, 2014
2cdf625
Merge branch 'dev' of github.com:Djengo/plutonium into dev
djeusette May 17, 2014
5f2aa45
Add information about entity name
djeusette May 17, 2014
c3c7937
Add index on mongo event store to support two step replays (non backw…
thibaultponcelet May 20, 2014
e735a34
Pass replaying mode to reports
thibaultponcelet May 20, 2014
8fa1b96
Pass replaying option to reporters
thibaultponcelet May 20, 2014
d2045ba
Merge pull request #6 from Djengo/new-replay
djeusette May 28, 2014
4c6a8bf
Do not erroneous sort after findOne in findAllEventsOneByOne
thibaultponcelet Jun 5, 2014
3720227
Fix commandName var in command bus
thibaultponcelet Jun 6, 2014
fe83893
Postgres EventStore: WIP
djeusette May 21, 2014
1ebf98b
Re add createNewUid method in mongodb event store
djeusette May 21, 2014
4ae6c1b
Fix postgres event store setup.
djeusette May 21, 2014
7a7349b
Escape strings before inserting them into the event store
djeusette May 22, 2014
8f3c848
Refactor event store interface
djeusette May 23, 2014
f0ca471
Use iterators in entity instantiator where possible
djeusette May 23, 2014
514fb41
Ensure inner callback have returned in iterateOverX methods
djeusette May 23, 2014
26f78f4
Remove useless log in entity instantiator
djeusette May 23, 2014
1a602a6
Handle errors
djeusette May 30, 2014
6b9262d
Re-add log on error
djeusette May 30, 2014
628711d
Use logger warn level
djeusette May 30, 2014
db2fcf2
Remove useless logs
djeusette May 30, 2014
fb21a76
Re add options arguments in replayAllEvents
djeusette May 30, 2014
7ade2fb
Snap entity asynchronously
djeusette May 30, 2014
0e5b08f
Snap entity asynchronously in instantiateFromEvents
djeusette May 30, 2014
77667a0
Fix interface in base event store
djeusette Jun 4, 2014
037e7da
Refactor entity instantiator and remove console.log
djeusette Jun 5, 2014
c83173f
Fix fat arrow
thibaultponcelet Jun 16, 2014
b37e841
Merge pull request #7 from Djengo/postgres
thibaultponcelet Jun 16, 2014
cdafea6
Defer recursive call
djeusette Jun 18, 2014
6af442f
Merge pull request #8 from Djengo/recursion
djeusette Jun 18, 2014
10d256a
Align lines
djeusette Sep 4, 2014
87ea7e1
Add warning to debug command stacking
djeusette Sep 4, 2014
354bc1a
Add logs
djeusette Sep 4, 2014
5a9501b
Add logger info for debug purpose
djeusette Sep 4, 2014
f0d8f4b
Remove logs used for debugging purpose
djeusette Sep 4, 2014
55586c5
Add readOnly option in assembler to avoid having the commandBus serve…
djeusette Sep 5, 2014
a31d46b
Update async version
djeusette Sep 5, 2014
de7a63b
Change postgres event store
djeusette Sep 5, 2014
6ebc026
Return in case of error
djeusette Sep 6, 2014
377627b
Improve publish to direct listener
djeusette Sep 6, 2014
c04fa97
Remove defer where useless
djeusette Sep 7, 2014
8623527
Change version
djeusette Sep 7, 2014
397aeb0
Remove unused constants
djeusette Sep 7, 2014
b42cbbc
Personal implementation of a queue system
djeusette Sep 7, 2014
83b4217
Use own queue implementation that's faster than async.queue
djeusette Sep 9, 2014
71ed00e
Refactor call to callback
djeusette Sep 29, 2014
9e0d7dc
Do not use defer in Queue worker anymore
djeusette Sep 29, 2014
d86e832
Fix readOnly assembler option
djeusette Oct 2, 2014
ae264ad
Fix logger scope and use batchSize on cursor for replay
djeusette Sep 25, 2014
8f23849
Only log the publishing to directlisteners information when an event …
djeusette Sep 25, 2014
10e6ff4
Refactor publish event method
djeusette Sep 25, 2014
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 56 additions & 38 deletions lib/assembler.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ var AggregateRoot = require("./aggregate_root");
var Service = require("./service");
var logger = require("./logger");

function Assembler(App, configurationPath) {
function Assembler(App, configurationPath, options) {
if (!App)
throw new Error("Missing app constructor");
if (!configurationPath)
throw new Error("Missing configuration file path");

if (!options)
options = {};

this.App = App;
this.configurationPath = configurationPath;
this.logger = logger;
Expand All @@ -22,6 +25,7 @@ function Assembler(App, configurationPath) {
this.reportStores = [];
this.eventBusReceivers = [];
this.env = process.env.NODE_ENV || "development";
this.replaying = !!options.replaying;
}

// Assemble an app, load the reporters.
Expand All @@ -30,8 +34,9 @@ Assembler.prototype.loadCompleteApp = function (options, callback) {

if (!callback) {
callback = options;
options = {}
options = {};
}

if (typeof options.clearQueues == "undefined")
options.clearQueues = false;
if (typeof options.setupEventStore == "undefined")
Expand All @@ -40,10 +45,12 @@ Assembler.prototype.loadCompleteApp = function (options, callback) {
options.setupReportStores = false;
if (typeof options.startReporters == "undefined")
options.startReporters = true;
if (typeof options.readOnly == "undefined")
options.readOnly = false;

async.series([
function (next) {
self.assembleApp(next);
self.assembleApp(options, next);
},
function (next) {
if (options.clearQueues)
Expand Down Expand Up @@ -81,7 +88,7 @@ Assembler.prototype.loadCompleteApp = function (options, callback) {
};

// Assemble an app only, no reporters
Assembler.prototype.assembleApp = function (callback) {
Assembler.prototype.assembleApp = function (options, callback) {
var self = this;

if (self.app)
Expand All @@ -100,21 +107,24 @@ Assembler.prototype.assembleApp = function (callback) {
}
}

var createApp = function createApp() {
var createApp = function createApp(callback) {
self.app = new self.App(appOptions);
callback();
};

if (self.configuration.domain) {
self._assembleDomainInfrastructure(function(err){
self._assembleDomainInfrastructure(options, function(err){
if (err)
return callback(err);
appOptions.domainRepository = self.domainRepository;
appOptions.commandBus = self.commandBus;
createApp();
createApp(function(){
DomainObject.initializeConstructors();
callback();
});
});
} else {
createApp();
createApp(callback);
}
};

Expand All @@ -141,7 +151,8 @@ Assembler.prototype.loadReporters = function (options, callback) {
var eventBusReceiverConfig = {
queueName: reporterSettings.eventBusReceiver.queue,
logger: self.logger,
scope: self.configuration.scope
scope: self.configuration.scope,
replaying: self.replaying
};

if (reporterSettings.eventBusReceiver.options) {
Expand All @@ -154,7 +165,8 @@ Assembler.prototype.loadReporters = function (options, callback) {
var reporterOptions = {
eventBusReceiver: eventBusReceiver,
logger: logger,
app: self.app
app: self.app,
replaying: self.replaying
};

var reporter = new reporterSettings.ctor(reporterOptions);
Expand Down Expand Up @@ -189,7 +201,8 @@ Assembler.prototype.initializeReports = function (callback) {
var ReportStore = reportSettings.store.ctor;
var reportStore = new ReportStore({
uri: reportSettings.store.uri,
logger: self.logger
logger: self.logger,
replaying: self.replaying
});
Report.initialize({store: reportStore, logger: self.logger});
self.reports.push(Report);
Expand Down Expand Up @@ -241,7 +254,6 @@ Assembler.prototype.tearDownApp = function (callback) {
var self = this;

// TODO: review this, anything else to unload/destroy?

async.series([
function (next) {
self.destroyEventBusEmitter(next);
Expand Down Expand Up @@ -277,19 +289,19 @@ Assembler.prototype.destroyReporters = function (callback) {
if (self.reporters.length == 0)
return callback();

var queue = async.queue(function (reporter, callback) {
var queue = async.queue(function (reporter, taskCallback) {
reporter.destroy(function (err) {
if (err)
if (err) {
self.logger.error("Assembler#unloadReporters", "destroying reporter failed: " + err);
callback(err);
return callback(err);
}
taskCallback();
});
}, Infinity);

queue.drain = function (err) {
if (err)
self.logger.error("Assembler#unloadReporters", "error: " + err);
queue.drain = function() {
self.reporters = [];
callback(err);
callback();
};
queue.push(self.reporters);
};
Expand All @@ -300,23 +312,23 @@ Assembler.prototype.destroyReportStores = function (callback) {
if (self.reportStores.length == 0)
return callback();

var queue = async.queue(function (reportStore, callback) {
var queue = async.queue(function (reportStore, taskCallback) {
if (reportStore.destroy) {
reportStore.destroy(function (err) {
if (err)
if (err) {
self.logger.error("Assembler#destroyReportStores", "destroying report store failed: " + err);
callback(err);
return callback(err);
}
taskCallback();
});
} else {
callback();
taskCallback();
}
}, Infinity);

queue.drain = function (err) {
if (err)
self.logger.error("Assembler#destroyReportStores", "error: " + err);
queue.drain = function() {
self.reportStores = [];
callback(err);
callback();
};
queue.push(self.reportStores);
};
Expand Down Expand Up @@ -351,7 +363,7 @@ Assembler.prototype.unloadEventBusReceivers = function (callback) {
queue.push(self.eventBusReceivers);
};

Assembler.prototype._assembleDomainInfrastructure = function (callback) {
Assembler.prototype._assembleDomainInfrastructure = function (options, callback) {
var self = this;

var EventStore = self.configuration.domain.eventStore.ctor;
Expand All @@ -360,12 +372,13 @@ Assembler.prototype._assembleDomainInfrastructure = function (callback) {
logger: self.logger
});

function initializeDomainInfrastructure(callback) {
function initializeDomainInfrastructure(options, callback) {
var EventBusEmitter = self.configuration.domain.eventBusEmitter.ctor;

var eventBusEmitterOptions = {
logger: self.logger,
scope: self.configuration.scope
scope: self.configuration.scope,
replaying: self.replaying
};

if (self.configuration.domain.eventBusEmitter.options) {
Expand All @@ -379,12 +392,19 @@ Assembler.prototype._assembleDomainInfrastructure = function (callback) {
var domainRepository = new DomainRepository({
store: eventStore,
emitter: eventBusEmitter,
logger: self.logger
logger: self.logger,
replaying: self.replaying
});

var port;
if (!options.readOnly)
port = self.configuration.domain.commandBus && self.configuration.domain.commandBus.port;

var commandBus = new CommandBus({
domainRepository: domainRepository,
port: self.configuration.domain.commandBus && self.configuration.domain.commandBus.port,
logger: self.logger
domainRepository : domainRepository,
port : port,
logger : self.logger,
replaying : self.replaying
});

// These initializations take place on the constructor, not instances.
Expand All @@ -411,8 +431,6 @@ Assembler.prototype._assembleDomainInfrastructure = function (callback) {
logger: self.logger
});

DomainObject.initializeConstructors();

self.eventBusEmitter = eventBusEmitter;
self.domainRepository = domainRepository;
self.commandBus = commandBus;
Expand All @@ -425,10 +443,10 @@ Assembler.prototype._assembleDomainInfrastructure = function (callback) {
eventStore.initialize(function(err){
if (err)
return callback(err);
initializeDomainInfrastructure(callback);
initializeDomainInfrastructure(options, callback);
});
} else {
initializeDomainInfrastructure(callback);
initializeDomainInfrastructure(options, callback);
}
};

Expand Down
8 changes: 5 additions & 3 deletions lib/command_bus.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@ Profiler = require "./profiler"

class CommandBus

constructor: ({@domainRepository, @logger, @port}) ->
constructor: ({@domainRepository, @logger, @port, @replaying}) ->
throw new Error "Missing domain repository" unless @domainRepository?
throw new Error "Missing logger" unless @logger?

if @port
if @port && !@replaying
@server = new CommandBusServer commandBus: @, port: @port, logger: @logger
@server.listen @port

@commandHandlers = {}

registerCommandHandler: (commandHandler) ->
commandName = commandHandler.getCommandName()
throw new Error "A command and its handler for command named \"#{command}\" were already registered" if @commandHandlers[commandName]?
throw new Error "A command and its handler for command named \"#{commandName}\" were already registered" if @commandHandlers[commandName]?
@commandHandlers[commandName] = commandHandler

createNewUid: (callback) ->
Expand Down Expand Up @@ -48,6 +48,7 @@ class CommandBus
commandHandler.validate (err, result) ->
p1.end()
if err?
logger.warning "executeCommand", "validation error", err
callback err
done err
return
Expand All @@ -57,6 +58,7 @@ class CommandBus
p2.end()
p.end()
done args...
transaction.callback = callback
else
transaction = (done) ->
p.start()
Expand Down
5 changes: 2 additions & 3 deletions lib/command_bus_client.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ class CommandBusClient
request = @_makeRequest path: "/commands"
stream = request.stream

request.on "error", (err) ->
callback err
request.on "error", callback

request.on "response", (response) =>
@_processResponse response, callback
Expand Down Expand Up @@ -96,4 +95,4 @@ class CommandBusClient
logger.error "CommandBusClient", "command failed remotely: #{error.stack || error.message || error}"
callback error

module.exports = CommandBusClient
module.exports = CommandBusClient
7 changes: 5 additions & 2 deletions lib/command_bus_server.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,10 @@ class CommandBusServer

logger.log "CommandBusServer", "deserialize command \"#{commandName}\""
@commandBus.deserializeCommand commandName, payload, (err, command) =>
logger.log "CommandBusServer", "start command \"#{commandName}\""
if err?
logger.error "CommandBusServer", "error while deserializing command", err
return djump res, 500, error: err

@commandBus.executeCommand command, (err) ->
if err?
logger.alert "CommandBusServer", "error while executing command (#{err})"
Expand All @@ -104,4 +107,4 @@ djump = (res, code, obj) ->
else
res.end()

module.exports = CommandBusServer
module.exports = CommandBusServer
1 change: 1 addition & 0 deletions lib/command_handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ CommandHandler.registerAllInDirectoryOnBus = function (path, options) {
var logger = options.logger;
fs.readdirSync(path).forEach(function (fileName) {
var CommandHandler = require(path + "/" + fileName);
CommandHandler.logger = logger;
logger.log("CommandHandler", "loading command handler for command \"" + CommandHandler.getCommandName() + "\" (" + fileName + ")");
commandBus.registerCommandHandler(CommandHandler);
});
Expand Down
Loading