Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions cmd/config/samples/mock-orderer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@ server:
# Path to the server's TLS private key file.
key-path: /root/artifacts/ordererOrganizations/orderer-org-0/orderers/orderer-0-org-0/tls/server.key
# List of paths to CA certificate files for verifying client certificates (required for mtls).
# TODO: Remove peer's CA-Certs as these will be loaded from the config block.
ca-cert-paths:
- /root/artifacts/ordererOrganizations/orderer-org-0/msp/tlscacerts/tlsorderer-org-0-CA-cert.pem
- /root/artifacts/peerOrganizations/peer-org-0/msp/tlscacerts/tlspeer-org-0-CA-cert.pem
- /root/artifacts/peerOrganizations/peer-org-1/msp/tlscacerts/tlspeer-org-1-CA-cert.pem
# Peer organization CAs are loaded automatically from config block

# Maximum number of transactions to include in each block. Blocks are cut when
# this size is reached or when the block-timeout expires, whichever comes first.
Expand Down
4 changes: 3 additions & 1 deletion cmd/mock/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ func startMockOrderer() *cobra.Command {
if conf.Server != nil && !conf.Server.Endpoint.Empty() {
serverConfigs = append(serverConfigs, conf.Server)
}
return grpcservice.StartAndServe(cmd.Context(), service, serverConfigs...)
return grpcservice.StartAndServeWithAdditionalCAs(
cmd.Context(), service, serverConfigs, service.GetConfigBlockCAs()...,
)
},
}
cliutil.SetDefaultFlags(cmd, &configPath)
Expand Down
64 changes: 58 additions & 6 deletions mock/orderer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ type (
cache *blockCache
healthcheck *health.Server

// applicationCAs holds CA certificate bytes loaded from config blocks at initialization.
// These are merged with static CAs from ServerConfig.TLS.CACertPaths when creating servers.
applicationCAs [][]byte

// config uses atomic.Pointer to allow safe concurrent reads by the Run() goroutine
// while supporting runtime updates (e.g., BlockTimeout changes in tests).
// Always use Load() to read and Store() to update the entire config struct.
Expand Down Expand Up @@ -163,6 +167,7 @@ func NewMockOrderer(config *OrdererConfig) (*Orderer, error) {
config.TestServerParameters.NumService = defaultConfig.TestServerParameters.NumService
}
genesisBlock := BlockWithConsenters{Block: defaultConfigBlock}
var applicationCAs [][]byte
if len(config.ArtifactsPath) > 0 {
configBlockPath := path.Join(config.ArtifactsPath, cryptogen.ConfigBlockFileName)
configMaterial, err := channelconfig.LoadConfigBlockMaterialFromFile(configBlockPath)
Expand All @@ -177,15 +182,19 @@ func NewMockOrderer(config *OrdererConfig) (*Orderer, error) {
Block: configMaterial.ConfigBlock,
ConsenterSigners: consenters,
}
// Load application root CAs from the config block.
// These will be used when creating the gRPC servers.
applicationCAs = getApplicationCAsFromConfigBlock(configMaterial.ConfigBlock)
}
numServices := max(1, config.TestServerParameters.NumService, len(config.ServerConfigs))
o := &Orderer{
genesisBlock: genesisBlock,
inEnvs: make(chan envelopeEntry, numServices*config.BlockSize*config.OutBlockCapacity),
inBlocks: make(chan *BlockWithConsenters, config.BlockSize*config.OutBlockCapacity),
cutBlock: make(chan any),
cache: newBlockCache(config.OutBlockCapacity),
healthcheck: connection.DefaultHealthCheckService(),
genesisBlock: genesisBlock,
applicationCAs: applicationCAs,
inEnvs: make(chan envelopeEntry, numServices*config.BlockSize*config.OutBlockCapacity),
inBlocks: make(chan *BlockWithConsenters, config.BlockSize*config.OutBlockCapacity),
cutBlock: make(chan any),
cache: newBlockCache(config.OutBlockCapacity),
healthcheck: connection.DefaultHealthCheckService(),
}
o.config.Store(config)
return o, nil
Expand Down Expand Up @@ -505,6 +514,13 @@ func newBlockCache(size int) *blockCache {
}
}

// GetConfigBlockCAs returns the application root CAs loaded from the config block at initialization.
// We do not export the field directly to avoid unintended modifications,
// as these CAs are used when creating the gRPC servers.
func (o *Orderer) GetConfigBlockCAs() [][]byte {
return o.applicationCAs
}

func (c *blockCache) releaseAfter(ctx context.Context) (stop func() bool) {
return context.AfterFunc(ctx, func() {
c.mu.L.Lock()
Expand Down Expand Up @@ -569,3 +585,39 @@ func addEnvelope(c *fifoCache[any], e *common.Envelope) bool {
digest := base64.StdEncoding.EncodeToString(digestRaw[:])
return c.addIfNotExist(digest, nil)
}

// getApplicationCAsFromConfigBlock retrieves the root CA certificates of all organizations from a config block.
func getApplicationCAsFromConfigBlock(configBlock *common.Block) [][]byte {
if configBlock == nil {
logger.Warn("Failed to extract application CAs from config block: config block is nil")
return nil
}

logger.Debug("Loading config block material to extract application root CAs")
configMaterial, err := channelconfig.LoadConfigBlockMaterial(configBlock)
if err != nil {
// static config fallback
logger.Warnf("Failed to extract application CAs from config block: %v", err)
return nil
}

if len(configMaterial.ApplicationOrganizations) == 0 {
logger.Debug("No application organizations found in config block")
return nil
}

var allCAs [][]byte
for _, appOrg := range configMaterial.ApplicationOrganizations {
allCAs = append(allCAs, appOrg.CACerts...)
}

if len(allCAs) == 0 {
logger.Debug("No application CAs found in config block")
return nil
}

logger.Infof("Loaded %d application CA certificate(s) from %d application organization(s)",
len(allCAs), len(configMaterial.ApplicationOrganizations))

return allCAs
}
69 changes: 69 additions & 0 deletions mock/orderer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,3 +618,72 @@ func makeEnvelopePayload(i int) *common.Envelope {
Payload: fmt.Appendf(nil, "%d", i),
}
}

func TestNewMockOrdererWithApplicationCAs(t *testing.T) {
t.Parallel()

peerOrgCount := uint32(2)
serverTLSConfig, clientTLSConfig := test.CreateServerAndClientTLSConfig(t, connection.MutualTLSMode)
// Create test environment with peer organizations
env := NewOrdererTestEnv(t, &OrdererTestParameters{
ChanID: "test-channel",
NumIDs: 2,
ServerPerID: 2,
PeerOrganizationCount: peerOrgCount,
ServerTLSConfig: serverTLSConfig,
OrdererConfig: &OrdererConfig{
BlockSize: 10,
BlockTimeout: time.Second,
OutBlockCapacity: 100,
PayloadCacheSize: 100,
},
})

// Verify that application CAs were loaded from config block
require.Len(t, env.Orderer.applicationCAs, int(peerOrgCount),
"Should have loaded %d CA certificates from config block", peerOrgCount)

// Build client TLS configs for each peer organization with mutual TLS.
clientTLSConfigs, err := testcrypto.BuildClientTLSConfigsPerOrg(env.ArtifactsPath)
require.NoError(t, err)

// Verify we can actually connect from each peer organization using the loaded CAs
for orgName, tlsConfig := range clientTLSConfigs.Peer {
t.Run(fmt.Sprintf("connect_from_%s", orgName), func(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(t.Context())
defer cancel()

// Add the root CA of the orderer's certificates.
tlsConfig.CACertPaths = append(tlsConfig.CACertPaths, clientTLSConfig.CACertPaths...)

// Create connection with peer org's TLS config
conn, err := connection.NewSingleConnection(&connection.ClientConfig{
Endpoint: &env.AllServerConfig[0].Endpoint,
TLS: tlsConfig,
})
require.NoError(t, err, "Should create connection from peer org %s", orgName)
defer conn.Close() //nolint:errcheck

// Establish a deliver stream to force the TLS handshake
client := ab.NewAtomicBroadcastClient(conn)
stream, err := client.Deliver(ctx)
require.NoError(t, err, "Should create deliver stream from peer org %s", orgName)

// Send a seek request to actually establish the connection
seekEnv, err := protoutil.CreateSignedEnvelope(
common.HeaderType_DELIVER_SEEK_INFO, env.ChanID, nil, &ab.SeekInfo{
Start: &ab.SeekPosition{
Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 0}},
},
Stop: &ab.SeekPosition{
Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 0}},
},
Behavior: ab.SeekInfo_BLOCK_UNTIL_READY,
}, 0, 0,
)
require.NoError(t, err)
require.NoError(t, stream.Send(seekEnv), "Should send seek request from peer org %s", orgName)
})
}
}
14 changes: 9 additions & 5 deletions mock/test_exports.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,15 @@ func NewOrdererTestEnv(t *testing.T, p *OrdererTestParameters) *OrdererTestEnv {
// StartServers starts the servers for the orderer and maps them to their respective IDs.
func (e *OrdererTestEnv) StartServers(t *testing.T) {
t.Helper()
allServers := test.StartGrpcServersWithConfigForTest(
t.Context(), t, e.Orderer.RegisterService, e.AllServerConfig...,
)
require.Len(t, allServers.Servers, len(e.AllServerConfig))
e.AllServers = allServers.Servers
// Create servers with application CAs merged in
grpcServers := make([]*grpc.Server, len(e.AllServerConfig))
for i, sc := range e.AllServerConfig {
grpcServers[i] = test.RunGrpcServerForTest(
t.Context(), t, sc, e.Orderer.RegisterService, e.Orderer.applicationCAs...,
)
}
require.Len(t, grpcServers, len(e.AllServerConfig))
e.AllServers = grpcServers
}

// StopServers stops the servers and closes their pre allocated listeners.
Expand Down
5 changes: 4 additions & 1 deletion utils/connection/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,14 @@ func (c TLSConfig) ClientCredentials() (credentials.TransportCredentials, error)
}

// ServerCredentials converts TLSConfig into a TLSMaterials struct and generates server creds.
func (c TLSConfig) ServerCredentials() (credentials.TransportCredentials, error) {
func (c TLSConfig) ServerCredentials(additionalCAs ...[]byte) (credentials.TransportCredentials, error) {
tlsMaterials, err := NewServerTLSMaterials(c)
if err != nil {
return nil, err
}
if len(additionalCAs) > 0 && tlsMaterials.Mode == MutualTLSMode {
tlsMaterials.CACerts = append(tlsMaterials.CACerts, additionalCAs...)
}
return NewServerCredentialsFromMaterial(tlsMaterials)
}

Expand Down
9 changes: 6 additions & 3 deletions utils/connection/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,13 @@ var (
portConflictRegex = regexp.MustCompile(`(?i)(address\s+already\s+in\s+use|port\s+is\s+already\s+allocated)`)
)

// GrpcServer instantiate a [grpc.Server].
func (c *ServerConfig) GrpcServer() (*grpc.Server, error) {
// GrpcServer instantiates a [grpc.Server] with additional CA certificate bytes
// merged with those from the TLS configuration.
// This is useful when CA certificates come from
// multiple sources (e.g., YAML config + config blocks).
func (c *ServerConfig) GrpcServer(additionalCAs ...[]byte) (*grpc.Server, error) {
opts := []grpc.ServerOption{grpc.MaxRecvMsgSize(maxMsgSize), grpc.MaxSendMsgSize(maxMsgSize)}
serverGrpcTransportCreds, err := c.TLS.ServerCredentials()
serverGrpcTransportCreds, err := c.TLS.ServerCredentials(additionalCAs...)
if err != nil {
return nil, errors.Wrapf(err, "failed loading the server's grpc credentials")
}
Expand Down
17 changes: 14 additions & 3 deletions utils/grpcservice/start_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ type Registerer interface {
// to be ready, then creates and serves gRPC server(s). Stops everything
// if either the service or any server exits.
func StartAndServe(ctx context.Context, service Service, serverConfigs ...*connection.ServerConfig) error {
return StartAndServeWithAdditionalCAs(ctx, service, serverConfigs)
}

// StartAndServeWithAdditionalCAs runs a full lifecycle service: starts the service, waits for it
// to be ready, then creates and serves gRPC server(s). Stops everything
// if either the service or any server exits.
func StartAndServeWithAdditionalCAs(
ctx context.Context, service Service, serverConfigs []*connection.ServerConfig, additionalCAs ...[]byte,
) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand All @@ -61,7 +70,7 @@ func StartAndServe(ctx context.Context, service Service, serverConfigs ...*conne
g.Go(func() error {
// If the GRPC servers stop, there is no reason to continue the service.
defer cancel()
return Serve(gCtx, service, sc)
return Serve(gCtx, service, sc, additionalCAs...)
})
}
return g.Wait()
Expand All @@ -70,12 +79,14 @@ func StartAndServe(ctx context.Context, service Service, serverConfigs ...*conne
// Serve creates a gRPC server and listener from the config, registers the
// service, and serves until the context is done. For services that only
// implement Registerer (e.g., mock services without Run/WaitForReady).
func Serve(ctx context.Context, service Registerer, serverConfig *connection.ServerConfig) error {
func Serve(
ctx context.Context, service Registerer, serverConfig *connection.ServerConfig, additionalCAs ...[]byte,
) error {
listener, err := serverConfig.Listener(ctx)
if err != nil {
return err
}
server, err := serverConfig.GrpcServer()
server, err := serverConfig.GrpcServer(additionalCAs...)
if err != nil {
return errors.Wrapf(err, "failed creating grpc server")
}
Expand Down
8 changes: 6 additions & 2 deletions utils/test/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,16 @@ func ServerToMultiClientConfig(
// did not specify a port.
// The method asserts that the GRPC server did not end with failure.
func RunGrpcServerForTest(
ctx context.Context, tb testing.TB, serverConfig *connection.ServerConfig, register func(server *grpc.Server),
ctx context.Context,
tb testing.TB,
serverConfig *connection.ServerConfig,
register func(server *grpc.Server),
additionalCAs ...[]byte,
) *grpc.Server {
tb.Helper()
listener, err := serverConfig.Listener(ctx)
require.NoError(tb, err)
server, err := serverConfig.GrpcServer()
server, err := serverConfig.GrpcServer(additionalCAs...)
require.NoError(tb, err)

if register != nil {
Expand Down
70 changes: 70 additions & 0 deletions utils/testcrypto/load_crypto.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ package testcrypto
import (
"os"
"path"
"path/filepath"
"strings"
"time"

"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -113,3 +115,71 @@ func GetOrdererConnConfig(artifactsPath string, clientTLSConfig connection.TLSCo
Identity: id,
}
}

// PerOrgTLSConfig holds each organization's representative TLS config for future client creation.
// We use this struct for the TestWithDynamicRootCAs.
type PerOrgTLSConfig struct {
Peer map[string]connection.TLSConfig
}

// BuildClientTLSConfigsPerOrg builds TLS configs using only the "client" user.
func BuildClientTLSConfigsPerOrg(root string) (*PerOrgTLSConfig, error) {
peerConfigs := make(map[string]connection.TLSConfig)
peerRoot := filepath.Join(root, cryptogen.PeerOrganizationsDir)

orgEntries, err := os.ReadDir(peerRoot)
// If the path doesn't exist, return empty maps to avoid nil pointer issues
if err != nil {
return nil, errors.Newf("failed to read peer organizations dir: %v", peerRoot)
}

// go over all peer organizations
for _, orgEntry := range orgEntries {
if !orgEntry.IsDir() {
continue
}

orgName := orgEntry.Name()
usersDir := filepath.Join(peerRoot, orgName, cryptogen.UsersDir)

// Extracted the inner loop logic to a helper function
clientUser, err := getClientUser(usersDir)
if err != nil {
return nil, errors.Newf("org %s: %w", orgName, err)
}

// Define paths relative to the selected client user
clientTLSDir := filepath.Join(usersDir, clientUser, cryptogen.TLSDir)
peerConfigs[orgName] = connection.TLSConfig{
Mode: connection.MutualTLSMode,
CertPath: filepath.Join(clientTLSDir, "client.crt"),
KeyPath: filepath.Join(clientTLSDir, "client.key"),
CACertPaths: []string{filepath.Join(clientTLSDir, "ca.crt")},
}
}

return &PerOrgTLSConfig{
Peer: peerConfigs,
}, nil
}

// getClientUser scans the directory and returns the first client user directory name.
func getClientUser(usersDir string) (string, error) {
userEntries, err := os.ReadDir(usersDir)
if err != nil {
return "", errors.Newf("missing or failed to read users dir: %e", err)
}

for _, u := range userEntries {
if !u.IsDir() {
continue
}

name := u.Name()
if len(name) >= 6 && strings.EqualFold(name[:6], "client") {
return name, nil
}
}

return "", errors.Newf("no 'client' user directory found under %s", usersDir)
}
Loading