Skip to content

Commit db0b6f2

Browse files
updated sync monitor to not throw error when dkg is happening
1 parent 275bb51 commit db0b6f2

File tree

2 files changed

+205
-38
lines changed

2 files changed

+205
-38
lines changed

rolling-shutter/keyperimpl/shutterservice/syncmonitor.go

Lines changed: 53 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@ package shutterservice
22

33
import (
44
"context"
5+
"fmt"
56
"time"
67

78
"github.com/jackc/pgx/v4"
89
"github.com/jackc/pgx/v4/pgxpool"
910
"github.com/pkg/errors"
1011
"github.com/rs/zerolog/log"
1112

13+
keyperDB "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/database"
1214
"github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/shutterservice/database"
1315
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service"
1416
)
@@ -29,36 +31,68 @@ func (s *SyncMonitor) Start(ctx context.Context, runner service.Runner) error {
2931
func (s *SyncMonitor) runMonitor(ctx context.Context) error {
3032
var lastBlockNumber int64
3133
db := database.New(s.DBPool)
34+
keyperdb := keyperDB.New(s.DBPool)
3235

3336
log.Debug().Msg("starting the sync monitor")
3437

3538
for {
3639
select {
3740
case <-time.After(s.CheckInterval):
38-
record, err := db.GetIdentityRegisteredEventsSyncedUntil(ctx)
39-
if err != nil {
40-
if errors.Is(err, pgx.ErrNoRows) {
41-
log.Warn().Err(err).Msg("no rows found in table identity_registered_events_synced_until")
42-
continue
41+
if err := s.runCheck(ctx, db, keyperdb, &lastBlockNumber); err != nil {
42+
if errors.Is(err, ErrBlockNotIncreasing) {
43+
return err
4344
}
44-
return errors.Wrap(err, "error getting identity_registered_events_synced_until")
45-
}
46-
47-
currentBlockNumber := record.BlockNumber
48-
log.Debug().Int64("current-block-number", currentBlockNumber).Msg("current block number")
49-
50-
if currentBlockNumber > lastBlockNumber {
51-
lastBlockNumber = currentBlockNumber
52-
} else {
53-
log.Error().
54-
Int64("last-block-number", lastBlockNumber).
55-
Int64("current-block-number", currentBlockNumber).
56-
Msg("block number has not increased between checks")
57-
return errors.New("block number has not increased between checks")
45+
log.Debug().Err(err).Msg("skipping sync check due to error")
5846
}
5947
case <-ctx.Done():
6048
log.Info().Msg("stopping syncMonitor due to context cancellation")
6149
return ctx.Err()
6250
}
6351
}
6452
}
53+
54+
var ErrBlockNotIncreasing = errors.New("block number has not increased between checks")
55+
56+
func (s *SyncMonitor) runCheck(
57+
ctx context.Context,
58+
db *database.Queries,
59+
keyperdb *keyperDB.Queries,
60+
lastBlockNumber *int64,
61+
) error {
62+
batchConfig, err := keyperdb.GetLatestBatchConfig(ctx)
63+
if err != nil {
64+
return fmt.Errorf("error getting batchconfig: %w", err)
65+
}
66+
67+
dkgResult, err := keyperdb.GetDKGResult(ctx, int64(batchConfig.KeyperConfigIndex))
68+
if err != nil {
69+
return fmt.Errorf("error getting dkgresult: %w", err)
70+
}
71+
72+
record, err := db.GetIdentityRegisteredEventsSyncedUntil(ctx)
73+
if err != nil {
74+
if errors.Is(err, pgx.ErrNoRows) {
75+
log.Warn().Err(err).Msg("no rows found in table identity_registered_events_synced_until")
76+
return nil // This is not an error condition that should stop monitoring
77+
}
78+
return errors.Wrap(err, "error getting identity_registered_events_synced_until")
79+
}
80+
81+
currentBlockNumber := record.BlockNumber
82+
log.Debug().Int64("current-block-number", currentBlockNumber).Msg("current block number")
83+
84+
if currentBlockNumber > *lastBlockNumber {
85+
*lastBlockNumber = currentBlockNumber
86+
return nil
87+
}
88+
89+
if dkgResult.Success {
90+
log.Error().
91+
Int64("last-block-number", *lastBlockNumber).
92+
Int64("current-block-number", currentBlockNumber).
93+
Msg("block number has not increased between checks")
94+
return ErrBlockNotIncreasing
95+
}
96+
97+
return nil
98+
}

rolling-shutter/keyperimpl/shutterservice/syncmonitor_test.go

Lines changed: 152 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,37 +7,56 @@ import (
77

88
"gotest.tools/assert"
99

10+
"github.com/jackc/pgx/v4/pgxpool"
11+
keyperDB "github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/database"
1012
"github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/shutterservice"
1113
"github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/shutterservice/database"
1214
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service"
1315
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/testsetup"
1416
)
1517

18+
func setupTestData(ctx context.Context, t *testing.T, dbpool *pgxpool.Pool, blockNumber int64) {
19+
db := database.New(dbpool)
20+
keyperdb := keyperDB.New(dbpool)
21+
22+
// Set up batch config
23+
err := keyperdb.InsertBatchConfig(ctx, keyperDB.InsertBatchConfigParams{
24+
KeyperConfigIndex: 1,
25+
Keypers: []string{},
26+
})
27+
assert.NilError(t, err)
28+
29+
// Set up DKG result
30+
err = keyperdb.InsertDKGResult(ctx, keyperDB.InsertDKGResultParams{
31+
Eon: 1,
32+
Success: true,
33+
})
34+
assert.NilError(t, err)
35+
36+
// Set up initial block
37+
err = db.SetIdentityRegisteredEventSyncedUntil(ctx, database.SetIdentityRegisteredEventSyncedUntilParams{
38+
BlockHash: []byte{0x01, 0x02, 0x03},
39+
BlockNumber: blockNumber,
40+
})
41+
assert.NilError(t, err)
42+
}
43+
1644
func TestAPISyncMonitor_ThrowsErrorWhenBlockNotIncreasing(t *testing.T) {
1745
ctx, cancel := context.WithCancel(context.Background())
1846
defer cancel()
1947

2048
dbpool, dbclose := testsetup.NewTestDBPool(ctx, t, database.Definition)
2149
defer dbclose()
22-
db := database.New(dbpool)
2350

2451
initialBlockNumber := int64(100)
25-
26-
err := db.SetIdentityRegisteredEventSyncedUntil(ctx, database.SetIdentityRegisteredEventSyncedUntilParams{
27-
BlockHash: []byte{0x01, 0x02, 0x03},
28-
BlockNumber: initialBlockNumber,
29-
})
30-
if err != nil {
31-
t.Fatalf("failed to set initial synced data: %v", err)
32-
}
52+
setupTestData(ctx, t, dbpool, initialBlockNumber)
3353

3454
monitor := &shutterservice.SyncMonitor{
3555
DBPool: dbpool,
3656
CheckInterval: 5 * time.Second,
3757
}
3858

3959
errCh := make(chan error, 1)
40-
4160
go func() {
4261
err := service.RunWithSighandler(ctx, monitor)
4362
if err != nil {
@@ -49,7 +68,7 @@ func TestAPISyncMonitor_ThrowsErrorWhenBlockNotIncreasing(t *testing.T) {
4968

5069
select {
5170
case err := <-errCh:
52-
assert.ErrorContains(t, err, "block number has not increased between checks")
71+
assert.ErrorContains(t, err, shutterservice.ErrBlockNotIncreasing.Error())
5372
case <-time.After(5 * time.Second):
5473
t.Fatal("expected an error, but none was returned")
5574
}
@@ -64,13 +83,7 @@ func TestAPISyncMonitor_HandlesBlockNumberIncreasing(t *testing.T) {
6483
db := database.New(dbpool)
6584

6685
initialBlockNumber := int64(100)
67-
err := db.SetIdentityRegisteredEventSyncedUntil(ctx, database.SetIdentityRegisteredEventSyncedUntilParams{
68-
BlockHash: []byte{0x01, 0x02, 0x03},
69-
BlockNumber: initialBlockNumber,
70-
})
71-
if err != nil {
72-
t.Fatalf("failed to set initial synced data: %v", err)
73-
}
86+
setupTestData(ctx, t, dbpool, initialBlockNumber)
7487

7588
monitor := &shutterservice.SyncMonitor{
7689
DBPool: dbpool,
@@ -114,7 +127,21 @@ func TestAPISyncMonitor_ContinuesWhenNoRows(t *testing.T) {
114127

115128
dbpool, closeDB := testsetup.NewTestDBPool(ctx, t, database.Definition)
116129
defer closeDB()
117-
_ = database.New(dbpool)
130+
131+
// Only set up keyper set and DKG result, but no block data
132+
keyperdb := keyperDB.New(dbpool)
133+
134+
err := keyperdb.InsertBatchConfig(ctx, keyperDB.InsertBatchConfigParams{
135+
KeyperConfigIndex: 1,
136+
Keypers: []string{},
137+
})
138+
assert.NilError(t, err)
139+
140+
err = keyperdb.InsertDKGResult(ctx, keyperDB.InsertDKGResultParams{
141+
Eon: 1,
142+
Success: true,
143+
})
144+
assert.NilError(t, err)
118145

119146
monitor := &shutterservice.SyncMonitor{
120147
DBPool: dbpool,
@@ -141,3 +168,109 @@ func TestAPISyncMonitor_ContinuesWhenNoRows(t *testing.T) {
141168
case <-time.After(1 * time.Second):
142169
}
143170
}
171+
172+
func TestAPISyncMonitor_ContinuesWhenNoDKGResult(t *testing.T) {
173+
ctx, cancel := context.WithCancel(context.Background())
174+
defer cancel()
175+
176+
dbpool, closeDB := testsetup.NewTestDBPool(ctx, t, database.Definition)
177+
defer closeDB()
178+
db := database.New(dbpool)
179+
keyperdb := keyperDB.New(dbpool)
180+
181+
// Set up batch config, but no DKG result
182+
err := keyperdb.InsertBatchConfig(ctx, keyperDB.InsertBatchConfigParams{
183+
KeyperConfigIndex: 1,
184+
Keypers: []string{},
185+
})
186+
assert.NilError(t, err)
187+
188+
// Set up initial block data
189+
initialBlockNumber := int64(100)
190+
err = db.SetIdentityRegisteredEventSyncedUntil(ctx, database.SetIdentityRegisteredEventSyncedUntilParams{
191+
BlockHash: []byte{0x01, 0x02, 0x03},
192+
BlockNumber: initialBlockNumber,
193+
})
194+
assert.NilError(t, err)
195+
196+
monitor := &shutterservice.SyncMonitor{
197+
DBPool: dbpool,
198+
CheckInterval: 5 * time.Second,
199+
}
200+
201+
monitorCtx, cancelMonitor := context.WithCancel(ctx)
202+
defer cancelMonitor()
203+
204+
errCh := make(chan error, 1)
205+
go func() {
206+
err := service.RunWithSighandler(monitorCtx, monitor)
207+
if err != nil {
208+
errCh <- err
209+
}
210+
}()
211+
212+
// Let it run for a while without incrementing the block number
213+
time.Sleep(15 * time.Second)
214+
cancelMonitor()
215+
216+
select {
217+
case err := <-errCh:
218+
t.Fatalf("expected monitor to continue without error, but got: %v", err)
219+
case <-time.After(1 * time.Second):
220+
// Test passes if no error is received
221+
}
222+
223+
// Verify the block number hasn't changed
224+
syncedData, err := db.GetIdentityRegisteredEventsSyncedUntil(ctx)
225+
assert.NilError(t, err)
226+
assert.Equal(t, initialBlockNumber, syncedData.BlockNumber, "block number should remain unchanged")
227+
}
228+
229+
func TestAPISyncMonitor_ContinuesWhenNoBatchConfig(t *testing.T) {
230+
ctx, cancel := context.WithCancel(context.Background())
231+
defer cancel()
232+
233+
dbpool, closeDB := testsetup.NewTestDBPool(ctx, t, database.Definition)
234+
defer closeDB()
235+
db := database.New(dbpool)
236+
237+
// Only set up initial block data, no keyper set
238+
initialBlockNumber := int64(100)
239+
err := db.SetIdentityRegisteredEventSyncedUntil(ctx, database.SetIdentityRegisteredEventSyncedUntilParams{
240+
BlockHash: []byte{0x01, 0x02, 0x03},
241+
BlockNumber: initialBlockNumber,
242+
})
243+
assert.NilError(t, err)
244+
245+
monitor := &shutterservice.SyncMonitor{
246+
DBPool: dbpool,
247+
CheckInterval: 5 * time.Second,
248+
}
249+
250+
monitorCtx, cancelMonitor := context.WithCancel(ctx)
251+
defer cancelMonitor()
252+
253+
errCh := make(chan error, 1)
254+
go func() {
255+
err := service.RunWithSighandler(monitorCtx, monitor)
256+
if err != nil {
257+
errCh <- err
258+
}
259+
}()
260+
261+
// Let it run for a while without incrementing the block number
262+
time.Sleep(15 * time.Second)
263+
cancelMonitor()
264+
265+
select {
266+
case err := <-errCh:
267+
t.Fatalf("expected monitor to continue without error, but got: %v", err)
268+
case <-time.After(1 * time.Second):
269+
// Test passes if no error is received
270+
}
271+
272+
// Verify the block number hasn't changed
273+
syncedData, err := db.GetIdentityRegisteredEventsSyncedUntil(ctx)
274+
assert.NilError(t, err)
275+
assert.Equal(t, initialBlockNumber, syncedData.BlockNumber, "block number should remain unchanged")
276+
}

0 commit comments

Comments
 (0)