Skip to content

Commit 35c3beb

Browse files
authored
Refactor pipelineResource into a separate package (#639)
* Refactor pipelineResource into its own package This helps the type to be reused by other packages * ToJSON -> MarshalJSON * Remove unnecessary pointer argument to Marshal * FileName -> Filename * Rename pipeline.Resource to ingest.Pipeline * Reorder imports
1 parent a0bea64 commit 35c3beb

File tree

4 files changed

+168
-62
lines changed

4 files changed

+168
-62
lines changed
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
5+
package ingest
6+
7+
import (
8+
"encoding/json"
9+
"strings"
10+
11+
"github.com/pkg/errors"
12+
"gopkg.in/yaml.v3"
13+
)
14+
15+
// Pipeline represents a pipeline resource loaded from a file
16+
type Pipeline struct {
17+
Name string // Name of the pipeline
18+
Format string // Format (extension) of the pipeline
19+
Content []byte // Content is the original file contents.
20+
}
21+
22+
// Filename returns the original filename associated with the pipeline.
23+
func (p *Pipeline) Filename() string {
24+
pos := strings.LastIndexByte(p.Name, '-')
25+
if pos == -1 {
26+
pos = len(p.Name)
27+
}
28+
return p.Name[:pos] + "." + p.Format
29+
}
30+
31+
// MarshalJSON returns the pipeline contents in JSON format.
32+
func (p *Pipeline) MarshalJSON() (asJSON []byte, err error) {
33+
switch p.Format {
34+
case "json":
35+
asJSON = p.Content
36+
case "yaml", "yml":
37+
var node map[string]interface{}
38+
err = yaml.Unmarshal(p.Content, &node)
39+
if err != nil {
40+
return nil, errors.Wrapf(err, "unmarshalling pipeline content failed (pipeline: %s)", p.Name)
41+
}
42+
if asJSON, err = json.Marshal(node); err != nil {
43+
return nil, errors.Wrapf(err, "marshalling pipeline content failed (pipeline: %s)", p.Name)
44+
}
45+
default:
46+
return nil, errors.Errorf("unsupported pipeline format '%s' (pipeline: %s)", p.Format, p.Name)
47+
}
48+
return asJSON, nil
49+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
5+
package ingest
6+
7+
import (
8+
"testing"
9+
10+
"github.com/stretchr/testify/assert"
11+
)
12+
13+
func TestPipelineFileName(t *testing.T) {
14+
for _, tt := range []struct {
15+
title string
16+
pipeline Pipeline
17+
expected string
18+
}{
19+
{
20+
title: "name with nonce",
21+
pipeline: Pipeline{
22+
Name: "default-1234",
23+
Format: "yml",
24+
},
25+
expected: "default.yml",
26+
},
27+
{
28+
title: "name without nonce",
29+
pipeline: Pipeline{
30+
Name: "mypipeline",
31+
Format: "json",
32+
},
33+
expected: "mypipeline.json",
34+
},
35+
{
36+
title: "empty resource",
37+
expected: ".",
38+
},
39+
} {
40+
t.Run(tt.title, func(t *testing.T) {
41+
assert.Equal(t, tt.expected, tt.pipeline.Filename())
42+
})
43+
}
44+
}
45+
46+
func TestPipelineMarshalJSON(t *testing.T) {
47+
for _, tt := range []struct {
48+
title string
49+
pipeline Pipeline
50+
expected string
51+
isErr bool
52+
}{
53+
{
54+
title: "JSON source",
55+
pipeline: Pipeline{
56+
Format: "json",
57+
Content: []byte(`{"foo":["bar"]}`),
58+
},
59+
expected: `{"foo":["bar"]}`,
60+
},
61+
{
62+
title: "Yaml source",
63+
pipeline: Pipeline{
64+
Format: "yaml",
65+
Content: []byte(`foo: ["bar"]`),
66+
},
67+
expected: `{"foo":["bar"]}`,
68+
},
69+
{
70+
title: "bad Yaml",
71+
pipeline: Pipeline{
72+
Format: "yaml",
73+
Content: []byte(`broken"`),
74+
},
75+
isErr: true,
76+
},
77+
} {
78+
t.Run(tt.title, func(t *testing.T) {
79+
got, err := tt.pipeline.MarshalJSON()
80+
if tt.isErr {
81+
assert.Error(t, err)
82+
} else {
83+
assert.NoError(t, err)
84+
assert.Equal(t, []byte(tt.expected), got)
85+
}
86+
})
87+
}
88+
}

internal/testrunner/runners/pipeline/ingest_pipeline.go

Lines changed: 29 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,14 @@ import (
1818
"time"
1919

2020
"github.com/pkg/errors"
21-
"gopkg.in/yaml.v3"
2221

2322
"github.com/elastic/elastic-package/internal/elasticsearch"
23+
"github.com/elastic/elastic-package/internal/elasticsearch/ingest"
2424
"github.com/elastic/elastic-package/internal/packages"
2525
)
2626

2727
var ingestPipelineTag = regexp.MustCompile(`{{\s*IngestPipeline.+}}`)
2828

29-
type pipelineResource struct {
30-
name string
31-
format string
32-
content []byte
33-
}
34-
3529
type simulatePipelineRequest struct {
3630
Docs []pipelineDocument `json:"docs"`
3731
}
@@ -48,7 +42,7 @@ type pipelineIngestedDocument struct {
4842
Doc pipelineDocument `json:"doc"`
4943
}
5044

51-
func installIngestPipelines(api *elasticsearch.API, dataStreamPath string) (string, []pipelineResource, error) {
45+
func installIngestPipelines(api *elasticsearch.API, dataStreamPath string) (string, []ingest.Pipeline, error) {
5246
dataStreamManifest, err := packages.ReadDataStreamManifest(filepath.Join(dataStreamPath, packages.DataStreamManifestFile))
5347
if err != nil {
5448
return "", nil, errors.Wrap(err, "reading data stream manifest failed")
@@ -62,19 +56,15 @@ func installIngestPipelines(api *elasticsearch.API, dataStreamPath string) (stri
6256
return "", nil, errors.Wrap(err, "loading ingest pipeline files failed")
6357
}
6458

65-
jsonPipelines, err := convertPipelineToJSON(pipelines)
66-
if err != nil {
67-
return "", nil, errors.Wrap(err, "converting pipelines failed")
68-
}
59+
err = installPipelinesInElasticsearch(api, pipelines)
6960

70-
err = installPipelinesInElasticsearch(api, jsonPipelines)
7161
if err != nil {
7262
return "", nil, errors.Wrap(err, "installing pipelines failed")
7363
}
74-
return mainPipeline, jsonPipelines, nil
64+
return mainPipeline, pipelines, nil
7565
}
7666

77-
func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]pipelineResource, error) {
67+
func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]ingest.Pipeline, error) {
7868
elasticsearchPath := filepath.Join(dataStreamPath, "elasticsearch", "ingest_pipeline")
7969

8070
var pipelineFiles []string
@@ -86,7 +76,7 @@ func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]pipelineReso
8676
pipelineFiles = append(pipelineFiles, files...)
8777
}
8878

89-
var pipelines []pipelineResource
79+
var pipelines []ingest.Pipeline
9080
for _, path := range pipelineFiles {
9181
c, err := os.ReadFile(path)
9282
if err != nil {
@@ -102,75 +92,52 @@ func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]pipelineReso
10292
return []byte(getWithPipelineNameWithNonce(pipelineTag, nonce))
10393
})
10494
name := filepath.Base(path)
105-
pipelines = append(pipelines, pipelineResource{
106-
name: getWithPipelineNameWithNonce(name[:strings.Index(name, ".")], nonce),
107-
format: filepath.Ext(path)[1:],
108-
content: c,
95+
pipelines = append(pipelines, ingest.Pipeline{
96+
Name: getWithPipelineNameWithNonce(name[:strings.Index(name, ".")], nonce),
97+
Format: filepath.Ext(path)[1:],
98+
Content: c,
10999
})
110100
}
111101
return pipelines, nil
112102
}
113103

114-
func convertPipelineToJSON(pipelines []pipelineResource) ([]pipelineResource, error) {
115-
var jsonPipelines []pipelineResource
116-
for _, pipeline := range pipelines {
117-
if pipeline.format == "json" {
118-
jsonPipelines = append(jsonPipelines, pipeline)
119-
continue
120-
}
121-
122-
var node map[string]interface{}
123-
err := yaml.Unmarshal(pipeline.content, &node)
124-
if err != nil {
125-
return nil, errors.Wrapf(err, "unmarshalling pipeline content failed (pipeline: %s)", pipeline.name)
126-
}
127-
128-
c, err := json.Marshal(&node)
129-
if err != nil {
130-
return nil, errors.Wrapf(err, "marshalling pipeline content failed (pipeline: %s)", pipeline.name)
131-
}
132-
133-
jsonPipelines = append(jsonPipelines, pipelineResource{
134-
name: pipeline.name,
135-
format: "json",
136-
content: c,
137-
})
138-
}
139-
return jsonPipelines, nil
140-
}
141-
142-
func installPipelinesInElasticsearch(api *elasticsearch.API, pipelines []pipelineResource) error {
143-
for _, pipeline := range pipelines {
144-
if err := installPipeline(api, pipeline); err != nil {
104+
func installPipelinesInElasticsearch(api *elasticsearch.API, pipelines []ingest.Pipeline) error {
105+
for _, p := range pipelines {
106+
if err := installPipeline(api, p); err != nil {
145107
return err
146108
}
147109
}
148110
return nil
149111
}
150112

151-
func installPipeline(api *elasticsearch.API, pipeline pipelineResource) error {
113+
func installPipeline(api *elasticsearch.API, pipeline ingest.Pipeline) error {
152114
if err := putIngestPipeline(api, pipeline); err != nil {
153115
return err
154116
}
155117
// Just to be sure the pipeline has been uploaded.
156-
return getIngestPipeline(api, pipeline.name)
118+
return getIngestPipeline(api, pipeline.Name)
157119
}
158120

159-
func putIngestPipeline(api *elasticsearch.API, pipeline pipelineResource) error {
160-
r, err := api.Ingest.PutPipeline(pipeline.name, bytes.NewReader(pipeline.content))
121+
func putIngestPipeline(api *elasticsearch.API, pipeline ingest.Pipeline) error {
122+
source, err := pipeline.MarshalJSON()
161123
if err != nil {
162-
return errors.Wrapf(err, "PutPipeline API call failed (pipelineName: %s)", pipeline.name)
124+
return err
125+
}
126+
r, err := api.Ingest.PutPipeline(pipeline.Name, bytes.NewReader(source))
127+
if err != nil {
128+
return errors.Wrapf(err, "PutPipeline API call failed (pipelineName: %s)", pipeline.Name)
163129
}
164130
defer r.Body.Close()
165131

166132
body, err := io.ReadAll(r.Body)
167133
if err != nil {
168-
return errors.Wrapf(err, "failed to read PutPipeline API response body (pipelineName: %s)", pipeline.name)
134+
return errors.Wrapf(err, "failed to read PutPipeline API response body (pipelineName: %s)", pipeline.Name)
169135
}
170136

171137
if r.StatusCode != http.StatusOK {
138+
172139
return errors.Wrapf(elasticsearch.NewError(body), "unexpected response status for PutPipeline (%d): %s (pipelineName: %s)",
173-
r.StatusCode, r.Status(), pipeline.name)
140+
r.StatusCode, r.Status(), pipeline.Name)
174141
}
175142
return nil
176143
}
@@ -196,12 +163,13 @@ func getIngestPipeline(api *elasticsearch.API, pipelineName string) error {
196163
return nil
197164
}
198165

199-
func uninstallIngestPipelines(api *elasticsearch.API, pipelines []pipelineResource) error {
166+
func uninstallIngestPipelines(api *elasticsearch.API, pipelines []ingest.Pipeline) error {
200167
for _, pipeline := range pipelines {
201-
_, err := api.Ingest.DeletePipeline(pipeline.name)
168+
resp, err := api.Ingest.DeletePipeline(pipeline.Name)
202169
if err != nil {
203-
return errors.Wrapf(err, "DeletePipeline API call failed (pipelineName: %s)", pipeline.name)
170+
return errors.Wrapf(err, "DeletePipeline API call failed (pipelineName: %s)", pipeline.Name)
204171
}
172+
resp.Body.Close()
205173
}
206174
return nil
207175
}

internal/testrunner/runners/pipeline/runner.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/pkg/errors"
1717

1818
"github.com/elastic/elastic-package/internal/common"
19+
"github.com/elastic/elastic-package/internal/elasticsearch/ingest"
1920
"github.com/elastic/elastic-package/internal/fields"
2021
"github.com/elastic/elastic-package/internal/logger"
2122
"github.com/elastic/elastic-package/internal/multierror"
@@ -30,7 +31,7 @@ const (
3031

3132
type runner struct {
3233
options testrunner.TestOptions
33-
pipelines []pipelineResource
34+
pipelines []ingest.Pipeline
3435
}
3536

3637
func (r *runner) TestFolderRequired() bool {

0 commit comments

Comments
 (0)