Skip to content

Commit 90da25e

Browse files
committed
feat: add 'Metadata' field in schema.TaskDefinition
1 parent 7482a02 commit 90da25e

File tree

6 files changed

+56
-27
lines changed

6 files changed

+56
-27
lines changed

gateway_event_based.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package bpmn
1818

1919
import (
2020
"context"
21-
"fmt"
2221
"sync"
2322
"sync/atomic"
2423

@@ -61,7 +60,7 @@ func (gw *eventBasedGateway) run(ctx context.Context, sender tracing.ISenderHand
6160
terminationChannels[*idPtr] = make(chan bool)
6261
} else {
6362
err := errors.NotFoundError{
64-
Expected: fmt.Sprintf("id for %#v", sequenceFlow),
63+
Expected: sequenceFlow,
6564
}
6665
gw.tracer.Send(ErrorTrace{Error: err})
6766
}

gateway_exclusive.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,6 @@ func (gw *exclusiveGateway) run(ctx context.Context, sender tracing.ISenderHandl
148148
case nextActionMessage:
149149
if _, ok := gw.probing[m.flow.Id()]; ok {
150150
gw.probing[m.flow.Id()] = &m.response
151-
// and now we wait until the probe has returned
152151
} else {
153152
gw.probing[m.flow.Id()] = nil
154153
m.response <- probeAction{

process_set.go

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ type ProcessSet struct {
4444

4545
definitions *schema.Definitions
4646

47-
sourceMessageFlows map[string]*schema.MessageFlow
47+
messageFlows map[string]*schema.MessageFlow
4848

4949
cmu sync.RWMutex
5050
catchCh map[string]chan struct{}
@@ -94,25 +94,23 @@ func NewProcessSet(executeProcesses, waitingProcesses []*schema.Process, definit
9494
executes = append(executes, process)
9595
}
9696

97-
sourceMessageFlows := make(map[string]*schema.MessageFlow)
97+
messageFlows := make(map[string]*schema.MessageFlow)
9898
for _, collaboration := range *definitions.Collaborations() {
9999
for _, msg := range *collaboration.MessageFlows() {
100-
sourceMessageFlows[string(msg.SourceRefField)] = &msg
100+
messageFlows[string(msg.SourceRefField)] = &msg
101101
}
102102
}
103103

104104
ps := &ProcessSet{
105-
Options: options,
106-
sourceOptions: opts,
107-
executes: executes,
108-
waitings: waitingProcesses,
109-
definitions: definitions,
110-
sourceMessageFlows: sourceMessageFlows,
111-
112-
catchCh: make(map[string]chan struct{}),
113-
114-
mch: make(chan imessage, len(executes)+1),
115-
done: make(chan struct{}, 1),
105+
Options: options,
106+
sourceOptions: opts,
107+
executes: executes,
108+
waitings: waitingProcesses,
109+
definitions: definitions,
110+
messageFlows: messageFlows,
111+
catchCh: make(map[string]chan struct{}),
112+
mch: make(chan imessage, len(executes)+1),
113+
done: make(chan struct{}, 1),
116114
}
117115

118116
return ps, nil
@@ -160,7 +158,7 @@ func (ps *ProcessSet) run(ctx context.Context) {
160158
case ch := <-ps.mch:
161159
switch msg := ch.(type) {
162160
case throwMessage:
163-
sourceRef, ok := ps.sourceMessageFlows[msg.Id]
161+
sourceRef, ok := ps.messageFlows[msg.Id]
164162
if ok {
165163
startFlowNode, waitingProcess, found := ps.resolveWaitingProcessAndEvent(string(sourceRef.TargetRefField))
166164
if found {
@@ -184,9 +182,9 @@ func (ps *ProcessSet) run(ctx context.Context) {
184182
ps.wg.Add(1)
185183
go ps.tracerProcess(ctx, process, &ps.wg)
186184
}
187-
trigger, found := ps.triggerCatch(string(sourceRef.TargetRefField))
185+
cancel, found := ps.triggerCatch(string(sourceRef.TargetRefField))
188186
if found {
189-
trigger()
187+
cancel()
190188
}
191189
}
192190
}

schema/schema.go

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ const (
3131
BpmnDINS = "bpmndi:"
3232
DINS = "di:"
3333
DCNS = "dc:"
34-
OliveNS = "olive:"
34+
OliveNS = "olive:" // olive consumer namespace
3535
)
3636

3737
var mapping = map[string]string{
@@ -356,6 +356,39 @@ type TaskDefinition struct {
356356
Timeout string `xml:"timeout,attr"`
357357
Retries int32 `xml:"retries,attr"`
358358
Target string `xml:"target,attr"`
359+
// more task definition data (json format)
360+
Metadata string `xml:"metadata,attr"`
361+
}
362+
363+
func (t *TaskDefinition) SetMetadata(name string, value any) {
364+
if t.Metadata == "" {
365+
t.Metadata = "{}"
366+
}
367+
metadata := map[string]any{}
368+
_ = json.Unmarshal([]byte(t.Metadata), &metadata)
369+
metadata[name] = value
370+
data, _ := json.Marshal(metadata)
371+
t.Metadata = string(data)
372+
}
373+
374+
func (t *TaskDefinition) GetMetadata(name string) (any, bool) {
375+
metadata := t.GetMetadatas()
376+
value, ok := metadata[name]
377+
return value, ok
378+
}
379+
380+
func (t *TaskDefinition) SetMetadatas(metadata map[string]any) {
381+
data, _ := json.Marshal(metadata)
382+
t.Metadata = string(data)
383+
}
384+
385+
func (t *TaskDefinition) GetMetadatas() map[string]any {
386+
if t.Metadata == "" {
387+
return map[string]any{}
388+
}
389+
metadata := map[string]any{}
390+
_ = json.Unmarshal([]byte(t.Metadata), &metadata)
391+
return metadata
359392
}
360393

361394
func (t *TaskDefinition) MarshalXML(e *xml.Encoder, start xml.StartElement) error {

schema/schema_parser_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,10 @@ func TestParseSample(t *testing.T) {
7171
if !assert.True(t, ok) {
7272
return
7373
}
74-
75-
_ = extension
76-
t.Log(extension.TaskHeaderField.Header[0])
77-
t.Log(extension.TaskDefinitionField.Type)
74+
metadata := extension.TaskDefinitionField.GetMetadatas()
75+
assert.Equal(t, map[string]any{"a": "b"}, metadata)
76+
assert.Equal(t, "application/json", extension.TaskHeaderField.Header[0].Value)
77+
assert.Equal(t, "service", extension.TaskDefinitionField.Type)
7878
}
7979

8080
func TestParseSampleNs(t *testing.T) {

schema/testdata/sample.bpmn

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@
3030
<bpmn:incoming>x3</bpmn:incoming>
3131
<bpmn:outgoing>x5</bpmn:outgoing>
3232
<bpmn:extensionElements>
33-
<olive:taskDefinition type="service"/>
33+
<olive:taskDefinition type="service" metadata="{&#34;a&#34;: &#34;b&#34;}"/>
3434
<olive:taskHeaders>
35-
<olive:header name="contentType" value="aplication/json"/>
35+
<olive:header name="contentType" value="application/json"/>
3636
</olive:taskHeaders>
3737
<olive:properties>
3838
<olive:property name="a" value="1" type="integer"/>

0 commit comments

Comments
 (0)