Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added docs/images/error_boundary_event.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/event_sub_process.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
13 changes: 13 additions & 0 deletions docs/supported-elements.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ There are some comments as well, which describe the level of support per each el
* get & set variables from/to context (of the instance) is possible
* variable mapping is supported (for input and output, see [Variables](#variables))

## Error Boundary Event
![](images/error_boundary_event.png){: .width-60pt }

* business errors raised by a task which requires modeling.
* multiple end events are supported as well.

## Sub Process
![](images/sub_process.png){: .width-60pt }

Expand All @@ -37,6 +43,13 @@ There are some comments as well, which describe the level of support per each el
* sub-processes can have their own start and end events.
* supports variable mapping for input and output, similar to tasks.
* can be used to handle repetitive or complex logic within a process.
*
## Event Sub Process
![](images/event_sub_process.png){: .width-60pt }

* Only Error Event Sub Process is supported
* event subprocess is a subprocess triggered by an event
* it must have an event based start event

## Gateways

Expand Down
17 changes: 14 additions & 3 deletions pkg/bpmn_engine/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import "github.com/nitram509/lib-bpmn-engine/pkg/spec/BPMN20"
type commandType string

const (
flowTransitionType commandType = "flowTransition"
activityType commandType = "activity"
continueActivityType commandType = "continueActivity"
flowTransitionType commandType = "flowTransition"
activityType commandType = "activity"
continueActivityType commandType = "continueActivity"
// A command that there is a technical error and the engine should fail the process instance
errorType commandType = "error"
eventSubProcessCompletedType commandType = "eventSubProcessCompletedType"
checkExclusiveGatewayDoneType commandType = "checkExclusiveGatewayDone"
)

Expand Down Expand Up @@ -72,3 +74,12 @@ type checkExclusiveGatewayDoneCommand struct {
func (t checkExclusiveGatewayDoneCommand) Type() commandType {
return checkExclusiveGatewayDoneType
}

type eventSubProcessCompletedCommand struct {
// activity reference to the event sub-process, which has completed
activity activity
}

func (t eventSubProcessCompletedCommand) Type() commandType {
return eventSubProcessCompletedType
}
187 changes: 180 additions & 7 deletions pkg/bpmn_engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,16 @@ func (state *BpmnEngineState) run(process BPMN20.ProcessElement, instance *proce
instance.ActivityState = Failed
// *activityState = Failed // TODO: check if meaningful
break
case eventSubProcessCompletedType:
subProcessActivity := cmd.(eventSubProcessCompletedCommand).activity
instance.SetState(subProcessActivity.State())
state.exportElementEvent(process, *instance, process, exporter.ElementCompleted)
break
case checkExclusiveGatewayDoneType:
activity := cmd.(checkExclusiveGatewayDoneCommand).gatewayActivity
state.checkExclusiveGatewayDone(activity)
default:
panic("[invariant check] command type check not fully implemented")
return newEngineErrorf("[invariant check] command type check not fully implemented")
}
}

Expand Down Expand Up @@ -239,12 +244,41 @@ func (state *BpmnEngineState) handleElement(process BPMN20.ProcessElement, act a
state.exportElementEvent(process, *instance, *element, exporter.ElementCompleted) // special case here, to end the instance
case BPMN20.ServiceTask:
taskElement := (*element).(BPMN20.TaskElement)
_, activity = state.handleServiceTask(process, instance, &taskElement)
createFlowTransitions = activity.State() == Completed
_, job, jobErr := state.handleServiceTask(process, instance, &taskElement)
err = jobErr
activity = job
if err != nil {
nextCommands = append(nextCommands, errorCommand{
err: err,
elementId: (*element).GetId(),
elementName: (*element).GetName(),
})
} else if job.ErrorCode != "" {
// The current process will remain ACTIVE until the event sub-processes have completed.
nextCommands = handleErrorEvent(process, instance, element, job.ErrorCode)
createFlowTransitions = false // TODO confirm
} else {
// Only follow sequence flow if there are no Technical or Business Errors
createFlowTransitions = activity.State() == Completed
}
case BPMN20.UserTask:
taskElement := (*element).(BPMN20.TaskElement)
activity = state.handleUserTask(process, instance, &taskElement)
createFlowTransitions = activity.State() == Completed
job, jobErr := state.handleUserTask(process, instance, &taskElement)
err = jobErr
activity = job
if err != nil {
nextCommands = append(nextCommands, errorCommand{
err: err,
elementId: (*element).GetId(),
elementName: (*element).GetName(),
})
} else if job.ErrorCode != "" {
nextCommands = handleErrorEvent(process, instance, element, job.ErrorCode)
createFlowTransitions = false
} else {
// Only follow sequence flow if there are no Technical or Business Errors
createFlowTransitions = activity.State() == Completed
}
case BPMN20.IntermediateCatchEvent:
ice := (*element).(BPMN20.TIntermediateCatchEvent)
createFlowTransitions, activity, err = state.handleIntermediateCatchEvent(process, instance, ice, originActivity)
Expand Down Expand Up @@ -292,24 +326,147 @@ func (state *BpmnEngineState) handleElement(process BPMN20.ProcessElement, act a
createFlowTransitions = true
case BPMN20.SubProcess:
subProcessElement := (*element).(BPMN20.TSubProcess)
activity, err = state.handleSubProcess(instance, &subProcessElement)
subProcess, subProcessErr := state.handleSubProcess(instance, &subProcessElement)
activity = subProcess
err = subProcessErr
if err != nil {
nextCommands = append(nextCommands, errorCommand{
err: err,
elementId: (*element).GetId(),
elementName: (*element).GetName(),
})
} else if subProcessElement.TriggeredByEvent {
// We need to complete the parent process when an event sub-process has completed. but we cant do it here
nextCommands = append(nextCommands, eventSubProcessCompletedCommand{
activity: subProcess,
})
}
createFlowTransitions = activity.State() == Completed
case BPMN20.BoundaryEvent:
boundary := (*element).(BPMN20.TBoundaryEvent)
activity, err = state.handleBoundaryEvent(&boundary, instance)
default:
panic(fmt.Sprintf("[invariant check] unsupported element: id=%s, type=%s", (*element).GetId(), (*element).GetType()))
nextCommands = append(nextCommands, errorCommand{
err: newEngineErrorf("[invariant check] unsupported element: id=%s, type=%s", (*element).GetId(), (*element).GetType()),
elementId: (*element).GetId(),
elementName: (*element).GetName(),
})
}
if createFlowTransitions && err == nil {
nextCommands = append(nextCommands, createNextCommands(process, instance, element, activity)...)
}
return nextCommands
}

func handleErrorEvent(process BPMN20.ProcessElement, instance *processInstanceInfo, element *BPMN20.BaseElement, errorCode string) []command {
// Find the error by code on the process
if errT, found := findErrorDefinition(instance.ProcessInfo.definitions, errorCode); found {

// Find the boundary events for the task
boundaryEvents := findBoundaryEventsForTypeAndReference(instance.ProcessInfo.definitions, BPMN20.ErrorBoundary, (*element).GetId())
if boundaryEvent, foundBoundary := findBoundaryEventForError(boundaryEvents, errT.Id); foundBoundary {
return []command{
activityCommand{element: BPMN20.Ptr[BPMN20.BaseElement](boundaryEvent)},
}
}

// If we still haven't found a command then we should look to see if there is an event sub process we can follow
if subProcess, subFound := findEventSubprocessForError(process, errT.Id); subFound {
return []command{
activityCommand{element: BPMN20.Ptr[BPMN20.BaseElement](subProcess)},
}
}

// If not see if there is a catch-all boundary event
if boundaryEvent, foundBoundary := findBoundaryEventForError(boundaryEvents, ""); foundBoundary {
return []command{
activityCommand{element: BPMN20.Ptr[BPMN20.BaseElement](boundaryEvent)},
}
}

// If not find an event sub process matching catchall
if subProcess, subFound := findEventSubprocessForError(process, ""); subFound {
return []command{
activityCommand{element: BPMN20.Ptr[BPMN20.BaseElement](subProcess)},
}
}

// TODO continue lookup up to the parent process if this is a sub process - not supported yet

return []command{
errorCommand{
err: newEngineErrorf("Could not find suitable handler for ErrorCode event id=%s, code=%s", errT.Id, errT.ErrorCode),
elementId: (*element).GetId(),
elementName: (*element).GetName(),
},
}
} else {
return []command{
errorCommand{
err: newEngineErrorf("Could not find error definition \"%s\"", errorCode),
elementId: (*element).GetId(),
elementName: (*element).GetName(),
},
}
}
}

func findEventSubprocessForError(process BPMN20.ProcessElement, errorReferenceID string) (BPMN20.TSubProcess, bool) {
// Look for event sub-processes in the process
for _, subProcess := range process.GetSubProcess() {
// Check if this is an event sub-process (triggered by event)
if subProcess.TriggeredByEvent {
// Look for start events in the sub-process
for _, startEvent := range subProcess.StartEvents {
// Check if this start event has an error event definition
if startEvent.ErrorEventDefinition.ErrorRef == errorReferenceID {
// We found an event sub-process with an error start event
return subProcess, true
}
}
}
}

// No matching event sub-process found
return BPMN20.TSubProcess{}, false
}

// findBoundaryEventsForReference finds all boundary events attached to the provided element
func findBoundaryEventsForTypeAndReference(definitions BPMN20.TDefinitions, boundaryType BPMN20.BoundaryType, referenceID string) []BPMN20.TBoundaryEvent {
boundaryEvents := make([]BPMN20.TBoundaryEvent, 0)
for _, boundary := range definitions.Process.BoundaryEvent {
if boundary.AttachedToRef == referenceID && boundary.GetBoundaryType() == boundaryType {
boundaryEvents = append(boundaryEvents, boundary)
}
}
return boundaryEvents
}

func findBoundaryEventForError(boundaryEvents []BPMN20.TBoundaryEvent, errorID string) (BPMN20.TBoundaryEvent, bool) {
for _, boundaryEvent := range boundaryEvents {
// Check if this boundary event has an error event definition
if boundaryEvent.ErrorEventDefinition.ErrorRef == errorID {
return boundaryEvent, true
}
}
return BPMN20.TBoundaryEvent{}, false
}

func findErrorDefinition(definitions BPMN20.TDefinitions, errorCode string) (BPMN20.TError, bool) {

// Iterate through all errors in the definitions
for _, err := range definitions.Errors {
// Check if the error code matches the requested code
if err.ErrorCode == errorCode {
return err, true
}
}

// Return empty error if not found
return BPMN20.TError{}, false

}

func createCheckExclusiveGatewayDoneCommand(originActivity activity) (cmds []command) {
if (*originActivity.Element()).GetType() == BPMN20.EventBasedGateway {
evtBasedGatewayActivity := originActivity.(*eventBasedGatewayActivity)
Expand Down Expand Up @@ -473,3 +630,19 @@ func (state *BpmnEngineState) findCreatedTimers(instance *processInstanceInfo) (
}
return result
}

func (state *BpmnEngineState) handleBoundaryEvent(element *BPMN20.TBoundaryEvent, instance *processInstanceInfo) (activity, error) {
var be BPMN20.BaseElement = element
activity := &elementActivity{
key: state.generateKey(),
state: Completed,
element: &be,
}
variableHolder := NewVarHolder(&instance.VariableHolder, nil)
err := propagateProcessInstanceVariables(&variableHolder, element.GetOutputMapping())
if err != nil {
instance.ActivityState = Failed
}

return activity, err
}
39 changes: 39 additions & 0 deletions pkg/bpmn_engine/engine_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package bpmn_engine

import (
"fmt"
"github.com/nitram509/lib-bpmn-engine/pkg/bpmn_engine/exporter"
"testing"
"time"

Expand All @@ -22,6 +24,41 @@ func (callPath *CallPath) TaskHandler(job ActivatedJob) {
job.Complete()
}

func pathString(paths []string) string {
path := ""
for _, p := range paths {
path += p + "\n"
}
return path
}

// PathRecordingEventExporter records the paths taken during the execution of a process
// into a string.
type PathRecordingEventExporter struct {
paths []string
}

func (e *PathRecordingEventExporter) String() string {
return pathString(e.paths)
}

// NewEventLogExporter creates a new instance of a PathRecordingEventExporter
func NewPathRecordingEventExporter() *PathRecordingEventExporter {
return &PathRecordingEventExporter{
paths: make([]string, 0),
}
}

func (*PathRecordingEventExporter) NewProcessEvent(_ *exporter.ProcessEvent) {}

func (*PathRecordingEventExporter) EndProcessEvent(_ *exporter.ProcessInstanceEvent) {}

func (*PathRecordingEventExporter) NewProcessInstanceEvent(_ *exporter.ProcessInstanceEvent) {}

func (e *PathRecordingEventExporter) NewElementEvent(_ *exporter.ProcessInstanceEvent, elementInfo *exporter.ElementInfo) {
e.paths = append(e.paths, fmt.Sprintf("%s(%s)", elementInfo.ElementId, elementInfo.Intent))
}

func Test_BpmnEngine_interfaces_implemented(t *testing.T) {
var _ BpmnEngine = &BpmnEngineState{}
}
Expand Down Expand Up @@ -215,6 +252,8 @@ func Test_CreateInstanceById_uses_latest_process_version(t *testing.T) {
func Test_CreateAndRunInstanceById_uses_latest_process_version(t *testing.T) {
// setup
engine := New()
engine.NewTaskHandler().Id("id").Handler(jobCompleteHandler)
engine.NewTaskHandler().Id("test-2").Handler(jobCompleteHandler)

// when
v1, err := engine.LoadFromFile("../../test-cases/simple_task.bpmn")
Expand Down
30 changes: 30 additions & 0 deletions pkg/bpmn_engine/exporter/logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package exporter

import "fmt"

// LoggingEventExported writes all events to a log file
type LoggingEventExported struct {
}

// NewEventLogExporter creates a new instance of a LoggingEventExported
func NewEventLogExporter() *LoggingEventExported {
return &LoggingEventExported{}
}

func (*LoggingEventExported) NewProcessEvent(event *ProcessEvent) {
fmt.Printf("New Process event version: %d, processKey: %d, processID: %s\n", event.Version, event.ProcessKey, event.ProcessId)
}

func (*LoggingEventExported) EndProcessEvent(event *ProcessInstanceEvent) {
fmt.Printf("End Process event version: %d, processKey: %d, processID: %s, processInstanceKey: %d\n", event.Version, event.ProcessKey, event.ProcessId, event.ProcessInstanceKey)
}

func (*LoggingEventExported) NewProcessInstanceEvent(event *ProcessInstanceEvent) {
fmt.Printf("New Process Instance version: %d, processKey: %d, processID: %s, processInstanceKey: %d\n", event.Version, event.ProcessKey, event.ProcessId, event.ProcessInstanceKey)
}

func (*LoggingEventExported) NewElementEvent(event *ProcessInstanceEvent, elementInfo *ElementInfo) {
fmt.Printf("New Element event version: %d, processKey: %d, processID: %s, processInstanceKey: %d, elementType: %s, elementId: %s, intent: %s\n",
event.Version, event.ProcessKey, event.ProcessId, event.ProcessInstanceKey,
elementInfo.BpmnElementType, elementInfo.ElementId, elementInfo.Intent)
}
4 changes: 4 additions & 0 deletions pkg/bpmn_engine/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ type job struct {
JobState ActivityState `json:"s"`
CreatedAt time.Time `json:"c"`
baseElement *BPMN20.BaseElement
// Failure returned by a handler with job.Fail(string)
Failure string `json:"f,omitempty"`
// ErrorCode event thrown by a handler with job.ThrowError(string)
ErrorCode string `json:"ec,omitempty"`
}

func (j job) Key() int64 {
Expand Down
Loading
Loading