Skip to content

Commit d50fcc8

Browse files
alexandraoberaigneraepflitoddbaert
authored
feat: add ssl support to sync service (#1479) (#1501)
Adds SSL support to the flagd sync service --------- Signed-off-by: Alexandra Oberaigner <alexandra.oberaigner@dynatrace.com> Co-authored-by: Simon Schrottner <simon.schrottner@dynatrace.com> Co-authored-by: Todd Baert <todd.baert@dynatrace.com>
1 parent 9891df2 commit d50fcc8

File tree

9 files changed

+355
-128
lines changed

9 files changed

+355
-128
lines changed

core/pkg/telemetry/builder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ func buildTransportCredentials(_ context.Context, cfg CollectorConfig) (credenti
145145

146146
tlsConfig := &tls.Config{
147147
RootCAs: capool,
148-
MinVersion: tls.VersionTLS13,
148+
MinVersion: tls.VersionTLS12,
149149
GetCertificate: func(chi *tls.ClientHelloInfo) (*tls.Certificate, error) {
150150
certs, err := reloader.GetCertificate()
151151
if err != nil {

flagd/pkg/service/flag-sync/sync_service.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package sync
22

33
import (
44
"context"
5+
"crypto/tls"
56
"fmt"
67
"net"
78
"slices"
@@ -12,6 +13,7 @@ import (
1213
"github.com/open-feature/flagd/core/pkg/store"
1314
"golang.org/x/sync/errgroup"
1415
"google.golang.org/grpc"
16+
"google.golang.org/grpc/credentials"
1517
)
1618

1719
type ISyncService interface {
@@ -28,6 +30,8 @@ type SvcConfigurations struct {
2830
Sources []string
2931
Store *store.Flags
3032
ContextValues map[string]any
33+
CertPath string
34+
KeyPath string
3135
}
3236

3337
type Service struct {
@@ -39,14 +43,41 @@ type Service struct {
3943
startupTracker syncTracker
4044
}
4145

46+
func loadTLSCredentials(certPath string, keyPath string) (credentials.TransportCredentials, error) {
47+
// Load server's certificate and private key
48+
serverCert, err := tls.LoadX509KeyPair(certPath, keyPath)
49+
if err != nil {
50+
return nil, fmt.Errorf("failed to load key pair from certificate paths '%s' and '%s': %w", certPath, keyPath, err)
51+
}
52+
53+
// Create the credentials and return it
54+
config := &tls.Config{
55+
Certificates: []tls.Certificate{serverCert},
56+
ClientAuth: tls.NoClientCert,
57+
MinVersion: tls.VersionTLS12,
58+
}
59+
60+
return credentials.NewTLS(config), nil
61+
}
62+
4263
func NewSyncService(cfg SvcConfigurations) (*Service, error) {
4364
l := cfg.Logger
4465
mux, err := NewMux(cfg.Store, cfg.Sources)
4566
if err != nil {
4667
return nil, fmt.Errorf("error initializing multiplexer: %w", err)
4768
}
4869

49-
server := grpc.NewServer()
70+
var server *grpc.Server
71+
if cfg.CertPath != "" && cfg.KeyPath != "" {
72+
tlsCredentials, err := loadTLSCredentials(cfg.CertPath, cfg.KeyPath)
73+
if err != nil {
74+
return nil, fmt.Errorf("failed to load TLS cert and key: %w", err)
75+
}
76+
server = grpc.NewServer(grpc.Creds(tlsCredentials))
77+
} else {
78+
server = grpc.NewServer()
79+
}
80+
5081
syncv1grpc.RegisterFlagSyncServiceServer(server, &syncHandler{
5182
mux: mux,
5283
log: l,

flagd/pkg/service/flag-sync/sync_service_test.go

Lines changed: 166 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package sync
33
import (
44
"context"
55
"fmt"
6+
"log"
67
"testing"
78
"time"
89

@@ -14,134 +15,173 @@ import (
1415
)
1516

1617
func TestSyncServiceEndToEnd(t *testing.T) {
17-
// given
18-
port := 18016
19-
store, sources := getSimpleFlagStore()
20-
21-
service, err := NewSyncService(SvcConfigurations{
22-
Logger: logger.NewLogger(nil, false),
23-
Port: uint16(port),
24-
Sources: sources,
25-
Store: store,
26-
})
27-
if err != nil {
28-
t.Fatal("error creating the service: %w", err)
29-
return
18+
testCases := []struct {
19+
certPath string
20+
keyPath string
21+
clientCertPath string
22+
tls bool
23+
wantErr bool
24+
}{
25+
{"./test-cert/server-cert.pem", "./test-cert/server-key.pem", "./test-cert/ca-cert.pem", true, false},
26+
{"", "", "", false, false},
27+
{"./lol/not/a/cert", "./test-cert/server-key.pem", "./test-cert/ca-cert.pem", true, true},
3028
}
3129

32-
ctx, cancelFunc := context.WithCancel(context.Background())
33-
doneChan := make(chan interface{})
34-
35-
go func() {
36-
// error ignored, tests will fail if start is not successful
37-
_ = service.Start(ctx)
38-
close(doneChan)
39-
}()
40-
41-
// trigger manual emits matching sources, so that service can start
42-
for _, source := range sources {
43-
service.Emit(false, source)
44-
}
45-
46-
// when - derive a client for sync service
47-
con, err := grpc.DialContext(ctx, fmt.Sprintf("localhost:%d", port), grpc.WithTransportCredentials(insecure.NewCredentials()))
48-
if err != nil {
49-
t.Fatal(fmt.Printf("error creating grpc dial ctx: %v", err))
50-
return
51-
}
52-
53-
serviceClient := syncv1grpc.NewFlagSyncServiceClient(con)
54-
55-
// then
56-
57-
// sync flags request
58-
flags, err := serviceClient.SyncFlags(ctx, &v1.SyncFlagsRequest{})
59-
if err != nil {
60-
t.Fatal(fmt.Printf("error from sync request: %v", err))
61-
return
62-
}
63-
64-
syncRsp, err := flags.Recv()
65-
if err != nil {
66-
t.Fatal(fmt.Printf("stream error: %v", err))
67-
return
68-
}
69-
70-
if len(syncRsp.GetFlagConfiguration()) == 0 {
71-
t.Error("expected non empty sync response, but got empty")
72-
}
73-
74-
// validate emits
75-
dataReceived := make(chan interface{})
76-
go func() {
77-
_, err := flags.Recv()
78-
if err != nil {
79-
return
30+
for _, tc := range testCases {
31+
var testTitle string
32+
if tc.tls {
33+
testTitle = "Testing Sync Service with TLS Connection"
34+
} else {
35+
testTitle = "Testing Sync Service without TLS Connection"
8036
}
81-
82-
dataReceived <- nil
83-
}()
84-
85-
// Emit as a resync
86-
service.Emit(true, "A")
87-
88-
select {
89-
case <-dataReceived:
90-
t.Fatal("expected no data as this is a resync")
91-
case <-time.After(1 * time.Second):
92-
break
93-
}
94-
95-
// Emit as a resync
96-
service.Emit(false, "A")
97-
98-
select {
99-
case <-dataReceived:
100-
break
101-
case <-time.After(1 * time.Second):
102-
t.Fatal("expected data but timeout waiting for sync")
103-
}
104-
105-
// fetch all flags
106-
allRsp, err := serviceClient.FetchAllFlags(ctx, &v1.FetchAllFlagsRequest{})
107-
if err != nil {
108-
t.Fatal(fmt.Printf("fetch all error: %v", err))
109-
return
110-
}
111-
112-
if allRsp.GetFlagConfiguration() != syncRsp.GetFlagConfiguration() {
113-
t.Errorf("expected both sync and fetch all responses to be same, but got %s from sync & %s from fetch all",
114-
syncRsp.GetFlagConfiguration(), allRsp.GetFlagConfiguration())
115-
}
116-
117-
// metadata request
118-
metadataRsp, err := serviceClient.GetMetadata(ctx, &v1.GetMetadataRequest{})
119-
if err != nil {
120-
t.Fatal(fmt.Printf("metadata error: %v", err))
121-
return
122-
}
123-
124-
asMap := metadataRsp.GetMetadata().AsMap()
125-
126-
// expect `sources` to be present
127-
if asMap["sources"] == nil {
128-
t.Fatal("expected sources entry in the metadata, but got nil")
129-
}
130-
131-
if asMap["sources"] != "A,B,C" {
132-
t.Fatal("incorrect sources entry in metadata")
133-
}
134-
135-
// validate shutdown from context cancellation
136-
go func() {
137-
cancelFunc()
138-
}()
139-
140-
select {
141-
case <-doneChan:
142-
// exit successful
143-
return
144-
case <-time.After(2 * time.Second):
145-
t.Fatal("service did not exist within sufficient timeframe")
37+
t.Run(testTitle, func(t *testing.T) {
38+
// given
39+
port := 18016
40+
store, sources := getSimpleFlagStore()
41+
42+
service, err := NewSyncService(SvcConfigurations{
43+
Logger: logger.NewLogger(nil, false),
44+
Port: uint16(port),
45+
Sources: sources,
46+
Store: store,
47+
CertPath: tc.certPath,
48+
KeyPath: tc.keyPath,
49+
})
50+
51+
if tc.wantErr {
52+
if err == nil {
53+
t.Fatal("expected error creating the service!")
54+
}
55+
return
56+
} else if err != nil {
57+
t.Fatal("unexpected error creating the service: %w", err)
58+
return
59+
}
60+
61+
ctx, cancelFunc := context.WithCancel(context.Background())
62+
doneChan := make(chan interface{})
63+
64+
go func() {
65+
// error ignored, tests will fail if start is not successful
66+
_ = service.Start(ctx)
67+
close(doneChan)
68+
}()
69+
70+
// trigger manual emits matching sources, so that service can start
71+
for _, source := range sources {
72+
service.Emit(false, source)
73+
}
74+
75+
// when - derive a client for sync service
76+
var con *grpc.ClientConn
77+
if tc.tls {
78+
tlsCredentials, e := loadTLSClientCredentials(tc.clientCertPath)
79+
if e != nil {
80+
log.Fatal("cannot load TLS credentials: ", e)
81+
}
82+
con, err = grpc.Dial(fmt.Sprintf("0.0.0.0:%d", port), grpc.WithTransportCredentials(tlsCredentials))
83+
} else {
84+
con, err = grpc.DialContext(ctx, fmt.Sprintf("localhost:%d", port), grpc.WithTransportCredentials(insecure.NewCredentials()))
85+
}
86+
if err != nil {
87+
t.Fatal(fmt.Printf("error creating grpc dial ctx: %v", err))
88+
return
89+
}
90+
91+
serviceClient := syncv1grpc.NewFlagSyncServiceClient(con)
92+
93+
// then
94+
95+
// sync flags request
96+
flags, err := serviceClient.SyncFlags(ctx, &v1.SyncFlagsRequest{})
97+
if err != nil {
98+
t.Fatal(fmt.Printf("error from sync request: %v", err))
99+
return
100+
}
101+
102+
syncRsp, err := flags.Recv()
103+
if err != nil {
104+
t.Fatal(fmt.Printf("stream error: %v", err))
105+
return
106+
}
107+
108+
if len(syncRsp.GetFlagConfiguration()) == 0 {
109+
t.Error("expected non empty sync response, but got empty")
110+
}
111+
112+
// validate emits
113+
dataReceived := make(chan interface{})
114+
go func() {
115+
_, err := flags.Recv()
116+
if err != nil {
117+
return
118+
}
119+
120+
dataReceived <- nil
121+
}()
122+
123+
// Emit as a resync
124+
service.Emit(true, "A")
125+
126+
select {
127+
case <-dataReceived:
128+
t.Fatal("expected no data as this is a resync")
129+
case <-time.After(1 * time.Second):
130+
break
131+
}
132+
133+
// Emit as a resync
134+
service.Emit(false, "A")
135+
136+
select {
137+
case <-dataReceived:
138+
break
139+
case <-time.After(1 * time.Second):
140+
t.Fatal("expected data but timeout waiting for sync")
141+
}
142+
143+
// fetch all flags
144+
allRsp, err := serviceClient.FetchAllFlags(ctx, &v1.FetchAllFlagsRequest{})
145+
if err != nil {
146+
t.Fatal(fmt.Printf("fetch all error: %v", err))
147+
return
148+
}
149+
150+
if allRsp.GetFlagConfiguration() != syncRsp.GetFlagConfiguration() {
151+
t.Errorf("expected both sync and fetch all responses to be same, but got %s from sync & %s from fetch all",
152+
syncRsp.GetFlagConfiguration(), allRsp.GetFlagConfiguration())
153+
}
154+
155+
// metadata request
156+
metadataRsp, err := serviceClient.GetMetadata(ctx, &v1.GetMetadataRequest{})
157+
if err != nil {
158+
t.Fatal(fmt.Printf("metadata error: %v", err))
159+
return
160+
}
161+
162+
asMap := metadataRsp.GetMetadata().AsMap()
163+
164+
// expect `sources` to be present
165+
if asMap["sources"] == nil {
166+
t.Fatal("expected sources entry in the metadata, but got nil")
167+
}
168+
169+
if asMap["sources"] != "A,B,C" {
170+
t.Fatal("incorrect sources entry in metadata")
171+
}
172+
173+
// validate shutdown from context cancellation
174+
go func() {
175+
cancelFunc()
176+
}()
177+
178+
select {
179+
case <-doneChan:
180+
// exit successful
181+
return
182+
case <-time.After(2 * time.Second):
183+
t.Fatal("service did not exist within sufficient timeframe")
184+
}
185+
})
146186
}
147187
}

0 commit comments

Comments
 (0)