-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathevent_end.go
More file actions
92 lines (78 loc) · 2.34 KB
/
event_end.go
File metadata and controls
92 lines (78 loc) · 2.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
/*
Copyright 2023 The bpmn Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package bpmn
import (
"context"
"sync"
"sync/atomic"
"github.com/olive-io/bpmn/schema"
"github.com/olive-io/bpmn/v2/pkg/event"
"github.com/olive-io/bpmn/v2/pkg/tracing"
)
type endEvent struct {
*wiring
element *schema.EndEvent
activated atomic.Bool
completed atomic.Bool
once sync.Once
mch chan imessage
startEventsActivated []*schema.StartEvent
}
func newEndEvent(wr *wiring, element *schema.EndEvent) (evt *endEvent, err error) {
evt = &endEvent{
wiring: wr,
element: element,
mch: make(chan imessage, len(wr.incoming)*2+1),
startEventsActivated: make([]*schema.StartEvent, 0),
}
return
}
func (evt *endEvent) run(ctx context.Context, sender tracing.ISenderHandle) {
defer sender.Done()
for {
select {
case msg := <-evt.mch:
switch m := msg.(type) {
case nextActionMessage:
if !evt.activated.Load() {
evt.activated.Store(true)
} else {
// If the node already completed, then we essentially fuse it
m.response <- completeAction{}
continue
}
if _, err := evt.eventIngress.ConsumeEvent(event.MakeEndEvent(evt.element)); err == nil {
evt.completed.Store(true)
m.response <- completeAction{}
} else {
evt.wiring.tracer.Send(ErrorTrace{Error: err})
}
}
case <-ctx.Done():
evt.tracer.Send(CancellationFlowNodeTrace{Node: evt.element})
return
}
}
}
func (evt *endEvent) NextAction(ctx context.Context, flow Flow) chan IAction {
evt.once.Do(func() {
sender := evt.tracer.RegisterSender()
go evt.run(ctx, sender)
})
response := make(chan IAction, 1)
evt.mch <- nextActionMessage{response: response}
return response
}
func (evt *endEvent) Element() schema.FlowNodeInterface {
return evt.element
}