Skip to content

Commit eae7aa2

Browse files
Merge pull request #581 from shutter-network/feat/api-sync-monitor
added sync monitor for api keyper
2 parents 029a80b + 275bb51 commit eae7aa2

File tree

5 files changed

+225
-8
lines changed

5 files changed

+225
-8
lines changed

.circleci/config.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ jobs:
156156
name: "Run unit tests with gotestsum"
157157
command: |
158158
mkdir -p report/unit report/integration
159-
gotestsum -f standard-verbose --junitfile report/unit/tests.xml -- -short -race ./...
159+
gotestsum -f standard-verbose --junitfile report/unit/tests.xml -- -short -race -count=1 -p 1 ./...
160160
- run:
161161
name: Wait for PostgreSQL
162162
command: |

rolling-shutter/keyperimpl/shutterservice/config.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -81,16 +81,18 @@ func (c *Config) GetAddress() common.Address {
8181
}
8282

8383
type ChainConfig struct {
84-
Node *configuration.EthnodeConfig `shconfig:",required"`
85-
Contracts *ContractsConfig `shconfig:",required"`
86-
SyncStartBlockNumber uint64 `shconfig:",required"`
84+
Node *configuration.EthnodeConfig `shconfig:",required"`
85+
Contracts *ContractsConfig `shconfig:",required"`
86+
SyncStartBlockNumber uint64 `shconfig:",required"`
87+
SyncMonitorCheckInterval uint64 `shconfig:",required"`
8788
}
8889

8990
func NewChainConfig() *ChainConfig {
9091
c := &ChainConfig{
91-
Node: configuration.NewEthnodeConfig(),
92-
Contracts: NewContractsConfig(),
93-
SyncStartBlockNumber: 0,
92+
Node: configuration.NewEthnodeConfig(),
93+
Contracts: NewContractsConfig(),
94+
SyncStartBlockNumber: 0,
95+
SyncMonitorCheckInterval: 0,
9496
}
9597
c.Init()
9698
return c
@@ -111,10 +113,12 @@ func (c *ChainConfig) Validate() error {
111113

112114
func (c *ChainConfig) SetDefaultValues() error {
113115
c.SyncStartBlockNumber = 0
116+
c.SyncMonitorCheckInterval = 30
114117
return c.Contracts.SetDefaultValues()
115118
}
116119

117120
func (c *ChainConfig) SetExampleValues() error {
121+
c.SyncMonitorCheckInterval = 30
118122
return nil
119123
}
120124

rolling-shutter/keyperimpl/shutterservice/keyper.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package shutterservice
22

33
import (
44
"context"
5+
"time"
56

67
"github.com/ethereum/go-ethereum/ethclient"
78
gethLog "github.com/ethereum/go-ethereum/log"
@@ -35,6 +36,7 @@ type Keyper struct {
3536
registrySyncer *RegistrySyncer
3637
eonKeyPublisher *eonkeypublisher.EonKeyPublisher
3738
latestTriggeredTime *uint64
39+
syncMonitor *SyncMonitor
3840

3941
// input events
4042
newBlocks chan *syncevent.LatestBlock
@@ -111,8 +113,12 @@ func (kpr *Keyper) Start(ctx context.Context, runner service.Runner) error {
111113
return err
112114
}
113115

116+
kpr.syncMonitor = &SyncMonitor{
117+
DBPool: kpr.dbpool,
118+
CheckInterval: time.Duration(kpr.config.Chain.SyncMonitorCheckInterval) * time.Second,
119+
}
114120
runner.Go(func() error { return kpr.processInputs(ctx) })
115-
return runner.StartService(kpr.core, kpr.chainSyncClient, kpr.eonKeyPublisher)
121+
return runner.StartService(kpr.core, kpr.chainSyncClient, kpr.eonKeyPublisher, kpr.syncMonitor)
116122
}
117123

118124
func NewKeyper(kpr *Keyper, messagingMiddleware *MessagingMiddleware) (*keyper.KeyperCore, error) {
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package shutterservice
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/jackc/pgx/v4"
8+
"github.com/jackc/pgx/v4/pgxpool"
9+
"github.com/pkg/errors"
10+
"github.com/rs/zerolog/log"
11+
12+
"github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/shutterservice/database"
13+
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service"
14+
)
15+
16+
type SyncMonitor struct {
17+
DBPool *pgxpool.Pool
18+
CheckInterval time.Duration
19+
}
20+
21+
func (s *SyncMonitor) Start(ctx context.Context, runner service.Runner) error {
22+
runner.Go(func() error {
23+
return s.runMonitor(ctx)
24+
})
25+
26+
return nil
27+
}
28+
29+
func (s *SyncMonitor) runMonitor(ctx context.Context) error {
30+
var lastBlockNumber int64
31+
db := database.New(s.DBPool)
32+
33+
log.Debug().Msg("starting the sync monitor")
34+
35+
for {
36+
select {
37+
case <-time.After(s.CheckInterval):
38+
record, err := db.GetIdentityRegisteredEventsSyncedUntil(ctx)
39+
if err != nil {
40+
if errors.Is(err, pgx.ErrNoRows) {
41+
log.Warn().Err(err).Msg("no rows found in table identity_registered_events_synced_until")
42+
continue
43+
}
44+
return errors.Wrap(err, "error getting identity_registered_events_synced_until")
45+
}
46+
47+
currentBlockNumber := record.BlockNumber
48+
log.Debug().Int64("current-block-number", currentBlockNumber).Msg("current block number")
49+
50+
if currentBlockNumber > lastBlockNumber {
51+
lastBlockNumber = currentBlockNumber
52+
} else {
53+
log.Error().
54+
Int64("last-block-number", lastBlockNumber).
55+
Int64("current-block-number", currentBlockNumber).
56+
Msg("block number has not increased between checks")
57+
return errors.New("block number has not increased between checks")
58+
}
59+
case <-ctx.Done():
60+
log.Info().Msg("stopping syncMonitor due to context cancellation")
61+
return ctx.Err()
62+
}
63+
}
64+
}
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package shutterservice_test
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"gotest.tools/assert"
9+
10+
"github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/shutterservice"
11+
"github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/shutterservice/database"
12+
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service"
13+
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/testsetup"
14+
)
15+
16+
func TestAPISyncMonitor_ThrowsErrorWhenBlockNotIncreasing(t *testing.T) {
17+
ctx, cancel := context.WithCancel(context.Background())
18+
defer cancel()
19+
20+
dbpool, dbclose := testsetup.NewTestDBPool(ctx, t, database.Definition)
21+
defer dbclose()
22+
db := database.New(dbpool)
23+
24+
initialBlockNumber := int64(100)
25+
26+
err := db.SetIdentityRegisteredEventSyncedUntil(ctx, database.SetIdentityRegisteredEventSyncedUntilParams{
27+
BlockHash: []byte{0x01, 0x02, 0x03},
28+
BlockNumber: initialBlockNumber,
29+
})
30+
if err != nil {
31+
t.Fatalf("failed to set initial synced data: %v", err)
32+
}
33+
34+
monitor := &shutterservice.SyncMonitor{
35+
DBPool: dbpool,
36+
CheckInterval: 5 * time.Second,
37+
}
38+
39+
errCh := make(chan error, 1)
40+
41+
go func() {
42+
err := service.RunWithSighandler(ctx, monitor)
43+
if err != nil {
44+
errCh <- err
45+
}
46+
}()
47+
48+
time.Sleep(12 * time.Second)
49+
50+
select {
51+
case err := <-errCh:
52+
assert.ErrorContains(t, err, "block number has not increased between checks")
53+
case <-time.After(5 * time.Second):
54+
t.Fatal("expected an error, but none was returned")
55+
}
56+
}
57+
58+
func TestAPISyncMonitor_HandlesBlockNumberIncreasing(t *testing.T) {
59+
ctx, cancel := context.WithCancel(context.Background())
60+
defer cancel()
61+
62+
dbpool, closeDB := testsetup.NewTestDBPool(ctx, t, database.Definition)
63+
defer closeDB()
64+
db := database.New(dbpool)
65+
66+
initialBlockNumber := int64(100)
67+
err := db.SetIdentityRegisteredEventSyncedUntil(ctx, database.SetIdentityRegisteredEventSyncedUntilParams{
68+
BlockHash: []byte{0x01, 0x02, 0x03},
69+
BlockNumber: initialBlockNumber,
70+
})
71+
if err != nil {
72+
t.Fatalf("failed to set initial synced data: %v", err)
73+
}
74+
75+
monitor := &shutterservice.SyncMonitor{
76+
DBPool: dbpool,
77+
CheckInterval: 5 * time.Second,
78+
}
79+
80+
_, deferFn := service.RunBackground(ctx, monitor)
81+
defer deferFn()
82+
83+
doneCh := make(chan struct{})
84+
go func() {
85+
for i := 0; i < 5; i++ {
86+
newBlockNumber := initialBlockNumber + int64(i+1)
87+
err := db.SetIdentityRegisteredEventSyncedUntil(ctx, database.SetIdentityRegisteredEventSyncedUntilParams{
88+
BlockHash: []byte{0x01, 0x02, 0x03},
89+
BlockNumber: newBlockNumber,
90+
})
91+
if err != nil {
92+
t.Errorf("failed to update block number: %v", err)
93+
return
94+
}
95+
96+
time.Sleep(5 * time.Second)
97+
}
98+
99+
doneCh <- struct{}{}
100+
}()
101+
102+
<-doneCh
103+
syncedData, err := db.GetIdentityRegisteredEventsSyncedUntil(ctx)
104+
if err != nil {
105+
t.Fatalf("failed to retrieve final block number: %v", err)
106+
}
107+
108+
assert.Equal(t, initialBlockNumber+5, syncedData.BlockNumber, "block number should have been incremented correctly")
109+
}
110+
111+
func TestAPISyncMonitor_ContinuesWhenNoRows(t *testing.T) {
112+
ctx, cancel := context.WithCancel(context.Background())
113+
defer cancel()
114+
115+
dbpool, closeDB := testsetup.NewTestDBPool(ctx, t, database.Definition)
116+
defer closeDB()
117+
_ = database.New(dbpool)
118+
119+
monitor := &shutterservice.SyncMonitor{
120+
DBPool: dbpool,
121+
CheckInterval: 5 * time.Second,
122+
}
123+
124+
monitorCtx, cancelMonitor := context.WithCancel(ctx)
125+
defer cancelMonitor()
126+
127+
errCh := make(chan error, 1)
128+
go func() {
129+
err := service.RunWithSighandler(monitorCtx, monitor)
130+
if err != nil {
131+
errCh <- err
132+
}
133+
}()
134+
135+
time.Sleep(15 * time.Second)
136+
cancelMonitor()
137+
138+
select {
139+
case err := <-errCh:
140+
t.Fatalf("expected monitor to continue without error, but got: %v", err)
141+
case <-time.After(1 * time.Second):
142+
}
143+
}

0 commit comments

Comments
 (0)