Skip to content

Commit 4f7d752

Browse files
committed
Fix merge issues
1 parent f4dce70 commit 4f7d752

File tree

8 files changed

+153
-225
lines changed

8 files changed

+153
-225
lines changed

node/cmd/guardiand/node.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,8 @@ var (
240240
chainGovernorEnabled *bool
241241
governorFlowCancelEnabled *bool
242242

243+
processorWorkerFactor *float64
244+
243245
ccqEnabled *bool
244246
ccqAllowedRequesters *string
245247
ccqP2pPort *uint
@@ -450,6 +452,7 @@ func init() {
450452

451453
chainGovernorEnabled = NodeCmd.Flags().Bool("chainGovernorEnabled", false, "Run the chain governor")
452454
governorFlowCancelEnabled = NodeCmd.Flags().Bool("governorFlowCancelEnabled", false, "Enable flow cancel on the governor")
455+
processorWorkerFactor = NodeCmd.Flags().Float64("processorWorkerFactor", 0.0, "Multiplied by the number of available CPUs on the system to determine the number of workers that the processor uses. 0.0 means single worker")
453456

454457
ccqEnabled = NodeCmd.Flags().Bool("ccqEnabled", false, "Enable cross chain query support")
455458
ccqAllowedRequesters = NodeCmd.Flags().String("ccqAllowedRequesters", "", "Comma separated list of signers allowed to submit cross chain queries")
@@ -1639,7 +1642,7 @@ func runNode(cmd *cobra.Command, args []string) {
16391642
node.GuardianOptionAdminService(*adminSocketPath, ethRPC, ethContract, rpcMap),
16401643
node.GuardianOptionP2P(p2pKey, *p2pNetworkID, *p2pBootstrap, *nodeName, *subscribeToVAAs, *disableHeartbeatVerify, *p2pPort, *ccqP2pBootstrap, *ccqP2pPort, *ccqAllowedPeers, *gossipAdvertiseAddress, ibc.GetFeatures),
16411644
node.GuardianOptionStatusServer(*statusAddr),
1642-
node.GuardianOptionProcessor(*p2pNetworkID),
1645+
node.GuardianOptionProcessor(*p2pNetworkID, *processorWorkerFactor),
16431646
}
16441647

16451648
if shouldStart(publicGRPCSocketPath) {

node/pkg/node/node_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"net/http"
1515
"os"
1616
"regexp"
17+
"runtime"
1718
"strconv"
1819
"strings"
1920
"testing"
@@ -196,7 +197,7 @@ func mockGuardianRunnable(t testing.TB, gs []*mockGuardian, mockGuardianIndex ui
196197
GuardianOptionPublicWeb(cfg.publicWeb, cfg.publicSocket, "", false, ""),
197198
GuardianOptionAdminService(cfg.adminSocket, nil, nil, rpcMap),
198199
GuardianOptionStatusServer(fmt.Sprintf("[::]:%d", cfg.statusPort)),
199-
GuardianOptionProcessor(networkID),
200+
GuardianOptionProcessor(networkID, 3.0/float64(runtime.NumCPU())), // Create three workers.
200201
}
201202

202203
guardianNode := NewGuardianNode(

node/pkg/node/options.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -575,7 +575,7 @@ func GuardianOptionDatabase(db *db.Database) *GuardianOption {
575575

576576
// GuardianOptionProcessor enables the default processor, which is required to make consensus on messages.
577577
// Dependencies: db, governor, accountant
578-
func GuardianOptionProcessor(networkId string) *GuardianOption {
578+
func GuardianOptionProcessor(networkId string, workerFactor float64) *GuardianOption {
579579
return &GuardianOption{
580580
name: "processor",
581581
// governor and accountant may be set to nil, but that choice needs to be made before the processor is configured
@@ -600,6 +600,7 @@ func GuardianOptionProcessor(networkId string) *GuardianOption {
600600
g.acctC.readC,
601601
g.gatewayRelayer,
602602
networkId,
603+
workerFactor,
603604
).Run
604605

605606
return nil

node/pkg/processor/benchmark_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,11 +174,10 @@ func createProcessorForTest(b *testing.B, numVAAs int, ctx context.Context, db *
174174
gossipAttestationSendC: pd.gossipAttestationSendC,
175175
gossipVaaSendC: pd.gossipVaaSendC,
176176
guardianSigner: ourSigner,
177-
gs: gs,
178177
gst: gst,
179178
db: db,
180179
logger: logger,
181-
state: &aggregationState{observationMap{}},
180+
state: &aggregationState{signatures: observationMap{}},
182181
ourAddr: crypto.PubkeyToAddress(ourSigner.PublicKey()),
183182
pythnetVaas: make(map[string]PythNetVaaEntry),
184183
updatedVAAs: make(map[string]*updateVaaEntry),

node/pkg/processor/cleanup.go

Lines changed: 39 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
package processor
33

44
import (
5-
"context"
65
"encoding/hex"
76
"fmt"
87
"time"
@@ -69,7 +68,7 @@ var (
6968
)
7069

7170
// handleCleanup handles periodic retransmissions and cleanup of observations
72-
func (p *Processor) handleCleanup(ctx context.Context) {
71+
func (p *Processor) handleCleanup() {
7372
p.cleanupState()
7473
p.cleanupPythnetVaas()
7574
}
@@ -129,37 +128,36 @@ func (p *Processor) cleanUpStateEntry(hash string, s *state) bool {
129128
gs := s.gs
130129
if gs == nil {
131130
gs = p.gst.Get()
132-
if gs == nil {
133-
return false
134-
}
135131
}
136132

137-
hasSigs := len(s.signatures)
138-
wantSigs := vaa.CalculateQuorum(len(gs.Keys))
139-
quorum := hasSigs >= wantSigs
133+
if gs != nil {
134+
if p.logger.Level().Enabled(zapcore.DebugLevel) {
135+
hasSigs := len(s.signatures)
136+
wantSigs := vaa.CalculateQuorum(len(gs.Keys))
137+
quorum := hasSigs >= wantSigs
140138

141-
var chain vaa.ChainID
142-
if s.ourObservation != nil {
143-
chain = s.ourObservation.GetEmitterChain()
144-
}
139+
var chain vaa.ChainID
140+
if s.ourObservation != nil {
141+
chain = s.ourObservation.GetEmitterChain()
142+
}
145143

146-
if p.logger.Level().Enabled(zapcore.DebugLevel) {
147-
p.logger.Debug("observation considered settled",
148-
zap.String("message_id", s.LoggingID()),
149-
zap.String("digest", hash),
150-
zap.Duration("delta", delta),
151-
zap.Int("have_sigs", hasSigs),
152-
zap.Int("required_sigs", wantSigs),
153-
zap.Bool("quorum", quorum),
154-
zap.Stringer("emitter_chain", chain),
155-
)
156-
}
144+
p.logger.Debug("observation considered settled",
145+
zap.String("message_id", s.LoggingID()),
146+
zap.String("digest", hash),
147+
zap.Duration("delta", delta),
148+
zap.Int("have_sigs", hasSigs),
149+
zap.Int("required_sigs", wantSigs),
150+
zap.Bool("quorum", quorum),
151+
zap.Stringer("emitter_chain", chain),
152+
)
153+
}
157154

158-
for _, k := range gs.Keys {
159-
if _, ok := s.signatures[k]; ok {
160-
aggregationStateFulfillment.WithLabelValues(k.Hex(), s.source, "present").Inc()
161-
} else {
162-
aggregationStateFulfillment.WithLabelValues(k.Hex(), s.source, "missing").Inc()
155+
for _, k := range gs.Keys {
156+
if _, ok := s.signatures[k]; ok {
157+
aggregationStateFulfillment.WithLabelValues(k.Hex(), s.source, "present").Inc()
158+
} else {
159+
aggregationStateFulfillment.WithLabelValues(k.Hex(), s.source, "missing").Inc()
160+
}
163161
}
164162
}
165163
case s.submitted && delta.Hours() >= 1:
@@ -247,7 +245,12 @@ func (p *Processor) cleanUpStateEntry(hash string, s *state) bool {
247245
if err := common.PostObservationRequest(p.obsvReqSendC, req); err != nil {
248246
p.logger.Warn("failed to broadcast re-observation request", zap.String("message_id", s.LoggingID()), zap.Error(err))
249247
}
250-
p.gossipSendC <- s.ourMsg
248+
if s.ourMsg != nil {
249+
// This is the case for immediately published messages (as well as anything still pending from before the cutover).
250+
p.gossipAttestationSendC <- s.ourMsg
251+
} else {
252+
p.postObservationToBatch(s.ourObs)
253+
}
251254
s.retryCtr++
252255
s.nextRetry = time.Now().Add(nextRetryDuration(s.retryCtr))
253256
aggregationStateRetries.Inc()
@@ -256,23 +259,19 @@ func (p *Processor) cleanUpStateEntry(hash string, s *state) bool {
256259
// For nil state entries, we log the quorum to determine whether the
257260
// network reached consensus without us. We don't know the correct guardian
258261
// set, so we simply use the most recent one.
259-
hasSigs := len(s.signatures)
260-
gs := p.gst.Get()
261-
if gs != nil {
262-
wantSigs := vaa.CalculateQuorum(len(gs.Keys))
263-
264-
if p.logger.Level().Enabled(zapcore.DebugLevel) {
262+
if p.logger.Level().Enabled(zapcore.DebugLevel) {
263+
hasSigs := len(s.signatures)
264+
gs := p.gst.Get()
265+
if gs != nil {
265266
p.logger.Debug("expiring unsubmitted nil observation",
266267
zap.String("message_id", s.LoggingID()),
267268
zap.String("digest", hash),
268269
zap.Duration("delta", delta),
269270
zap.Int("have_sigs", hasSigs),
270-
zap.Int("required_sigs", p.gs.Quorum()),
271-
zap.Bool("quorum", hasSigs >= p.gs.Quorum()),
271+
zap.Int("required_sigs", gs.Quorum()),
272+
zap.Bool("quorum", hasSigs >= gs.Quorum()),
272273
)
273-
}
274-
} else {
275-
if p.logger.Level().Enabled(zapcore.DebugLevel) {
274+
} else {
276275
p.logger.Debug("expiring unsubmitted nil observation, gs is nil",
277276
zap.String("message_id", s.LoggingID()),
278277
zap.String("digest", hash),

node/pkg/processor/message.go

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"github.com/prometheus/client_golang/prometheus"
1010
"github.com/prometheus/client_golang/prometheus/promauto"
1111

12-
ethCommon "github.com/ethereum/go-ethereum/common"
1312
"go.uber.org/zap"
1413
"go.uber.org/zap/zapcore"
1514

@@ -100,31 +99,27 @@ func (p *Processor) handleMessage(k *common.MessagePublication) {
10099
observationsReceivedByGuardianAddressTotal.WithLabelValues(p.ourAddr.Hex()).Inc()
101100

102101
// Get / create our state entry.
103-
s := p.state.signatures[hash]
104-
if s == nil {
105-
s = &state{
106-
firstObserved: time.Now(),
107-
nextRetry: time.Now().Add(nextRetryDuration(0)),
108-
signatures: map[ethCommon.Address][]byte{},
109-
source: "loopback",
110-
}
111-
112-
p.state.signatures[hash] = s
102+
s, created := p.state.getOrCreateState(hash)
103+
s.lock.Lock()
104+
defer s.lock.Unlock()
105+
106+
if created {
107+
s.source = "loopback"
113108
}
114109

115110
// Update our state.
116111
s.ourObservation = v
117112
s.txHash = k.TxHash.Bytes()
118113
s.source = v.GetEmitterChain().String()
119-
s.gs = p.gs // guaranteed to match ourObservation - there's no concurrent access to p.gs
114+
s.gs = p.gst.Get() // guaranteed to match ourObservation - there's no concurrent access to p.gs
120115
s.signatures[p.ourAddr] = signature
121116
s.ourObs = ourObs
122117
s.ourMsg = msg
123118

124119
// Fast path for our own signature.
125120
if !s.submitted {
126121
start := time.Now()
127-
p.checkForQuorum(ourObs, s, s.gs, hash)
122+
p.checkForQuorumAlreadyLocked(ourObs, s, hash)
128123
timeToHandleObservation.Observe(float64(time.Since(start).Microseconds()))
129124
}
130125
}

0 commit comments

Comments
 (0)