Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added docs/images/sub_process.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
10 changes: 10 additions & 0 deletions docs/supported-elements.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ There are some comments as well, which describe the level of support per each el
* get & set variables from/to context (of the instance) is possible
* variable mapping is supported (for input and output, see [Variables](#variables))

## Sub Process
![](images/sub_process.png){: .width-60pt }

* sub-processes can encapsulate a series of tasks and events.
* supports both embedded and reusable sub-processes.
* allows for better organization and modularization of complex workflows.
* sub-processes can have their own start and end events.
* supports variable mapping for input and output, similar to tasks.
* can be used to handle repetitive or complex logic within a process.

## Gateways

The Parallel Gateway and the Exclusive Gateway do allow fork and join logic,
Expand Down
9 changes: 9 additions & 0 deletions pkg/bpmn_engine/actiivty.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ const (
type activity interface {
Key() int64
State() ActivityState
SetState(state ActivityState)
Element() *BPMN20.BaseElement
}

Expand All @@ -82,6 +83,10 @@ func (a elementActivity) State() ActivityState {
return a.state
}

func (a *elementActivity) SetState(state ActivityState) {
a.state = state
}

func (a elementActivity) Element() *BPMN20.BaseElement {
return a.element
}
Expand Down Expand Up @@ -142,6 +147,10 @@ func (ebg *eventBasedGatewayActivity) State() ActivityState {
return ebg.state
}

func (ebg *eventBasedGatewayActivity) SetState(state ActivityState) {
ebg.state = state
}

func (ebg *eventBasedGatewayActivity) Element() *BPMN20.BaseElement {
return ebg.element
}
Expand Down
31 changes: 30 additions & 1 deletion pkg/bpmn_engine/actiivty_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package bpmn_engine

import "testing"
import (
"fmt"
"github.com/corbym/gocrest/is"
"github.com/corbym/gocrest/then"
"reflect"
"testing"
)

func Test_Activity_interfaces_implemented(t *testing.T) {
var _ activity = &elementActivity{}
Expand All @@ -25,3 +31,26 @@ func Test_Job_implements_Activity(t *testing.T) {
func Test_MessageSubscription_implements_Activity(t *testing.T) {
var _ activity = &MessageSubscription{}
}

func Test_SetState_is_working(t *testing.T) {
// to avoid errors, when anyone forgets the pointer on the receiver type
tests := []struct {
a activity
}{
{&MessageSubscription{}},
{&Timer{}},
{&elementActivity{}},
{&eventBasedGatewayActivity{}},
{&gatewayActivity{}},
{&job{}},
{&processInstanceInfo{}},
{&subProcessInfo{}},
}
for _, test := range tests {

t.Run(fmt.Sprintf("%s", reflect.TypeOf(test.a)), func(t *testing.T) {
test.a.SetState(Completed)
then.AssertThat(t, test.a.State(), is.EqualTo(Completed))
})
}
}
2 changes: 1 addition & 1 deletion pkg/bpmn_engine/conditions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func Test_evaluation_error_percolates_up(t *testing.T) {
instance, err := bpmnEngine.CreateAndRunInstance(process.ProcessKey, nil)

// then
then.AssertThat(t, instance.State, is.EqualTo(Failed))
then.AssertThat(t, instance.ActivityState, is.EqualTo(Failed))
then.AssertThat(t, err, is.Not(is.Nil()))
then.AssertThat(t, err.Error(), has.Prefix("Error evaluating expression in flow element id="))
}
Expand Down
109 changes: 69 additions & 40 deletions pkg/bpmn_engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
InstanceKey: state.generateKey(),
VariableHolder: NewVarHolder(nil, variableContext),
CreatedAt: time.Now(),
State: Ready,
ActivityState: Ready,
}
state.processInstances = append(state.processInstances, &processInstanceInfo)
state.exportProcessInstanceEvent(*process, processInstanceInfo)
Expand All @@ -93,7 +93,7 @@
if err != nil {
return nil, err
}
return instance, state.run(instance)
return instance, state.run(instance.ProcessInfo.definitions.Process, instance, instance)
}

// CreateAndRunInstance creates a new instance and executes it immediately.
Expand All @@ -105,7 +105,7 @@
if err != nil {
return nil, err
}
return instance, state.run(instance)
return instance, state.run(instance.ProcessInfo.definitions.Process, instance, instance)
}

// RunOrContinueInstance runs or continues a process instance by a given processInstanceKey.
Expand All @@ -116,26 +116,25 @@
func (state *BpmnEngineState) RunOrContinueInstance(processInstanceKey int64) (*processInstanceInfo, error) {
for _, pi := range state.processInstances {
if processInstanceKey == pi.InstanceKey {
return pi, state.run(pi)
return pi, state.run(pi.ProcessInfo.definitions.Process, pi, pi)
}
}
return nil, nil
}

func (state *BpmnEngineState) run(instance *processInstanceInfo) (err error) {
process := instance.ProcessInfo
func (state *BpmnEngineState) run(process BPMN20.ProcessElement, instance *processInstanceInfo, currentActivity activity) (err error) {
var commandQueue []command

switch instance.State {
switch currentActivity.State() {
case Ready:
// use start events to start the instance
for _, startEvent := range process.definitions.Process.StartEvents {
for _, startEvent := range process.GetStartEvents() {
var be BPMN20.BaseElement = startEvent
commandQueue = append(commandQueue, activityCommand{
element: &be,
})
}
instance.State = Active
currentActivity.SetState(Active)
// TODO: check? export process EVENT
case Active:
jobs := state.findActiveJobsForContinuation(instance)
Expand Down Expand Up @@ -169,17 +168,17 @@
case flowTransitionType:
sourceActivity := cmd.(flowTransitionCommand).sourceActivity
flowId := cmd.(flowTransitionCommand).sequenceFlowId
nextFlows := BPMN20.FindSequenceFlows(&process.definitions.Process.SequenceFlows, []string{flowId})
nextFlows := BPMN20.FindSequenceFlows(process, []string{flowId})
if BPMN20.ExclusiveGateway == (*sourceActivity.Element()).GetType() {
nextFlows, err = exclusivelyFilterByConditionExpression(nextFlows, instance.VariableHolder.Variables())
if err != nil {
instance.State = Failed
instance.ActivityState = Failed

Check warning on line 175 in pkg/bpmn_engine/engine.go

View check run for this annotation

Codecov / codecov/patch

pkg/bpmn_engine/engine.go#L175

Added line #L175 was not covered by tests
return err
}
}
for _, flow := range nextFlows {
state.exportSequenceFlowEvent(*process, *instance, flow)
baseElements := BPMN20.FindBaseElementsById(&process.definitions, flow.TargetRef)
state.exportSequenceFlowEvent(*instance.ProcessInfo, *instance, flow)
baseElements := BPMN20.FindBaseElementsById(process, flow.TargetRef)
targetBaseElement := baseElements[0]
aCmd := activityCommand{
sourceId: flowId,
Expand All @@ -191,17 +190,18 @@
case activityType:
element := cmd.(activityCommand).element
originActivity := cmd.(activityCommand).originActivity
nextCommands := state.handleElement(process, instance, element, originActivity)
state.exportElementEvent(*process, *instance, *element, exporter.ElementCompleted)
nextCommands := state.handleElement(process, currentActivity, instance, element, originActivity)
state.exportElementEvent(process, *instance, *element, exporter.ElementCompleted)
commandQueue = append(commandQueue, nextCommands...)
case continueActivityType:
element := cmd.(continueActivityCommand).activity.Element()
originActivity := cmd.(continueActivityCommand).originActivity
nextCommands := state.handleElement(process, instance, element, originActivity)
nextCommands := state.handleElement(process, currentActivity, instance, element, originActivity)
commandQueue = append(commandQueue, nextCommands...)
case errorType:
err = cmd.(errorCommand).err
instance.State = Failed
instance.ActivityState = Failed
// *activityState = Failed // TODO: check if meaningful
break
case checkExclusiveGatewayDoneType:
activity := cmd.(checkExclusiveGatewayDoneCommand).gatewayActivity
Expand All @@ -211,16 +211,16 @@
}
}

if instance.State == Completed || instance.State == Failed {
if instance.ActivityState == Completed || instance.ActivityState == Failed {
// TODO need to send failed State
state.exportEndProcessEvent(*process, *instance)
state.exportEndProcessEvent(*instance.ProcessInfo, *instance)
}

return err
}

func (state *BpmnEngineState) handleElement(process *ProcessInfo, instance *processInstanceInfo, element *BPMN20.BaseElement, originActivity activity) []command {
state.exportElementEvent(*process, *instance, *element, exporter.ElementActivated) // FIXME: don't create event on continuation ?!?!
func (state *BpmnEngineState) handleElement(process BPMN20.ProcessElement, act activity, instance *processInstanceInfo, element *BPMN20.BaseElement, originActivity activity) []command {
state.exportElementEvent(process, *instance, *element, exporter.ElementActivated) // FIXME: don't create event on continuation ?!?!
createFlowTransitions := true
var activity activity
var nextCommands []command
Expand All @@ -234,14 +234,9 @@
element: element,
}
case BPMN20.EndEvent:
state.handleEndEvent(process, instance)
state.exportElementEvent(*process, *instance, *element, exporter.ElementCompleted) // special case here, to end the instance
createFlowTransitions = false
activity = &elementActivity{
key: state.generateKey(),
state: Completed,
element: element,
}
createFlowTransitions = state.handleEndEvent(process, act, instance)
activity = act
state.exportElementEvent(process, *instance, *element, exporter.ElementCompleted) // special case here, to end the instance
case BPMN20.ServiceTask:
taskElement := (*element).(BPMN20.TaskElement)
_, activity = state.handleServiceTask(process, instance, &taskElement)
Expand Down Expand Up @@ -274,7 +269,7 @@
case BPMN20.ParallelGateway:
createFlowTransitions, activity = state.handleParallelGateway(process, instance, (*element).(BPMN20.TParallelGateway), originActivity)
case BPMN20.ExclusiveGateway:
activity = elementActivity{
activity = &elementActivity{
key: state.generateKey(),
state: Active,
element: element,
Expand All @@ -289,12 +284,23 @@
instance.appendActivity(activity)
createFlowTransitions = true
case BPMN20.InclusiveGateway:
activity = elementActivity{
activity = &elementActivity{
key: state.generateKey(),
state: Active,
element: element,
}
createFlowTransitions = true
case BPMN20.SubProcess:
subProcessElement := (*element).(BPMN20.TSubProcess)
activity, err = state.handleSubProcess(instance, &subProcessElement)
if err != nil {
nextCommands = append(nextCommands, errorCommand{
err: err,
elementId: (*element).GetId(),
elementName: (*element).GetName(),
})
}

Check warning on line 302 in pkg/bpmn_engine/engine.go

View check run for this annotation

Codecov / codecov/patch

pkg/bpmn_engine/engine.go#L297-L302

Added lines #L297 - L302 were not covered by tests
createFlowTransitions = activity.State() == Completed
default:
panic(fmt.Sprintf("[invariant check] unsupported element: id=%s, type=%s", (*element).GetId(), (*element).GetType()))
}
Expand All @@ -314,14 +320,14 @@
return cmds
}

func createNextCommands(process *ProcessInfo, instance *processInstanceInfo, element *BPMN20.BaseElement, activity activity) (cmds []command) {
nextFlows := BPMN20.FindSequenceFlows(&process.definitions.Process.SequenceFlows, (*element).GetOutgoingAssociation())
func createNextCommands(process BPMN20.ProcessElement, instance *processInstanceInfo, element *BPMN20.BaseElement, activity activity) (cmds []command) {
nextFlows := BPMN20.FindSequenceFlows(process, (*element).GetOutgoingAssociation())
var err error
switch (*element).GetType() {
case BPMN20.ExclusiveGateway:
nextFlows, err = exclusivelyFilterByConditionExpression(nextFlows, instance.VariableHolder.Variables())
if err != nil {
instance.State = Failed
instance.ActivityState = Failed
cmds = append(cmds, errorCommand{
err: err,
elementId: (*element).GetId(),
Expand All @@ -332,7 +338,7 @@
case BPMN20.InclusiveGateway:
nextFlows, err = inclusivelyFilterByConditionExpression(nextFlows, instance.VariableHolder.Variables())
if err != nil {
instance.State = Failed
instance.ActivityState = Failed

Check warning on line 341 in pkg/bpmn_engine/engine.go

View check run for this annotation

Codecov / codecov/patch

pkg/bpmn_engine/engine.go#L341

Added line #L341 was not covered by tests
return []command{
errorCommand{
elementId: (*element).GetId(),
Expand All @@ -352,7 +358,7 @@
return cmds
}

func (state *BpmnEngineState) handleIntermediateCatchEvent(process *ProcessInfo, instance *processInstanceInfo, ice BPMN20.TIntermediateCatchEvent, originActivity activity) (continueFlow bool, activity activity, err error) {
func (state *BpmnEngineState) handleIntermediateCatchEvent(process BPMN20.ProcessElement, instance *processInstanceInfo, ice BPMN20.TIntermediateCatchEvent, originActivity activity) (continueFlow bool, activity activity, err error) {
continueFlow = false
if ice.MessageEventDefinition.Id != "" {
continueFlow, activity, err = state.handleIntermediateMessageCatchEvent(process, instance, ice, originActivity)
Expand All @@ -378,7 +384,7 @@
return continueFlow, activity, err
}

func (state *BpmnEngineState) handleEndEvent(process *ProcessInfo, instance *processInstanceInfo) {
func (state *BpmnEngineState) handleEndEvent(process BPMN20.ProcessElement, act activity, instance *processInstanceInfo) bool {
activeMessageSubscriptions := false
for _, ms := range state.messageSubscriptions {
if ms.ProcessInstanceKey == instance.InstanceKey {
Expand All @@ -389,11 +395,19 @@
}
}
if !activeMessageSubscriptions {
instance.State = Completed
act.SetState(Completed)
}
switch process.(type) {
case *BPMN20.TProcess:
return false

Check warning on line 402 in pkg/bpmn_engine/engine.go

View check run for this annotation

Codecov / codecov/patch

pkg/bpmn_engine/engine.go#L401-L402

Added lines #L401 - L402 were not covered by tests
case *BPMN20.TSubProcess:
act.SetState(Completed)
return true
}
return false
}

func (state *BpmnEngineState) handleParallelGateway(process *ProcessInfo, instance *processInstanceInfo, element BPMN20.TParallelGateway, originActivity activity) (continueFlow bool, resultActivity activity) {
func (state *BpmnEngineState) handleParallelGateway(process BPMN20.ProcessElement, instance *processInstanceInfo, element BPMN20.TParallelGateway, originActivity activity) (continueFlow bool, resultActivity activity) {
resultActivity = instance.findActiveActivityByElementId(element.Id)
if resultActivity == nil {
var be BPMN20.BaseElement = element
Expand All @@ -405,7 +419,7 @@
}
instance.appendActivity(resultActivity)
}
sourceFlow := BPMN20.FindFirstSequenceFlow(&process.definitions.Process.SequenceFlows, (*originActivity.Element()).GetId(), element.GetId())
sourceFlow := BPMN20.FindFirstSequenceFlow(process, (*originActivity.Element()).GetId(), element.GetId())
resultActivity.(*gatewayActivity).SetInboundFlowCompleted(sourceFlow.Id)
continueFlow = resultActivity.(*gatewayActivity).parallel && resultActivity.(*gatewayActivity).AreInboundFlowsCompleted()
if continueFlow {
Expand All @@ -414,6 +428,21 @@
return continueFlow, resultActivity
}

func (state *BpmnEngineState) handleSubProcess(instance *processInstanceInfo, subProcessElement *BPMN20.TSubProcess) (subProcessActivity activity, err error) {
var be BPMN20.BaseElement = subProcessElement
subProcessActivity = &subProcessInfo{
ElementId: subProcessElement.GetId(),
ProcessInstance: instance,
ProcessId: state.generateKey(),
CreatedAt: time.Now(),
processState: Ready,
variableHolder: NewVarHolder(&instance.VariableHolder, nil),
baseElement: &be,
}
err = state.run(subProcessElement, instance, subProcessActivity)
return subProcessActivity, err
}

func (state *BpmnEngineState) findActiveJobsForContinuation(instance *processInstanceInfo) (ret []*job) {
for _, job := range state.jobs {
if job.ProcessInstanceKey == instance.InstanceKey && job.JobState == Active {
Expand Down
Loading
Loading