Skip to content

Commit df05e1f

Browse files
authored
Merge pull request #264 from nitram509/feature/add-subprocess
add feature: add subprocess element support (continued from #247)
2 parents 05317be + da2f569 commit df05e1f

30 files changed

+1859
-120
lines changed

docs/images/sub_process.png

1.36 KB
Loading

docs/supported-elements.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,16 @@ There are some comments as well, which describe the level of support per each el
2828
* get & set variables from/to context (of the instance) is possible
2929
* variable mapping is supported (for input and output, see [Variables](#variables))
3030

31+
## Sub Process
32+
![](images/sub_process.png){: .width-60pt }
33+
34+
* sub-processes can encapsulate a series of tasks and events.
35+
* supports both embedded and reusable sub-processes.
36+
* allows for better organization and modularization of complex workflows.
37+
* sub-processes can have their own start and end events.
38+
* supports variable mapping for input and output, similar to tasks.
39+
* can be used to handle repetitive or complex logic within a process.
40+
3141
## Gateways
3242

3343
The Parallel Gateway and the Exclusive Gateway do allow fork and join logic,

pkg/bpmn_engine/actiivty.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ const (
6565
type activity interface {
6666
Key() int64
6767
State() ActivityState
68+
SetState(state ActivityState)
6869
Element() *BPMN20.BaseElement
6970
}
7071

@@ -82,6 +83,10 @@ func (a elementActivity) State() ActivityState {
8283
return a.state
8384
}
8485

86+
func (a *elementActivity) SetState(state ActivityState) {
87+
a.state = state
88+
}
89+
8590
func (a elementActivity) Element() *BPMN20.BaseElement {
8691
return a.element
8792
}
@@ -142,6 +147,10 @@ func (ebg *eventBasedGatewayActivity) State() ActivityState {
142147
return ebg.state
143148
}
144149

150+
func (ebg *eventBasedGatewayActivity) SetState(state ActivityState) {
151+
ebg.state = state
152+
}
153+
145154
func (ebg *eventBasedGatewayActivity) Element() *BPMN20.BaseElement {
146155
return ebg.element
147156
}

pkg/bpmn_engine/actiivty_test.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
package bpmn_engine
22

3-
import "testing"
3+
import (
4+
"fmt"
5+
"github.com/corbym/gocrest/is"
6+
"github.com/corbym/gocrest/then"
7+
"reflect"
8+
"testing"
9+
)
410

511
func Test_Activity_interfaces_implemented(t *testing.T) {
612
var _ activity = &elementActivity{}
@@ -25,3 +31,26 @@ func Test_Job_implements_Activity(t *testing.T) {
2531
func Test_MessageSubscription_implements_Activity(t *testing.T) {
2632
var _ activity = &MessageSubscription{}
2733
}
34+
35+
func Test_SetState_is_working(t *testing.T) {
36+
// to avoid errors, when anyone forgets the pointer on the receiver type
37+
tests := []struct {
38+
a activity
39+
}{
40+
{&MessageSubscription{}},
41+
{&Timer{}},
42+
{&elementActivity{}},
43+
{&eventBasedGatewayActivity{}},
44+
{&gatewayActivity{}},
45+
{&job{}},
46+
{&processInstanceInfo{}},
47+
{&subProcessInfo{}},
48+
}
49+
for _, test := range tests {
50+
51+
t.Run(fmt.Sprintf("%s", reflect.TypeOf(test.a)), func(t *testing.T) {
52+
test.a.SetState(Completed)
53+
then.AssertThat(t, test.a.State(), is.EqualTo(Completed))
54+
})
55+
}
56+
}

pkg/bpmn_engine/conditions_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ func Test_evaluation_error_percolates_up(t *testing.T) {
163163
instance, err := bpmnEngine.CreateAndRunInstance(process.ProcessKey, nil)
164164

165165
// then
166-
then.AssertThat(t, instance.State, is.EqualTo(Failed))
166+
then.AssertThat(t, instance.ActivityState, is.EqualTo(Failed))
167167
then.AssertThat(t, err, is.Not(is.Nil()))
168168
then.AssertThat(t, err.Error(), has.Prefix("Error evaluating expression in flow element id="))
169169
}

pkg/bpmn_engine/engine.go

Lines changed: 69 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func (state *BpmnEngineState) CreateInstance(processKey int64, variableContext m
7474
InstanceKey: state.generateKey(),
7575
VariableHolder: NewVarHolder(nil, variableContext),
7676
CreatedAt: time.Now(),
77-
State: Ready,
77+
ActivityState: Ready,
7878
}
7979
state.processInstances = append(state.processInstances, &processInstanceInfo)
8080
state.exportProcessInstanceEvent(*process, processInstanceInfo)
@@ -93,7 +93,7 @@ func (state *BpmnEngineState) CreateAndRunInstanceById(processId string, variabl
9393
if err != nil {
9494
return nil, err
9595
}
96-
return instance, state.run(instance)
96+
return instance, state.run(instance.ProcessInfo.definitions.Process, instance, instance)
9797
}
9898

9999
// CreateAndRunInstance creates a new instance and executes it immediately.
@@ -105,7 +105,7 @@ func (state *BpmnEngineState) CreateAndRunInstance(processKey int64, variableCon
105105
if err != nil {
106106
return nil, err
107107
}
108-
return instance, state.run(instance)
108+
return instance, state.run(instance.ProcessInfo.definitions.Process, instance, instance)
109109
}
110110

111111
// RunOrContinueInstance runs or continues a process instance by a given processInstanceKey.
@@ -116,26 +116,25 @@ func (state *BpmnEngineState) CreateAndRunInstance(processKey int64, variableCon
116116
func (state *BpmnEngineState) RunOrContinueInstance(processInstanceKey int64) (*processInstanceInfo, error) {
117117
for _, pi := range state.processInstances {
118118
if processInstanceKey == pi.InstanceKey {
119-
return pi, state.run(pi)
119+
return pi, state.run(pi.ProcessInfo.definitions.Process, pi, pi)
120120
}
121121
}
122122
return nil, nil
123123
}
124124

125-
func (state *BpmnEngineState) run(instance *processInstanceInfo) (err error) {
126-
process := instance.ProcessInfo
125+
func (state *BpmnEngineState) run(process BPMN20.ProcessElement, instance *processInstanceInfo, currentActivity activity) (err error) {
127126
var commandQueue []command
128127

129-
switch instance.State {
128+
switch currentActivity.State() {
130129
case Ready:
131130
// use start events to start the instance
132-
for _, startEvent := range process.definitions.Process.StartEvents {
131+
for _, startEvent := range process.GetStartEvents() {
133132
var be BPMN20.BaseElement = startEvent
134133
commandQueue = append(commandQueue, activityCommand{
135134
element: &be,
136135
})
137136
}
138-
instance.State = Active
137+
currentActivity.SetState(Active)
139138
// TODO: check? export process EVENT
140139
case Active:
141140
jobs := state.findActiveJobsForContinuation(instance)
@@ -169,17 +168,17 @@ func (state *BpmnEngineState) run(instance *processInstanceInfo) (err error) {
169168
case flowTransitionType:
170169
sourceActivity := cmd.(flowTransitionCommand).sourceActivity
171170
flowId := cmd.(flowTransitionCommand).sequenceFlowId
172-
nextFlows := BPMN20.FindSequenceFlows(&process.definitions.Process.SequenceFlows, []string{flowId})
171+
nextFlows := BPMN20.FindSequenceFlows(process, []string{flowId})
173172
if BPMN20.ExclusiveGateway == (*sourceActivity.Element()).GetType() {
174173
nextFlows, err = exclusivelyFilterByConditionExpression(nextFlows, instance.VariableHolder.Variables())
175174
if err != nil {
176-
instance.State = Failed
175+
instance.ActivityState = Failed
177176
return err
178177
}
179178
}
180179
for _, flow := range nextFlows {
181-
state.exportSequenceFlowEvent(*process, *instance, flow)
182-
baseElements := BPMN20.FindBaseElementsById(&process.definitions, flow.TargetRef)
180+
state.exportSequenceFlowEvent(*instance.ProcessInfo, *instance, flow)
181+
baseElements := BPMN20.FindBaseElementsById(process, flow.TargetRef)
183182
targetBaseElement := baseElements[0]
184183
aCmd := activityCommand{
185184
sourceId: flowId,
@@ -191,17 +190,18 @@ func (state *BpmnEngineState) run(instance *processInstanceInfo) (err error) {
191190
case activityType:
192191
element := cmd.(activityCommand).element
193192
originActivity := cmd.(activityCommand).originActivity
194-
nextCommands := state.handleElement(process, instance, element, originActivity)
195-
state.exportElementEvent(*process, *instance, *element, exporter.ElementCompleted)
193+
nextCommands := state.handleElement(process, currentActivity, instance, element, originActivity)
194+
state.exportElementEvent(process, *instance, *element, exporter.ElementCompleted)
196195
commandQueue = append(commandQueue, nextCommands...)
197196
case continueActivityType:
198197
element := cmd.(continueActivityCommand).activity.Element()
199198
originActivity := cmd.(continueActivityCommand).originActivity
200-
nextCommands := state.handleElement(process, instance, element, originActivity)
199+
nextCommands := state.handleElement(process, currentActivity, instance, element, originActivity)
201200
commandQueue = append(commandQueue, nextCommands...)
202201
case errorType:
203202
err = cmd.(errorCommand).err
204-
instance.State = Failed
203+
instance.ActivityState = Failed
204+
// *activityState = Failed // TODO: check if meaningful
205205
break
206206
case checkExclusiveGatewayDoneType:
207207
activity := cmd.(checkExclusiveGatewayDoneCommand).gatewayActivity
@@ -211,16 +211,16 @@ func (state *BpmnEngineState) run(instance *processInstanceInfo) (err error) {
211211
}
212212
}
213213

214-
if instance.State == Completed || instance.State == Failed {
214+
if instance.ActivityState == Completed || instance.ActivityState == Failed {
215215
// TODO need to send failed State
216-
state.exportEndProcessEvent(*process, *instance)
216+
state.exportEndProcessEvent(*instance.ProcessInfo, *instance)
217217
}
218218

219219
return err
220220
}
221221

222-
func (state *BpmnEngineState) handleElement(process *ProcessInfo, instance *processInstanceInfo, element *BPMN20.BaseElement, originActivity activity) []command {
223-
state.exportElementEvent(*process, *instance, *element, exporter.ElementActivated) // FIXME: don't create event on continuation ?!?!
222+
func (state *BpmnEngineState) handleElement(process BPMN20.ProcessElement, act activity, instance *processInstanceInfo, element *BPMN20.BaseElement, originActivity activity) []command {
223+
state.exportElementEvent(process, *instance, *element, exporter.ElementActivated) // FIXME: don't create event on continuation ?!?!
224224
createFlowTransitions := true
225225
var activity activity
226226
var nextCommands []command
@@ -234,14 +234,9 @@ func (state *BpmnEngineState) handleElement(process *ProcessInfo, instance *proc
234234
element: element,
235235
}
236236
case BPMN20.EndEvent:
237-
state.handleEndEvent(process, instance)
238-
state.exportElementEvent(*process, *instance, *element, exporter.ElementCompleted) // special case here, to end the instance
239-
createFlowTransitions = false
240-
activity = &elementActivity{
241-
key: state.generateKey(),
242-
state: Completed,
243-
element: element,
244-
}
237+
createFlowTransitions = state.handleEndEvent(process, act, instance)
238+
activity = act
239+
state.exportElementEvent(process, *instance, *element, exporter.ElementCompleted) // special case here, to end the instance
245240
case BPMN20.ServiceTask:
246241
taskElement := (*element).(BPMN20.TaskElement)
247242
_, activity = state.handleServiceTask(process, instance, &taskElement)
@@ -274,7 +269,7 @@ func (state *BpmnEngineState) handleElement(process *ProcessInfo, instance *proc
274269
case BPMN20.ParallelGateway:
275270
createFlowTransitions, activity = state.handleParallelGateway(process, instance, (*element).(BPMN20.TParallelGateway), originActivity)
276271
case BPMN20.ExclusiveGateway:
277-
activity = elementActivity{
272+
activity = &elementActivity{
278273
key: state.generateKey(),
279274
state: Active,
280275
element: element,
@@ -289,12 +284,23 @@ func (state *BpmnEngineState) handleElement(process *ProcessInfo, instance *proc
289284
instance.appendActivity(activity)
290285
createFlowTransitions = true
291286
case BPMN20.InclusiveGateway:
292-
activity = elementActivity{
287+
activity = &elementActivity{
293288
key: state.generateKey(),
294289
state: Active,
295290
element: element,
296291
}
297292
createFlowTransitions = true
293+
case BPMN20.SubProcess:
294+
subProcessElement := (*element).(BPMN20.TSubProcess)
295+
activity, err = state.handleSubProcess(instance, &subProcessElement)
296+
if err != nil {
297+
nextCommands = append(nextCommands, errorCommand{
298+
err: err,
299+
elementId: (*element).GetId(),
300+
elementName: (*element).GetName(),
301+
})
302+
}
303+
createFlowTransitions = activity.State() == Completed
298304
default:
299305
panic(fmt.Sprintf("[invariant check] unsupported element: id=%s, type=%s", (*element).GetId(), (*element).GetType()))
300306
}
@@ -314,14 +320,14 @@ func createCheckExclusiveGatewayDoneCommand(originActivity activity) (cmds []com
314320
return cmds
315321
}
316322

317-
func createNextCommands(process *ProcessInfo, instance *processInstanceInfo, element *BPMN20.BaseElement, activity activity) (cmds []command) {
318-
nextFlows := BPMN20.FindSequenceFlows(&process.definitions.Process.SequenceFlows, (*element).GetOutgoingAssociation())
323+
func createNextCommands(process BPMN20.ProcessElement, instance *processInstanceInfo, element *BPMN20.BaseElement, activity activity) (cmds []command) {
324+
nextFlows := BPMN20.FindSequenceFlows(process, (*element).GetOutgoingAssociation())
319325
var err error
320326
switch (*element).GetType() {
321327
case BPMN20.ExclusiveGateway:
322328
nextFlows, err = exclusivelyFilterByConditionExpression(nextFlows, instance.VariableHolder.Variables())
323329
if err != nil {
324-
instance.State = Failed
330+
instance.ActivityState = Failed
325331
cmds = append(cmds, errorCommand{
326332
err: err,
327333
elementId: (*element).GetId(),
@@ -332,7 +338,7 @@ func createNextCommands(process *ProcessInfo, instance *processInstanceInfo, ele
332338
case BPMN20.InclusiveGateway:
333339
nextFlows, err = inclusivelyFilterByConditionExpression(nextFlows, instance.VariableHolder.Variables())
334340
if err != nil {
335-
instance.State = Failed
341+
instance.ActivityState = Failed
336342
return []command{
337343
errorCommand{
338344
elementId: (*element).GetId(),
@@ -352,7 +358,7 @@ func createNextCommands(process *ProcessInfo, instance *processInstanceInfo, ele
352358
return cmds
353359
}
354360

355-
func (state *BpmnEngineState) handleIntermediateCatchEvent(process *ProcessInfo, instance *processInstanceInfo, ice BPMN20.TIntermediateCatchEvent, originActivity activity) (continueFlow bool, activity activity, err error) {
361+
func (state *BpmnEngineState) handleIntermediateCatchEvent(process BPMN20.ProcessElement, instance *processInstanceInfo, ice BPMN20.TIntermediateCatchEvent, originActivity activity) (continueFlow bool, activity activity, err error) {
356362
continueFlow = false
357363
if ice.MessageEventDefinition.Id != "" {
358364
continueFlow, activity, err = state.handleIntermediateMessageCatchEvent(process, instance, ice, originActivity)
@@ -378,7 +384,7 @@ func (state *BpmnEngineState) handleIntermediateCatchEvent(process *ProcessInfo,
378384
return continueFlow, activity, err
379385
}
380386

381-
func (state *BpmnEngineState) handleEndEvent(process *ProcessInfo, instance *processInstanceInfo) {
387+
func (state *BpmnEngineState) handleEndEvent(process BPMN20.ProcessElement, act activity, instance *processInstanceInfo) bool {
382388
activeMessageSubscriptions := false
383389
for _, ms := range state.messageSubscriptions {
384390
if ms.ProcessInstanceKey == instance.InstanceKey {
@@ -389,11 +395,19 @@ func (state *BpmnEngineState) handleEndEvent(process *ProcessInfo, instance *pro
389395
}
390396
}
391397
if !activeMessageSubscriptions {
392-
instance.State = Completed
398+
act.SetState(Completed)
399+
}
400+
switch process.(type) {
401+
case *BPMN20.TProcess:
402+
return false
403+
case *BPMN20.TSubProcess:
404+
act.SetState(Completed)
405+
return true
393406
}
407+
return false
394408
}
395409

396-
func (state *BpmnEngineState) handleParallelGateway(process *ProcessInfo, instance *processInstanceInfo, element BPMN20.TParallelGateway, originActivity activity) (continueFlow bool, resultActivity activity) {
410+
func (state *BpmnEngineState) handleParallelGateway(process BPMN20.ProcessElement, instance *processInstanceInfo, element BPMN20.TParallelGateway, originActivity activity) (continueFlow bool, resultActivity activity) {
397411
resultActivity = instance.findActiveActivityByElementId(element.Id)
398412
if resultActivity == nil {
399413
var be BPMN20.BaseElement = element
@@ -405,7 +419,7 @@ func (state *BpmnEngineState) handleParallelGateway(process *ProcessInfo, instan
405419
}
406420
instance.appendActivity(resultActivity)
407421
}
408-
sourceFlow := BPMN20.FindFirstSequenceFlow(&process.definitions.Process.SequenceFlows, (*originActivity.Element()).GetId(), element.GetId())
422+
sourceFlow := BPMN20.FindFirstSequenceFlow(process, (*originActivity.Element()).GetId(), element.GetId())
409423
resultActivity.(*gatewayActivity).SetInboundFlowCompleted(sourceFlow.Id)
410424
continueFlow = resultActivity.(*gatewayActivity).parallel && resultActivity.(*gatewayActivity).AreInboundFlowsCompleted()
411425
if continueFlow {
@@ -414,6 +428,21 @@ func (state *BpmnEngineState) handleParallelGateway(process *ProcessInfo, instan
414428
return continueFlow, resultActivity
415429
}
416430

431+
func (state *BpmnEngineState) handleSubProcess(instance *processInstanceInfo, subProcessElement *BPMN20.TSubProcess) (subProcessActivity activity, err error) {
432+
var be BPMN20.BaseElement = subProcessElement
433+
subProcessActivity = &subProcessInfo{
434+
ElementId: subProcessElement.GetId(),
435+
ProcessInstance: instance,
436+
ProcessId: state.generateKey(),
437+
CreatedAt: time.Now(),
438+
processState: Ready,
439+
variableHolder: NewVarHolder(&instance.VariableHolder, nil),
440+
baseElement: &be,
441+
}
442+
err = state.run(subProcessElement, instance, subProcessActivity)
443+
return subProcessActivity, err
444+
}
445+
417446
func (state *BpmnEngineState) findActiveJobsForContinuation(instance *processInstanceInfo) (ret []*job) {
418447
for _, job := range state.jobs {
419448
if job.ProcessInstanceKey == instance.InstanceKey && job.JobState == Active {

0 commit comments

Comments
 (0)