Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/abstractions/persistence-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export interface IPersistenceProvider {
getRunnableInstances(): Promise<Array<string>>;

createEventSubscription(subscription: EventSubscription): Promise<void>;
getSubscriptions(eventName: string, eventKey: string, asOf: Date): Promise<Array<EventSubscription>>;
getSubscriptions(eventName: string, eventKey: string, asOf?: Date): Promise<Array<EventSubscription>>;
terminateSubscription(id: string): Promise<void>;

createEvent(event: Event): Promise<string>;
Expand Down
2 changes: 1 addition & 1 deletion core/src/abstractions/workflow-host.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export interface IWorkflowHost {
stop();
startWorkflow(id: string, version: number, data: any): Promise<string>;
registerWorkflow<TData>(workflow: new () => WorkflowBase<TData>);
publishEvent(eventName: string, eventKey: string, eventData: any, eventTime: Date): Promise<void>;
publishEvent(eventName: string, eventKey: string, eventData: any, eventTime?: Date): Promise<void>;
suspendWorkflow(id: string): Promise<boolean>;
resumeWorkflow(id: string): Promise<boolean>;
terminateWorkflow(id: string): Promise<boolean>;
Expand Down
2 changes: 1 addition & 1 deletion core/src/models/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ export class Event {
public eventName: string;
public eventKey: string;
public eventData: any;
public eventTime: Date;
public eventTime?: Date;
public isProcessed: boolean;
}
2 changes: 1 addition & 1 deletion core/src/services/event-queue-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export class EventQueueWorker implements IBackgroundWorker {
if (gotLock) {
try {
let evt = await self.persistence.getEvent(eventId);
if (evt.eventTime <= new Date())
if (evt.eventTime === undefined || evt.eventTime <= new Date())
{
let subs = await self.persistence.getSubscriptions(evt.eventName, evt.eventKey, evt.eventTime);
let success = true;
Expand Down
4 changes: 2 additions & 2 deletions core/src/services/memory-persistence-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ export class MemoryPersistenceProvider implements IPersistenceProvider {
wfes_subscriptions.push(subscription);
}

public async getSubscriptions(eventName: string, eventKey: string, asOf: Date): Promise<Array<EventSubscription>> {
return wfes_subscriptions.filter(x => x.eventName == eventName && x.eventKey == eventKey && x.subscribeAsOf <= asOf);
public async getSubscriptions(eventName: string, eventKey: string, asOf?: Date): Promise<Array<EventSubscription>> {
return wfes_subscriptions.filter(x => x.eventName == eventName && x.eventKey == eventKey && (asOf ? x.subscribeAsOf <= asOf : true));
}

public async terminateSubscription(id: string): Promise<void> {
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/workflow-host.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ export class WorkflowHost implements IWorkflowHost {
this.registry.registerWorkflow<TData>(new workflow());
}

public async publishEvent(eventName: string, eventKey: string, eventData: any, eventTime: Date): Promise<void> {
public async publishEvent(eventName: string, eventKey: string, eventData: any, eventTime?: Date): Promise<void> {
//todo: check host status

this.logger.info("Publishing event %s %s", eventName, eventKey);
Expand Down
15 changes: 13 additions & 2 deletions providers/workflow-es-mongodb/src/mongodb-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,28 @@ export class MongoDBPersistence implements IPersistenceProvider {
return deferred;
}

public async getSubscriptions(eventName: string, eventKey: string, asOf: Date): Promise<Array<EventSubscription>> {
public async getSubscriptions(eventName: string, eventKey: string, asOf?: Date): Promise<Array<EventSubscription>> {
var self = this;
var deferred = new Promise<Array<EventSubscription>>((resolve, reject) => {
self.subscriptionCollection.find({ eventName: eventName, eventKey: eventKey, subscribeAsOf: { $lt: asOf } })
if (asOf === undefined) {
self.subscriptionCollection.find({ eventName: eventName, eventKey: eventKey })
.toArray((err, data) => {
if (err)
reject(err);
for (let item of data)
item.id = item["_id"].toString();
resolve(data);
});
} else {
self.subscriptionCollection.find({ eventName: eventName, eventKey: eventKey, subscribeAsOf: { $lt: asOf } })
.toArray((err, data) => {
if (err)
reject(err);
for (let item of data)
item.id = item["_id"].toString();
resolve(data);
});
}
});
return deferred;
}
Expand Down
10 changes: 8 additions & 2 deletions providers/workflow-es-mysql/src/mysql-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,17 @@ export class MySqlPersistence implements IPersistenceProvider {
});
return deferred;
}
public async getSubscriptions(eventName: string, eventKey: string, asOf: Date): Promise<Array<EventSubscription>> {
public async getSubscriptions(eventName: string, eventKey: string, asOf?: Date): Promise<Array<EventSubscription>> {
var deferred = new Promise<Array<EventSubscription>>(
async (resolve, reject) => {
try {
var instances = await subscriptionCollection.findAll({
var instances = asOf === undefined ? await subscriptionCollection.findAll({
where: {
eventName: eventName,
eventKey: eventKey
},
include: [Workflow]
}) : await subscriptionCollection.findAll({
where: {
eventName: eventName,
eventKey: eventKey,
Expand Down