Skip to content

Commit dec43f6

Browse files
committed
feat: advanced sync
1 parent 741ab0e commit dec43f6

File tree

4 files changed

+108
-9
lines changed

4 files changed

+108
-9
lines changed

pkg/apis/api-rules/violation_exceptions.list

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ API rule violation: list_type_missing,github.com/argoproj/argo-workflows/v3/pkg/
1515
API rule violation: list_type_missing,github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1,Inputs,Parameters
1616
API rule violation: list_type_missing,github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1,LabelKeys,Items
1717
API rule violation: list_type_missing,github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1,LabelValues,Items
18+
API rule violation: list_type_missing,github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1,Mutex,Selectors
1819
API rule violation: list_type_missing,github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1,Metrics,Prometheus
1920
API rule violation: list_type_missing,github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1,NodeStatus,Children
2021
API rule violation: list_type_missing,github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1,NodeStatus,OutboundNodes
@@ -24,6 +25,7 @@ API rule violation: list_type_missing,github.com/argoproj/argo-workflows/v3/pkg/
2425
API rule violation: list_type_missing,github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1,ParallelSteps,Steps
2526
API rule violation: list_type_missing,github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1,Parameter,Enum
2627
API rule violation: list_type_missing,github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1,Prometheus,Labels
28+
API rule violation: list_type_missing,github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1,SemaphoreRef,Selectors
2729
API rule violation: list_type_missing,github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1,ResourceTemplate,Flags
2830
API rule violation: list_type_missing,github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1,SemaphoreStatus,Holding
2931
API rule violation: list_type_missing,github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1,SemaphoreStatus,Waiting

pkg/apis/workflow/v1alpha1/workflow_types.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1665,12 +1665,23 @@ func (s *Synchronization) getSemaphoreConfigMapRef() *apiv1.ConfigMapKeySelector
16651665
return nil
16661666
}
16671667

1668+
// Synchronization selector
1669+
type SyncSelector struct {
1670+
// Name of the selector
1671+
Name string `json:"name,omitempty" protobuf:"bytes,1,opt,name=name"`
1672+
1673+
// Template replaced with global variables
1674+
Template string `json:"template,omitempty" protobuf:"bytes,2,opt,name=template"`
1675+
}
1676+
16681677
// SemaphoreRef is a reference of Semaphore
16691678
type SemaphoreRef struct {
16701679
// ConfigMapKeyRef is configmap selector for Semaphore configuration
16711680
ConfigMapKeyRef *apiv1.ConfigMapKeySelector `json:"configMapKeyRef,omitempty" protobuf:"bytes,1,opt,name=configMapKeyRef"`
16721681
// Namespace is the namespace of the configmap, default: [namespace of workflow]
16731682
Namespace string `json:"namespace,omitempty" protobuf:"bytes,2,opt,name=namespace"`
1683+
// Selectors is a list of references to dynamic values (like parameters, labels, annotations) that can be added to semaphore key to make concurrency more customizable
1684+
Selectors []SyncSelector `json:"selectors,omitempty" protobuf:"bytes,3,opt,name=selectors"`
16741685
}
16751686

16761687
// Mutex holds Mutex configuration
@@ -1679,6 +1690,8 @@ type Mutex struct {
16791690
Name string `json:"name,omitempty" protobuf:"bytes,1,opt,name=name"`
16801691
// Namespace is the namespace of the mutex, default: [namespace of workflow]
16811692
Namespace string `json:"namespace,omitempty" protobuf:"bytes,2,opt,name=namespace"`
1693+
// Selectors is a list of references to dynamic values (like parameters, labels, annotations) that can be added to mutex key to make concurrency more customizable
1694+
Selectors []SyncSelector `json:"selectors,omitempty" protobuf:"bytes,3,opt,name=selectors"`
16821695
}
16831696

16841697
// WorkflowTemplateRef is a reference to a WorkflowTemplate resource.

workflow/sync/lock_name.go

Lines changed: 54 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package sync
22

33
import (
44
"fmt"
5+
"regexp"
56
"strings"
67

78
"github.com/argoproj/argo-workflows/v3/errors"
@@ -20,14 +21,16 @@ type lockName struct {
2021
ResourceName string
2122
Key string
2223
Kind lockKind
24+
Selectors []v1alpha1.SyncSelector
2325
}
2426

25-
func newLockName(namespace, resourceName, lockKey string, kind lockKind) *lockName {
27+
func newLockName(namespace, resourceName, lockKey string, kind lockKind, selectors []v1alpha1.SyncSelector) *lockName {
2628
return &lockName{
2729
Namespace: namespace,
2830
ResourceName: resourceName,
2931
Key: lockKey,
3032
Kind: kind,
33+
Selectors: selectors,
3134
}
3235
}
3336

@@ -37,7 +40,7 @@ func getSemaphoreLockName(sem *v1alpha1.SemaphoreRef, wfNamespace string) (*lock
3740
if namespace == "" {
3841
namespace = wfNamespace
3942
}
40-
return newLockName(namespace, sem.ConfigMapKeyRef.Name, sem.ConfigMapKeyRef.Key, lockKindConfigMap), nil
43+
return newLockName(namespace, sem.ConfigMapKeyRef.Name, sem.ConfigMapKeyRef.Key, lockKindConfigMap, sem.Selectors), nil
4144
}
4245
return nil, fmt.Errorf("cannot get LockName for a Semaphore without a ConfigMapRef")
4346
}
@@ -47,7 +50,7 @@ func getMutexLockName(mtx *v1alpha1.Mutex, wfNamespace string) *lockName {
4750
if namespace == "" {
4851
namespace = wfNamespace
4952
}
50-
return newLockName(namespace, mtx.Name, "", lockKindMutex)
53+
return newLockName(namespace, mtx.Name, "", lockKindMutex, mtx.Selectors)
5154
}
5255

5356
func getLockName(item *syncItem, wfNamespace string) (*lockName, error) {
@@ -62,7 +65,10 @@ func getLockName(item *syncItem, wfNamespace string) (*lockName, error) {
6265
}
6366

6467
func DecodeLockName(name string) (*lockName, error) {
65-
items := strings.SplitN(name, "/", 3)
68+
splittedLockName := strings.Split(name, "?")
69+
lockNameTrimedSelectors := splittedLockName[0]
70+
selectors := ParseSelectors(strings.Join(splittedLockName[1:], "?"))
71+
items := strings.SplitN(lockNameTrimedSelectors, "/", 3)
6672
if len(items) < 3 {
6773
return nil, errors.New(errors.CodeBadRequest, "Invalid lock key: unknown format")
6874
}
@@ -73,15 +79,15 @@ func DecodeLockName(name string) (*lockName, error) {
7379

7480
switch lockKind {
7581
case lockKindMutex:
76-
lock = lockName{Namespace: namespace, Kind: lockKind, ResourceName: items[2]}
82+
lock = lockName{Namespace: namespace, Kind: lockKind, ResourceName: items[2], Selectors: selectors}
7783
case lockKindConfigMap:
7884
components := strings.Split(items[2], "/")
7985

8086
if len(components) != 2 {
8187
return nil, errors.New(errors.CodeBadRequest, "Invalid ConfigMap lock key: unknown format")
8288
}
8389

84-
lock = lockName{Namespace: namespace, Kind: lockKind, ResourceName: components[0], Key: components[1]}
90+
lock = lockName{Namespace: namespace, Kind: lockKind, ResourceName: components[0], Key: components[1], Selectors: selectors}
8591
default:
8692
return nil, errors.New(errors.CodeBadRequest, fmt.Sprintf("Invalid lock key, unexpected kind: %s", lockKind))
8793
}
@@ -93,11 +99,50 @@ func DecodeLockName(name string) (*lockName, error) {
9399
return &lock, nil
94100
}
95101

102+
func StringifySelectors(selectors []v1alpha1.SyncSelector) string {
103+
joinedSelectors := ""
104+
for _, selector := range selectors {
105+
// at this point template should be already replaced
106+
if selector.Template != "" {
107+
// escape & and = chars to decode easily later
108+
re := regexp.MustCompile("&|=")
109+
escapedSelectorName := re.ReplaceAllString(selector.Name, "-")
110+
escapedSelectorValue := re.ReplaceAllString(selector.Template, "-")
111+
112+
joinedSelectors = joinedSelectors + fmt.Sprintf("%s=%s&", escapedSelectorName, escapedSelectorValue)
113+
}
114+
}
115+
return strings.TrimRight(joinedSelectors, "&")
116+
}
117+
118+
func ParseSelectors(selectors string) []v1alpha1.SyncSelector {
119+
parsedSelectors := []v1alpha1.SyncSelector{}
120+
splittedSelectors := strings.Split(selectors, "&")
121+
122+
for _, selectorStr := range splittedSelectors {
123+
keyValPair := strings.Split(selectorStr, "=")
124+
if len(keyValPair) == 2 {
125+
parsedSelectors = append(parsedSelectors, v1alpha1.SyncSelector{
126+
Name: keyValPair[0],
127+
Template: keyValPair[1],
128+
})
129+
}
130+
// otherwise consider invalid, do nothing
131+
}
132+
return parsedSelectors
133+
}
134+
96135
func (ln *lockName) encodeName() string {
97-
if ln.Kind == lockKindMutex {
98-
return ln.validateEncoding(fmt.Sprintf("%s/%s/%s", ln.Namespace, ln.Kind, ln.ResourceName))
136+
encodingBuilder := &strings.Builder{}
137+
138+
encodingBuilder.WriteString(fmt.Sprintf("%s/%s/%s", ln.Namespace, ln.Kind, ln.ResourceName))
139+
if ln.Kind == lockKindConfigMap {
140+
encodingBuilder.WriteString(fmt.Sprintf("/%s", ln.Key))
141+
}
142+
if selectors := StringifySelectors(ln.Selectors); len(selectors) > 0 {
143+
encodingBuilder.WriteString(fmt.Sprintf("?%s", selectors))
99144
}
100-
return ln.validateEncoding(fmt.Sprintf("%s/%s/%s/%s", ln.Namespace, ln.Kind, ln.ResourceName, ln.Key))
145+
return ln.validateEncoding(encodingBuilder.String())
101146
}
102147

103148
func (ln *lockName) validate() error {

workflow/sync/lock_name_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"testing"
66

77
"github.com/stretchr/testify/assert"
8+
9+
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
810
)
911

1012
func TestDecodeLockName(t *testing.T) {
@@ -25,6 +27,7 @@ func TestDecodeLockName(t *testing.T) {
2527
ResourceName: "test",
2628
Key: "",
2729
Kind: lockKindMutex,
30+
Selectors: []v1alpha1.SyncSelector{},
2831
},
2932
func(t assert.TestingT, err error, i ...interface{}) bool {
3033
return true
@@ -38,6 +41,7 @@ func TestDecodeLockName(t *testing.T) {
3841
ResourceName: "test/foo/bar/baz",
3942
Key: "",
4043
Kind: lockKindMutex,
44+
Selectors: []v1alpha1.SyncSelector{},
4145
},
4246
func(t assert.TestingT, err error, i ...interface{}) bool {
4347
return true
@@ -51,6 +55,7 @@ func TestDecodeLockName(t *testing.T) {
5155
ResourceName: "foo",
5256
Key: "bar",
5357
Kind: lockKindConfigMap,
58+
Selectors: []v1alpha1.SyncSelector{},
5459
},
5560
func(t assert.TestingT, err error, i ...interface{}) bool {
5661
return true
@@ -64,6 +69,40 @@ func TestDecodeLockName(t *testing.T) {
6469
return err == nil // this should error
6570
},
6671
},
72+
{
73+
"TestConfigMapSelectorsParsedCorrectly",
74+
args{"default/ConfigMap/foo/bar?selector1=selector1-value&selector2=selector2-value"},
75+
&lockName{
76+
Namespace: "default",
77+
ResourceName: "foo",
78+
Key: "bar",
79+
Kind: lockKindConfigMap,
80+
Selectors: []v1alpha1.SyncSelector{
81+
{Name: "selector1", Template: "selector1-value"},
82+
{Name: "selector2", Template: "selector2-value"},
83+
},
84+
},
85+
func(t assert.TestingT, err error, i ...interface{}) bool {
86+
return true
87+
},
88+
},
89+
{
90+
"TestMutexSelectorsParsedCorrectly",
91+
args{"default/Mutex/test?selector1=selector1-value&selector2=selector2-value"},
92+
&lockName{
93+
Namespace: "default",
94+
ResourceName: "test",
95+
Key: "",
96+
Kind: lockKindMutex,
97+
Selectors: []v1alpha1.SyncSelector{
98+
{Name: "selector1", Template: "selector1-value"},
99+
{Name: "selector2", Template: "selector2-value"},
100+
},
101+
},
102+
func(t assert.TestingT, err error, i ...interface{}) bool {
103+
return true
104+
},
105+
},
67106
}
68107
for _, tt := range tests {
69108
t.Run(tt.name, func(t *testing.T) {

0 commit comments

Comments
 (0)