Skip to content

Commit 7db3da8

Browse files
fix: update isDKGRunning for sync monitor to check for EonStarted event
1 parent d64f6a4 commit 7db3da8

File tree

6 files changed

+65
-89
lines changed

6 files changed

+65
-89
lines changed

rolling-shutter/keyperimpl/shutterservice/syncmonitor.go

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -93,27 +93,29 @@ func (s *SyncMonitor) runCheck(
9393
}
9494

9595
func (s *SyncMonitor) isDKGRunning(ctx context.Context, keyperdb *keyperDB.Queries) (bool, error) {
96-
batchConfig, err := keyperdb.GetLatestBatchConfig(ctx)
96+
// if latest eon is registered then EonStarted event has triggered, which means the dkg can start
97+
eons, err := keyperdb.GetAllEons(ctx)
9798
if errors.Is(err, pgx.ErrNoRows) {
9899
return false, nil
99100
}
100101
if err != nil {
101102
log.Error().
102103
Err(err).
103-
Msg("syncMonitor | error getting latest batchconfig")
104+
Msg("syncMonitor | error getting all eons")
104105
return false, err
105106
}
106107

107-
// if batchconfig.Started is true that means dkg can happen for this keyper config index
108-
if batchConfig.Started {
109-
// if we get no rows in getting dkg result then dkg is not completed for that keyper config index
110-
_, err := keyperdb.GetDKGResult(ctx, int64(batchConfig.KeyperConfigIndex))
111-
if errors.Is(err, pgx.ErrNoRows) {
112-
return true, nil
113-
} else if err != nil {
114-
log.Error().Err(err).Msg("syncMonitor | error getting dkg result")
115-
return false, err
116-
}
108+
if len(eons) == 0 {
109+
return false, nil
110+
}
111+
112+
// if we get no rows in getting dkg result then dkg is not completed for that eon
113+
_, err = keyperdb.GetDKGResult(ctx, eons[len(eons)-1].Eon)
114+
if errors.Is(err, pgx.ErrNoRows) {
115+
return true, nil
116+
} else if err != nil {
117+
log.Error().Err(err).Msg("syncMonitor | error getting dkg result")
118+
return false, err
117119
}
118120
return false, nil
119121
}

rolling-shutter/keyperimpl/shutterservice/syncmonitor_test.go

Lines changed: 43 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,9 @@ func setupTestData(ctx context.Context, t *testing.T, dbpool *pgxpool.Pool, bloc
2020
db := database.New(dbpool)
2121
keyperdb := keyperDB.New(dbpool)
2222

23-
// Set up batch config with Started flag
24-
err := keyperdb.InsertBatchConfig(ctx, keyperDB.InsertBatchConfigParams{
25-
KeyperConfigIndex: 1,
26-
Keypers: []string{},
27-
Height: 50,
28-
Started: true,
23+
// Set up eon
24+
err := keyperdb.InsertEon(ctx, keyperDB.InsertEonParams{
25+
Eon: 1,
2926
})
3027
assert.NilError(t, err)
3128

@@ -124,26 +121,26 @@ func TestAPISyncMonitor_HandlesBlockNumberIncreasing(t *testing.T) {
124121
assert.Equal(t, initialBlockNumber+5, syncedData.BlockNumber, "block number should have been incremented correctly")
125122
}
126123

127-
func TestAPISyncMonitor_ContinuesWhenNoRows(t *testing.T) {
124+
func TestAPISyncMonitor_SkipsWhenDKGIsRunning(t *testing.T) {
128125
ctx, cancel := context.WithCancel(context.Background())
129126
defer cancel()
130127

131128
dbpool, closeDB := testsetup.NewTestDBPool(ctx, t, database.Definition)
132129
defer closeDB()
133-
134-
// Only set up keyper set and DKG result, but no block data
130+
db := database.New(dbpool)
135131
keyperdb := keyperDB.New(dbpool)
136132

137-
err := keyperdb.InsertBatchConfig(ctx, keyperDB.InsertBatchConfigParams{
138-
KeyperConfigIndex: 1,
139-
Keypers: []string{},
140-
Started: true,
133+
// Set up eon but no DKG result to simulate DKG running
134+
err := keyperdb.InsertEon(ctx, keyperDB.InsertEonParams{
135+
Eon: 1,
141136
})
142137
assert.NilError(t, err)
143138

144-
err = keyperdb.InsertDKGResult(ctx, keyperDB.InsertDKGResultParams{
145-
Eon: 1,
146-
Success: true,
139+
// Set up initial block data
140+
initialBlockNumber := int64(100)
141+
err = db.SetIdentityRegisteredEventSyncedUntil(ctx, database.SetIdentityRegisteredEventSyncedUntilParams{
142+
BlockHash: []byte{0x01, 0x02, 0x03},
143+
BlockNumber: initialBlockNumber,
147144
})
148145
assert.NilError(t, err)
149146

@@ -163,37 +160,34 @@ func TestAPISyncMonitor_ContinuesWhenNoRows(t *testing.T) {
163160
}
164161
}()
165162

163+
// Let it run for a while without incrementing the block number
166164
time.Sleep(15 * time.Second)
167165
cancelMonitor()
168166

169167
select {
170168
case err := <-errCh:
171169
t.Fatalf("expected monitor to continue without error, but got: %v", err)
172170
case <-time.After(1 * time.Second):
171+
// Test passes if no error is received
173172
}
173+
174+
// Verify the block number hasn't changed
175+
syncedData, err := db.GetIdentityRegisteredEventsSyncedUntil(ctx)
176+
assert.NilError(t, err)
177+
assert.Equal(t, initialBlockNumber, syncedData.BlockNumber, "block number should remain unchanged")
174178
}
175179

176-
func TestAPISyncMonitor_SkipsWhenDKGIsRunning(t *testing.T) {
180+
func TestAPISyncMonitor_RunsNormallyWhenNoEons(t *testing.T) {
177181
ctx, cancel := context.WithCancel(context.Background())
178182
defer cancel()
179183

180184
dbpool, closeDB := testsetup.NewTestDBPool(ctx, t, database.Definition)
181185
defer closeDB()
182186
db := database.New(dbpool)
183-
keyperdb := keyperDB.New(dbpool)
184-
185-
// Set up batch config, but no DKG result
186-
err := keyperdb.InsertBatchConfig(ctx, keyperDB.InsertBatchConfigParams{
187-
KeyperConfigIndex: 1,
188-
Keypers: []string{},
189-
Height: 50,
190-
Started: true,
191-
})
192-
assert.NilError(t, err)
193187

194-
// Set up initial block data
188+
// Only set up initial block data, no eon entries
195189
initialBlockNumber := int64(100)
196-
err = db.SetIdentityRegisteredEventSyncedUntil(ctx, database.SetIdentityRegisteredEventSyncedUntilParams{
190+
err := db.SetIdentityRegisteredEventSyncedUntil(ctx, database.SetIdentityRegisteredEventSyncedUntilParams{
197191
BlockHash: []byte{0x01, 0x02, 0x03},
198192
BlockNumber: initialBlockNumber,
199193
})
@@ -221,9 +215,9 @@ func TestAPISyncMonitor_SkipsWhenDKGIsRunning(t *testing.T) {
221215

222216
select {
223217
case err := <-errCh:
224-
t.Fatalf("expected monitor to continue without error, but got: %v", err)
218+
assert.ErrorContains(t, err, shutterservice.ErrBlockNotIncreasing.Error())
225219
case <-time.After(1 * time.Second):
226-
// Test passes if no error is received
220+
t.Fatalf("expected monitor to throw error, but no error returned")
227221
}
228222

229223
// Verify the block number hasn't changed
@@ -232,19 +226,24 @@ func TestAPISyncMonitor_SkipsWhenDKGIsRunning(t *testing.T) {
232226
assert.Equal(t, initialBlockNumber, syncedData.BlockNumber, "block number should remain unchanged")
233227
}
234228

235-
func TestAPISyncMonitor_RunsNormallyWhenNoBatchConfig(t *testing.T) {
229+
func TestAPISyncMonitor_ContinuesWhenNoRows(t *testing.T) {
236230
ctx, cancel := context.WithCancel(context.Background())
237231
defer cancel()
238232

239233
dbpool, closeDB := testsetup.NewTestDBPool(ctx, t, database.Definition)
240234
defer closeDB()
241-
db := database.New(dbpool)
242235

243-
// Only set up initial block data, no keyper set
244-
initialBlockNumber := int64(100)
245-
err := db.SetIdentityRegisteredEventSyncedUntil(ctx, database.SetIdentityRegisteredEventSyncedUntilParams{
246-
BlockHash: []byte{0x01, 0x02, 0x03},
247-
BlockNumber: initialBlockNumber,
236+
// Set up eon and DKG result, but no block data
237+
keyperdb := keyperDB.New(dbpool)
238+
239+
err := keyperdb.InsertEon(ctx, keyperDB.InsertEonParams{
240+
Eon: 1,
241+
})
242+
assert.NilError(t, err)
243+
244+
err = keyperdb.InsertDKGResult(ctx, keyperDB.InsertDKGResultParams{
245+
Eon: 1,
246+
Success: true,
248247
})
249248
assert.NilError(t, err)
250249

@@ -264,44 +263,29 @@ func TestAPISyncMonitor_RunsNormallyWhenNoBatchConfig(t *testing.T) {
264263
}
265264
}()
266265

267-
// Let it run for a while without incrementing the block number
268266
time.Sleep(15 * time.Second)
269267
cancelMonitor()
270268

271269
select {
272270
case err := <-errCh:
273-
assert.ErrorContains(t, err, shutterservice.ErrBlockNotIncreasing.Error())
271+
t.Fatalf("expected monitor to continue without error, but got: %v", err)
274272
case <-time.After(1 * time.Second):
275-
t.Fatalf("expected monitor to throw error, but no error returned")
276273
}
277-
278-
// Verify the block number hasn't changed
279-
syncedData, err := db.GetIdentityRegisteredEventsSyncedUntil(ctx)
280-
assert.NilError(t, err)
281-
assert.Equal(t, initialBlockNumber, syncedData.BlockNumber, "block number should remain unchanged")
282274
}
283275

284-
func TestAPISyncMonitor_RunsNormallyWhenBatchConfigNotStarted(t *testing.T) {
276+
func TestAPISyncMonitor_RunsNormallyWithCompletedDKG(t *testing.T) {
285277
ctx, cancel := context.WithCancel(context.Background())
286278
defer cancel()
287279

288280
dbpool, closeDB := testsetup.NewTestDBPool(ctx, t, database.Definition)
289281
defer closeDB()
290282
db := database.New(dbpool)
291-
keyperdb := keyperDB.New(dbpool)
292283

293-
// Set up batch config with Started = false
294-
err := keyperdb.InsertBatchConfig(ctx, keyperDB.InsertBatchConfigParams{
295-
KeyperConfigIndex: 1,
296-
Keypers: []string{},
297-
Height: 50,
298-
Started: false,
299-
})
300-
assert.NilError(t, err)
284+
initialBlockNumber := int64(100)
285+
setupTestData(ctx, t, dbpool, initialBlockNumber)
301286

302287
// Set up initial block data
303-
initialBlockNumber := int64(100)
304-
err = db.SetIdentityRegisteredEventSyncedUntil(ctx, database.SetIdentityRegisteredEventSyncedUntilParams{
288+
err := db.SetIdentityRegisteredEventSyncedUntil(ctx, database.SetIdentityRegisteredEventSyncedUntilParams{
305289
BlockHash: []byte{0x01, 0x02, 0x03},
306290
BlockNumber: initialBlockNumber,
307291
})
@@ -323,7 +307,7 @@ func TestAPISyncMonitor_RunsNormallyWhenBatchConfigNotStarted(t *testing.T) {
323307
}
324308
}()
325309

326-
// Let it run for a while
310+
// Let it run for a while without incrementing the block number
327311
time.Sleep(15 * time.Second)
328312
cancelMonitor()
329313

rolling-shutter/medley/chainsync/syncer/eonpubkey.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func (s *EonPubKeySyncer) Start(ctx context.Context, runner service.Runner) erro
6262
}
6363
runner.Defer(subs.Unsubscribe)
6464
runner.Go(func() error {
65-
return s.watchNewEonPubkey(ctx, subs.Err())
65+
return s.watchNewEonPubkey(ctx)
6666
})
6767
return nil
6868
}
@@ -126,7 +126,7 @@ func (s *EonPubKeySyncer) GetEonPubKeyForEon(ctx context.Context, opts *bind.Cal
126126
}, nil
127127
}
128128

129-
func (s *EonPubKeySyncer) watchNewEonPubkey(ctx context.Context, subsErr <-chan error) error {
129+
func (s *EonPubKeySyncer) watchNewEonPubkey(ctx context.Context) error {
130130
for {
131131
select {
132132
case newEonKey, ok := <-s.keyBroadcastCh:
@@ -147,8 +147,6 @@ func (s *EonPubKeySyncer) watchNewEonPubkey(ctx context.Context, subsErr <-chan
147147
err.Error(),
148148
)
149149
}
150-
case err := <-subsErr:
151-
s.Log.Error("subscription error for watchNewEonPubkey", err.Error())
152150
case <-ctx.Done():
153151
return ctx.Err()
154152
}

rolling-shutter/medley/chainsync/syncer/keyperset.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func (s *KeyperSetSyncer) Start(ctx context.Context, runner service.Runner) erro
7272
}
7373
runner.Defer(subs.Unsubscribe)
7474
runner.Go(func() error {
75-
return s.watchNewKeypersService(ctx, subs.Err())
75+
return s.watchNewKeypersService(ctx)
7676
})
7777
return nil
7878
}
@@ -204,7 +204,7 @@ func (s *KeyperSetSyncer) newEvent(
204204
}, nil
205205
}
206206

207-
func (s *KeyperSetSyncer) watchNewKeypersService(ctx context.Context, subsErr <-chan error) error {
207+
func (s *KeyperSetSyncer) watchNewKeypersService(ctx context.Context) error {
208208
for {
209209
select {
210210
case newKeypers, ok := <-s.keyperAddedCh:
@@ -234,8 +234,6 @@ func (s *KeyperSetSyncer) watchNewKeypersService(ctx context.Context, subsErr <-
234234
err.Error(),
235235
)
236236
}
237-
case err := <-subsErr:
238-
s.Log.Error("subscription error for watchNewKeypersService", err.Error())
239237
case <-ctx.Done():
240238
return ctx.Err()
241239
}

rolling-shutter/medley/chainsync/syncer/shutterstate.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (s *ShutterStateSyncer) Start(ctx context.Context, runner service.Runner) e
6363
runner.Defer(subsUnpaused.Unsubscribe)
6464

6565
runner.Go(func() error {
66-
return s.watchPaused(ctx, subs.Err(), subsUnpaused.Err())
66+
return s.watchPaused(ctx)
6767
})
6868
return nil
6969
}
@@ -87,7 +87,7 @@ func (s *ShutterStateSyncer) handle(ctx context.Context, ev *event.ShutterState)
8787
}
8888
}
8989

90-
func (s *ShutterStateSyncer) watchPaused(ctx context.Context, subsErr <-chan error, subsErrUnpaused <-chan error) error {
90+
func (s *ShutterStateSyncer) watchPaused(ctx context.Context) error {
9191
isActive, err := s.pollIsActive(ctx)
9292
if err != nil {
9393
// XXX: this will fail everything, do we want that?
@@ -123,10 +123,6 @@ func (s *ShutterStateSyncer) watchPaused(ctx context.Context, subsErr <-chan err
123123
}
124124
isActive = ev.Active
125125
s.handle(ctx, ev)
126-
case err := <-subsErr:
127-
s.Log.Error("subscription error for watchPaused", err.Error())
128-
case err := <-subsErrUnpaused:
129-
s.Log.Error("subscription error for watchUnpaused", err.Error())
130126
case <-ctx.Done():
131127
return ctx.Err()
132128
}

rolling-shutter/medley/chainsync/syncer/unsafehead.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,12 @@ func (s *UnsafeHeadSyncer) Start(ctx context.Context, runner service.Runner) err
3333
}
3434
runner.Defer(subs.Unsubscribe)
3535
runner.Go(func() error {
36-
return s.watchLatestUnsafeHead(ctx, subs.Err())
36+
return s.watchLatestUnsafeHead(ctx)
3737
})
3838
return nil
3939
}
4040

41-
func (s *UnsafeHeadSyncer) watchLatestUnsafeHead(ctx context.Context, subsErr <-chan error) error {
41+
func (s *UnsafeHeadSyncer) watchLatestUnsafeHead(ctx context.Context) error {
4242
for {
4343
select {
4444
case newHeader, ok := <-s.newLatestHeadCh:
@@ -58,8 +58,6 @@ func (s *UnsafeHeadSyncer) watchLatestUnsafeHead(ctx context.Context, subsErr <-
5858
err.Error(),
5959
)
6060
}
61-
case err := <-subsErr:
62-
s.Log.Error("subscription error for watchLatestUnsafeHead", err.Error())
6361
case <-ctx.Done():
6462
return ctx.Err()
6563
}

0 commit comments

Comments
 (0)