Skip to content

Commit f4dce70

Browse files
committed
Node: Multithreaded processor
1 parent abd0b33 commit f4dce70

File tree

5 files changed

+391
-210
lines changed

5 files changed

+391
-210
lines changed

node/pkg/processor/cleanup.go

Lines changed: 177 additions & 145 deletions
Original file line numberDiff line numberDiff line change
@@ -70,179 +70,196 @@ var (
7070

7171
// handleCleanup handles periodic retransmissions and cleanup of observations
7272
func (p *Processor) handleCleanup(ctx context.Context) {
73+
p.cleanupState()
74+
p.cleanupPythnetVaas()
75+
}
76+
77+
// cleanupState walks through the aggregation state map and cleans up entries that are no longer needed. It grabs the state lock.
78+
func (p *Processor) cleanupState() {
79+
p.state.signaturesLock.Lock()
80+
defer p.state.signaturesLock.Unlock()
81+
7382
p.logger.Info("aggregation state summary", zap.Int("cached", len(p.state.signatures)))
7483
aggregationStateEntries.Set(float64(len(p.state.signatures)))
7584

7685
for hash, s := range p.state.signatures {
77-
delta := time.Since(s.firstObserved)
78-
79-
if !s.submitted && s.ourObservation != nil && delta > settlementTime {
80-
// Expire pending VAAs post settlement time if we have a stored quorum VAA.
81-
//
82-
// This occurs when we observed a message after the cluster has already reached
83-
// consensus on it, causing us to never achieve quorum.
84-
if ourVaa, ok := s.ourObservation.(*VAA); ok {
85-
if p.haveSignedVAA(*db.VaaIDFromVAA(&ourVaa.VAA)) {
86-
// If we have a stored quorum VAA, we can safely expire the state.
87-
//
88-
// This is a rare case, and we can safely expire the state, since we
89-
// have a quorum VAA.
90-
p.logger.Info("Expiring late VAA",
91-
zap.String("message_id", ourVaa.VAA.MessageID()),
92-
zap.String("digest", hash),
93-
zap.Duration("delta", delta),
94-
)
95-
aggregationStateLate.Inc()
96-
delete(p.state.signatures, hash)
97-
continue
98-
}
99-
}
86+
if shouldDelete := p.cleanUpStateEntry(hash, s); shouldDelete {
87+
delete(p.state.signatures, hash) // Can't use p.state.delete() because we're holding the lock.
10088
}
89+
}
90+
}
10191

102-
switch {
103-
case !s.settled && delta > settlementTime:
104-
// After 30 seconds, the observation is considered settled - it's unlikely that more observations will
105-
// arrive, barring special circumstances. This is a better time to count misses than submission,
106-
// because we submit right when we quorum rather than waiting for all observations to arrive.
107-
s.settled = true
108-
109-
// Use either the most recent (in case of a observation we haven't seen) or stored gs, if available.
110-
var gs *common.GuardianSet
111-
if s.gs != nil {
112-
gs = s.gs
113-
} else {
114-
gs = p.gs
115-
}
116-
117-
hasSigs := len(s.signatures)
118-
quorum := hasSigs >= gs.Quorum()
92+
// cleanUpStateEntry cleans up a single aggregation state entry. It grabs the lock for that entry. Returns true if the entry should be deleted.
93+
func (p *Processor) cleanUpStateEntry(hash string, s *state) bool {
94+
s.lock.Lock()
95+
defer s.lock.Unlock()
11996

120-
var chain vaa.ChainID
121-
if s.ourObservation != nil {
122-
chain = s.ourObservation.GetEmitterChain()
123-
}
97+
delta := time.Since(s.firstObserved)
12498

125-
if p.logger.Level().Enabled(zapcore.DebugLevel) {
126-
p.logger.Debug("observation considered settled",
127-
zap.String("message_id", s.LoggingID()),
99+
if !s.submitted && s.ourObservation != nil && delta > settlementTime {
100+
// Expire pending VAAs post settlement time if we have a stored quorum VAA.
101+
//
102+
// This occurs when we observed a message after the cluster has already reached
103+
// consensus on it, causing us to never achieve quorum.
104+
if ourVaa, ok := s.ourObservation.(*VAA); ok {
105+
if p.haveSignedVAA(*db.VaaIDFromVAA(&ourVaa.VAA)) {
106+
// If we have a stored quorum VAA, we can safely expire the state.
107+
//
108+
// This is a rare case, and we can safely expire the state, since we
109+
// have a quorum VAA.
110+
p.logger.Info("Expiring late VAA",
111+
zap.String("message_id", ourVaa.VAA.MessageID()),
128112
zap.String("digest", hash),
129113
zap.Duration("delta", delta),
130-
zap.Int("have_sigs", hasSigs),
131-
zap.Int("required_sigs", gs.Quorum()),
132-
zap.Bool("quorum", quorum),
133-
zap.Stringer("emitter_chain", chain),
134114
)
115+
aggregationStateLate.Inc()
116+
return true
135117
}
118+
}
119+
}
136120

137-
for _, k := range gs.Keys {
138-
if _, ok := s.signatures[k]; ok {
139-
aggregationStateFulfillment.WithLabelValues(k.Hex(), s.source, "present").Inc()
140-
} else {
141-
aggregationStateFulfillment.WithLabelValues(k.Hex(), s.source, "missing").Inc()
142-
}
121+
switch {
122+
case !s.settled && delta > settlementTime:
123+
// After 30 seconds, the observation is considered settled - it's unlikely that more observations will
124+
// arrive, barring special circumstances. This is a better time to count misses than submission,
125+
// because we submit right when we quorum rather than waiting for all observations to arrive.
126+
s.settled = true
127+
128+
// Peg the appropriate settlement metric using the current guardian set. If we don't have a guardian set (extremely unlikely), we just won't peg the metric.
129+
gs := s.gs
130+
if gs == nil {
131+
gs = p.gst.Get()
132+
if gs == nil {
133+
return false
143134
}
144-
case s.submitted && delta.Hours() >= 1:
145-
// We could delete submitted observations right away, but then we'd lose context about additional (late)
146-
// observation that come in. Therefore, keep it for a reasonable amount of time.
147-
// If a very late observation arrives after cleanup, a nil aggregation state will be created
148-
// and then expired after a while (as noted in observation.go, this can be abused by a byzantine guardian).
149-
if p.logger.Level().Enabled(zapcore.DebugLevel) {
150-
p.logger.Debug("expiring submitted observation",
151-
zap.String("message_id", s.LoggingID()),
152-
zap.String("digest", hash),
153-
zap.Duration("delta", delta),
154-
)
135+
}
136+
137+
hasSigs := len(s.signatures)
138+
wantSigs := vaa.CalculateQuorum(len(gs.Keys))
139+
quorum := hasSigs >= wantSigs
140+
141+
var chain vaa.ChainID
142+
if s.ourObservation != nil {
143+
chain = s.ourObservation.GetEmitterChain()
144+
}
145+
146+
if p.logger.Level().Enabled(zapcore.DebugLevel) {
147+
p.logger.Debug("observation considered settled",
148+
zap.String("message_id", s.LoggingID()),
149+
zap.String("digest", hash),
150+
zap.Duration("delta", delta),
151+
zap.Int("have_sigs", hasSigs),
152+
zap.Int("required_sigs", wantSigs),
153+
zap.Bool("quorum", quorum),
154+
zap.Stringer("emitter_chain", chain),
155+
)
156+
}
157+
158+
for _, k := range gs.Keys {
159+
if _, ok := s.signatures[k]; ok {
160+
aggregationStateFulfillment.WithLabelValues(k.Hex(), s.source, "present").Inc()
161+
} else {
162+
aggregationStateFulfillment.WithLabelValues(k.Hex(), s.source, "missing").Inc()
155163
}
156-
delete(p.state.signatures, hash)
157-
aggregationStateExpiration.Inc()
158-
case !s.submitted && ((s.ourObs != nil && delta > retryLimitOurs) || (s.ourObs == nil && delta > retryLimitNotOurs)):
159-
// Clearly, this horse is dead and continued beatings won't bring it closer to quorum.
160-
p.logger.Info("expiring unsubmitted observation after exhausting retries",
164+
}
165+
case s.submitted && delta.Hours() >= 1:
166+
// We could delete submitted observations right away, but then we'd lose context about additional (late)
167+
// observation that come in. Therefore, keep it for a reasonable amount of time.
168+
// If a very late observation arrives after cleanup, a nil aggregation state will be created
169+
// and then expired after a while (as noted in observation.go, this can be abused by a byzantine guardian).
170+
if p.logger.Level().Enabled(zapcore.DebugLevel) {
171+
p.logger.Debug("expiring submitted observation",
161172
zap.String("message_id", s.LoggingID()),
162173
zap.String("digest", hash),
163174
zap.Duration("delta", delta),
164-
zap.Bool("weObserved", s.ourObs != nil),
165175
)
166-
delete(p.state.signatures, hash)
167-
aggregationStateTimeout.Inc()
168-
case !s.submitted && delta >= FirstRetryMinWait && time.Since(s.nextRetry) >= 0:
169-
// Poor observation has been unsubmitted for five minutes - clearly, something went wrong.
170-
// If we have previously submitted an observation, and it was reliable, we can make another attempt to get
171-
// it over the finish line by sending a re-observation request to the network and rebroadcasting our
172-
// sig. If we do not have an observation, it means we either never observed it, or it got
173-
// revived by a malfunctioning guardian node, in which case, we can't do anything about it
174-
// and just delete it to keep our state nice and lean.
175-
if s.ourObs != nil {
176-
// Unreliable observations cannot be resubmitted and can be considered failed after 5 minutes
177-
if !s.ourObservation.IsReliable() {
178-
p.logger.Info("expiring unsubmitted unreliable observation",
176+
}
177+
aggregationStateExpiration.Inc()
178+
return true
179+
case !s.submitted && ((s.ourMsg != nil && delta > retryLimitOurs) || (s.ourMsg == nil && delta > retryLimitNotOurs)):
180+
// Clearly, this horse is dead and continued beatings won't bring it closer to quorum.
181+
p.logger.Info("expiring unsubmitted observation after exhausting retries",
182+
zap.String("message_id", s.LoggingID()),
183+
zap.String("digest", hash),
184+
zap.Duration("delta", delta),
185+
zap.Bool("weObserved", s.ourMsg != nil),
186+
)
187+
aggregationStateTimeout.Inc()
188+
return true
189+
case !s.submitted && delta >= FirstRetryMinWait && time.Since(s.nextRetry) >= 0:
190+
// Poor observation has been unsubmitted for five minutes - clearly, something went wrong.
191+
// If we have previously submitted an observation, and it was reliable, we can make another attempt to get
192+
// it over the finish line by sending a re-observation request to the network and rebroadcasting our
193+
// sig. If we do not have an observation, it means we either never observed it, or it got
194+
// revived by a malfunctioning guardian node, in which case, we can't do anything about it
195+
// and just delete it to keep our state nice and lean.
196+
if s.ourMsg != nil {
197+
// Unreliable observations cannot be resubmitted and can be considered failed after 5 minutes
198+
if !s.ourObservation.IsReliable() {
199+
p.logger.Info("expiring unsubmitted unreliable observation",
200+
zap.String("message_id", s.LoggingID()),
201+
zap.String("digest", hash),
202+
zap.Duration("delta", delta),
203+
)
204+
aggregationStateTimeout.Inc()
205+
return true
206+
}
207+
208+
// Reobservation requests should not be resubmitted but we will keep waiting for more observations.
209+
if s.ourObservation.IsReobservation() {
210+
if p.logger.Level().Enabled(zapcore.DebugLevel) {
211+
p.logger.Debug("not submitting reobservation request for reobservation",
179212
zap.String("message_id", s.LoggingID()),
180213
zap.String("digest", hash),
181214
zap.Duration("delta", delta),
182215
)
183-
delete(p.state.signatures, hash)
184-
aggregationStateTimeout.Inc()
185-
break
186-
}
187-
188-
// Reobservation requests should not be resubmitted but we will keep waiting for more observations.
189-
if s.ourObservation.IsReobservation() {
190-
if p.logger.Level().Enabled(zapcore.DebugLevel) {
191-
p.logger.Debug("not submitting reobservation request for reobservation",
192-
zap.String("message_id", s.LoggingID()),
193-
zap.String("digest", hash),
194-
zap.Duration("delta", delta),
195-
)
196-
}
197-
break
198216
}
217+
return false
218+
}
199219

200-
// If we have already stored this VAA, there is no reason for us to request reobservation.
201-
alreadyInDB, err := p.signedVaaAlreadyInDB(hash, s)
202-
if err != nil {
203-
p.logger.Error("failed to check if observation is already in DB, requesting reobservation",
204-
zap.String("message_id", s.LoggingID()),
205-
zap.String("hash", hash),
206-
zap.Error(err))
207-
}
220+
// If we have already stored this VAA, there is no reason for us to request reobservation.
221+
alreadyInDB, err := p.signedVaaAlreadyInDB(hash, s)
222+
if err != nil {
223+
p.logger.Error("failed to check if observation is already in DB, requesting reobservation",
224+
zap.String("message_id", s.LoggingID()),
225+
zap.String("hash", hash),
226+
zap.Error(err))
227+
}
208228

209-
if alreadyInDB {
210-
if p.logger.Level().Enabled(zapcore.DebugLevel) {
211-
p.logger.Debug("observation already in DB, not requesting reobservation",
212-
zap.String("message_id", s.LoggingID()),
213-
zap.String("digest", hash),
214-
)
215-
}
216-
} else {
217-
p.logger.Info("resubmitting observation",
229+
if alreadyInDB {
230+
if p.logger.Level().Enabled(zapcore.DebugLevel) {
231+
p.logger.Debug("observation already in DB, not requesting reobservation",
218232
zap.String("message_id", s.LoggingID()),
219233
zap.String("digest", hash),
220-
zap.Duration("delta", delta),
221-
zap.String("firstObserved", s.firstObserved.String()),
222-
zap.Int("numSignatures", len(s.signatures)),
223234
)
224-
req := &gossipv1.ObservationRequest{
225-
ChainId: uint32(s.ourObservation.GetEmitterChain()),
226-
TxHash: s.txHash,
227-
}
228-
if err := common.PostObservationRequest(p.obsvReqSendC, req); err != nil {
229-
p.logger.Warn("failed to broadcast re-observation request", zap.String("message_id", s.LoggingID()), zap.Error(err))
230-
}
231-
if s.ourMsg != nil {
232-
// This is the case for immediately published messages (as well as anything still pending from before the cutover).
233-
p.gossipAttestationSendC <- s.ourMsg
234-
} else {
235-
p.postObservationToBatch(s.ourObs)
236-
}
237-
s.retryCtr++
238-
s.nextRetry = time.Now().Add(nextRetryDuration(s.retryCtr))
239-
aggregationStateRetries.Inc()
240235
}
241236
} else {
242-
// For nil state entries, we log the quorum to determine whether the
243-
// network reached consensus without us. We don't know the correct guardian
244-
// set, so we simply use the most recent one.
245-
hasSigs := len(s.signatures)
237+
p.logger.Info("resubmitting observation",
238+
zap.String("message_id", s.LoggingID()),
239+
zap.String("digest", hash),
240+
zap.Duration("delta", delta),
241+
zap.String("firstObserved", s.firstObserved.String()),
242+
)
243+
req := &gossipv1.ObservationRequest{
244+
ChainId: uint32(s.ourObservation.GetEmitterChain()),
245+
TxHash: s.txHash,
246+
}
247+
if err := common.PostObservationRequest(p.obsvReqSendC, req); err != nil {
248+
p.logger.Warn("failed to broadcast re-observation request", zap.String("message_id", s.LoggingID()), zap.Error(err))
249+
}
250+
p.gossipSendC <- s.ourMsg
251+
s.retryCtr++
252+
s.nextRetry = time.Now().Add(nextRetryDuration(s.retryCtr))
253+
aggregationStateRetries.Inc()
254+
}
255+
} else {
256+
// For nil state entries, we log the quorum to determine whether the
257+
// network reached consensus without us. We don't know the correct guardian
258+
// set, so we simply use the most recent one.
259+
hasSigs := len(s.signatures)
260+
gs := p.gst.Get()
261+
if gs != nil {
262+
wantSigs := vaa.CalculateQuorum(len(gs.Keys))
246263

247264
if p.logger.Level().Enabled(zapcore.DebugLevel) {
248265
p.logger.Debug("expiring unsubmitted nil observation",
@@ -254,13 +271,28 @@ func (p *Processor) handleCleanup(ctx context.Context) {
254271
zap.Bool("quorum", hasSigs >= p.gs.Quorum()),
255272
)
256273
}
257-
delete(p.state.signatures, hash)
258-
aggregationStateUnobserved.Inc()
274+
} else {
275+
if p.logger.Level().Enabled(zapcore.DebugLevel) {
276+
p.logger.Debug("expiring unsubmitted nil observation, gs is nil",
277+
zap.String("message_id", s.LoggingID()),
278+
zap.String("digest", hash),
279+
zap.Duration("delta", delta),
280+
zap.Int("have_sigs", hasSigs),
281+
)
282+
}
259283
}
284+
aggregationStateUnobserved.Inc()
285+
return true
260286
}
261287
}
262288

263-
// Clean up old pythnet VAAs.
289+
return false
290+
}
291+
292+
// cleanupPythnetVaas deletes expired pythnet vaas.
293+
func (p *Processor) cleanupPythnetVaas() {
294+
p.pythnetVaaLock.Lock()
295+
defer p.pythnetVaaLock.Unlock()
264296
oldestTime := time.Now().Add(-time.Hour)
265297
for key, pe := range p.pythnetVaas {
266298
if pe.updateTime.Before(oldestTime) {

node/pkg/processor/message.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ var (
3232
// handleMessage processes a message received from a chain and instantiates our deterministic copy of the VAA. An
3333
// event may be received multiple times and must be handled in an idempotent fashion.
3434
func (p *Processor) handleMessage(k *common.MessagePublication) {
35-
if p.gs == nil {
35+
gs := p.gst.Get()
36+
if gs == nil {
3637
p.logger.Warn("dropping observation since we haven't initialized our guardian set yet",
3738
zap.String("message_id", k.MessageIDString()),
3839
zap.Uint32("nonce", k.Nonce),
@@ -50,7 +51,7 @@ func (p *Processor) handleMessage(k *common.MessagePublication) {
5051
v := &VAA{
5152
VAA: vaa.VAA{
5253
Version: vaa.SupportedVAAVersion,
53-
GuardianSetIndex: p.gs.Index,
54+
GuardianSetIndex: gs.Index,
5455
Signatures: nil,
5556
Timestamp: k.Timestamp,
5657
Nonce: k.Nonce,

0 commit comments

Comments
 (0)