Skip to content

Commit 494cb8e

Browse files
committed
Node: Multithreaded processor
1 parent abd0b33 commit 494cb8e

File tree

9 files changed

+349
-240
lines changed

9 files changed

+349
-240
lines changed

node/cmd/guardiand/node.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,8 @@ var (
240240
chainGovernorEnabled *bool
241241
governorFlowCancelEnabled *bool
242242

243+
processorWorkerFactor *float64
244+
243245
ccqEnabled *bool
244246
ccqAllowedRequesters *string
245247
ccqP2pPort *uint
@@ -450,6 +452,7 @@ func init() {
450452

451453
chainGovernorEnabled = NodeCmd.Flags().Bool("chainGovernorEnabled", false, "Run the chain governor")
452454
governorFlowCancelEnabled = NodeCmd.Flags().Bool("governorFlowCancelEnabled", false, "Enable flow cancel on the governor")
455+
processorWorkerFactor = NodeCmd.Flags().Float64("processorWorkerFactor", 0.0, "Multiplied by the number of available CPUs on the system to determine the number of workers that the processor uses. 0.0 means single worker")
453456

454457
ccqEnabled = NodeCmd.Flags().Bool("ccqEnabled", false, "Enable cross chain query support")
455458
ccqAllowedRequesters = NodeCmd.Flags().String("ccqAllowedRequesters", "", "Comma separated list of signers allowed to submit cross chain queries")
@@ -1639,7 +1642,7 @@ func runNode(cmd *cobra.Command, args []string) {
16391642
node.GuardianOptionAdminService(*adminSocketPath, ethRPC, ethContract, rpcMap),
16401643
node.GuardianOptionP2P(p2pKey, *p2pNetworkID, *p2pBootstrap, *nodeName, *subscribeToVAAs, *disableHeartbeatVerify, *p2pPort, *ccqP2pBootstrap, *ccqP2pPort, *ccqAllowedPeers, *gossipAdvertiseAddress, ibc.GetFeatures),
16411644
node.GuardianOptionStatusServer(*statusAddr),
1642-
node.GuardianOptionProcessor(*p2pNetworkID),
1645+
node.GuardianOptionProcessor(*p2pNetworkID, *processorWorkerFactor),
16431646
}
16441647

16451648
if shouldStart(publicGRPCSocketPath) {

node/pkg/node/node_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"net/http"
1515
"os"
1616
"regexp"
17+
"runtime"
1718
"strconv"
1819
"strings"
1920
"testing"
@@ -196,7 +197,7 @@ func mockGuardianRunnable(t testing.TB, gs []*mockGuardian, mockGuardianIndex ui
196197
GuardianOptionPublicWeb(cfg.publicWeb, cfg.publicSocket, "", false, ""),
197198
GuardianOptionAdminService(cfg.adminSocket, nil, nil, rpcMap),
198199
GuardianOptionStatusServer(fmt.Sprintf("[::]:%d", cfg.statusPort)),
199-
GuardianOptionProcessor(networkID),
200+
GuardianOptionProcessor(networkID, 3.0/float64(runtime.NumCPU())), // Create three workers.
200201
}
201202

202203
guardianNode := NewGuardianNode(

node/pkg/node/options.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -575,7 +575,7 @@ func GuardianOptionDatabase(db *db.Database) *GuardianOption {
575575

576576
// GuardianOptionProcessor enables the default processor, which is required to make consensus on messages.
577577
// Dependencies: db, governor, accountant
578-
func GuardianOptionProcessor(networkId string) *GuardianOption {
578+
func GuardianOptionProcessor(networkId string, workerFactor float64) *GuardianOption {
579579
return &GuardianOption{
580580
name: "processor",
581581
// governor and accountant may be set to nil, but that choice needs to be made before the processor is configured
@@ -600,6 +600,7 @@ func GuardianOptionProcessor(networkId string) *GuardianOption {
600600
g.acctC.readC,
601601
g.gatewayRelayer,
602602
networkId,
603+
workerFactor,
603604
).Run
604605

605606
return nil

node/pkg/processor/benchmark_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,11 +174,10 @@ func createProcessorForTest(b *testing.B, numVAAs int, ctx context.Context, db *
174174
gossipAttestationSendC: pd.gossipAttestationSendC,
175175
gossipVaaSendC: pd.gossipVaaSendC,
176176
guardianSigner: ourSigner,
177-
gs: gs,
178177
gst: gst,
179178
db: db,
180179
logger: logger,
181-
state: &aggregationState{observationMap{}},
180+
state: &aggregationState{signatures: observationMap{}},
182181
ourAddr: crypto.PubkeyToAddress(ourSigner.PublicKey()),
183182
pythnetVaas: make(map[string]PythNetVaaEntry),
184183
updatedVAAs: make(map[string]*updateVaaEntry),

0 commit comments

Comments
 (0)