Skip to content

Commit 905d634

Browse files
committed
Node: Multithreaded processor
1 parent 6294969 commit 905d634

File tree

6 files changed

+381
-252
lines changed

6 files changed

+381
-252
lines changed

node/pkg/processor/broadcast.go

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,10 @@ package processor
22

33
import (
44
"encoding/hex"
5-
"time"
65

76
"github.com/prometheus/client_golang/prometheus"
87
"github.com/prometheus/client_golang/prometheus/promauto"
98

10-
ethcommon "github.com/ethereum/go-ethereum/common"
119
"github.com/ethereum/go-ethereum/crypto"
1210
"google.golang.org/protobuf/proto"
1311

@@ -50,20 +48,19 @@ func (p *Processor) broadcastSignature(
5048
// Store our VAA in case we're going to submit it to Solana
5149
hash := hex.EncodeToString(digest.Bytes())
5250

53-
if p.state.signatures[hash] == nil {
54-
p.state.signatures[hash] = &state{
55-
firstObserved: time.Now(),
56-
nextRetry: time.Now().Add(nextRetryDuration(0)),
57-
signatures: map[ethcommon.Address][]byte{},
58-
source: "loopback",
59-
}
51+
obsState, created := p.state.getOrCreateState(hash)
52+
obsState.lock.Lock()
53+
defer obsState.lock.Unlock()
54+
55+
if created {
56+
obsState.source = "loopback"
6057
}
6158

62-
p.state.signatures[hash].ourObservation = o
63-
p.state.signatures[hash].ourMsg = msg
64-
p.state.signatures[hash].txHash = txhash
65-
p.state.signatures[hash].source = o.GetEmitterChain().String()
66-
p.state.signatures[hash].gs = p.gs // guaranteed to match ourObservation - there's no concurrent access to p.gs
59+
obsState.ourObservation = o
60+
obsState.ourMsg = msg
61+
obsState.txHash = txhash
62+
obsState.source = o.GetEmitterChain().String()
63+
obsState.gs = p.gst.Get()
6764

6865
// Fast path for our own signature
6966
// send to obsvC directly if there is capacity, otherwise do it in a go routine.

0 commit comments

Comments
 (0)