Skip to content

Commit afb2062

Browse files
committed
dynamic TLS CA rotation for sidecar and query service
Add dynamic TLS support so that peer organization CA certificates are automatically updated when config blocks add or remove organizations. - DynamicTLSManager in utils/connection/ watches config block updates and rebuilds the trusted CA pool on the fly - Sidecar updates TLS immediately during config block processing - Query service polls the DB periodically for config changes - Integration test covers add/remove/restore of peer orgs with mTLS - Infrastructure plumbing for QueryTLSRefreshInterval and PeerOrganizationCount in runner config Signed-off-by: Senthilnathan <cendhu@gmail.com>
1 parent 5d2d7c4 commit afb2062

File tree

27 files changed

+822
-43
lines changed

27 files changed

+822
-43
lines changed

cmd/committer/start_cmd.go

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/hyperledger/fabric-x-committer/service/sidecar"
2020
"github.com/hyperledger/fabric-x-committer/service/vc"
2121
"github.com/hyperledger/fabric-x-committer/service/verifier"
22+
"github.com/hyperledger/fabric-x-committer/utils/connection"
2223
"github.com/hyperledger/fabric-x-committer/utils/grpcservice"
2324
)
2425

@@ -58,31 +59,59 @@ func startService(ctx context.Context, name, configPath string) error {
5859

5960
switch c := conf.(type) {
6061
case *sidecar.Config:
61-
service, err := sidecar.New(c)
62+
tlsUpdater, tlsProvider, err := newDynamicTLS(c.Server)
63+
if err != nil {
64+
return err
65+
}
66+
service, err := sidecar.New(c, tlsUpdater)
6267
if err != nil {
6368
return errors.Wrap(err, "failed to create sidecar service")
6469
}
6570
defer service.Close()
66-
return grpcservice.StartAndServe(ctx, service, c.Server)
71+
return grpcservice.StartAndServe(ctx, service, tlsProvider, c.Server)
6772

6873
case *coordinator.Config:
69-
return grpcservice.StartAndServe(ctx, coordinator.NewCoordinatorService(c), c.Server)
74+
return grpcservice.StartAndServe(ctx, coordinator.NewCoordinatorService(c), nil, c.Server)
7075

7176
case *vc.Config:
7277
service, err := vc.NewValidatorCommitterService(ctx, c)
7378
if err != nil {
7479
return errors.Wrap(err, "failed to create validator committer service")
7580
}
7681
defer service.Close()
77-
return grpcservice.StartAndServe(ctx, service, c.Server)
82+
return grpcservice.StartAndServe(ctx, service, nil, c.Server)
7883

7984
case *verifier.Config:
80-
return grpcservice.StartAndServe(ctx, verifier.New(c), c.Server)
85+
return grpcservice.StartAndServe(ctx, verifier.New(c), nil, c.Server)
8186

8287
case *query.Config:
83-
return grpcservice.StartAndServe(ctx, query.NewQueryService(c), c.Server)
88+
tlsUpdater, tlsProvider, err := newDynamicTLS(c.Server)
89+
if err != nil {
90+
return err
91+
}
92+
return grpcservice.StartAndServe(ctx, query.NewQueryService(c, tlsUpdater), tlsProvider, c.Server)
8493

8594
default:
8695
return errors.Newf("unknown config type: %T", conf)
8796
}
8897
}
98+
99+
// newDynamicTLS returns the TLS interfaces separately to avoid the Go
100+
// nil-interface trap: a nil *DynamicTLS assigned to an interface becomes a
101+
// non-nil interface wrapping a nil pointer, which passes != nil checks but
102+
// panics on method calls. Returning interfaces directly ensures that when
103+
// TLS is disabled, callers receive true nil values.
104+
func newDynamicTLS(
105+
serverConfig *connection.ServerConfig,
106+
) (connection.TLSCertUpdater, connection.TLSConfigProvider, error) {
107+
if serverConfig == nil || serverConfig.TLS.Mode != connection.MutualTLSMode {
108+
return nil, nil, nil
109+
}
110+
111+
dynamicTLS, err := connection.NewDynamicTLSFromConfig(serverConfig.TLS)
112+
if err != nil {
113+
return nil, nil, errors.Wrap(err, "failed to create dynamic TLS config")
114+
}
115+
116+
return dynamicTLS, dynamicTLS, nil
117+
}

cmd/config/app_config_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,7 @@ func TestReadConfigQuery(t *testing.T) {
292292
MaxActiveViews: query.DefaultMaxActiveViews,
293293
MaxViewTimeout: query.DefaultMaxViewTimeout,
294294
MaxRequestKeys: query.DefaultMaxRequestKeys,
295+
TLSRefreshInterval: query.DefaultTLSRefreshInterval,
295296
},
296297
}, {
297298
name: "sample",
@@ -307,6 +308,7 @@ func TestReadConfigQuery(t *testing.T) {
307308
MaxActiveViews: query.DefaultMaxActiveViews,
308309
MaxViewTimeout: query.DefaultMaxViewTimeout,
309310
MaxRequestKeys: query.DefaultMaxRequestKeys,
311+
TLSRefreshInterval: query.DefaultTLSRefreshInterval,
310312
},
311313
}}
312314

cmd/config/create_config_file.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,17 +42,18 @@ type (
4242
DB DatabaseConfig
4343

4444
// Per service configurations.
45-
BlockSize uint64 // orderer, loadgen
46-
BlockTimeout time.Duration // orderer
47-
LedgerPath string // sidecar
48-
Policy *workload.PolicyProfile // loadgen
49-
LoadGenBlockLimit uint64 // loadgen
50-
LoadGenTXLimit uint64 // loadgen
51-
LoadGenWorkers uint64 // loadgen
52-
Logging flogging.Config // for all
53-
RateLimit *connection.RateLimitConfig // query, sidecar
54-
MaxRequestKeys int // query
55-
MaxConcurrentStreams int // sidecar
45+
BlockSize uint64 // orderer, loadgen
46+
BlockTimeout time.Duration // orderer
47+
LedgerPath string // sidecar
48+
Policy *workload.PolicyProfile // loadgen
49+
LoadGenBlockLimit uint64 // loadgen
50+
LoadGenTXLimit uint64 // loadgen
51+
LoadGenWorkers uint64 // loadgen
52+
Logging flogging.Config // for all
53+
RateLimit *connection.RateLimitConfig // query, sidecar
54+
MaxRequestKeys int // query
55+
QueryTLSRefreshInterval time.Duration // query
56+
MaxConcurrentStreams int // sidecar
5657

5758
// VC service batching configuration (for testing).
5859
VCMinTransactionBatchSize int // vc

cmd/config/templates/query.yaml.tmpl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,6 @@
88
{{ include "database" . | indent 0 }}
99

1010
max-request-keys: {{ .MaxRequestKeys }}
11+
{{- if .QueryTLSRefreshInterval }}
12+
tls-refresh-interval: {{ .QueryTLSRefreshInterval }}
13+
{{- end }}

cmd/config/viper.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ func NewViperWithQueryDefaults() *viper.Viper {
8282
v.SetDefault("max-active-views", query.DefaultMaxActiveViews)
8383
v.SetDefault("max-view-timeout", query.DefaultMaxViewTimeout)
8484
v.SetDefault("max-request-keys", query.DefaultMaxRequestKeys)
85+
v.SetDefault("tls-refresh-interval", query.DefaultTLSRefreshInterval)
8586
return v
8687
}
8788

cmd/loadgen/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func loadGenCMD() *cobra.Command {
7676
if err != nil {
7777
return err
7878
}
79-
return grpcservice.StartAndServe(cmd.Context(), client, conf.Server)
79+
return grpcservice.StartAndServe(cmd.Context(), client, nil, conf.Server)
8080
},
8181
}
8282
cliutil.SetDefaultFlags(cmd, &configPath)

cmd/mock/main.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func startMockOrderer() *cobra.Command {
8484
if conf.Server != nil && !conf.Server.Endpoint.Empty() {
8585
serverConfigs = append(serverConfigs, conf.Server)
8686
}
87-
return grpcservice.StartAndServe(cmd.Context(), service, serverConfigs...)
87+
return grpcservice.StartAndServe(cmd.Context(), service, nil, serverConfigs...)
8888
},
8989
}
9090
cliutil.SetDefaultFlags(cmd, &configPath)
@@ -109,7 +109,7 @@ func startMockCoordinator() *cobra.Command {
109109
defer cmd.Printf("%v ended\n", mockVerifierName)
110110

111111
service := mock.NewMockCoordinator()
112-
return grpcservice.Serve(cmd.Context(), service, conf.Server)
112+
return grpcservice.Serve(cmd.Context(), service, conf.Server, nil)
113113
},
114114
}
115115
cliutil.SetDefaultFlags(cmd, &configPath)
@@ -134,7 +134,7 @@ func startMockVerifier() *cobra.Command {
134134
defer cmd.Printf("%v ended\n", mockVerifierName)
135135

136136
sv := mock.NewMockSigVerifier()
137-
return grpcservice.Serve(cmd.Context(), sv, conf.Server)
137+
return grpcservice.Serve(cmd.Context(), sv, conf.Server, nil)
138138
},
139139
}
140140
cliutil.SetDefaultFlags(cmd, &configPath)
@@ -159,7 +159,7 @@ func startMockVC() *cobra.Command {
159159
defer cmd.Printf("%v ended\n", mockVcName)
160160

161161
vcs := mock.NewMockVcService()
162-
return grpcservice.Serve(cmd.Context(), vcs, conf.Server)
162+
return grpcservice.Serve(cmd.Context(), vcs, conf.Server, nil)
163163
},
164164
}
165165
cliutil.SetDefaultFlags(cmd, &configPath)

integration/runner/runtime.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,14 @@ type (
102102
VerifierBatchTimeCutoff time.Duration
103103
// VerifierBatchSizeCutoff configures the batch size cutoff for verifier service.
104104
VerifierBatchSizeCutoff int
105+
106+
// QueryTLSRefreshInterval configures how often the query service polls the DB
107+
// for config block updates to refresh dynamic TLS CA certificates.
108+
QueryTLSRefreshInterval time.Duration
109+
110+
// PeerOrganizationCount is the number of peer organizations in the genesis config block.
111+
// Defaults to 2 if not set.
112+
PeerOrganizationCount uint32
105113
}
106114
)
107115

@@ -163,7 +171,7 @@ func NewRuntime(t *testing.T, conf *Config) *CommitterRuntime {
163171
ordererEnv := mock.NewOrdererTestEnv(t, &mock.OrdererTestParameters{
164172
NumIDs: 3,
165173
ServerPerID: 2,
166-
PeerOrganizationCount: 2,
174+
PeerOrganizationCount: max(2, conf.PeerOrganizationCount),
167175
ChanID: TestChannelName,
168176
ClientTLSConfig: clientTLS,
169177
ServerTLSConfig: ordererServiceTLS,
@@ -205,6 +213,7 @@ func NewRuntime(t *testing.T, conf *Config) *CommitterRuntime {
205213
VCTimeoutForMinTransactionBatchSize: conf.VCTimeoutForMinTransactionBatchSize,
206214
VerifierBatchTimeCutoff: conf.VerifierBatchTimeCutoff,
207215
VerifierBatchSizeCutoff: conf.VerifierBatchSizeCutoff,
216+
QueryTLSRefreshInterval: conf.QueryTLSRefreshInterval,
208217
},
209218
CommittedBlock: make(chan *common.Block, 100),
210219
SeedForCryptoGen: rand.New(rand.NewSource(10)),
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package test
8+
9+
import (
10+
"context"
11+
"testing"
12+
"time"
13+
14+
"github.com/hyperledger/fabric-x-common/api/committerpb"
15+
"github.com/onsi/gomega"
16+
"github.com/stretchr/testify/require"
17+
"google.golang.org/grpc"
18+
19+
"github.com/hyperledger/fabric-x-committer/integration/runner"
20+
"github.com/hyperledger/fabric-x-committer/utils/connection"
21+
"github.com/hyperledger/fabric-x-committer/utils/grpcerror"
22+
"github.com/hyperledger/fabric-x-committer/utils/test"
23+
"github.com/hyperledger/fabric-x-committer/utils/testcrypto"
24+
)
25+
26+
// TestDynamicTLS verifies that the sidecar and query service dynamically update
27+
// their trusted TLS CA pool when config blocks add or remove peer organizations,
28+
// while preserving static (YAML-configured) root CAs.
29+
func TestDynamicTLS(t *testing.T) {
30+
t.Parallel()
31+
gomega.RegisterTestingT(t)
32+
33+
c := runner.NewRuntime(t, &runner.Config{
34+
TLSMode: connection.MutualTLSMode,
35+
PeerOrganizationCount: 3,
36+
BlockTimeout: 2 * time.Second,
37+
QueryTLSRefreshInterval: 5 * time.Second,
38+
CrashTest: true,
39+
})
40+
// Start orderer servers in-process so SubmitConfigBlock can write directly
41+
// to the orderer's block channel (separate-process orderers don't share memory).
42+
c.OrdererEnv.StartServers(t)
43+
c.Start(t, runner.CommitterTxPath|runner.QueryService)
44+
45+
serverCACertPaths := c.SystemConfig.ClientTLS.CACertPaths
46+
sidecarEndpoint := c.SystemConfig.Services.Sidecar.GrpcEndpoint
47+
queryEndpoint := c.SystemConfig.Services.Query.GrpcEndpoint
48+
49+
// Per-org mTLS configs. Crypto artifacts don't change across config block updates,
50+
// so these remain valid throughout the test.
51+
orgTLS := [3]connection.TLSConfig{}
52+
for i := range orgTLS {
53+
orgTLS[i] = test.OrgClientTLSConfig(c.OrdererEnv.ArtifactsPath, i, serverCACertPaths)
54+
}
55+
56+
// Step 1: Assert all three peer orgs can connect to sidecar and query service.
57+
// The sidecar updates dynamic TLS immediately when processing the genesis config block.
58+
// The query service polls the DB periodically, so we use Eventually for it.
59+
t.Log("Step 1: Initial connection - all three orgs should connect")
60+
for orgIdx, tlsCfg := range orgTLS {
61+
assertRPCSucceeds(t, sidecarEndpoint, tlsCfg, "sidecar", orgIdx)
62+
eventuallyRPCSucceeds(t, queryEndpoint, tlsCfg, "query", orgIdx)
63+
}
64+
65+
// Step 2: Submit config block removing peer-org-2 (keep only peer-org-0 and peer-org-1).
66+
// CreateOrExtendConfigBlockWithCrypto retains existing crypto on disk, so peer-org-2's
67+
// certs remain available for reconnection in Step 4.
68+
t.Log("Step 2: Dynamic removal - submit config with 2 peer orgs")
69+
c.OrdererEnv.SubmitConfigBlock(t, &testcrypto.ConfigBlock{
70+
OrdererEndpoints: c.OrdererEnv.AllEndpoints,
71+
PeerOrganizationCount: 2,
72+
})
73+
c.ValidateExpectedResultsInCommittedBlock(t, &runner.ExpectedStatusInBlock{
74+
Statuses: []committerpb.Status{committerpb.Status_COMMITTED},
75+
})
76+
77+
// Step 3: Verify peer-org-2 is rejected and peer-org-0 remains trusted.
78+
t.Log("Step 3: Negative assertion - peer-org-2 rejected, peer-org-0 accepted")
79+
80+
// Sidecar updates immediately from config block processing.
81+
assertRPCSucceeds(t, sidecarEndpoint, orgTLS[0], "sidecar", 0)
82+
assertRPCFails(t, sidecarEndpoint, orgTLS[2], "sidecar", 2)
83+
84+
// Query service polls the DB; wait for the TLS refresh (up to 15s).
85+
assertRPCSucceeds(t, queryEndpoint, orgTLS[0], "query", 0)
86+
gomega.Eventually(func() error {
87+
return tryRPC(queryEndpoint, orgTLS[2])
88+
}, 15*time.Second, time.Second).Should(gomega.HaveOccurred(),
89+
"query service should reject peer-org-2 after TLS refresh")
90+
91+
// Step 4: Restore peer-org-2.
92+
t.Log("Step 4: Restoration - add peer-org-2 back")
93+
c.OrdererEnv.SubmitConfigBlock(t, &testcrypto.ConfigBlock{
94+
OrdererEndpoints: c.OrdererEnv.AllEndpoints,
95+
PeerOrganizationCount: 3,
96+
})
97+
c.ValidateExpectedResultsInCommittedBlock(t, &runner.ExpectedStatusInBlock{
98+
Statuses: []committerpb.Status{committerpb.Status_COMMITTED},
99+
})
100+
101+
// Sidecar: peer-org-2 connects immediately.
102+
assertRPCSucceeds(t, sidecarEndpoint, orgTLS[2], "sidecar", 2)
103+
104+
// Query service: wait for TLS refresh.
105+
eventuallyRPCSucceeds(t, queryEndpoint, orgTLS[2], "query", 2)
106+
107+
// Step 5: Static persistence - CredentialsFactory client still works.
108+
t.Log("Step 5: Static persistence - static TLS client still trusted")
109+
assertRPCSucceeds(t, sidecarEndpoint, c.SystemConfig.ClientTLS, "sidecar (static)", -1)
110+
assertRPCSucceeds(t, queryEndpoint, c.SystemConfig.ClientTLS, "query (static)", -1)
111+
}
112+
113+
func assertRPCSucceeds( //nolint:revive // test helper, readability over argument count
114+
t *testing.T, endpoint connection.WithAddress,
115+
tlsConfig connection.TLSConfig, service string, orgIdx int,
116+
) {
117+
t.Helper()
118+
err := tryRPC(endpoint, tlsConfig)
119+
require.NoErrorf(t, err, "%s: peer-org-%d should connect successfully", service, orgIdx)
120+
}
121+
122+
func assertRPCFails( //nolint:revive // test helper, readability over argument count
123+
t *testing.T, endpoint connection.WithAddress,
124+
tlsConfig connection.TLSConfig, service string, orgIdx int,
125+
) {
126+
t.Helper()
127+
err := tryRPC(endpoint, tlsConfig)
128+
require.Errorf(t, err, "%s: peer-org-%d should be rejected", service, orgIdx)
129+
}
130+
131+
func eventuallyRPCSucceeds( //nolint:revive // test helper, readability over argument count
132+
t *testing.T, endpoint connection.WithAddress,
133+
tlsConfig connection.TLSConfig, service string, orgIdx int,
134+
) {
135+
t.Helper()
136+
gomega.Eventually(func() error {
137+
return tryRPC(endpoint, tlsConfig)
138+
}, 15*time.Second, time.Second).ShouldNot(gomega.HaveOccurred(),
139+
"%s: peer-org-%d should connect successfully", service, orgIdx)
140+
}
141+
142+
// tryRPC attempts a lightweight gRPC call and returns an error only if the TLS
143+
// handshake fails. Application-level gRPC errors (InvalidArgument, Unimplemented,
144+
// etc.) indicate a successful TLS connection and are treated as success.
145+
func tryRPC(endpoint connection.WithAddress, tlsConfig connection.TLSConfig) error {
146+
creds, err := tlsConfig.ClientCredentials()
147+
if err != nil {
148+
return err
149+
}
150+
conn, err := grpc.NewClient(endpoint.Address(), grpc.WithTransportCredentials(creds))
151+
if err != nil {
152+
return err
153+
}
154+
defer conn.Close() //nolint:errcheck
155+
156+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
157+
defer cancel()
158+
159+
client := committerpb.NewQueryServiceClient(conn)
160+
_, err = client.GetTransactionStatus(ctx, &committerpb.TxStatusQuery{})
161+
// FilterUnavailableErrorCode returns nil for transient connectivity errors
162+
// (Unavailable, DeadlineExceeded) and passes through application-level errors.
163+
// An application-level error means TLS succeeded, so we invert: if the filter
164+
// passes through an error, the connection worked.
165+
if grpcerror.FilterUnavailableErrorCode(err) != nil {
166+
return nil
167+
}
168+
return err
169+
}

loadgen/adapters/sidecar.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func (c *SidecarAdapter) RunWorkload(ctx context.Context, txStream *workload.Str
5555
g, gCtx := errgroup.WithContext(dCtx)
5656

5757
g.Go(func() error {
58-
return grpcservice.StartAndServe(gCtx, orderer, c.config.OrdererServers...)
58+
return grpcservice.StartAndServe(gCtx, orderer, nil, c.config.OrdererServers...)
5959
})
6060

6161
g.Go(func() error {

0 commit comments

Comments
 (0)