Skip to content

Commit b36ce00

Browse files
Merge pull request #584 from shutter-network/feat/api-sync-monitor
updated sync monitor to not throw error when dkg is happening
2 parents eae7aa2 + 77973dc commit b36ce00

File tree

8 files changed

+325
-53
lines changed

8 files changed

+325
-53
lines changed
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package shutterservice
2+
3+
import "github.com/prometheus/client_golang/prometheus"
4+
5+
var metricsRegistryEventsSyncedUntil = prometheus.NewGauge(
6+
prometheus.GaugeOpts{
7+
Namespace: "shutter",
8+
Subsystem: "shutter_api",
9+
Name: "registry_events_syned_until",
10+
Help: "Current value of the latest block fetched",
11+
},
12+
)
13+
14+
func init() {
15+
prometheus.MustRegister(metricsRegistryEventsSyncedUntil)
16+
}

rolling-shutter/keyperimpl/shutterservice/registrysyncer.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,8 @@ func (s *RegistrySyncer) syncRange(
182182
Int("num-inserted-events", len(filteredEvents)).
183183
Int("num-discarded-events", len(events)-len(filteredEvents)).
184184
Msg("synced registry contract")
185+
186+
metricsRegistryEventsSyncedUntil.Set(float64(end))
185187
return nil
186188
}
187189

rolling-shutter/keyperimpl/shutterservice/syncmonitor.go

Lines changed: 76 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@ package shutterservice
22

33
import (
44
"context"
5+
"fmt"
56
"time"
67

78
"github.com/jackc/pgx/v4"
89
"github.com/jackc/pgx/v4/pgxpool"
910
"github.com/pkg/errors"
1011
"github.com/rs/zerolog/log"
1112

13+
keyperDB "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/database"
1214
"github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/shutterservice/database"
1315
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service"
1416
)
@@ -29,36 +31,91 @@ func (s *SyncMonitor) Start(ctx context.Context, runner service.Runner) error {
2931
func (s *SyncMonitor) runMonitor(ctx context.Context) error {
3032
var lastBlockNumber int64
3133
db := database.New(s.DBPool)
34+
keyperdb := keyperDB.New(s.DBPool)
3235

3336
log.Debug().Msg("starting the sync monitor")
3437

3538
for {
3639
select {
3740
case <-time.After(s.CheckInterval):
38-
record, err := db.GetIdentityRegisteredEventsSyncedUntil(ctx)
39-
if err != nil {
40-
if errors.Is(err, pgx.ErrNoRows) {
41-
log.Warn().Err(err).Msg("no rows found in table identity_registered_events_synced_until")
42-
continue
41+
if err := s.runCheck(ctx, db, keyperdb, &lastBlockNumber); err != nil {
42+
if errors.Is(err, ErrBlockNotIncreasing) {
43+
return err
4344
}
44-
return errors.Wrap(err, "error getting identity_registered_events_synced_until")
45-
}
46-
47-
currentBlockNumber := record.BlockNumber
48-
log.Debug().Int64("current-block-number", currentBlockNumber).Msg("current block number")
49-
50-
if currentBlockNumber > lastBlockNumber {
51-
lastBlockNumber = currentBlockNumber
52-
} else {
53-
log.Error().
54-
Int64("last-block-number", lastBlockNumber).
55-
Int64("current-block-number", currentBlockNumber).
56-
Msg("block number has not increased between checks")
57-
return errors.New("block number has not increased between checks")
45+
log.Debug().Err(err).Msg("skipping sync check due to error")
5846
}
5947
case <-ctx.Done():
6048
log.Info().Msg("stopping syncMonitor due to context cancellation")
6149
return ctx.Err()
6250
}
6351
}
6452
}
53+
54+
var ErrBlockNotIncreasing = errors.New("block number has not increased between checks")
55+
56+
func (s *SyncMonitor) runCheck(
57+
ctx context.Context,
58+
db *database.Queries,
59+
keyperdb *keyperDB.Queries,
60+
lastBlockNumber *int64,
61+
) error {
62+
isRunning, err := s.isDKGRunning(ctx, keyperdb)
63+
if err != nil {
64+
return fmt.Errorf("syncMonitor | error in dkgIsRunning: %w", err)
65+
}
66+
if isRunning {
67+
log.Debug().Msg("dkg is running, skipping sync monitor checks")
68+
return nil
69+
}
70+
71+
record, err := db.GetIdentityRegisteredEventsSyncedUntil(ctx)
72+
if err != nil {
73+
if errors.Is(err, pgx.ErrNoRows) {
74+
log.Warn().Err(err).Msg("no rows found in table identity_registered_events_synced_until")
75+
return nil // This is not an error condition that should stop monitoring
76+
}
77+
return fmt.Errorf("error getting identity_registered_events_synced_until: %w", err)
78+
}
79+
80+
currentBlockNumber := record.BlockNumber
81+
log.Debug().Int64("current-block-number", currentBlockNumber).Msg("current block number")
82+
83+
if currentBlockNumber > *lastBlockNumber {
84+
*lastBlockNumber = currentBlockNumber
85+
return nil
86+
}
87+
88+
log.Error().
89+
Int64("last-block-number", *lastBlockNumber).
90+
Int64("current-block-number", currentBlockNumber).
91+
Msg("block number has not increased between checks")
92+
return ErrBlockNotIncreasing
93+
}
94+
95+
func (s *SyncMonitor) isDKGRunning(ctx context.Context, keyperdb *keyperDB.Queries) (bool, error) {
96+
// if latest eon is registered then EonStarted event has triggered, which means the dkg can start
97+
eons, err := keyperdb.GetAllEons(ctx)
98+
if errors.Is(err, pgx.ErrNoRows) {
99+
return false, nil
100+
}
101+
if err != nil {
102+
log.Error().
103+
Err(err).
104+
Msg("syncMonitor | error getting all eons")
105+
return false, err
106+
}
107+
108+
if len(eons) == 0 {
109+
return false, nil
110+
}
111+
112+
// if we get no rows in getting dkg result then dkg is not completed for that eon
113+
_, err = keyperdb.GetDKGResult(ctx, eons[len(eons)-1].Eon)
114+
if errors.Is(err, pgx.ErrNoRows) {
115+
return true, nil
116+
} else if err != nil {
117+
log.Error().Err(err).Msg("syncMonitor | error getting dkg result")
118+
return false, err
119+
}
120+
return false, nil
121+
}

0 commit comments

Comments
 (0)