Skip to content

Commit df72f6a

Browse files
authored
Move kibana client (#32)
1 parent 9d85625 commit df72f6a

File tree

10 files changed

+1537
-0
lines changed

10 files changed

+1537
-0
lines changed

kibana/client.go

Lines changed: 386 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,386 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package kibana
19+
20+
import (
21+
"bytes"
22+
"context"
23+
"encoding/base64"
24+
"encoding/json"
25+
"fmt"
26+
"io"
27+
"io/ioutil"
28+
"mime"
29+
"mime/multipart"
30+
"net/http"
31+
"net/textproto"
32+
"net/url"
33+
"path"
34+
"strings"
35+
36+
"github.com/joeshaw/multierror"
37+
38+
"github.com/elastic/elastic-agent-libs/config"
39+
"github.com/elastic/elastic-agent-libs/logp"
40+
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
41+
"github.com/elastic/elastic-agent-libs/useragent"
42+
"github.com/elastic/elastic-agent-libs/version"
43+
)
44+
45+
const statusAPI = "/api/status"
46+
47+
type Connection struct {
48+
URL string
49+
Username string
50+
Password string
51+
APIKey string
52+
ServiceToken string
53+
Headers http.Header
54+
55+
HTTP *http.Client
56+
Version version.V
57+
}
58+
59+
type Client struct {
60+
Connection
61+
log *logp.Logger
62+
}
63+
64+
func addToURL(_url, _path string, params url.Values) string {
65+
if len(params) == 0 {
66+
return _url + _path
67+
}
68+
69+
return strings.Join([]string{_url, _path, "?", params.Encode()}, "")
70+
}
71+
72+
func extractError(result []byte) error {
73+
var kibanaResult struct {
74+
Message string
75+
Attributes struct {
76+
Objects []struct {
77+
ID string
78+
Error struct {
79+
Message string
80+
}
81+
}
82+
}
83+
}
84+
if err := json.Unmarshal(result, &kibanaResult); err != nil {
85+
return err
86+
}
87+
var errs multierror.Errors
88+
if kibanaResult.Message != "" {
89+
for _, err := range kibanaResult.Attributes.Objects {
90+
errs = append(errs, fmt.Errorf("id: %s, message: %s", err.ID, err.Error.Message))
91+
}
92+
return fmt.Errorf("%s: %w", kibanaResult.Message, errs.Err())
93+
}
94+
return nil
95+
}
96+
97+
func extractMessage(result []byte) error {
98+
var kibanaResult struct {
99+
Success bool
100+
Errors []struct {
101+
ID string
102+
Type string
103+
Error struct {
104+
Type string
105+
References []struct {
106+
Type string
107+
ID string
108+
}
109+
}
110+
}
111+
}
112+
if err := json.Unmarshal(result, &kibanaResult); err != nil {
113+
return nil // nolint: nilerr // we suppress some malformed errors on purpose
114+
}
115+
116+
if !kibanaResult.Success {
117+
var errs multierror.Errors
118+
for _, err := range kibanaResult.Errors {
119+
errs = append(errs, fmt.Errorf("error: %s, asset ID=%s; asset type=%s; references=%+v", err.Error.Type, err.ID, err.Type, err.Error.References))
120+
}
121+
return errs.Err()
122+
}
123+
124+
return nil
125+
}
126+
127+
// NewKibanaClient builds and returns a new Kibana client
128+
func NewKibanaClient(cfg *config.C, binaryName, version, commit, buildtime string) (*Client, error) {
129+
config := DefaultClientConfig()
130+
if err := cfg.Unpack(&config); err != nil {
131+
return nil, err
132+
}
133+
134+
return NewClientWithConfig(&config, binaryName, version, commit, buildtime)
135+
}
136+
137+
// NewClientWithConfig creates and returns a kibana client using the given config
138+
func NewClientWithConfig(config *ClientConfig, binaryName, version, commit, buildtime string) (*Client, error) {
139+
return NewClientWithConfigDefault(config, 5601, binaryName, version, commit, buildtime)
140+
}
141+
142+
// NewClientWithConfig creates and returns a kibana client using the given config
143+
func NewClientWithConfigDefault(config *ClientConfig, defaultPort int, binaryName, version, commit, buildtime string) (*Client, error) {
144+
if err := config.Validate(); err != nil {
145+
return nil, err
146+
}
147+
148+
p := config.Path
149+
if config.SpaceID != "" {
150+
p = path.Join(p, "s", config.SpaceID)
151+
}
152+
kibanaURL, err := MakeURL(config.Protocol, p, config.Host, defaultPort)
153+
if err != nil {
154+
return nil, fmt.Errorf("invalid Kibana host: %w", err)
155+
}
156+
157+
u, err := url.Parse(kibanaURL)
158+
if err != nil {
159+
return nil, fmt.Errorf("failed to parse the Kibana URL: %w", err)
160+
}
161+
162+
username := config.Username
163+
password := config.Password
164+
165+
if u.User != nil {
166+
username = u.User.Username()
167+
password, _ = u.User.Password()
168+
u.User = nil
169+
170+
if config.APIKey != "" && (username != "" || password != "") {
171+
return nil, fmt.Errorf("cannot set api_key with username/password in Kibana URL")
172+
}
173+
174+
// Re-write URL without credentials.
175+
kibanaURL = u.String()
176+
}
177+
178+
log := logp.NewLogger("kibana")
179+
log.Infof("Kibana url: %s", kibanaURL)
180+
181+
headers := make(http.Header)
182+
for k, v := range config.Headers {
183+
headers.Set(k, v)
184+
}
185+
186+
if binaryName == "" {
187+
binaryName = "Libbeat"
188+
}
189+
userAgent := useragent.UserAgent(binaryName, version, commit, buildtime)
190+
rt, err := config.Transport.Client(httpcommon.WithHeaderRoundTripper(map[string]string{"User-Agent": userAgent}))
191+
if err != nil {
192+
return nil, err
193+
}
194+
195+
client := &Client{
196+
Connection: Connection{
197+
URL: kibanaURL,
198+
Username: username,
199+
Password: password,
200+
APIKey: config.APIKey,
201+
ServiceToken: config.ServiceToken,
202+
Headers: headers,
203+
HTTP: rt,
204+
},
205+
log: log,
206+
}
207+
208+
if !config.IgnoreVersion {
209+
if err = client.readVersion(); err != nil {
210+
return nil, fmt.Errorf("fail to get the Kibana version: %w", err)
211+
}
212+
}
213+
214+
return client, nil
215+
}
216+
217+
func (conn *Connection) Request(method, extraPath string,
218+
params url.Values, headers http.Header, body io.Reader) (int, []byte, error) {
219+
220+
resp, err := conn.Send(method, extraPath, params, headers, body)
221+
if err != nil {
222+
return 0, nil, fmt.Errorf("fail to execute the HTTP %s request: %w", method, err)
223+
}
224+
defer resp.Body.Close()
225+
226+
result, err := ioutil.ReadAll(resp.Body)
227+
if err != nil {
228+
return 0, nil, fmt.Errorf("fail to read response: %w", err)
229+
}
230+
231+
var retError error
232+
if resp.StatusCode >= 300 {
233+
retError = extractError(result)
234+
} else {
235+
retError = extractMessage(result)
236+
}
237+
return resp.StatusCode, result, retError
238+
}
239+
240+
// Sends an application/json request to Kibana with appropriate kbn headers
241+
func (conn *Connection) Send(method, extraPath string,
242+
params url.Values, headers http.Header, body io.Reader) (*http.Response, error) {
243+
244+
return conn.SendWithContext(context.Background(), method, extraPath, params, headers, body)
245+
}
246+
247+
// SendWithContext sends an application/json request to Kibana with appropriate kbn headers and the given context.
248+
func (conn *Connection) SendWithContext(ctx context.Context, method, extraPath string,
249+
params url.Values, headers http.Header, body io.Reader) (*http.Response, error) {
250+
251+
reqURL := addToURL(conn.URL, extraPath, params)
252+
253+
req, err := http.NewRequestWithContext(ctx, method, reqURL, body)
254+
if err != nil {
255+
return nil, fmt.Errorf("fail to create the HTTP %s request: %w", method, err)
256+
}
257+
258+
if conn.Username != "" || conn.Password != "" {
259+
req.SetBasicAuth(conn.Username, conn.Password)
260+
}
261+
if conn.APIKey != "" {
262+
v := "ApiKey " + base64.StdEncoding.EncodeToString([]byte(conn.APIKey))
263+
req.Header.Set("Authorization", v)
264+
}
265+
if conn.ServiceToken != "" {
266+
v := "Bearer " + conn.ServiceToken
267+
req.Header.Set("Authorization", v)
268+
}
269+
270+
addHeaders(req.Header, conn.Headers)
271+
addHeaders(req.Header, headers)
272+
273+
contentType := req.Header.Get("Content-Type")
274+
contentType, _, _ = mime.ParseMediaType(contentType)
275+
if contentType != "multipart/form-data" && contentType != "application/ndjson" {
276+
req.Header.Set("Content-Type", "application/json")
277+
}
278+
req.Header.Set("Accept", "application/json")
279+
req.Header.Set("kbn-xsrf", "1")
280+
281+
return conn.RoundTrip(req)
282+
}
283+
284+
func addHeaders(out, in http.Header) {
285+
for k, vs := range in {
286+
for _, v := range vs {
287+
out.Add(k, v)
288+
}
289+
}
290+
}
291+
292+
// Implements RoundTrip interface
293+
func (conn *Connection) RoundTrip(r *http.Request) (*http.Response, error) {
294+
return conn.HTTP.Do(r)
295+
}
296+
297+
func (client *Client) readVersion() error {
298+
type kibanaVersionResponse struct {
299+
Name string `json:"name"`
300+
Version struct {
301+
Number string `json:"number"`
302+
Snapshot bool `json:"build_snapshot"`
303+
} `json:"version"`
304+
}
305+
306+
code, result, err := client.Connection.Request("GET", statusAPI, nil, nil, nil)
307+
if err != nil || code >= 400 {
308+
return fmt.Errorf("HTTP GET request to %s/api/status fails: %w. Response: %s",
309+
client.Connection.URL, err, truncateString(result))
310+
}
311+
312+
var versionString string
313+
314+
var kibanaVersion kibanaVersionResponse
315+
err = json.Unmarshal(result, &kibanaVersion)
316+
if err != nil {
317+
return fmt.Errorf("fail to unmarshal the response from GET %s/api/status. Response: %s. Kibana status api returns: %w",
318+
client.Connection.URL, truncateString(result), err)
319+
}
320+
321+
versionString = kibanaVersion.Version.Number
322+
323+
if kibanaVersion.Version.Snapshot {
324+
// needed for the tests
325+
versionString += "-SNAPSHOT"
326+
}
327+
328+
version, err := version.New(versionString)
329+
if err != nil {
330+
return fmt.Errorf("fail to parse kibana version (%v): %w", versionString, err)
331+
}
332+
333+
client.Version = *version
334+
return nil
335+
}
336+
337+
// GetVersion returns the version read from kibana. The version is not set if
338+
// IgnoreVersion was set when creating the client.
339+
func (client *Client) GetVersion() version.V { return client.Version }
340+
341+
func (client *Client) ImportMultiPartFormFile(url string, params url.Values, filename string, contents string) error {
342+
buf := &bytes.Buffer{}
343+
w := multipart.NewWriter(buf)
344+
345+
pHeaders := textproto.MIMEHeader{}
346+
pHeaders.Add("Content-Disposition", fmt.Sprintf(`form-data; name="file"; filename="%s"`, filename))
347+
pHeaders.Add("Content-Type", "application/ndjson")
348+
349+
p, err := w.CreatePart(pHeaders)
350+
if err != nil {
351+
return fmt.Errorf("failed to create multipart writer for payload: %w", err)
352+
}
353+
_, err = io.Copy(p, strings.NewReader(contents))
354+
if err != nil {
355+
return fmt.Errorf("failed to copy contents of the object: %w", err)
356+
}
357+
w.Close()
358+
359+
headers := http.Header{}
360+
headers.Add("Content-Type", w.FormDataContentType())
361+
statusCode, response, err := client.Connection.Request("POST", url, params, headers, buf)
362+
if err != nil || statusCode >= 300 {
363+
return fmt.Errorf("returned %d to import file: %w. Response: %s", statusCode, err, response)
364+
}
365+
366+
client.log.Debugf("Imported multipart file to %s with params %v", url, params)
367+
return nil
368+
}
369+
370+
func (client *Client) Close() error { return nil }
371+
372+
// truncateString returns a truncated string if the length is greater than 250
373+
// runes. If the string is truncated "... (truncated)" is appended. Newlines are
374+
// replaced by spaces in the returned string.
375+
//
376+
// This function is useful for logging raw HTTP responses with errors when those
377+
// responses can be very large (such as an HTML page with CSS content).
378+
func truncateString(b []byte) string {
379+
const maxLength = 250
380+
runes := bytes.Runes(b)
381+
if len(runes) > maxLength {
382+
runes = append(runes[:maxLength], []rune("... (truncated)")...)
383+
}
384+
385+
return strings.Replace(string(runes), "\n", " ", -1)
386+
}

0 commit comments

Comments
 (0)