Skip to content

Commit e17237d

Browse files
added sync monitor for api keyper
1 parent 029a80b commit e17237d

File tree

4 files changed

+223
-7
lines changed

4 files changed

+223
-7
lines changed

rolling-shutter/keyperimpl/shutterservice/config.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -81,16 +81,18 @@ func (c *Config) GetAddress() common.Address {
8181
}
8282

8383
type ChainConfig struct {
84-
Node *configuration.EthnodeConfig `shconfig:",required"`
85-
Contracts *ContractsConfig `shconfig:",required"`
86-
SyncStartBlockNumber uint64 `shconfig:",required"`
84+
Node *configuration.EthnodeConfig `shconfig:",required"`
85+
Contracts *ContractsConfig `shconfig:",required"`
86+
SyncStartBlockNumber uint64 `shconfig:",required"`
87+
SyncMonitorCheckInterval uint64 `shconfig:",required"`
8788
}
8889

8990
func NewChainConfig() *ChainConfig {
9091
c := &ChainConfig{
91-
Node: configuration.NewEthnodeConfig(),
92-
Contracts: NewContractsConfig(),
93-
SyncStartBlockNumber: 0,
92+
Node: configuration.NewEthnodeConfig(),
93+
Contracts: NewContractsConfig(),
94+
SyncStartBlockNumber: 0,
95+
SyncMonitorCheckInterval: 0,
9496
}
9597
c.Init()
9698
return c
@@ -111,10 +113,12 @@ func (c *ChainConfig) Validate() error {
111113

112114
func (c *ChainConfig) SetDefaultValues() error {
113115
c.SyncStartBlockNumber = 0
116+
c.SyncMonitorCheckInterval = 30
114117
return c.Contracts.SetDefaultValues()
115118
}
116119

117120
func (c *ChainConfig) SetExampleValues() error {
121+
c.SyncMonitorCheckInterval = 30
118122
return nil
119123
}
120124

rolling-shutter/keyperimpl/shutterservice/keyper.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package shutterservice
22

33
import (
44
"context"
5+
"time"
56

67
"github.com/ethereum/go-ethereum/ethclient"
78
gethLog "github.com/ethereum/go-ethereum/log"
@@ -35,6 +36,7 @@ type Keyper struct {
3536
registrySyncer *RegistrySyncer
3637
eonKeyPublisher *eonkeypublisher.EonKeyPublisher
3738
latestTriggeredTime *uint64
39+
syncMonitor *SyncMonitor
3840

3941
// input events
4042
newBlocks chan *syncevent.LatestBlock
@@ -111,8 +113,12 @@ func (kpr *Keyper) Start(ctx context.Context, runner service.Runner) error {
111113
return err
112114
}
113115

116+
kpr.syncMonitor = &SyncMonitor{
117+
DBPool: kpr.dbpool,
118+
CheckInterval: time.Duration(kpr.config.Chain.SyncMonitorCheckInterval) * time.Second,
119+
}
114120
runner.Go(func() error { return kpr.processInputs(ctx) })
115-
return runner.StartService(kpr.core, kpr.chainSyncClient, kpr.eonKeyPublisher)
121+
return runner.StartService(kpr.core, kpr.chainSyncClient, kpr.eonKeyPublisher, kpr.syncMonitor)
116122
}
117123

118124
func NewKeyper(kpr *Keyper, messagingMiddleware *MessagingMiddleware) (*keyper.KeyperCore, error) {
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package shutterservice
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/jackc/pgx/v4"
8+
"github.com/jackc/pgx/v4/pgxpool"
9+
"github.com/pkg/errors"
10+
"github.com/rs/zerolog/log"
11+
12+
"github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/shutterservice/database"
13+
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service"
14+
)
15+
16+
type SyncMonitor struct {
17+
DBPool *pgxpool.Pool
18+
CheckInterval time.Duration
19+
}
20+
21+
func (s *SyncMonitor) Start(ctx context.Context, runner service.Runner) error {
22+
runner.Go(func() error {
23+
return s.runMonitor(ctx)
24+
})
25+
26+
return nil
27+
}
28+
29+
func (s *SyncMonitor) runMonitor(ctx context.Context) error {
30+
var lastBlockNumber int64
31+
db := database.New(s.DBPool)
32+
33+
log.Debug().Msg("starting the sync monitor")
34+
35+
for {
36+
select {
37+
case <-time.After(s.CheckInterval):
38+
record, err := db.GetIdentityRegisteredEventsSyncedUntil(ctx)
39+
if err != nil {
40+
if errors.Is(err, pgx.ErrNoRows) {
41+
log.Warn().Err(err).Msg("no rows found in table identity_registered_events_synced_until")
42+
continue
43+
}
44+
return errors.Wrap(err, "error getting identity_registered_events_synced_until")
45+
}
46+
47+
currentBlockNumber := record.BlockNumber
48+
log.Debug().Int64("current-block-number", currentBlockNumber).Msg("current block number")
49+
50+
if currentBlockNumber > lastBlockNumber {
51+
lastBlockNumber = currentBlockNumber
52+
} else {
53+
log.Error().
54+
Int64("last-block-number", lastBlockNumber).
55+
Int64("current-block-number", currentBlockNumber).
56+
Msg("block number has not increased between checks")
57+
return errors.New("block number has not increased between checks")
58+
}
59+
case <-ctx.Done():
60+
log.Info().Msg("stopping syncMonitor due to context cancellation")
61+
return ctx.Err()
62+
}
63+
}
64+
}
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
package shutterservice
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"gotest.tools/assert"
9+
10+
"github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/shutterservice/database"
11+
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service"
12+
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/testsetup"
13+
)
14+
15+
func TestSyncMonitor_ThrowsErrorWhenBlockNotIncreasing(t *testing.T) {
16+
ctx, cancel := context.WithCancel(context.Background())
17+
defer cancel()
18+
19+
dbpool, dbclose := testsetup.NewTestDBPool(ctx, t, database.Definition)
20+
defer dbclose()
21+
db := database.New(dbpool)
22+
23+
initialBlockNumber := int64(100)
24+
25+
err := db.SetIdentityRegisteredEventSyncedUntil(ctx, database.SetIdentityRegisteredEventSyncedUntilParams{
26+
BlockHash: []byte{0x01, 0x02, 0x03},
27+
BlockNumber: initialBlockNumber,
28+
})
29+
if err != nil {
30+
t.Fatalf("failed to set initial synced data: %v", err)
31+
}
32+
33+
monitor := &SyncMonitor{
34+
DBPool: dbpool,
35+
CheckInterval: 5 * time.Second,
36+
}
37+
38+
errCh := make(chan error, 1)
39+
40+
go func() {
41+
err := service.RunWithSighandler(ctx, monitor)
42+
if err != nil {
43+
errCh <- err
44+
}
45+
}()
46+
47+
time.Sleep(12 * time.Second)
48+
49+
select {
50+
case err := <-errCh:
51+
assert.ErrorContains(t, err, "block number has not increased between checks")
52+
case <-time.After(5 * time.Second):
53+
t.Fatal("expected an error, but none was returned")
54+
}
55+
}
56+
57+
func TestSyncMonitor_HandlesBlockNumberIncreasing(t *testing.T) {
58+
ctx, cancel := context.WithCancel(context.Background())
59+
defer cancel()
60+
61+
dbpool, closeDB := testsetup.NewTestDBPool(ctx, t, database.Definition)
62+
defer closeDB()
63+
db := database.New(dbpool)
64+
65+
initialBlockNumber := int64(100)
66+
err := db.SetIdentityRegisteredEventSyncedUntil(ctx, database.SetIdentityRegisteredEventSyncedUntilParams{
67+
BlockHash: []byte{0x01, 0x02, 0x03},
68+
BlockNumber: initialBlockNumber,
69+
})
70+
if err != nil {
71+
t.Fatalf("failed to set initial synced data: %v", err)
72+
}
73+
74+
monitor := &SyncMonitor{
75+
DBPool: dbpool,
76+
CheckInterval: 5 * time.Second,
77+
}
78+
79+
_, deferFn := service.RunBackground(ctx, monitor)
80+
defer deferFn()
81+
82+
doneCh := make(chan struct{})
83+
go func() {
84+
for i := 0; i < 5; i++ {
85+
newBlockNumber := initialBlockNumber + int64(i+1)
86+
err := db.SetIdentityRegisteredEventSyncedUntil(ctx, database.SetIdentityRegisteredEventSyncedUntilParams{
87+
BlockHash: []byte{0x01, 0x02, 0x03},
88+
BlockNumber: newBlockNumber,
89+
})
90+
if err != nil {
91+
t.Errorf("failed to update block number: %v", err)
92+
return
93+
}
94+
95+
time.Sleep(5 * time.Second)
96+
}
97+
98+
doneCh <- struct{}{}
99+
}()
100+
101+
<-doneCh
102+
syncedData, err := db.GetIdentityRegisteredEventsSyncedUntil(ctx)
103+
if err != nil {
104+
t.Fatalf("failed to retrieve final block number: %v", err)
105+
}
106+
107+
assert.Equal(t, initialBlockNumber+5, syncedData.BlockNumber, "block number should have been incremented correctly")
108+
}
109+
110+
func TestSyncMonitor_ContinuesWhenNoRows(t *testing.T) {
111+
ctx, cancel := context.WithCancel(context.Background())
112+
defer cancel()
113+
114+
dbpool, closeDB := testsetup.NewTestDBPool(ctx, t, database.Definition)
115+
defer closeDB()
116+
_ = database.New(dbpool)
117+
118+
monitor := &SyncMonitor{
119+
DBPool: dbpool,
120+
CheckInterval: 5 * time.Second,
121+
}
122+
123+
monitorCtx, cancelMonitor := context.WithCancel(ctx)
124+
defer cancelMonitor()
125+
126+
errCh := make(chan error, 1)
127+
go func() {
128+
err := service.RunWithSighandler(monitorCtx, monitor)
129+
if err != nil {
130+
errCh <- err
131+
}
132+
}()
133+
134+
time.Sleep(15 * time.Second)
135+
cancelMonitor()
136+
137+
select {
138+
case err := <-errCh:
139+
t.Fatalf("expected monitor to continue without error, but got: %v", err)
140+
case <-time.After(1 * time.Second):
141+
}
142+
}

0 commit comments

Comments
 (0)