Skip to content

Commit 0b61ac3

Browse files
committed
integrating request verifier in router
Signed-off-by: Dor.Katzelnick <Dor.Katzelnick@ibm.com>
1 parent 96134d8 commit 0b61ac3

File tree

11 files changed

+158
-41
lines changed

11 files changed

+158
-41
lines changed

common/requestfilter/rulesverifier_test.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ package requestfilter_test
99
import (
1010
"errors"
1111
"sync"
12+
"sync/atomic"
1213
"testing"
1314
"time"
1415

@@ -51,16 +52,24 @@ func TestRulesVerifier(t *testing.T) {
5152
})
5253
}
5354

54-
// scenario: test that verifiers can run in parallel.
55+
// scenario: test that verifiers will execute verify in parallel.
5556
func TestConcurrentVeirfy(t *testing.T) {
5657
v := requestfilter.NewRulesVerifier(nil)
57-
fc := &mocks.FakeFilterConfig{}
58-
fc.GetMaxSizeBytesReturns(1000, nil)
59-
v.AddRule(requestfilter.NewMaxSizeFilter(fc))
58+
fr := &mocks.FakeRule{}
59+
var activeVerifiers int64 = 0
60+
var maxVerifiers int64 = 0
61+
fr.VerifyStub = func(r *comm.Request) error {
62+
atomic.AddInt64(&activeVerifiers, 1)
63+
time.Sleep(50 * time.Millisecond)
64+
atomic.StoreInt64(&maxVerifiers, max(atomic.LoadInt64(&activeVerifiers), atomic.LoadInt64(&maxVerifiers)))
65+
time.Sleep(50 * time.Millisecond)
66+
atomic.AddInt64(&activeVerifiers, -1)
67+
return nil
68+
}
69+
v.AddRule(fr)
6070

6171
var wg sync.WaitGroup
6272
start := make(chan struct{})
63-
end := make(chan struct{})
6473
verifiers := 8
6574

6675
for i := 0; i < verifiers; i++ {
@@ -70,21 +79,12 @@ func TestConcurrentVeirfy(t *testing.T) {
7079
<-start
7180
err := v.Verify(&comm.Request{})
7281
require.NoError(t, err)
73-
time.Sleep(1 * time.Second)
7482
}()
7583
}
7684

77-
go func() {
78-
close(start)
79-
wg.Wait()
80-
close(end)
81-
}()
82-
83-
select {
84-
case <-end:
85-
case <-time.After(7 * time.Second):
86-
t.Error("concurrent verify took too long")
87-
}
85+
close(start)
86+
wg.Wait()
87+
require.True(t, maxVerifiers > 1)
8888
}
8989

9090
// scenario: multiple goroutines call verify and update. chech with -race flag.

config/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ func (config *Configuration) ExtractRouterConfig() *nodeconfig.RouterNodeConfig
155155
NumOfgRPCStreamsPerConnection: config.LocalConfig.NodeLocalConfig.RouterParams.NumberOfStreamsPerConnection,
156156
UseTLS: config.LocalConfig.TLSConfig.Enabled,
157157
ClientAuthRequired: config.LocalConfig.TLSConfig.ClientAuthRequired,
158+
RequestMaxBytes: config.SharedConfig.BatchingConfig.RequestMaxBytes,
158159
}
159160
return routerConfig
160161
}

node/config/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ type RouterNodeConfig struct {
7777
NumOfgRPCStreamsPerConnection int
7878
UseTLS bool
7979
ClientAuthRequired bool
80+
RequestMaxBytes uint64
8081
}
8182

8283
type AssemblerNodeConfig struct {

node/config/utils.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,7 @@ func (c *BatcherNodeConfig) GetShardsIDs() []types.ShardID {
2222
})
2323
return ids
2424
}
25+
26+
func (rc *RouterNodeConfig) GetMaxSizeBytes() (uint64, error) {
27+
return rc.RequestMaxBytes, nil
28+
}

node/router/router.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020

2121
"github.com/hyperledger/fabric-protos-go-apiv2/common"
2222
"github.com/hyperledger/fabric-protos-go-apiv2/orderer"
23+
"github.com/hyperledger/fabric-x-orderer/common/requestfilter"
2324
"github.com/hyperledger/fabric-x-orderer/common/types"
2425
"github.com/hyperledger/fabric-x-orderer/config"
2526
"github.com/hyperledger/fabric-x-orderer/node"
@@ -40,6 +41,7 @@ type Router struct {
4041
shardIDs []types.ShardID
4142
incoming uint64
4243
routerNodeConfig *nodeconfig.RouterNodeConfig
44+
verifier *requestfilter.RulesVerifier
4345
}
4446

4547
func NewRouter(config *nodeconfig.RouterNodeConfig, logger types.Logger) *Router {
@@ -68,7 +70,9 @@ func NewRouter(config *nodeconfig.RouterNodeConfig, logger types.Logger) *Router
6870
return int(shardIDs[i]) < int(shardIDs[j])
6971
})
7072

71-
r := createRouter(shardIDs, batcherEndpoints, tlsCAsOfBatchers, config, logger)
73+
verifier := createVerifier(config)
74+
75+
r := createRouter(shardIDs, batcherEndpoints, tlsCAsOfBatchers, config, logger, verifier)
7276
r.init()
7377
return r
7478
}
@@ -169,7 +173,7 @@ func (r *Router) Deliver(server orderer.AtomicBroadcast_DeliverServer) error {
169173
return fmt.Errorf("not implemented")
170174
}
171175

172-
func createRouter(shardIDs []types.ShardID, batcherEndpoints map[types.ShardID]string, batcherRootCAs map[types.ShardID][][]byte, rconfig *nodeconfig.RouterNodeConfig, logger types.Logger) *Router {
176+
func createRouter(shardIDs []types.ShardID, batcherEndpoints map[types.ShardID]string, batcherRootCAs map[types.ShardID][][]byte, rconfig *nodeconfig.RouterNodeConfig, logger types.Logger, verifier *requestfilter.RulesVerifier) *Router {
173177
if rconfig.NumOfConnectionsForBatcher == 0 {
174178
rconfig.NumOfConnectionsForBatcher = config.DefaultRouterParams.NumberOfConnectionsPerBatcher
175179
}
@@ -187,10 +191,11 @@ func createRouter(shardIDs []types.ShardID, batcherEndpoints map[types.ShardID]s
187191
logger: logger,
188192
shardIDs: shardIDs,
189193
routerNodeConfig: rconfig,
194+
verifier: verifier,
190195
}
191196

192197
for _, shardId := range shardIDs {
193-
r.shardRouters[shardId] = NewShardRouter(logger, batcherEndpoints[shardId], batcherRootCAs[shardId], rconfig.TLSCertificateFile, rconfig.TLSPrivateKeyFile, rconfig.NumOfConnectionsForBatcher, rconfig.NumOfgRPCStreamsPerConnection)
198+
r.shardRouters[shardId] = NewShardRouter(logger, batcherEndpoints[shardId], batcherRootCAs[shardId], rconfig.TLSCertificateFile, rconfig.TLSPrivateKeyFile, rconfig.NumOfConnectionsForBatcher, rconfig.NumOfgRPCStreamsPerConnection, verifier)
194199
}
195200

196201
go func() {
@@ -326,6 +331,13 @@ func createTraceID(rand *rand2.Rand) []byte {
326331
return trace
327332
}
328333

334+
func createVerifier(config *nodeconfig.RouterNodeConfig) *requestfilter.RulesVerifier {
335+
rv := requestfilter.NewRulesVerifier(nil)
336+
rv.AddRule(requestfilter.PayloadNotEmptyRule{})
337+
rv.AddRule(requestfilter.NewMaxSizeFilter(config))
338+
return rv
339+
}
340+
329341
// IsAllStreamsOK checks that all the streams accross all shard-routers are non-faulty.
330342
// Use for testing only.
331343
func (r *Router) IsAllStreamsOK() bool {

node/router/router_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,54 @@ func TestClientRouterBroadcastRequestsAgainstMultipleBatchers(t *testing.T) {
380380
}, 60*time.Second, 10*time.Millisecond)
381381
}
382382

383+
// test request filters
384+
// 1) Start a client, router and stub batcher
385+
// 2) Send valid request, expect no error.
386+
// 3) Send request with empty payload, expect error.
387+
// 4) Send request that exceed the maximal size, expect error.
388+
// 5) ** Not implemented ** send request with bad signature, expect error.
389+
func TestRequestFilters(t *testing.T) {
390+
// 1) Start a client, router and stub batcher
391+
testSetup := createRouterTestSetup(t, types.PartyID(1), 1, true, false)
392+
err := createServerTLSClientConnection(testSetup, testSetup.ca)
393+
require.NoError(t, err)
394+
require.NotNil(t, testSetup.clientConn)
395+
defer testSetup.Close()
396+
conn := testSetup.clientConn
397+
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
398+
cl := protos.NewRequestTransmitClient(conn)
399+
defer cancel()
400+
401+
// 2) send a valid request.
402+
buff := make([]byte, 300)
403+
binary.BigEndian.PutUint32(buff, uint32(12345))
404+
req := &protos.Request{
405+
Payload: buff,
406+
}
407+
resp, err := cl.Submit(ctx, req)
408+
require.NoError(t, err)
409+
require.Equal(t, "", resp.Error)
410+
411+
// 3) send request with empty payload.
412+
req = &protos.Request{
413+
Payload: nil,
414+
}
415+
resp, err = cl.Submit(ctx, req)
416+
require.NoError(t, err)
417+
require.Equal(t, "request verification error: empty payload field", resp.Error)
418+
// 4) send request with payload too big. (3000 is more than 1 << 10, the maximal request size in bytes)
419+
buff = make([]byte, 3000)
420+
binary.BigEndian.PutUint32(buff, uint32(12345))
421+
req = &protos.Request{
422+
Payload: buff,
423+
}
424+
resp, err = cl.Submit(ctx, req)
425+
require.NoError(t, err)
426+
require.Equal(t, "request verification error: the request's size exceeds the maximum size: actual = 3000, limit = 1024", resp.Error)
427+
428+
// 5) send request with invalid signature. Not implemented
429+
}
430+
383431
func createServerTLSClientConnection(testSetup *routerTestSetup, ca tlsgen.CA) error {
384432
cc := comm.ClientConfig{
385433
SecOpts: comm.SecureOptions{
@@ -558,6 +606,7 @@ func createAndStartRouter(t *testing.T, partyID types.PartyID, ca tlsgen.CA, bat
558606
ListenAddress: "127.0.0.1:0",
559607
ClientAuthRequired: clientAuthRequired,
560608
Shards: shards,
609+
RequestMaxBytes: 1 << 10,
561610
}
562611

563612
r := router.NewRouter(conf, logger)

node/router/shard_router.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515

1616
"google.golang.org/grpc/connectivity"
1717

18+
"github.com/hyperledger/fabric-x-orderer/common/requestfilter"
1819
"github.com/hyperledger/fabric-x-orderer/common/types"
1920
"github.com/hyperledger/fabric-x-orderer/node/comm"
2021
protos "github.com/hyperledger/fabric-x-orderer/node/protos/comm"
@@ -69,6 +70,7 @@ type ShardRouter struct {
6970
closeReconnectOnce sync.Once
7071
reconnectRequests chan reconnectReq
7172
closeReconnect chan bool
73+
verifier *requestfilter.RulesVerifier
7274
}
7375

7476
func NewShardRouter(l types.Logger,
@@ -78,6 +80,7 @@ func NewShardRouter(l types.Logger,
7880
tlsKey []byte,
7981
numOfConnectionsForBatcher int,
8082
numOfgRPCStreamsPerConnection int,
83+
verifier *requestfilter.RulesVerifier,
8184
) *ShardRouter {
8285
cc := comm.ClientConfig{
8386
AsyncConnect: false,
@@ -106,6 +109,7 @@ func NewShardRouter(l types.Logger,
106109
clientConfig: cc,
107110
reconnectRequests: make(chan reconnectReq, 2*numOfgRPCStreamsPerConnection*numOfConnectionsForBatcher),
108111
closeReconnect: make(chan bool),
112+
verifier: verifier,
109113
}
110114

111115
return sr
@@ -332,6 +336,7 @@ func (sr *ShardRouter) initStream(i int, j int) error {
332336
streamNum: j,
333337
srReconnectChan: sr.reconnectRequests,
334338
notifiedReconnect: false,
339+
verifier: sr.verifier,
335340
}
336341
go s.sendRequests()
337342
go s.readResponses()

node/router/shard_router_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515

1616
"google.golang.org/grpc/grpclog"
1717

18+
"github.com/hyperledger/fabric-x-orderer/common/requestfilter"
1819
"github.com/hyperledger/fabric-x-orderer/common/types"
1920
"github.com/hyperledger/fabric-x-orderer/node/comm/tlsgen"
2021
"github.com/hyperledger/fabric-x-orderer/node/router"
@@ -157,11 +158,14 @@ func createTestSetup(t *testing.T, partyID types.PartyID) *TestSetup {
157158
ckp, err := ca.NewServerCertKeyPair("127.0.0.1")
158159
require.NoError(t, err)
159160

161+
verifier := requestfilter.NewRulesVerifier(nil)
162+
verifier.AddRule(requestfilter.AcceptRule{})
163+
160164
// create stub batcher
161165
batcher := NewStubBatcher(t, ca, partyID, types.ShardID(1))
162166

163167
// create shard router
164-
shardRouter := router.NewShardRouter(logger, batcher.GetBatcherEndpoint(), [][]byte{ca.CertBytes()}, ckp.Cert, ckp.Key, 10, 20)
168+
shardRouter := router.NewShardRouter(logger, batcher.GetBatcherEndpoint(), [][]byte{ca.CertBytes()}, ckp.Cert, ckp.Key, 10, 20, verifier)
165169

166170
// start the batcher
167171
batcher.Start()

node/router/stream.go

Lines changed: 40 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"maps"
1313
"sync"
1414

15+
"github.com/hyperledger/fabric-x-orderer/common/requestfilter"
1516
"github.com/hyperledger/fabric-x-orderer/common/types"
1617
protos "github.com/hyperledger/fabric-x-orderer/node/protos/comm"
1718
)
@@ -32,6 +33,7 @@ type stream struct {
3233
streamNum int
3334
srReconnectChan chan reconnectReq
3435
notifiedReconnect bool
36+
verifier *requestfilter.RulesVerifier
3537
}
3638

3739
// readResponses listens for responses from the batcher.
@@ -50,18 +52,9 @@ func (s *stream) readResponses() {
5052
s.cancelOnServerError()
5153
return
5254
}
53-
54-
s.lock.Lock()
55-
ch, exists := s.requestTraceIdToResponseChannel[string(resp.TraceId)]
56-
delete(s.requestTraceIdToResponseChannel, string(resp.TraceId))
57-
s.lock.Unlock()
58-
if exists {
59-
s.logger.Debugf("read response from batcher %s on request with trace id %x", s.endpoint, resp.TraceId)
60-
s.logger.Debugf("registration for request with trace id %x was removed upon receiving a response", resp.TraceId)
61-
ch <- Response{
62-
SubmitResponse: resp,
63-
}
64-
} else {
55+
s.logger.Debugf("read response from batcher %s on request with trace id %x", s.endpoint, resp.TraceId)
56+
err = s.sendResponseToClient(resp)
57+
if err != nil {
6558
s.logger.Debugf("received a response from batcher %s for a request with trace id %x, which does not exist in the map, dropping response", s.endpoint, resp.TraceId)
6659
}
6760
}
@@ -81,17 +74,45 @@ func (s *stream) sendRequests() {
8174
s.cancelOnServerError()
8275
return
8376
}
84-
s.logger.Debugf("send request with trace id %x to batcher %s", msg.TraceId, s.endpoint)
85-
err := s.requestTransmitSubmitStreamClient.Send(msg)
86-
if err != nil {
87-
s.logger.Errorf("Failed sending request to batcher %s", s.endpoint)
88-
s.cancelOnServerError()
89-
return
77+
// verify the request
78+
if err := s.verifier.Verify(msg); err != nil {
79+
s.logger.Debugf("request is invalid: %s", err)
80+
// send a response to the client
81+
resp := protos.SubmitResponse{Error: fmt.Sprintf("request verification error: %s", err), TraceId: msg.TraceId}
82+
err = s.sendResponseToClient(&resp)
83+
if err != nil {
84+
s.logger.Debugf("error sending response to client: %s", err)
85+
}
86+
} else {
87+
s.logger.Debugf("send request with trace id %x to batcher %s", msg.TraceId, s.endpoint)
88+
err := s.requestTransmitSubmitStreamClient.Send(msg)
89+
if err != nil {
90+
s.logger.Errorf("Failed sending request to batcher %s", s.endpoint)
91+
s.cancelOnServerError()
92+
return
93+
}
9094
}
9195
}
9296
}
9397
}
9498

99+
func (s *stream) sendResponseToClient(response *protos.SubmitResponse) error {
100+
traceID := response.TraceId
101+
s.lock.Lock()
102+
ch, exists := s.requestTraceIdToResponseChannel[string(traceID)]
103+
delete(s.requestTraceIdToResponseChannel, string(traceID))
104+
s.lock.Unlock()
105+
if exists {
106+
s.logger.Debugf("registration for request with trace id %x was removed upon receiving a response", traceID)
107+
ch <- Response{
108+
SubmitResponse: response,
109+
}
110+
return nil
111+
} else {
112+
return fmt.Errorf("request with traceID %x is not in map", traceID)
113+
}
114+
}
115+
95116
func (s *stream) cancelOnServerError() {
96117
s.cancel()
97118
s.sendResponseToAllClientsOnError(fmt.Errorf("server error: could not establish connection between router and batcher %s", s.endpoint))
@@ -201,6 +222,7 @@ CopyChannelLoop:
201222
streamNum: s.streamNum,
202223
srReconnectChan: s.srReconnectChan,
203224
notifiedReconnect: false,
225+
verifier: s.verifier,
204226
}
205227
s.lock.Unlock()
206228

0 commit comments

Comments
 (0)