2
2
package processor
3
3
4
4
import (
5
- "context"
6
5
"encoding/hex"
7
6
"fmt"
8
7
"time"
69
68
)
70
69
71
70
// handleCleanup handles periodic retransmissions and cleanup of observations
72
- func (p * Processor ) handleCleanup (ctx context. Context ) {
71
+ func (p * Processor ) handleCleanup () {
73
72
p .cleanupState ()
74
73
p .cleanupPythnetVaas ()
75
74
}
@@ -129,37 +128,36 @@ func (p *Processor) cleanUpStateEntry(hash string, s *state) bool {
129
128
gs := s .gs
130
129
if gs == nil {
131
130
gs = p .gst .Get ()
132
- if gs == nil {
133
- return false
134
- }
135
131
}
136
132
137
- hasSigs := len (s .signatures )
138
- wantSigs := vaa .CalculateQuorum (len (gs .Keys ))
139
- quorum := hasSigs >= wantSigs
133
+ if gs != nil {
134
+ if p .logger .Level ().Enabled (zapcore .DebugLevel ) {
135
+ hasSigs := len (s .signatures )
136
+ wantSigs := vaa .CalculateQuorum (len (gs .Keys ))
137
+ quorum := hasSigs >= wantSigs
140
138
141
- var chain vaa.ChainID
142
- if s .ourObservation != nil {
143
- chain = s .ourObservation .GetEmitterChain ()
144
- }
139
+ var chain vaa.ChainID
140
+ if s .ourObservation != nil {
141
+ chain = s .ourObservation .GetEmitterChain ()
142
+ }
145
143
146
- if p .logger .Level ().Enabled (zapcore .DebugLevel ) {
147
- p .logger .Debug ("observation considered settled" ,
148
- zap .String ("message_id" , s .LoggingID ()),
149
- zap .String ("digest" , hash ),
150
- zap .Duration ("delta" , delta ),
151
- zap .Int ("have_sigs" , hasSigs ),
152
- zap .Int ("required_sigs" , wantSigs ),
153
- zap .Bool ("quorum" , quorum ),
154
- zap .Stringer ("emitter_chain" , chain ),
155
- )
156
- }
144
+ p .logger .Debug ("observation considered settled" ,
145
+ zap .String ("message_id" , s .LoggingID ()),
146
+ zap .String ("digest" , hash ),
147
+ zap .Duration ("delta" , delta ),
148
+ zap .Int ("have_sigs" , hasSigs ),
149
+ zap .Int ("required_sigs" , wantSigs ),
150
+ zap .Bool ("quorum" , quorum ),
151
+ zap .Stringer ("emitter_chain" , chain ),
152
+ )
153
+ }
157
154
158
- for _ , k := range gs .Keys {
159
- if _ , ok := s .signatures [k ]; ok {
160
- aggregationStateFulfillment .WithLabelValues (k .Hex (), s .source , "present" ).Inc ()
161
- } else {
162
- aggregationStateFulfillment .WithLabelValues (k .Hex (), s .source , "missing" ).Inc ()
155
+ for _ , k := range gs .Keys {
156
+ if _ , ok := s .signatures [k ]; ok {
157
+ aggregationStateFulfillment .WithLabelValues (k .Hex (), s .source , "present" ).Inc ()
158
+ } else {
159
+ aggregationStateFulfillment .WithLabelValues (k .Hex (), s .source , "missing" ).Inc ()
160
+ }
163
161
}
164
162
}
165
163
case s .submitted && delta .Hours () >= 1 :
@@ -176,13 +174,13 @@ func (p *Processor) cleanUpStateEntry(hash string, s *state) bool {
176
174
}
177
175
aggregationStateExpiration .Inc ()
178
176
return true
179
- case ! s .submitted && ((s .ourMsg != nil && delta > retryLimitOurs ) || (s .ourMsg == nil && delta > retryLimitNotOurs )):
177
+ case ! s .submitted && ((s .ourObs != nil && delta > retryLimitOurs ) || (s .ourObs == nil && delta > retryLimitNotOurs )):
180
178
// Clearly, this horse is dead and continued beatings won't bring it closer to quorum.
181
179
p .logger .Info ("expiring unsubmitted observation after exhausting retries" ,
182
180
zap .String ("message_id" , s .LoggingID ()),
183
181
zap .String ("digest" , hash ),
184
182
zap .Duration ("delta" , delta ),
185
- zap .Bool ("weObserved" , s .ourMsg != nil ),
183
+ zap .Bool ("weObserved" , s .ourObs != nil ),
186
184
)
187
185
aggregationStateTimeout .Inc ()
188
186
return true
@@ -193,7 +191,7 @@ func (p *Processor) cleanUpStateEntry(hash string, s *state) bool {
193
191
// sig. If we do not have an observation, it means we either never observed it, or it got
194
192
// revived by a malfunctioning guardian node, in which case, we can't do anything about it
195
193
// and just delete it to keep our state nice and lean.
196
- if s .ourMsg != nil {
194
+ if s .ourObs != nil {
197
195
// Unreliable observations cannot be resubmitted and can be considered failed after 5 minutes
198
196
if ! s .ourObservation .IsReliable () {
199
197
p .logger .Info ("expiring unsubmitted unreliable observation" ,
@@ -247,7 +245,12 @@ func (p *Processor) cleanUpStateEntry(hash string, s *state) bool {
247
245
if err := common .PostObservationRequest (p .obsvReqSendC , req ); err != nil {
248
246
p .logger .Warn ("failed to broadcast re-observation request" , zap .String ("message_id" , s .LoggingID ()), zap .Error (err ))
249
247
}
250
- p .gossipSendC <- s .ourMsg
248
+ if s .ourMsg != nil {
249
+ // This is the case for immediately published messages (as well as anything still pending from before the cutover).
250
+ p .gossipAttestationSendC <- s .ourMsg
251
+ } else {
252
+ p .postObservationToBatch (s .ourObs )
253
+ }
251
254
s .retryCtr ++
252
255
s .nextRetry = time .Now ().Add (nextRetryDuration (s .retryCtr ))
253
256
aggregationStateRetries .Inc ()
@@ -256,23 +259,19 @@ func (p *Processor) cleanUpStateEntry(hash string, s *state) bool {
256
259
// For nil state entries, we log the quorum to determine whether the
257
260
// network reached consensus without us. We don't know the correct guardian
258
261
// set, so we simply use the most recent one.
259
- hasSigs := len (s .signatures )
260
- gs := p .gst .Get ()
261
- if gs != nil {
262
- wantSigs := vaa .CalculateQuorum (len (gs .Keys ))
263
-
264
- if p .logger .Level ().Enabled (zapcore .DebugLevel ) {
262
+ if p .logger .Level ().Enabled (zapcore .DebugLevel ) {
263
+ hasSigs := len (s .signatures )
264
+ gs := p .gst .Get ()
265
+ if gs != nil {
265
266
p .logger .Debug ("expiring unsubmitted nil observation" ,
266
267
zap .String ("message_id" , s .LoggingID ()),
267
268
zap .String ("digest" , hash ),
268
269
zap .Duration ("delta" , delta ),
269
270
zap .Int ("have_sigs" , hasSigs ),
270
- zap .Int ("required_sigs" , p . gs .Quorum ()),
271
- zap .Bool ("quorum" , hasSigs >= p . gs .Quorum ()),
271
+ zap .Int ("required_sigs" , gs .Quorum ()),
272
+ zap .Bool ("quorum" , hasSigs >= gs .Quorum ()),
272
273
)
273
- }
274
- } else {
275
- if p .logger .Level ().Enabled (zapcore .DebugLevel ) {
274
+ } else {
276
275
p .logger .Debug ("expiring unsubmitted nil observation, gs is nil" ,
277
276
zap .String ("message_id" , s .LoggingID ()),
278
277
zap .String ("digest" , hash ),
0 commit comments