Skip to content

Commit b828378

Browse files
authored
BFT synchronizer with noop-verifier and block-puller (#636)
Signed-off-by: Yoav Tock <tock@il.ibm.com>
1 parent 9084bd0 commit b828378

File tree

10 files changed

+425
-311
lines changed

10 files changed

+425
-311
lines changed

node/consensus/synchronizer/block_puller_factory.go

Lines changed: 0 additions & 66 deletions
This file was deleted.
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package synchronizer
8+
9+
import (
10+
"crypto/x509"
11+
"encoding/pem"
12+
"time"
13+
14+
"github.com/hyperledger/fabric-lib-go/bccsp"
15+
"github.com/hyperledger/fabric-lib-go/common/flogging"
16+
cb "github.com/hyperledger/fabric-protos-go-apiv2/common"
17+
"github.com/hyperledger/fabric-x-common/common/channelconfig"
18+
"github.com/hyperledger/fabric-x-orderer/common/deliverclient/orderers"
19+
"github.com/hyperledger/fabric-x-orderer/common/types"
20+
"github.com/hyperledger/fabric-x-orderer/config"
21+
"github.com/hyperledger/fabric-x-orderer/config/protos"
22+
"github.com/hyperledger/fabric-x-orderer/node/comm"
23+
"github.com/pkg/errors"
24+
"google.golang.org/protobuf/proto"
25+
)
26+
27+
//go:generate counterfeiter -o mocks/height_detector.go . HeightDetector
28+
29+
// HeightDetector is used to detect the ledger height of other consenters.
30+
// It is used by the synchronizer in order to establish a target height for pulling blocks.
31+
type HeightDetector interface {
32+
HeightsByEndpoints() (map[string]uint64, string, error)
33+
Close()
34+
}
35+
36+
//go:generate counterfeiter -o mocks/height_detector_factory.go . HeightDetectorFactory
37+
38+
type HeightDetectorFactory interface {
39+
// CreateHeightDetector creates a new height detector.
40+
CreateHeightDetector(
41+
myPartyID types.PartyID,
42+
support ConsenterSupport,
43+
baseDialer *comm.PredicateDialer,
44+
clusterConfig config.Cluster,
45+
bccsp bccsp.BCCSP,
46+
logger *flogging.FabricLogger,
47+
) (HeightDetector, error)
48+
}
49+
50+
type HeightDetectorCreator struct{}
51+
52+
func (*HeightDetectorCreator) CreateHeightDetector(
53+
myPartyID types.PartyID,
54+
support ConsenterSupport,
55+
baseDialer *comm.PredicateDialer,
56+
clusterConfig config.Cluster,
57+
bccsp bccsp.BCCSP,
58+
logger *flogging.FabricLogger,
59+
) (HeightDetector, error) {
60+
return newBlockPuller(myPartyID, support, baseDialer, clusterConfig, bccsp, logger)
61+
}
62+
63+
// newBlockPuller creates a new block puller, which is used for target height detection.
64+
func newBlockPuller(
65+
myPartyID types.PartyID,
66+
support ConsenterSupport,
67+
baseDialer *comm.PredicateDialer,
68+
clusterConfig config.Cluster,
69+
bccsp bccsp.BCCSP,
70+
logger *flogging.FabricLogger,
71+
) (HeightDetector, error) {
72+
// TODO replace this with the actual implementation
73+
verifyBlockSequenceNoOp := func(blocks []*cb.Block, _ string) error {
74+
// TODO
75+
return nil
76+
}
77+
78+
stdDialer := &comm.StandardDialer{
79+
Config: baseDialer.Config.Clone(),
80+
}
81+
stdDialer.Config.AsyncConnect = false
82+
stdDialer.Config.SecOpts.VerifyCertificate = nil
83+
84+
// TODO Extract endpoint and TLS cert from the config.
85+
endpoints, err := extractEndpointCriteriaFromConfig(myPartyID, support)
86+
if err != nil {
87+
return nil, err
88+
}
89+
90+
der, _ := pem.Decode(stdDialer.Config.SecOpts.Certificate)
91+
if der == nil {
92+
return nil, errors.Errorf("client certificate isn't in PEM format: %v",
93+
string(stdDialer.Config.SecOpts.Certificate))
94+
}
95+
96+
myCert, err := x509.ParseCertificate(der.Bytes)
97+
if err != nil {
98+
logger.Warnf("Failed parsing my own TLS certificate: %v, therefore we may connect to our own endpoint when pulling blocks", err)
99+
}
100+
101+
// TODO Fabric defaults. Extend the config to have these values, and use the config values instead of hardcoding them here.
102+
// Cluster: Cluster{
103+
// ReplicationMaxRetries: 12,
104+
// RPCTimeout: time.Second * 7,
105+
// DialTimeout: time.Second * 5,
106+
// ReplicationBufferSize: 20971520,
107+
// SendBufferSize: 100,
108+
// ReplicationRetryTimeout: time.Second * 5,
109+
// ReplicationPullTimeout: time.Second * 5,
110+
// CertExpirationWarningThreshold: time.Hour * 24 * 7,
111+
// ReplicationPolicy: "consensus", // BFT default; on etcdraft it is ignored
112+
// },
113+
114+
bp := &comm.BlockPuller{
115+
MyOwnTLSCert: myCert,
116+
VerifyBlockSequence: verifyBlockSequenceNoOp,
117+
Logger: logger,
118+
RetryTimeout: time.Second * 5, // clusterConfig.ReplicationRetryTimeout,
119+
MaxTotalBufferBytes: 20 * 1024 * 1024, // clusterConfig.ReplicationBufferSize,
120+
FetchTimeout: time.Second * 5, // clusterConfig.ReplicationPullTimeout,
121+
Endpoints: endpoints, // TODO the block puller is not party aware yet
122+
Signer: support,
123+
TLSCert: der.Bytes,
124+
Channel: support.ChannelID(),
125+
Dialer: stdDialer,
126+
}
127+
128+
logger.Infof("Built new block puller (target height detector) with cluster config: %+v, endpoints: %+v", clusterConfig, endpoints)
129+
130+
return bp, nil
131+
}
132+
133+
func extractEndpointCriteriaFromConfig(myPartyID types.PartyID, support ConsenterSupport) ([]comm.EndpointCriteria, error) {
134+
party2endpoint, err := extractConsenterAddresses(support.SharedConfig())
135+
if err != nil {
136+
return nil, err
137+
}
138+
139+
var endpoints []comm.EndpointCriteria
140+
for party, ep := range party2endpoint {
141+
if party == myPartyID {
142+
continue
143+
}
144+
endpointCriteria := &comm.EndpointCriteria{
145+
Endpoint: ep.Address,
146+
TLSRootCAs: ep.RootCerts,
147+
}
148+
endpoints = append(endpoints, *endpointCriteria)
149+
}
150+
151+
return endpoints, nil
152+
}
153+
154+
func extractConsenterAddresses(ordererConfig channelconfig.Orderer) (orderers.Party2Endpoint, error) {
155+
consensusMeta := ordererConfig.ConsensusMetadata()
156+
sharedConfig := &protos.SharedConfig{}
157+
if err := proto.Unmarshal(consensusMeta, sharedConfig); err != nil {
158+
return nil, errors.Wrap(err, "failed to unmarshal consensus metadata")
159+
}
160+
161+
conf := &config.Configuration{
162+
SharedConfig: sharedConfig,
163+
}
164+
165+
cInfo := conf.ExtractConsenters()
166+
167+
party2Endpoint := make(orderers.Party2Endpoint)
168+
169+
for _, consenter := range cInfo {
170+
party2Endpoint[consenter.PartyID] = &orderers.Endpoint{
171+
Address: consenter.Endpoint,
172+
}
173+
for _, cert := range consenter.TLSCACerts {
174+
party2Endpoint[consenter.PartyID].RootCerts = append(party2Endpoint[consenter.PartyID].RootCerts, cert)
175+
}
176+
}
177+
178+
return party2Endpoint, nil
179+
}

node/consensus/synchronizer/mocks/block_puller_factory.go

Lines changed: 0 additions & 123 deletions
This file was deleted.

0 commit comments

Comments
 (0)