Skip to content

Commit 55f5403

Browse files
kaisechengbelimawr
andauthored
[OTel] Add TLS test to Logstash Exporter (#47199)
This commit verifies that key TLS features in Logstash Exporter still work as expected Add TLS tests - TLS connections - Self-signed certificates - Keypass support Add E2E TestLogstashExporterProxyURL to ensure proxy_url config is respected --------- Co-authored-by: Tiago Queiroz <[email protected]>
1 parent f6f56c8 commit 55f5403

File tree

5 files changed

+441
-65
lines changed

5 files changed

+441
-65
lines changed

libbeat/otelbeat/oteltest/beatsauth_test.go

Lines changed: 15 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,6 @@ import (
5353
"go.opentelemetry.io/otel/sdk/metric/metricdata"
5454
"go.uber.org/zap/zaptest"
5555

56-
"github.com/elastic/pkcs8"
57-
5856
"gopkg.in/yaml.v2"
5957

6058
"github.com/elastic/beats/v7/libbeat/otelbeat/beatconverter"
@@ -101,7 +99,7 @@ func TestMTLS(t *testing.T) {
10199
}
102100

103101
// get client certificates paths
104-
clientCertificate, clientKey := getClientCerts(t, caCert, "")
102+
clientCertificate, clientKey := GetClientCerts(t, caCert, "")
105103

106104
// start test server with given server and root certs
107105
certPool := x509.NewCertPool()
@@ -117,11 +115,11 @@ func TestMTLS(t *testing.T) {
117115

118116
inputConfig := `
119117
receivers:
120-
filebeatreceiver:
118+
filebeatreceiver:
121119
output:
122120
elasticsearch:
123121
hosts: {{ .Host }}
124-
ssl:
122+
ssl:
125123
enabled: true
126124
certificate_authorities:
127125
- {{ .CACertificate }}
@@ -175,7 +173,7 @@ func TestKeyPassPhrase(t *testing.T) {
175173
}
176174

177175
// get client certificates paths with key file encrypted in PKCS#8 format
178-
clientCertificate, clientKey := getClientCerts(t, caCert, "your-password")
176+
clientCertificate, clientKey := GetClientCerts(t, caCert, "your-password")
179177

180178
// start test server with given server and root certs
181179
certPool := x509.NewCertPool()
@@ -191,17 +189,17 @@ func TestKeyPassPhrase(t *testing.T) {
191189

192190
inputConfig := `
193191
receivers:
194-
filebeatreceiver:
192+
filebeatreceiver:
195193
output:
196194
elasticsearch:
197195
hosts: {{ .Host }}
198-
ssl:
196+
ssl:
199197
enabled: true
200198
certificate_authorities:
201199
- {{ .CACertificate }}
202200
certificate: {{ .ClientCert }}
203201
key: {{ .ClientKey }}
204-
key_passphrase: your-password
202+
key_passphrase: your-password
205203
`
206204

207205
var otelConfigBuffer bytes.Buffer
@@ -262,13 +260,13 @@ func TestCATrustedFingerPrint(t *testing.T) {
262260

263261
inputConfig := `
264262
receivers:
265-
filebeatreceiver:
263+
filebeatreceiver:
266264
output:
267265
elasticsearch:
268266
hosts: {{ .Host }}
269-
ssl:
267+
ssl:
270268
enabled: true
271-
ca_trusted_fingerprint: {{ .CATrustedFingerPrint }}
269+
ca_trusted_fingerprint: {{ .CATrustedFingerPrint }}
272270
`
273271

274272
var otelConfigBuffer bytes.Buffer
@@ -440,11 +438,11 @@ func TestVerificationMode(t *testing.T) {
440438

441439
inputConfig := `
442440
receivers:
443-
filebeatreceiver:
441+
filebeatreceiver:
444442
output:
445443
elasticsearch:
446444
hosts: {{ .Host }}
447-
ssl:
445+
ssl:
448446
enabled: true
449447
certificate_authorities:
450448
- {{ .CACertificate }}
@@ -536,7 +534,7 @@ func TestProxyHTTP(t *testing.T) {
536534
proxytest.WithRequestLog("https", t.Logf)},
537535
inputConfig: `
538536
receivers:
539-
filebeatreceiver:
537+
filebeatreceiver:
540538
output:
541539
elasticsearch:
542540
hosts: {{ .Host }}
@@ -569,7 +567,7 @@ receivers:
569567
})},
570568
inputConfig: `
571569
receivers:
572-
filebeatreceiver:
570+
filebeatreceiver:
573571
output:
574572
elasticsearch:
575573
hosts: {{ .Host }}
@@ -587,7 +585,7 @@ receivers:
587585
proxytest.WithRequestLog("https", t.Logf)},
588586
inputConfig: `
589587
receivers:
590-
filebeatreceiver:
588+
filebeatreceiver:
591589
output:
592590
elasticsearch:
593591
hosts: {{ .Host }}
@@ -762,54 +760,6 @@ func mustSendLogs(t *testing.T, exporter exporter.Logs, logs plog.Logs) error {
762760
return err
763761
}
764762

765-
// getClientCerts creates client certificates, writes them to a file and return the path of certificate and key
766-
// if passphrase is passed, it is used to encrypt the key file
767-
func getClientCerts(t *testing.T, caCert tls.Certificate, passphrase string) (certificate string, key string) {
768-
// create client certificates
769-
clientCerts, err := tlscommontest.GenSignedCert(caCert, x509.KeyUsageCertSign, false, "client", []string{"localhost"}, []net.IP{net.IPv4(127, 0, 0, 1)}, false)
770-
if err != nil {
771-
t.Fatalf("could not generate certificates: %s", err)
772-
}
773-
774-
tempDir := t.TempDir()
775-
clientCertPath := filepath.Join(tempDir, "client-cert.pem")
776-
clientKeyPath := filepath.Join(tempDir, "client-key.pem")
777-
778-
if passphrase != "" {
779-
clientKey, err := pkcs8.MarshalPrivateKey(clientCerts.PrivateKey, []byte(passphrase), pkcs8.DefaultOpts)
780-
if err != nil {
781-
t.Fatalf("could not marshal private key: %v", err)
782-
}
783-
784-
if err = os.WriteFile(clientKeyPath, pem.EncodeToMemory(&pem.Block{
785-
Type: "ENCRYPTED PRIVATE KEY",
786-
Bytes: clientKey,
787-
}), 0400); err != nil {
788-
t.Fatalf("could not write client key to file")
789-
}
790-
} else {
791-
clientKey, err := x509.MarshalPKCS8PrivateKey(clientCerts.PrivateKey)
792-
if err != nil {
793-
t.Fatalf("could not marshal private key: %v", err)
794-
}
795-
if err = os.WriteFile(clientKeyPath, pem.EncodeToMemory(&pem.Block{
796-
Type: "RSA PRIVATE KEY",
797-
Bytes: clientKey,
798-
}), 0400); err != nil {
799-
t.Fatalf("could not write client key to file")
800-
}
801-
}
802-
803-
if err = os.WriteFile(clientCertPath, pem.EncodeToMemory(&pem.Block{
804-
Type: "CERTIFICATE",
805-
Bytes: clientCerts.Leaf.Raw,
806-
}), 0400); err != nil {
807-
t.Fatalf("could not write client certificate to file")
808-
}
809-
810-
return clientCertPath, clientKeyPath
811-
}
812-
813763
func getTranslatedConf(t *testing.T, input []byte) *confmap.Conf {
814764
c := beatconverter.Converter{}
815765

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package oteltest
19+
20+
import (
21+
"crypto/tls"
22+
"crypto/x509"
23+
"encoding/pem"
24+
"net"
25+
"os"
26+
"path/filepath"
27+
"testing"
28+
29+
"github.com/elastic/elastic-agent-libs/transport/tlscommontest"
30+
"github.com/elastic/pkcs8"
31+
)
32+
33+
// GetClientCerts creates client certificates, writes them to a file and return the path of certificate and key
34+
// if passphrase is passed, it is used to encrypt the key file
35+
func GetClientCerts(t *testing.T, caCert tls.Certificate, passphrase string) (certificate string, key string) {
36+
// create client certificates
37+
clientCerts, err := tlscommontest.GenSignedCert(caCert, x509.KeyUsageCertSign, false, "client", []string{"localhost"}, []net.IP{net.IPv4(127, 0, 0, 1)}, false)
38+
if err != nil {
39+
t.Fatalf("could not generate certificates: %s", err)
40+
}
41+
42+
tempDir := t.TempDir()
43+
clientCertPath := filepath.Join(tempDir, "client-cert.pem")
44+
clientKeyPath := filepath.Join(tempDir, "client-key.pem")
45+
46+
if passphrase != "" {
47+
clientKey, err := pkcs8.MarshalPrivateKey(clientCerts.PrivateKey, []byte(passphrase), pkcs8.DefaultOpts)
48+
if err != nil {
49+
t.Fatalf("could not marshal private key: %v", err)
50+
}
51+
52+
if err = os.WriteFile(clientKeyPath, pem.EncodeToMemory(&pem.Block{
53+
Type: "ENCRYPTED PRIVATE KEY",
54+
Bytes: clientKey,
55+
}), 0400); err != nil {
56+
t.Fatalf("could not write client key to file")
57+
}
58+
} else {
59+
clientKey, err := x509.MarshalPKCS8PrivateKey(clientCerts.PrivateKey)
60+
if err != nil {
61+
t.Fatalf("could not marshal private key: %v", err)
62+
}
63+
if err = os.WriteFile(clientKeyPath, pem.EncodeToMemory(&pem.Block{
64+
Type: "RSA PRIVATE KEY",
65+
Bytes: clientKey,
66+
}), 0400); err != nil {
67+
t.Fatalf("could not write client key to file")
68+
}
69+
}
70+
71+
if err = os.WriteFile(clientCertPath, pem.EncodeToMemory(&pem.Block{
72+
Type: "CERTIFICATE",
73+
Bytes: clientCerts.Leaf.Raw,
74+
}), 0400); err != nil {
75+
t.Fatalf("could not write client certificate to file")
76+
}
77+
78+
return clientCertPath, clientKeyPath
79+
}

x-pack/filebeat/docker-compose.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,14 @@ services:
5858
- --addr=:8090
5959
- --config=/files/config.yaml
6060

61+
socks5-proxy:
62+
image: serjs/go-socks5-proxy:latest
63+
container_name: socks5-proxy
64+
ports:
65+
- "1080:1080"
66+
environment:
67+
PROXY_USER: elastic
68+
PROXY_PASSWORD: changeme
6169
init-logstash:
6270
image: busybox:latest
6371
user: "0:0"
@@ -68,6 +76,7 @@ services:
6876
logstash:
6977
depends_on:
7078
- init-logstash
79+
- socks5-proxy
7180
extends:
7281
file: ${ES_BEATS}/testing/environments/${STACK_ENVIRONMENT}.yml
7382
service: logstash

x-pack/filebeat/tests/integration/otel_lsexporter_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,81 @@ processors:
149149
compareOutputFiles(t, fbFilePath, otelFilePath, ignoredFields)
150150
}
151151

152+
// TestLogstashExporterProxyURL verifies that Filebeat OTel mode can send data to Logstash via a SOCKS5 proxy.
153+
// Filebeat otel mode sends events to "logstash" via a socks5-proxy container running on localhost:1080
154+
func TestLogstashExporterProxyURL(t *testing.T) {
155+
// ensure the size of events is big enough
156+
numEvents := 3
157+
158+
var beatsCfgFile = `
159+
filebeat.inputs:
160+
- type: filestream
161+
id: filestream-input-id
162+
enabled: true
163+
paths:
164+
- %s
165+
output.logstash:
166+
hosts:
167+
- "logstash:%s"
168+
pipelining: 0
169+
worker: 1
170+
proxy_url: "socks5://elastic:changeme@localhost:1080"
171+
queue.mem.flush.timeout: 0s
172+
processors:
173+
- add_host_metadata: ~
174+
- add_fields:
175+
target: ""
176+
fields:
177+
testcase: %s
178+
`
179+
testCaseName := uuid.Must(uuid.NewV4()).String()
180+
events := generateEvents(numEvents)
181+
182+
// start filebeat in otel mode
183+
fbOTel := integration.NewBeat(
184+
t,
185+
"filebeat-otel",
186+
"../../filebeat.test",
187+
"otel",
188+
)
189+
190+
inputFilePath := filepath.Join(fbOTel.TempDir(), "event.json")
191+
writeEvents(t, inputFilePath, events)
192+
193+
fbOTel.WriteConfigFile(fmt.Sprintf(beatsCfgFile, inputFilePath, "5055", testCaseName))
194+
fbOTel.Start()
195+
defer fbOTel.Stop()
196+
197+
// Nginx endpoint URLs
198+
baseURL := "http://localhost:8082"
199+
outOTelFileURL := fmt.Sprintf("%s/%s_otel.json", baseURL, testCaseName)
200+
201+
// Logstash is outputting to a file inside its container, to access
202+
// this file we use Nginx to serve the output folder via HTTP
203+
// (see docker-compose.yml for Logstash and Nginx configuration).
204+
// Wait to ensure the file can be downloaded via HTTP, the file
205+
// being available indicates that Filebeat successfully sent data
206+
// to Logstash
207+
require.EventuallyWithTf(t,
208+
func(ct *assert.CollectT) {
209+
for _, url := range []string{outOTelFileURL} {
210+
checkURLHasContent(ct, url)
211+
}
212+
},
213+
30*time.Second,
214+
1*time.Second,
215+
"did not find Logstash output file served via Nginx")
216+
217+
// download files from Nginx
218+
otelFilePath := downloadToTestData(t, outOTelFileURL, fmt.Sprintf("%s_otel.json", testCaseName))
219+
otelEvents, err := readAllEvents(otelFilePath)
220+
221+
require.NoError(t, err, "failed to read otel events")
222+
require.Equal(t, numEvents, len(otelEvents),
223+
"different number of events: sent=%d, get=%d", numEvents, len(otelEvents))
224+
225+
}
226+
152227
func generateEvents(numEvents int) []string {
153228
gofakeit.Seed(time.Now().UnixNano())
154229

0 commit comments

Comments
 (0)