Skip to content

Commit fdcb9fe

Browse files
authored
allow for different args with conduit > 12 (#53)
* allow for different args with conduit > 12 * update to using options pattern * fix pkg import * add support for conduit version 13 * subcommand pass through does not work for v13 unless command is set * fix tests * throw error on unsupported conduit versions during container creation * fix linter * fix linter * adding a test for version patch levels * add error checking when creating constraint
1 parent cfadbe5 commit fdcb9fe

File tree

5 files changed

+276
-29
lines changed

5 files changed

+276
-29
lines changed

internal/conduit/version.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package conduit
2+
3+
import (
4+
"fmt"
5+
"strings"
6+
7+
"github.com/Masterminds/semver/v3"
8+
)
9+
10+
type Flags struct {
11+
args *Args
12+
}
13+
14+
type Args struct {
15+
PipelineFile string
16+
ConnectorsPath string
17+
DBPath string
18+
ProcessorsPath string
19+
}
20+
21+
func NewFlags(fns ...func(*Args)) *Flags {
22+
var args Args
23+
for _, fn := range fns {
24+
fn(&args)
25+
}
26+
return &Flags{args: &args}
27+
}
28+
29+
func (f *Flags) ForVersion(ver string) ([]string, error) {
30+
constraints := map[string]string{
31+
"v011": "~0.11.1",
32+
"v012": "~0.12.x",
33+
"v013": "~0.13.x",
34+
}
35+
36+
sanitized, _ := strings.CutPrefix(ver, "v")
37+
v, _ := semver.NewVersion(sanitized)
38+
39+
for key, rule := range constraints {
40+
c, err := semver.NewConstraint(rule)
41+
if err != nil {
42+
return nil, fmt.Errorf("parse error occured while creating constraint: %w", err)
43+
}
44+
if c.Check(v) {
45+
switch key {
46+
case "v011":
47+
return f.v011(), nil
48+
case "v012":
49+
return f.v012(), nil
50+
case "v013":
51+
return f.v013(), nil
52+
}
53+
}
54+
}
55+
return nil, fmt.Errorf("version %s not supported", ver)
56+
}
57+
58+
func (f *Flags) v011() []string {
59+
return []string{
60+
"-pipelines.path", f.args.PipelineFile,
61+
"-connectors.path", f.args.ConnectorsPath,
62+
"-db.type", "sqlite",
63+
"-db.sqlite.path", f.args.DBPath,
64+
"-pipelines.exit-on-error",
65+
"-processors.path", f.args.ProcessorsPath,
66+
}
67+
}
68+
69+
func (f *Flags) v012() []string {
70+
return []string{
71+
"--pipelines.path", f.args.PipelineFile,
72+
"--connectors.path", f.args.ConnectorsPath,
73+
"--db.type", "sqlite",
74+
"--db.sqlite.path", f.args.DBPath,
75+
"--pipelines.exit-on-degraded",
76+
"--processors.path", f.args.ProcessorsPath,
77+
}
78+
}
79+
80+
func (f *Flags) v013() []string {
81+
return []string{
82+
"run",
83+
"--pipelines.path", f.args.PipelineFile,
84+
"--connectors.path", f.args.ConnectorsPath,
85+
"--db.type", "sqlite",
86+
"--db.sqlite.path", f.args.DBPath,
87+
"--pipelines.exit-on-degraded",
88+
"--processors.path", f.args.ProcessorsPath,
89+
}
90+
}
91+
92+
func WithPipelineFile(file string) func(*Args) {
93+
return func(a *Args) {
94+
a.PipelineFile = file
95+
}
96+
}
97+
98+
func WithConnectorsPath(path string) func(*Args) {
99+
return func(a *Args) {
100+
a.ConnectorsPath = path
101+
}
102+
}
103+
104+
func WithDBPath(path string) func(*Args) {
105+
return func(a *Args) {
106+
a.DBPath = path
107+
}
108+
}
109+
110+
func WithProcessorsPath(path string) func(*Args) {
111+
return func(a *Args) {
112+
a.ProcessorsPath = path
113+
}
114+
}

internal/conduit/version_test.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package conduit_test
2+
3+
import (
4+
"testing"
5+
6+
"github.com/conduitio/conduit-operator/api/v1alpha"
7+
"github.com/conduitio/conduit-operator/internal/conduit"
8+
"github.com/matryer/is"
9+
"github.com/pkg/errors"
10+
)
11+
12+
func Test_ForVersion(t *testing.T) {
13+
tests := []struct {
14+
name string
15+
version string
16+
want []string
17+
wantErr error
18+
}{
19+
{
20+
name: "with version less than 0.12",
21+
version: "v0.11.1",
22+
want: []string{
23+
"-pipelines.path", "/conduit.pipelines/pipeline.yaml",
24+
"-connectors.path", "/conduit.storage/connectors",
25+
"-db.type", "sqlite",
26+
"-db.sqlite.path", "/conduit.storage/db",
27+
"-pipelines.exit-on-error",
28+
"-processors.path", "/conduit.storage/processors",
29+
},
30+
},
31+
{
32+
name: "with version of 0.12",
33+
version: "v0.12.0",
34+
want: []string{
35+
"--pipelines.path", "/conduit.pipelines/pipeline.yaml",
36+
"--connectors.path", "/conduit.storage/connectors",
37+
"--db.type", "sqlite",
38+
"--db.sqlite.path", "/conduit.storage/db",
39+
"--pipelines.exit-on-degraded",
40+
"--processors.path", "/conduit.storage/processors",
41+
},
42+
},
43+
{
44+
name: "with a patched version of 0.12",
45+
version: "v0.12.4",
46+
want: []string{
47+
"--pipelines.path", "/conduit.pipelines/pipeline.yaml",
48+
"--connectors.path", "/conduit.storage/connectors",
49+
"--db.type", "sqlite",
50+
"--db.sqlite.path", "/conduit.storage/db",
51+
"--pipelines.exit-on-degraded",
52+
"--processors.path", "/conduit.storage/processors",
53+
},
54+
},
55+
{
56+
name: "with version greater than 0.12",
57+
version: "v0.13.0",
58+
want: []string{
59+
"run",
60+
"--pipelines.path", "/conduit.pipelines/pipeline.yaml",
61+
"--connectors.path", "/conduit.storage/connectors",
62+
"--db.type", "sqlite",
63+
"--db.sqlite.path", "/conduit.storage/db",
64+
"--pipelines.exit-on-degraded",
65+
"--processors.path", "/conduit.storage/processors",
66+
},
67+
},
68+
{
69+
name: "with an unsupported version",
70+
version: "v0.14.0",
71+
want: nil,
72+
wantErr: errors.Errorf("version v0.14.0 not supported"),
73+
},
74+
}
75+
76+
for _, tc := range tests {
77+
t.Run(tc.name, func(t *testing.T) {
78+
is := is.New(t)
79+
80+
flags := conduit.NewFlags(
81+
conduit.WithPipelineFile(v1alpha.ConduitPipelineFile),
82+
conduit.WithConnectorsPath(v1alpha.ConduitConnectorsPath),
83+
conduit.WithDBPath(v1alpha.ConduitDBPath),
84+
conduit.WithProcessorsPath(v1alpha.ConduitProcessorsPath),
85+
)
86+
args, err := flags.ForVersion(tc.version)
87+
if err != nil {
88+
is.Equal(tc.wantErr.Error(), err.Error())
89+
}
90+
is.Equal(args, tc.want)
91+
})
92+
}
93+
}

internal/controller/conduit_containers.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"sync"
99

1010
v1alpha "github.com/conduitio/conduit-operator/api/v1alpha"
11+
"github.com/conduitio/conduit-operator/internal/conduit"
1112
corev1 "k8s.io/api/core/v1"
1213
"k8s.io/apimachinery/pkg/util/intstr"
1314
)
@@ -130,22 +131,23 @@ func ConduitInitContainers(cc []*v1alpha.ConduitConnector) []corev1.Container {
130131
}
131132

132133
// ConduitRuntimeContainer returns a Kubernetes container definition
133-
// todo is the pipelineName supposed to be used?
134-
func ConduitRuntimeContainer(image, version string, envVars []corev1.EnvVar) corev1.Container {
135-
args := []string{
136-
"/app/conduit",
137-
"-pipelines.path", v1alpha.ConduitPipelineFile,
138-
"-connectors.path", v1alpha.ConduitConnectorsPath,
139-
"-db.type", "sqlite",
140-
"-db.sqlite.path", v1alpha.ConduitDBPath,
141-
"-pipelines.exit-on-error",
142-
"-processors.path", v1alpha.ConduitProcessorsPath,
134+
func ConduitRuntimeContainer(image, version string, envVars []corev1.EnvVar) (corev1.Container, error) {
135+
flags := conduit.NewFlags(
136+
conduit.WithPipelineFile(v1alpha.ConduitPipelineFile),
137+
conduit.WithConnectorsPath(v1alpha.ConduitConnectorsPath),
138+
conduit.WithDBPath(v1alpha.ConduitDBPath),
139+
conduit.WithProcessorsPath(v1alpha.ConduitProcessorsPath),
140+
)
141+
args, err := flags.ForVersion(version)
142+
if err != nil {
143+
return corev1.Container{}, err
143144
}
144145

145146
return corev1.Container{
146147
Name: v1alpha.ConduitContainerName,
147148
Image: fmt.Sprint(image, ":", version),
148149
ImagePullPolicy: corev1.PullAlways,
150+
Command: []string{"/app/conduit"},
149151
Args: args,
150152
Ports: []corev1.ContainerPort{
151153
{
@@ -184,5 +186,5 @@ func ConduitRuntimeContainer(image, version string, envVars []corev1.EnvVar) cor
184186
},
185187
},
186188
Env: envVars,
187-
}
189+
}, nil
188190
}

internal/controller/conduit_containers_test.go

Lines changed: 49 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"testing"
55

66
"github.com/google/go-cmp/cmp"
7+
"github.com/matryer/is"
8+
"github.com/pkg/errors"
79

810
corev1 "k8s.io/api/core/v1"
911
"k8s.io/apimachinery/pkg/util/intstr"
@@ -190,12 +192,12 @@ func Test_ConduitInitContainers(t *testing.T) {
190192
}
191193

192194
func Test_ConduitRuntimeContainer(t *testing.T) {
193-
want := corev1.Container{
195+
runtimeContainer := corev1.Container{
194196
Name: "conduit-server",
195197
Image: "my-image:v0.11.1",
196198
ImagePullPolicy: corev1.PullAlways,
199+
Command: []string{"/app/conduit"},
197200
Args: []string{
198-
"/app/conduit",
199201
"-pipelines.path", "/conduit.pipelines/pipeline.yaml",
200202
"-connectors.path", "/conduit.storage/connectors",
201203
"-db.type", "sqlite",
@@ -252,21 +254,51 @@ func Test_ConduitRuntimeContainer(t *testing.T) {
252254
},
253255
}
254256

255-
got := ConduitRuntimeContainer(
256-
"my-image",
257-
"v0.11.1",
258-
[]corev1.EnvVar{
259-
{
260-
Name: "var-1",
261-
Value: "val-1",
262-
},
263-
{
264-
Name: "var-2",
265-
Value: "val-2",
266-
},
257+
tests := []struct {
258+
name string
259+
version string
260+
want corev1.Container
261+
wantErr error
262+
}{
263+
{
264+
name: "runtime container is created",
265+
version: "v0.11.1",
266+
want: runtimeContainer,
267+
wantErr: nil,
267268
},
268-
)
269-
if diff := cmp.Diff(want, got); diff != "" {
270-
t.Fatalf("container mismatch (-want +got): %v", diff)
269+
{
270+
name: "error occurs creating runtime container",
271+
version: "v0.14.0",
272+
want: corev1.Container{},
273+
wantErr: errors.Errorf("version v0.14.0 not supported"),
274+
},
275+
}
276+
277+
for _, tc := range tests {
278+
t.Run(tc.name, func(t *testing.T) {
279+
is := is.New(t)
280+
281+
got, err := ConduitRuntimeContainer(
282+
"my-image",
283+
tc.version,
284+
[]corev1.EnvVar{
285+
{
286+
Name: "var-1",
287+
Value: "val-1",
288+
},
289+
{
290+
Name: "var-2",
291+
Value: "val-2",
292+
},
293+
},
294+
)
295+
if err != nil {
296+
is.Equal(tc.wantErr.Error(), err.Error())
297+
}
298+
299+
if diff := cmp.Diff(tc.want, got); diff != "" {
300+
t.Fatalf("container mismatch (-want +got): %v", diff)
301+
}
302+
})
271303
}
272304
}

internal/controller/conduit_controller.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,12 @@ func (r *ConduitReconciler) CreateOrUpdateDeployment(ctx context.Context, c *v1.
339339
})
340340
}
341341

342+
// fmt.Printf("version %s", c.Spec.Version)
343+
container, err := ConduitRuntimeContainer(c.Spec.Image, c.Spec.Version, envVars)
344+
if err != nil {
345+
return err
346+
}
347+
342348
spec := appsv1.DeploymentSpec{
343349
Strategy: appsv1.DeploymentStrategy{
344350
Type: appsv1.RecreateDeploymentStrategyType,
@@ -351,7 +357,7 @@ func (r *ConduitReconciler) CreateOrUpdateDeployment(ctx context.Context, c *v1.
351357
RestartPolicy: corev1.RestartPolicyAlways,
352358
InitContainers: ConduitInitContainers(c.Spec.Connectors),
353359
Containers: []corev1.Container{
354-
ConduitRuntimeContainer(c.Spec.Image, c.Spec.Version, envVars),
360+
container,
355361
},
356362
Volumes: []corev1.Volume{
357363
ConduitVolume(nn.Name),

0 commit comments

Comments
 (0)