Skip to content

Commit b6c646f

Browse files
update sync monitor to use DKGStartBlockDelta to determine if dkg started
1 parent a4c82dc commit b6c646f

File tree

3 files changed

+124
-19
lines changed

3 files changed

+124
-19
lines changed

rolling-shutter/keyperimpl/shutterservice/keyper.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,9 @@ func (kpr *Keyper) Start(ctx context.Context, runner service.Runner) error {
114114
}
115115

116116
kpr.syncMonitor = &SyncMonitor{
117-
DBPool: kpr.dbpool,
118-
CheckInterval: time.Duration(kpr.config.Chain.SyncMonitorCheckInterval) * time.Second,
117+
DBPool: kpr.dbpool,
118+
CheckInterval: time.Duration(kpr.config.Chain.SyncMonitorCheckInterval) * time.Second,
119+
DKGStartBlockDelta: kpr.config.Shuttermint.DKGStartBlockDelta,
119120
}
120121
runner.Go(func() error { return kpr.processInputs(ctx) })
121122
return runner.StartService(kpr.core, kpr.chainSyncClient, kpr.eonKeyPublisher, kpr.syncMonitor)

rolling-shutter/keyperimpl/shutterservice/syncmonitor.go

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@ import (
1616
)
1717

1818
type SyncMonitor struct {
19-
DBPool *pgxpool.Pool
20-
CheckInterval time.Duration
19+
DBPool *pgxpool.Pool
20+
CheckInterval time.Duration
21+
DKGStartBlockDelta uint64
2122
}
2223

2324
func (s *SyncMonitor) Start(ctx context.Context, runner service.Runner) error {
@@ -59,14 +60,9 @@ func (s *SyncMonitor) runCheck(
5960
keyperdb *keyperDB.Queries,
6061
lastBlockNumber *int64,
6162
) error {
62-
batchConfig, err := keyperdb.GetLatestBatchConfig(ctx)
63-
if err != nil {
64-
return fmt.Errorf("error getting batchconfig: %w", err)
65-
}
66-
67-
dkgResult, err := keyperdb.GetDKGResult(ctx, int64(batchConfig.KeyperConfigIndex))
68-
if err != nil {
69-
return fmt.Errorf("error getting dkgresult: %w", err)
63+
if s.dkgIsRunning(ctx, keyperdb) {
64+
log.Debug().Msg("dkg is running, skipping sync monitor checks")
65+
return nil
7066
}
7167

7268
record, err := db.GetIdentityRegisteredEventsSyncedUntil(ctx)
@@ -75,7 +71,7 @@ func (s *SyncMonitor) runCheck(
7571
log.Warn().Err(err).Msg("no rows found in table identity_registered_events_synced_until")
7672
return nil // This is not an error condition that should stop monitoring
7773
}
78-
return errors.Wrap(err, "error getting identity_registered_events_synced_until")
74+
return fmt.Errorf("error getting identity_registered_events_synced_until: %w", err)
7975
}
8076

8177
currentBlockNumber := record.BlockNumber
@@ -86,13 +82,37 @@ func (s *SyncMonitor) runCheck(
8682
return nil
8783
}
8884

89-
if dkgResult.Success {
85+
log.Error().
86+
Int64("last-block-number", *lastBlockNumber).
87+
Int64("current-block-number", currentBlockNumber).
88+
Msg("block number has not increased between checks")
89+
return ErrBlockNotIncreasing
90+
}
91+
92+
func (s *SyncMonitor) dkgIsRunning(ctx context.Context, keyperdb *keyperDB.Queries) bool {
93+
batchConfig, err := keyperdb.GetLatestBatchConfig(ctx)
94+
if err != nil {
9095
log.Error().
91-
Int64("last-block-number", *lastBlockNumber).
92-
Int64("current-block-number", currentBlockNumber).
93-
Msg("block number has not increased between checks")
94-
return ErrBlockNotIncreasing
96+
Err(err).
97+
Msg("syncMonitor | error getting latest batchconfig")
98+
return true
9599
}
96100

97-
return nil
101+
tmSyncData, err := keyperdb.TMGetSyncMeta(ctx)
102+
if err != nil {
103+
log.Error().
104+
Err(err).
105+
Msg("syncMonitor | error getting TMSyncMeta")
106+
return true
107+
}
108+
109+
// if batchConfig submission height + DKGStartBlockDelta is greater than shuttermint height then dkg has not started yet
110+
if batchConfig.Height+int64(s.DKGStartBlockDelta) < tmSyncData.LastCommittedHeight {
111+
// if we get an error in getting dkg result then dkg is not completed
112+
_, err := keyperdb.GetDKGResult(ctx, int64(batchConfig.KeyperConfigIndex))
113+
if err != nil {
114+
return true
115+
}
116+
}
117+
return false
98118
}

rolling-shutter/keyperimpl/shutterservice/syncmonitor_test.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ func setupTestData(ctx context.Context, t *testing.T, dbpool *pgxpool.Pool, bloc
2424
err := keyperdb.InsertBatchConfig(ctx, keyperDB.InsertBatchConfigParams{
2525
KeyperConfigIndex: 1,
2626
Keypers: []string{},
27+
Height: 50,
2728
})
2829
assert.NilError(t, err)
2930

@@ -34,6 +35,12 @@ func setupTestData(ctx context.Context, t *testing.T, dbpool *pgxpool.Pool, bloc
3435
})
3536
assert.NilError(t, err)
3637

38+
// Set up TMSyncMeta
39+
err = keyperdb.TMSetSyncMeta(ctx, keyperDB.TMSetSyncMetaParams{
40+
LastCommittedHeight: 100,
41+
})
42+
assert.NilError(t, err)
43+
3744
// Set up initial block
3845
err = db.SetIdentityRegisteredEventSyncedUntil(ctx, database.SetIdentityRegisteredEventSyncedUntilParams{
3946
BlockHash: []byte{0x01, 0x02, 0x03},
@@ -183,6 +190,13 @@ func TestAPISyncMonitor_ContinuesWhenNoDKGResult(t *testing.T) {
183190
err := keyperdb.InsertBatchConfig(ctx, keyperDB.InsertBatchConfigParams{
184191
KeyperConfigIndex: 1,
185192
Keypers: []string{},
193+
Height: 50,
194+
})
195+
assert.NilError(t, err)
196+
197+
// Set up TMSyncMeta
198+
err = keyperdb.TMSetSyncMeta(ctx, keyperDB.TMSetSyncMetaParams{
199+
LastCommittedHeight: 100,
186200
})
187201
assert.NilError(t, err)
188202

@@ -275,3 +289,73 @@ func TestAPISyncMonitor_ContinuesWhenNoBatchConfig(t *testing.T) {
275289
assert.NilError(t, err)
276290
assert.Equal(t, initialBlockNumber, syncedData.BlockNumber, "block number should remain unchanged")
277291
}
292+
293+
func TestAPISyncMonitor_ContinuesWhenNoTMSyncMeta(t *testing.T) {
294+
ctx, cancel := context.WithCancel(context.Background())
295+
defer cancel()
296+
297+
dbpool, closeDB := testsetup.NewTestDBPool(ctx, t, database.Definition)
298+
defer closeDB()
299+
db := database.New(dbpool)
300+
keyperdb := keyperDB.New(dbpool)
301+
302+
batchConfigHeight := int64(50)
303+
// Set TMSyncData height to be less than batchConfigHeight + DKGStartBlockDelta
304+
// This simulates a scenario where DKG hasn't started yet
305+
tmSyncHeight := int64(60)
306+
307+
// Set up batch config
308+
err := keyperdb.InsertBatchConfig(ctx, keyperDB.InsertBatchConfigParams{
309+
KeyperConfigIndex: 1,
310+
Keypers: []string{},
311+
Height: batchConfigHeight,
312+
})
313+
assert.NilError(t, err)
314+
315+
// Set up TMSyncMeta with lower height
316+
err = keyperdb.TMSetSyncMeta(ctx, keyperDB.TMSetSyncMetaParams{
317+
LastCommittedHeight: tmSyncHeight,
318+
})
319+
assert.NilError(t, err)
320+
321+
// Set up initial block data
322+
initialBlockNumber := int64(100)
323+
err = db.SetIdentityRegisteredEventSyncedUntil(ctx, database.SetIdentityRegisteredEventSyncedUntilParams{
324+
BlockHash: []byte{0x01, 0x02, 0x03},
325+
BlockNumber: initialBlockNumber,
326+
})
327+
assert.NilError(t, err)
328+
329+
monitor := &shutterservice.SyncMonitor{
330+
DBPool: dbpool,
331+
CheckInterval: 5 * time.Second,
332+
DKGStartBlockDelta: 5,
333+
}
334+
335+
monitorCtx, cancelMonitor := context.WithCancel(ctx)
336+
defer cancelMonitor()
337+
338+
errCh := make(chan error, 1)
339+
go func() {
340+
err := service.RunWithSighandler(monitorCtx, monitor)
341+
if err != nil {
342+
errCh <- err
343+
}
344+
}()
345+
346+
// Let it run for a while
347+
time.Sleep(15 * time.Second)
348+
cancelMonitor()
349+
350+
select {
351+
case err := <-errCh:
352+
t.Fatalf("expected monitor to continue without error, but got: %v", err)
353+
case <-time.After(1 * time.Second):
354+
// Test passes if no error is received
355+
}
356+
357+
// Verify the block number hasn't changed
358+
syncedData, err := db.GetIdentityRegisteredEventsSyncedUntil(ctx)
359+
assert.NilError(t, err)
360+
assert.Equal(t, initialBlockNumber, syncedData.BlockNumber, "block number should remain unchanged")
361+
}

0 commit comments

Comments
 (0)