Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions common/requestfilter/rulesverifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package requestfilter_test
import (
"errors"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -51,16 +52,24 @@ func TestRulesVerifier(t *testing.T) {
})
}

// scenario: test that verifiers can run in parallel.
// scenario: test that verifiers will execute verify in parallel.
func TestConcurrentVeirfy(t *testing.T) {
v := requestfilter.NewRulesVerifier(nil)
fc := &mocks.FakeFilterConfig{}
fc.GetMaxSizeBytesReturns(1000, nil)
v.AddRule(requestfilter.NewMaxSizeFilter(fc))
fr := &mocks.FakeRule{}
var activeVerifiers int64 = 0
var maxVerifiers int64 = 0
fr.VerifyStub = func(r *comm.Request) error {
atomic.AddInt64(&activeVerifiers, 1)
time.Sleep(50 * time.Millisecond)
atomic.StoreInt64(&maxVerifiers, max(atomic.LoadInt64(&activeVerifiers), atomic.LoadInt64(&maxVerifiers)))
time.Sleep(50 * time.Millisecond)
atomic.AddInt64(&activeVerifiers, -1)
return nil
}
v.AddRule(fr)

var wg sync.WaitGroup
start := make(chan struct{})
end := make(chan struct{})
verifiers := 8

for i := 0; i < verifiers; i++ {
Expand All @@ -70,21 +79,12 @@ func TestConcurrentVeirfy(t *testing.T) {
<-start
err := v.Verify(&comm.Request{})
require.NoError(t, err)
time.Sleep(1 * time.Second)
}()
}

go func() {
close(start)
wg.Wait()
close(end)
}()

select {
case <-end:
case <-time.After(7 * time.Second):
t.Error("concurrent verify took too long")
}
close(start)
wg.Wait()
require.True(t, maxVerifiers > 1)
}

// scenario: multiple goroutines call verify and update. chech with -race flag.
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ func (config *Configuration) ExtractRouterConfig() *nodeconfig.RouterNodeConfig
NumOfgRPCStreamsPerConnection: config.LocalConfig.NodeLocalConfig.RouterParams.NumberOfStreamsPerConnection,
UseTLS: config.LocalConfig.TLSConfig.Enabled,
ClientAuthRequired: config.LocalConfig.TLSConfig.ClientAuthRequired,
RequestMaxBytes: config.SharedConfig.BatchingConfig.RequestMaxBytes,
}
return routerConfig
}
Expand Down
1 change: 1 addition & 0 deletions node/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type RouterNodeConfig struct {
NumOfgRPCStreamsPerConnection int
UseTLS bool
ClientAuthRequired bool
RequestMaxBytes uint64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a getter to implement the FilterConfig interface

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

}

type AssemblerNodeConfig struct {
Expand Down
4 changes: 4 additions & 0 deletions node/config/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,7 @@ func (c *BatcherNodeConfig) GetShardsIDs() []types.ShardID {
})
return ids
}

func (rc *RouterNodeConfig) GetMaxSizeBytes() (uint64, error) {
return rc.RequestMaxBytes, nil
}
18 changes: 15 additions & 3 deletions node/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/hyperledger/fabric-protos-go-apiv2/common"
"github.com/hyperledger/fabric-protos-go-apiv2/orderer"
"github.com/hyperledger/fabric-x-orderer/common/requestfilter"
"github.com/hyperledger/fabric-x-orderer/common/types"
"github.com/hyperledger/fabric-x-orderer/config"
"github.com/hyperledger/fabric-x-orderer/node"
Expand All @@ -40,6 +41,7 @@ type Router struct {
shardIDs []types.ShardID
incoming uint64
routerNodeConfig *nodeconfig.RouterNodeConfig
verifier *requestfilter.RulesVerifier
}

func NewRouter(config *nodeconfig.RouterNodeConfig, logger types.Logger) *Router {
Expand Down Expand Up @@ -68,7 +70,9 @@ func NewRouter(config *nodeconfig.RouterNodeConfig, logger types.Logger) *Router
return int(shardIDs[i]) < int(shardIDs[j])
})

r := createRouter(shardIDs, batcherEndpoints, tlsCAsOfBatchers, config, logger)
verifier := createVerifier(config)

r := createRouter(shardIDs, batcherEndpoints, tlsCAsOfBatchers, config, logger, verifier)
r.init()
return r
}
Expand Down Expand Up @@ -169,7 +173,7 @@ func (r *Router) Deliver(server orderer.AtomicBroadcast_DeliverServer) error {
return fmt.Errorf("not implemented")
}

func createRouter(shardIDs []types.ShardID, batcherEndpoints map[types.ShardID]string, batcherRootCAs map[types.ShardID][][]byte, rconfig *nodeconfig.RouterNodeConfig, logger types.Logger) *Router {
func createRouter(shardIDs []types.ShardID, batcherEndpoints map[types.ShardID]string, batcherRootCAs map[types.ShardID][][]byte, rconfig *nodeconfig.RouterNodeConfig, logger types.Logger, verifier *requestfilter.RulesVerifier) *Router {
if rconfig.NumOfConnectionsForBatcher == 0 {
rconfig.NumOfConnectionsForBatcher = config.DefaultRouterParams.NumberOfConnectionsPerBatcher
}
Expand All @@ -187,10 +191,11 @@ func createRouter(shardIDs []types.ShardID, batcherEndpoints map[types.ShardID]s
logger: logger,
shardIDs: shardIDs,
routerNodeConfig: rconfig,
verifier: verifier,
}

for _, shardId := range shardIDs {
r.shardRouters[shardId] = NewShardRouter(logger, batcherEndpoints[shardId], batcherRootCAs[shardId], rconfig.TLSCertificateFile, rconfig.TLSPrivateKeyFile, rconfig.NumOfConnectionsForBatcher, rconfig.NumOfgRPCStreamsPerConnection)
r.shardRouters[shardId] = NewShardRouter(logger, batcherEndpoints[shardId], batcherRootCAs[shardId], rconfig.TLSCertificateFile, rconfig.TLSPrivateKeyFile, rconfig.NumOfConnectionsForBatcher, rconfig.NumOfgRPCStreamsPerConnection, verifier)
}

go func() {
Expand Down Expand Up @@ -326,6 +331,13 @@ func createTraceID(rand *rand2.Rand) []byte {
return trace
}

func createVerifier(config *nodeconfig.RouterNodeConfig) *requestfilter.RulesVerifier {
rv := requestfilter.NewRulesVerifier(nil)
rv.AddRule(requestfilter.PayloadNotEmptyRule{})
rv.AddRule(requestfilter.NewMaxSizeFilter(config))
return rv
}

// IsAllStreamsOK checks that all the streams accross all shard-routers are non-faulty.
// Use for testing only.
func (r *Router) IsAllStreamsOK() bool {
Expand Down
49 changes: 49 additions & 0 deletions node/router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,54 @@ func TestClientRouterBroadcastRequestsAgainstMultipleBatchers(t *testing.T) {
}, 60*time.Second, 10*time.Millisecond)
}

// test request filters
// 1) Start a client, router and stub batcher
// 2) Send valid request, expect no error.
// 3) Send request with empty payload, expect error.
// 4) Send request that exceed the maximal size, expect error.
// 5) ** Not implemented ** send request with bad signature, expect error.
func TestRequestFilters(t *testing.T) {
// 1) Start a client, router and stub batcher
testSetup := createRouterTestSetup(t, types.PartyID(1), 1, true, false)
err := createServerTLSClientConnection(testSetup, testSetup.ca)
require.NoError(t, err)
require.NotNil(t, testSetup.clientConn)
defer testSetup.Close()
conn := testSetup.clientConn
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
cl := protos.NewRequestTransmitClient(conn)
defer cancel()

// 2) send a valid request.
buff := make([]byte, 300)
binary.BigEndian.PutUint32(buff, uint32(12345))
req := &protos.Request{
Payload: buff,
}
resp, err := cl.Submit(ctx, req)
require.NoError(t, err)
require.Equal(t, "", resp.Error)

// 3) send request with empty payload.
req = &protos.Request{
Payload: nil,
}
resp, err = cl.Submit(ctx, req)
require.NoError(t, err)
require.Equal(t, "request verification error: empty payload field", resp.Error)
// 4) send request with payload too big. (3000 is more than 1 << 10, the maximal request size in bytes)
buff = make([]byte, 3000)
binary.BigEndian.PutUint32(buff, uint32(12345))
req = &protos.Request{
Payload: buff,
}
resp, err = cl.Submit(ctx, req)
require.NoError(t, err)
require.Equal(t, "request verification error: the request's size exceeds the maximum size: actual = 3000, limit = 1024", resp.Error)

// 5) send request with invalid signature. Not implemented
}

func createServerTLSClientConnection(testSetup *routerTestSetup, ca tlsgen.CA) error {
cc := comm.ClientConfig{
SecOpts: comm.SecureOptions{
Expand Down Expand Up @@ -558,6 +606,7 @@ func createAndStartRouter(t *testing.T, partyID types.PartyID, ca tlsgen.CA, bat
ListenAddress: "127.0.0.1:0",
ClientAuthRequired: clientAuthRequired,
Shards: shards,
RequestMaxBytes: 1 << 10,
}

r := router.NewRouter(conf, logger)
Expand Down
5 changes: 5 additions & 0 deletions node/router/shard_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"google.golang.org/grpc/connectivity"

"github.com/hyperledger/fabric-x-orderer/common/requestfilter"
"github.com/hyperledger/fabric-x-orderer/common/types"
"github.com/hyperledger/fabric-x-orderer/node/comm"
protos "github.com/hyperledger/fabric-x-orderer/node/protos/comm"
Expand Down Expand Up @@ -69,6 +70,7 @@ type ShardRouter struct {
closeReconnectOnce sync.Once
reconnectRequests chan reconnectReq
closeReconnect chan bool
verifier *requestfilter.RulesVerifier
}

func NewShardRouter(l types.Logger,
Expand All @@ -78,6 +80,7 @@ func NewShardRouter(l types.Logger,
tlsKey []byte,
numOfConnectionsForBatcher int,
numOfgRPCStreamsPerConnection int,
verifier *requestfilter.RulesVerifier,
) *ShardRouter {
cc := comm.ClientConfig{
AsyncConnect: false,
Expand Down Expand Up @@ -106,6 +109,7 @@ func NewShardRouter(l types.Logger,
clientConfig: cc,
reconnectRequests: make(chan reconnectReq, 2*numOfgRPCStreamsPerConnection*numOfConnectionsForBatcher),
closeReconnect: make(chan bool),
verifier: verifier,
}

return sr
Expand Down Expand Up @@ -332,6 +336,7 @@ func (sr *ShardRouter) initStream(i int, j int) error {
streamNum: j,
srReconnectChan: sr.reconnectRequests,
notifiedReconnect: false,
verifier: sr.verifier,
}
go s.sendRequests()
go s.readResponses()
Expand Down
6 changes: 5 additions & 1 deletion node/router/shard_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"google.golang.org/grpc/grpclog"

"github.com/hyperledger/fabric-x-orderer/common/requestfilter"
"github.com/hyperledger/fabric-x-orderer/common/types"
"github.com/hyperledger/fabric-x-orderer/node/comm/tlsgen"
"github.com/hyperledger/fabric-x-orderer/node/router"
Expand Down Expand Up @@ -157,11 +158,14 @@ func createTestSetup(t *testing.T, partyID types.PartyID) *TestSetup {
ckp, err := ca.NewServerCertKeyPair("127.0.0.1")
require.NoError(t, err)

verifier := requestfilter.NewRulesVerifier(nil)
verifier.AddRule(requestfilter.AcceptRule{})

// create stub batcher
batcher := NewStubBatcher(t, ca, partyID, types.ShardID(1))

// create shard router
shardRouter := router.NewShardRouter(logger, batcher.GetBatcherEndpoint(), [][]byte{ca.CertBytes()}, ckp.Cert, ckp.Key, 10, 20)
shardRouter := router.NewShardRouter(logger, batcher.GetBatcherEndpoint(), [][]byte{ca.CertBytes()}, ckp.Cert, ckp.Key, 10, 20, verifier)

// start the batcher
batcher.Start()
Expand Down
58 changes: 40 additions & 18 deletions node/router/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"maps"
"sync"

"github.com/hyperledger/fabric-x-orderer/common/requestfilter"
"github.com/hyperledger/fabric-x-orderer/common/types"
protos "github.com/hyperledger/fabric-x-orderer/node/protos/comm"
)
Expand All @@ -32,6 +33,7 @@ type stream struct {
streamNum int
srReconnectChan chan reconnectReq
notifiedReconnect bool
verifier *requestfilter.RulesVerifier
}

// readResponses listens for responses from the batcher.
Expand All @@ -50,18 +52,9 @@ func (s *stream) readResponses() {
s.cancelOnServerError()
return
}

s.lock.Lock()
ch, exists := s.requestTraceIdToResponseChannel[string(resp.TraceId)]
delete(s.requestTraceIdToResponseChannel, string(resp.TraceId))
s.lock.Unlock()
if exists {
s.logger.Debugf("read response from batcher %s on request with trace id %x", s.endpoint, resp.TraceId)
s.logger.Debugf("registration for request with trace id %x was removed upon receiving a response", resp.TraceId)
ch <- Response{
SubmitResponse: resp,
}
} else {
s.logger.Debugf("read response from batcher %s on request with trace id %x", s.endpoint, resp.TraceId)
err = s.sendResponseToClient(resp)
if err != nil {
s.logger.Debugf("received a response from batcher %s for a request with trace id %x, which does not exist in the map, dropping response", s.endpoint, resp.TraceId)
}
}
Expand All @@ -81,17 +74,45 @@ func (s *stream) sendRequests() {
s.cancelOnServerError()
return
}
s.logger.Debugf("send request with trace id %x to batcher %s", msg.TraceId, s.endpoint)
err := s.requestTransmitSubmitStreamClient.Send(msg)
if err != nil {
s.logger.Errorf("Failed sending request to batcher %s", s.endpoint)
s.cancelOnServerError()
return
// verify the request
if err := s.verifier.Verify(msg); err != nil {
s.logger.Debugf("request is invalid: %s", err)
// send a response to the client
resp := protos.SubmitResponse{Error: fmt.Sprintf("request verification error: %s", err), TraceId: msg.TraceId}
err = s.sendResponseToClient(&resp)
if err != nil {
s.logger.Debugf("error sending response to client: %s", err)
}
} else {
s.logger.Debugf("send request with trace id %x to batcher %s", msg.TraceId, s.endpoint)
err := s.requestTransmitSubmitStreamClient.Send(msg)
if err != nil {
s.logger.Errorf("Failed sending request to batcher %s", s.endpoint)
s.cancelOnServerError()
return
}
}
}
}
}

func (s *stream) sendResponseToClient(response *protos.SubmitResponse) error {
traceID := response.TraceId
s.lock.Lock()
ch, exists := s.requestTraceIdToResponseChannel[string(traceID)]
delete(s.requestTraceIdToResponseChannel, string(traceID))
s.lock.Unlock()
if exists {
s.logger.Debugf("registration for request with trace id %x was removed upon receiving a response", traceID)
ch <- Response{
SubmitResponse: response,
}
return nil
} else {
return fmt.Errorf("request with traceID %x is not in map", traceID)
}
}

func (s *stream) cancelOnServerError() {
s.cancel()
s.sendResponseToAllClientsOnError(fmt.Errorf("server error: could not establish connection between router and batcher %s", s.endpoint))
Expand Down Expand Up @@ -201,6 +222,7 @@ CopyChannelLoop:
streamNum: s.streamNum,
srReconnectChan: s.srReconnectChan,
notifiedReconnect: false,
verifier: s.verifier,
}
s.lock.Unlock()

Expand Down
Loading