Skip to content

Commit 956486f

Browse files
author
Sarah Sicard
committed
add unit tests for processor validation
1 parent 0befb56 commit 956486f

File tree

6 files changed

+227
-29
lines changed

6 files changed

+227
-29
lines changed

internal/testutil/conduit.go

Lines changed: 103 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ func SetupSampleConduit(t *testing.T) *v1alpha.Conduit {
2222
Name: "my-pipeline",
2323
Version: "v0.13.2",
2424
Description: "my-description",
25+
Registry: &v1alpha.SchemaRegistry{
26+
URL: "http://apicurio:8080/apis/ccompat/v7",
27+
},
2528
Connectors: []*v1alpha.ConduitConnector{
2629
{
2730
Name: "source-connector",
@@ -60,11 +63,12 @@ func SetupSampleConduit(t *testing.T) *v1alpha.Conduit {
6063
},
6164
Processors: []*v1alpha.ConduitProcessor{
6265
{
63-
ID: "proc1",
64-
Name: "proc1",
65-
Plugin: "builtin:base64.encode",
66-
Workers: 2,
67-
Condition: "{{ eq .Metadata.key \"pipeline\" }}",
66+
ID: "proc1",
67+
Name: "proc1",
68+
Plugin: "builtin:base64.encode",
69+
Workers: 2,
70+
Condition: "{{ eq .Metadata.key \"pipeline\" }}",
71+
ProcessorURL: "http://127.0.0.1:8090/api/files/processors/RECORD_ID/FILENAME",
6872
Settings: []v1alpha.SettingsVar{
6973
{
7074
Name: "setting01",
@@ -103,6 +107,9 @@ func SetupBadNameConduit(t *testing.T) *v1alpha.Conduit {
103107
Name: "my-pipeline",
104108
Description: "my-description",
105109
Version: "v0.13.2",
110+
Registry: &v1alpha.SchemaRegistry{
111+
URL: "http://apicurio:8080/apis/ccompat/v7",
112+
},
106113
Connectors: []*v1alpha.ConduitConnector{
107114
{
108115
Name: "source-connector",
@@ -157,6 +164,9 @@ func SetupBadValidationConduit(t *testing.T) *v1alpha.Conduit {
157164
Image: "ghcr.io/conduitio/conduit",
158165
Description: "my-description",
159166
Version: "v0.13.2",
167+
Registry: &v1alpha.SchemaRegistry{
168+
URL: "http://apicurio:8080/apis/ccompat/v7",
169+
},
160170
Connectors: []*v1alpha.ConduitConnector{
161171
{
162172
Name: "source-connector",
@@ -208,6 +218,9 @@ func SetupSecretConduit(t *testing.T) *v1alpha.Conduit {
208218
Name: "my-pipeline",
209219
Version: "v0.13.2",
210220
Description: "my-description",
221+
Registry: &v1alpha.SchemaRegistry{
222+
URL: "http://apicurio:8080/apis/ccompat/v7",
223+
},
211224
Connectors: []*v1alpha.ConduitConnector{
212225
{
213226
Name: "source-connector",
@@ -258,3 +271,88 @@ func SetupSecretConduit(t *testing.T) *v1alpha.Conduit {
258271

259272
return c
260273
}
274+
275+
func SetupSourceProcConduit(t *testing.T) *v1alpha.Conduit {
276+
t.Helper()
277+
running := true
278+
279+
c := &v1alpha.Conduit{
280+
ObjectMeta: metav1.ObjectMeta{
281+
Name: "sample",
282+
Namespace: "sample",
283+
},
284+
Spec: v1alpha.ConduitSpec{
285+
Running: &running,
286+
Name: "my-pipeline",
287+
Version: "v0.13.2",
288+
Description: "my-description",
289+
Registry: &v1alpha.SchemaRegistry{
290+
URL: "http://apicurio:8080/apis/ccompat/v7",
291+
},
292+
Connectors: []*v1alpha.ConduitConnector{
293+
{
294+
Name: "source-connector",
295+
Type: "source",
296+
Plugin: "builtin:generator",
297+
PluginVersion: "latest",
298+
PluginName: "builtin:generator",
299+
Settings: []v1alpha.SettingsVar{
300+
{
301+
Name: "servers",
302+
Value: "127.0.0.1",
303+
},
304+
{
305+
Name: "topics",
306+
Value: "input-topic",
307+
},
308+
},
309+
Processors: []*v1alpha.ConduitProcessor{
310+
{
311+
ID: "proc1",
312+
Name: "proc1",
313+
Plugin: "builtin:base64.encode",
314+
Workers: 2,
315+
Condition: "{{ eq .Metadata.key \"pipeline\" }}",
316+
ProcessorURL: "http://127.0.0.1:8090/api/files/processors/RECORD_ID/FILENAME",
317+
Settings: []v1alpha.SettingsVar{
318+
{
319+
Name: "setting01",
320+
SecretRef: &corev1.SecretKeySelector{
321+
Key: "setting01-%p-key",
322+
LocalObjectReference: corev1.LocalObjectReference{
323+
Name: "setting01-secret-name",
324+
},
325+
},
326+
},
327+
{
328+
Name: "setting02",
329+
Value: "setting02-val",
330+
},
331+
},
332+
},
333+
},
334+
},
335+
{
336+
Name: "destination-connector",
337+
Type: "destination",
338+
Plugin: "builtin:log",
339+
PluginVersion: "latest",
340+
PluginName: "builtin:log",
341+
Settings: []v1alpha.SettingsVar{
342+
{
343+
Name: "servers",
344+
Value: "127.0.0.1",
345+
},
346+
{
347+
Name: "topic",
348+
Value: "output-topic",
349+
},
350+
},
351+
},
352+
},
353+
Processors: nil,
354+
},
355+
}
356+
357+
return c
358+
}

internal/webhook/v1alpha/conduit_webhook.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ func (v *ConduitCustomValidator) ValidateUpdate(ctx context.Context, _, newObj r
224224
if errs := v.validateConnectors(ctx, conduit.Spec.Connectors, sr); len(errs) > 0 {
225225
return nil, apierrors.NewInvalid(v1alpha.GroupKind, conduit.Name, errs)
226226
}
227+
// why dont we validate processors on update?
227228

228229
return nil, nil
229230
}
@@ -299,6 +300,11 @@ func (*ConduitCustomValidator) validateConduitVersion(ver string) *field.Error {
299300
}
300301

301302
func schemaRegistry(reg *v1alpha.SchemaRegistry, fp *field.Path) (schemaregistry.Registry, *field.Error) {
303+
if reg == nil || reg.URL == "" {
304+
// TO TEST: is registry normally set?
305+
return nil, field.InternalError(fp, fmt.Errorf("registry must be set"))
306+
}
307+
302308
cl, err := schemaregistry.NewClient(conduitlog.Nop(), sr.URLs(reg.URL))
303309
if err != nil {
304310
return nil, field.Invalid(fp, sr.URLs(reg.URL), fmt.Sprintf("failed to create schema registry: %s", err))

internal/webhook/v1alpha/conduit_webhook_test.go

Lines changed: 86 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,21 +68,83 @@ func TestWebhook_ValidateCreate(t *testing.T) {
6868
webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["list"]),
6969
webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["spec"]),
7070
webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["spec"]),
71+
webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["wasm"]),
7172
)
7273

7374
return testutil.SetupSampleConduit(t)
7475
},
7576
},
7677
{
77-
name: "error occurs on http call",
78+
name: "validates pipeline with source processors",
7879
setup: func() *v1alpha.Conduit {
7980
webClient := conduit.SetupHTTPMockClient(t)
80-
webClient.EXPECT().Do(gomock.Any()).Return(nil, errors.New("BOOM")).Times(4)
81+
httpResps := conduit.GetHTTPResps(t)
82+
gomock.InOrder(
83+
webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["list"]),
84+
webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["spec"]),
85+
webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["wasm"]),
86+
webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["spec"]),
87+
)
88+
89+
return testutil.SetupSourceProcConduit(t)
90+
},
91+
},
92+
{
93+
name: "error occurs for connector params",
94+
setup: func() *v1alpha.Conduit {
95+
webClient := conduit.SetupHTTPMockClient(t)
96+
httpResps := conduit.GetHTTPResps(t)
97+
gomock.InOrder(
98+
webClient.EXPECT().Do(gomock.Any()).Return(nil, errors.New("BOOM")),
99+
webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["wasm"]),
100+
)
81101

82102
return testutil.SetupSampleConduit(t)
83103
},
84104
wantErr: nil,
85105
},
106+
{
107+
name: "error occurs for processor schema",
108+
setup: func() *v1alpha.Conduit {
109+
webClient := conduit.SetupHTTPMockClient(t)
110+
httpResps := conduit.GetHTTPResps(t)
111+
gomock.InOrder(
112+
webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["list"]),
113+
webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["spec"]),
114+
webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["spec"]),
115+
webClient.EXPECT().Do(gomock.Any()).Return(nil, errors.New("BOOM")),
116+
)
117+
118+
return testutil.SetupSampleConduit(t)
119+
},
120+
wantErr: apierrors.NewInvalid(v1alpha.GroupKind, "sample", field.ErrorList{
121+
field.InternalError(
122+
field.NewPath("spec", "processors", "schema"),
123+
fmt.Errorf("failed to save wasm to file: BOOM"),
124+
),
125+
}),
126+
},
127+
{
128+
name: "error occurs for source processor schema",
129+
setup: func() *v1alpha.Conduit {
130+
webClient := conduit.SetupHTTPMockClient(t)
131+
httpResps := conduit.GetHTTPResps(t)
132+
gomock.InOrder(
133+
webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["list"]),
134+
webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["spec"]),
135+
webClient.EXPECT().Do(gomock.Any()).Return(nil, errors.New("BOOM")),
136+
webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["spec"]),
137+
)
138+
139+
return testutil.SetupSourceProcConduit(t)
140+
},
141+
wantErr: apierrors.NewInvalid(v1alpha.GroupKind, "sample", field.ErrorList{
142+
field.InternalError(
143+
field.NewPath("spec", "connectors", "schema"),
144+
fmt.Errorf("failed to save wasm to file: BOOM"),
145+
),
146+
}),
147+
},
86148
{
87149
name: "error occurs during validation",
88150
setup: func() *v1alpha.Conduit {
@@ -92,6 +154,7 @@ func TestWebhook_ValidateCreate(t *testing.T) {
92154
webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["list"]),
93155
webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["spec"]),
94156
webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["spec"]),
157+
webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["wasm"]),
95158
)
96159

97160
return testutil.SetupBadValidationConduit(t)
@@ -158,6 +221,27 @@ func TestWebhook_ValidateUpdate(t *testing.T) {
158221
},
159222
wantErr: nil,
160223
},
224+
{
225+
name: "error occurs for processor schema",
226+
setup: func() *v1alpha.Conduit {
227+
webClient := conduit.SetupHTTPMockClient(t)
228+
httpResps := conduit.GetHTTPResps(t)
229+
gomock.InOrder(
230+
webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["list"]),
231+
webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["spec"]),
232+
webClient.EXPECT().Do(gomock.Any()).Return(nil, errors.New("BOOM")),
233+
webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["spec"]),
234+
)
235+
236+
return testutil.SetupSourceProcConduit(t)
237+
},
238+
wantErr: apierrors.NewInvalid(v1alpha.GroupKind, "sample", field.ErrorList{
239+
field.InternalError(
240+
field.NewPath("spec", "connectors", "schema"),
241+
fmt.Errorf("failed to save wasm to file: BOOM"),
242+
),
243+
}),
244+
},
161245
{
162246
name: "error occurs during validation",
163247
setup: func() *v1alpha.Conduit {

pkg/conduit/conduit_validator.go

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -108,17 +108,17 @@ func (v *Validator) ValidateProcessorPlugin(p *v1alpha.ConduitProcessor, fp *fie
108108
}
109109

110110
func (v *Validator) ValidateProcessorSchema(ctx context.Context, p *v1alpha.ConduitProcessor, sr schemaregistry.Registry, fp *field.Path) *field.Error {
111-
processorURL := p.ProcessorURL
112111
// will be getting an incoming URL to pull from pocketbase with the incoming standalone processor
113112
// url is passed in, call to pocketbase to get processor file
114-
file, err := pluginWASM(ctx, processorURL)
113+
file, cleanup, err := pluginWASM(ctx, p.ProcessorURL)
115114
if err != nil {
116-
// TODO how do I want to handle this error
117-
return nil
115+
return field.InternalError(fp.Child("schema"), fmt.Errorf("failed to save wasm to file: %w", err))
116+
}
117+
if cleanup != nil {
118+
defer cleanup()
118119
}
119120

120121
conduitLogger := conduitLog.InitLogger(zerolog.DebugLevel, conduitLog.FormatJSON)
121-
122122
procSchemaService := procutils.NewSchemaService(conduitLogger, sr)
123123
reg, err := standalone.NewRegistry(conduitLogger, "/tmp", procSchemaService)
124124
if err != nil {
@@ -127,8 +127,7 @@ func (v *Validator) ValidateProcessorSchema(ctx context.Context, p *v1alpha.Cond
127127

128128
_, err = reg.Register(ctx, file)
129129
if err != nil {
130-
// TODO handle error
131-
return nil
130+
return field.InternalError(fp.Child("schema"), fmt.Errorf("failed to register: %w", err))
132131
}
133132

134133
return nil
@@ -266,39 +265,38 @@ func formatPluginName(pn string) (string, error) {
266265

267266
// pluginWASM gets the processor WASM from the specified URL and saves to a temp
268267
// file.
269-
// Returns the filename of the created file
270-
func pluginWASM(ctx context.Context, processorURL string) (string, error) {
268+
// Returns the filename of the created file and a cleanup function fo the file
269+
func pluginWASM(ctx context.Context, processorURL string) (string, func(), error) {
271270
req, err := http.NewRequestWithContext(ctx, http.MethodGet, processorURL, nil)
272271
if err != nil {
273-
return "", err
272+
return "", nil, err
274273
}
275274

276275
resp, err := HTTPClient.Do(req)
277276
if err != nil {
278-
return "", err
277+
return "", nil, err
279278
}
280279
if resp.StatusCode != http.StatusOK {
281-
return "", fmt.Errorf("non-sucessful status code while getting processor WASM, status code: %d", resp.StatusCode)
280+
return "", nil, fmt.Errorf("non-sucessful status code while getting processor WASM, status code: %d", resp.StatusCode)
282281
}
283282
defer resp.Body.Close()
284283

285284
// save as tmp file and use for upload
286285
file, err := os.CreateTemp("", "proc-*.wasm")
287286
if err != nil {
288-
return "", err // TODO
287+
return "", nil, err
289288
}
290-
defer func() {
291-
if err := os.Remove(file.Name()); err == nil {
292-
log.Println(err)
293-
} // clean up temp file
294-
file.Close()
295-
}()
296289

297290
if _, err = io.Copy(file, resp.Body); err != nil {
298-
return "", err
291+
return "", nil, err
299292
}
300293

301-
return file.Name(), nil
294+
return file.Name(), func() {
295+
if err := os.Remove(file.Name()); err != nil {
296+
log.Println(err)
297+
}
298+
file.Close()
299+
}, nil
302300
}
303301

304302
// connectorList constructs a dictionary of connectors with information for
@@ -315,7 +313,7 @@ func connectorList(ctx context.Context) (map[string]PluginInfo, error) {
315313
return nil, err
316314
}
317315
if resp.StatusCode != http.StatusOK {
318-
return nil, err // TODO need to fix this to bad status code
316+
return nil, fmt.Errorf("getting connector list, status code: %d", resp.StatusCode)
319317
}
320318
defer resp.Body.Close()
321319

0 commit comments

Comments
 (0)