Skip to content

Commit c4980f3

Browse files
authored
Merge pull request #559 from shutter-network/feat/monitor-event-synch
Add last event synched monitor
2 parents a3a0ae8 + 40bf046 commit c4980f3

File tree

8 files changed

+256
-21
lines changed

8 files changed

+256
-21
lines changed

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,7 @@ log
88
node_modules
99
result
1010
temp
11-
work
11+
work
12+
.env
13+
.idea
14+
.DS_Store

play/manual/keyper-0.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ CustomBootstrapAddresses = ['/ip4/127.0.0.1/tcp/2001/p2p/12D3KooWMyutShWdqYj7fre
1515
Environment = "local"
1616
DiscoveryNamespace = "shutter-42"
1717

18+
[P2P.FloodSubDiscovery]
19+
enabled = false
20+
1821
[Gnosis]
1922
EncryptedGasLimit = 1000000
2023
MinGasPerTransaction = 21000
@@ -23,6 +26,7 @@ SecondsPerSlot = 5
2326
SlotsPerEpoch = 16
2427
GenesisSlotTimestamp = 1665396300
2528
SyncStartBlockNumber = 0
29+
SyncMonitorCheckInterval = 30
2630

2731
[Gnosis.Node]
2832
PrivateKey = 'dcb23da56656b3c8a11e2b4cdd92f430c500862f7f7fc762807d11b734e9500c'

play/manual/keyper-1.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ CustomBootstrapAddresses = ['/ip4/127.0.0.1/tcp/2001/p2p/12D3KooWMyutShWdqYj7fre
1515
Environment = "local"
1616
DiscoveryNamespace = "shutter-42"
1717

18+
[P2P.FloodSubDiscovery]
19+
enabled = false
20+
1821
[Gnosis]
1922
EncryptedGasLimit = 1000000
2023
MinGasPerTransaction = 21000
@@ -23,6 +26,7 @@ SecondsPerSlot = 5
2326
SlotsPerEpoch = 16
2427
GenesisSlotTimestamp = 1665396300
2528
SyncStartBlockNumber = 0
29+
SyncMonitorCheckInterval = 30
2630

2731
[Gnosis.Node]
2832
PrivateKey = '00b4a53228e3761ad78bd376b8293f19af36777f71d8e55a61975f8eecd8c1c1'

play/manual/keyper-2.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ CustomBootstrapAddresses = ['/ip4/127.0.0.1/tcp/2001/p2p/12D3KooWMyutShWdqYj7fre
1515
Environment = "local"
1616
DiscoveryNamespace = "shutter-42"
1717

18+
[P2P.FloodSubDiscovery]
19+
enabled = false
20+
1821
[Gnosis]
1922
EncryptedGasLimit = 1000000
2023
MinGasPerTransaction = 21000
@@ -23,6 +26,7 @@ SecondsPerSlot = 5
2326
SlotsPerEpoch = 16
2427
GenesisSlotTimestamp = 1665396300
2528
SyncStartBlockNumber = 0
29+
SyncMonitorCheckInterval = 30
2630

2731
[Gnosis.Node]
2832
PrivateKey = 'a4e901b1df81ff8fc5fa77f5bf0c15a4c8410e85fcaf19fbec47a2241b9d65d6'

rolling-shutter/keyperimpl/gnosis/config.go

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -98,28 +98,30 @@ func (c *Config) GetAddress() common.Address {
9898
}
9999

100100
type GnosisConfig struct {
101-
Node *configuration.EthnodeConfig `shconfig:",required"`
102-
Contracts *GnosisContractsConfig `shconfig:",required"`
103-
EncryptedGasLimit uint64 `shconfig:",required"`
104-
MinGasPerTransaction uint64 `shconfig:",required"`
105-
MaxTxPointerAge uint64 `shconfig:",required"`
106-
SecondsPerSlot uint64 `shconfig:",required"`
107-
SlotsPerEpoch uint64 `shconfig:",required"`
108-
GenesisSlotTimestamp uint64 `shconfig:",required"`
109-
SyncStartBlockNumber uint64 `shconfig:",required"`
101+
Node *configuration.EthnodeConfig `shconfig:",required"`
102+
Contracts *GnosisContractsConfig `shconfig:",required"`
103+
EncryptedGasLimit uint64 `shconfig:",required"`
104+
MinGasPerTransaction uint64 `shconfig:",required"`
105+
MaxTxPointerAge uint64 `shconfig:",required"`
106+
SecondsPerSlot uint64 `shconfig:",required"`
107+
SlotsPerEpoch uint64 `shconfig:",required"`
108+
GenesisSlotTimestamp uint64 `shconfig:",required"`
109+
SyncStartBlockNumber uint64 `shconfig:",required"`
110+
SyncMonitorCheckInterval uint64 `shconfig:",required"`
110111
}
111112

112113
func NewGnosisConfig() *GnosisConfig {
113114
c := &GnosisConfig{
114-
Node: configuration.NewEthnodeConfig(),
115-
Contracts: NewGnosisContractsConfig(),
116-
EncryptedGasLimit: 0,
117-
MinGasPerTransaction: 0,
118-
MaxTxPointerAge: 0,
119-
SecondsPerSlot: 0,
120-
SlotsPerEpoch: 0,
121-
GenesisSlotTimestamp: 0,
122-
SyncStartBlockNumber: 0,
115+
Node: configuration.NewEthnodeConfig(),
116+
Contracts: NewGnosisContractsConfig(),
117+
EncryptedGasLimit: 0,
118+
MinGasPerTransaction: 0,
119+
MaxTxPointerAge: 0,
120+
SecondsPerSlot: 0,
121+
SlotsPerEpoch: 0,
122+
GenesisSlotTimestamp: 0,
123+
SyncStartBlockNumber: 0,
124+
SyncMonitorCheckInterval: 0,
123125
}
124126
c.Init()
125127
return c
@@ -156,6 +158,7 @@ func (c *GnosisConfig) SetExampleValues() error {
156158
c.SlotsPerEpoch = 16
157159
c.GenesisSlotTimestamp = 1665410700
158160
c.SyncStartBlockNumber = 0
161+
c.SyncMonitorCheckInterval = 30
159162
return nil
160163
}
161164

rolling-shutter/keyperimpl/gnosis/keyper.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ type Keyper struct {
4949
validatorSyncer *ValidatorSyncer
5050
eonKeyPublisher *eonkeypublisher.EonKeyPublisher
5151
latestTriggeredSlot *uint64
52+
syncMonitor *SyncMonitor
5253

5354
// input events
5455
newBlocks chan *syncevent.LatestBlock
@@ -62,7 +63,8 @@ type Keyper struct {
6263

6364
func New(c *Config) *Keyper {
6465
return &Keyper{
65-
config: c,
66+
config: c,
67+
syncMonitor: &SyncMonitor{},
6668
}
6769
}
6870

@@ -154,8 +156,13 @@ func (kpr *Keyper) Start(ctx context.Context, runner service.Runner) error {
154156
return errors.Wrap(err, "failed to reset transaction pointer age")
155157
}
156158

159+
kpr.syncMonitor = &SyncMonitor{
160+
DBPool: kpr.dbpool,
161+
CheckInterval: time.Duration(kpr.config.Gnosis.SyncMonitorCheckInterval) * time.Second,
162+
}
163+
157164
runner.Go(func() error { return kpr.processInputs(ctx) })
158-
return runner.StartService(kpr.core, kpr.chainSyncClient, kpr.slotTicker, kpr.eonKeyPublisher)
165+
return runner.StartService(kpr.core, kpr.chainSyncClient, kpr.slotTicker, kpr.eonKeyPublisher, kpr.syncMonitor)
159166
}
160167

161168
func NewKeyper(kpr *Keyper, messagingMiddleware *MessagingMiddleware) (*keyper.KeyperCore, error) {
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package gnosis
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/jackc/pgx/v4"
8+
"github.com/jackc/pgx/v4/pgxpool"
9+
"github.com/pkg/errors"
10+
"github.com/rs/zerolog/log"
11+
12+
"github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/database"
13+
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service"
14+
)
15+
16+
type SyncMonitor struct {
17+
DBPool *pgxpool.Pool
18+
CheckInterval time.Duration
19+
}
20+
21+
func (s *SyncMonitor) Start(ctx context.Context, runner service.Runner) error {
22+
runner.Go(func() error {
23+
return s.runMonitor(ctx)
24+
})
25+
26+
return nil
27+
}
28+
29+
func (s *SyncMonitor) runMonitor(ctx context.Context) error {
30+
var lastBlockNumber int64
31+
db := database.New(s.DBPool)
32+
33+
log.Debug().Msg("starting the sync monitor")
34+
35+
for {
36+
select {
37+
case <-time.After(s.CheckInterval):
38+
record, err := db.GetTransactionSubmittedEventsSyncedUntil(ctx)
39+
if err != nil {
40+
if errors.Is(err, pgx.ErrNoRows) {
41+
log.Warn().Err(err).Msg("no rows found in table transaction_submitted_events_synced_until")
42+
continue
43+
}
44+
return errors.Wrap(err, "error getting transaction_submitted_events_synced_until")
45+
}
46+
47+
currentBlockNumber := record.BlockNumber
48+
log.Debug().Int64("current-block-number", currentBlockNumber).Msg("current block number")
49+
50+
if currentBlockNumber > lastBlockNumber {
51+
lastBlockNumber = currentBlockNumber
52+
} else {
53+
log.Error().
54+
Int64("last-block-number", lastBlockNumber).
55+
Int64("current-block-number", currentBlockNumber).
56+
Msg("block number has not increased between checks")
57+
return errors.New("block number has not increased between checks")
58+
}
59+
case <-ctx.Done():
60+
log.Info().Msg("stopping syncMonitor due to context cancellation")
61+
return ctx.Err()
62+
}
63+
}
64+
}
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
package gnosis_test
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"gotest.tools/assert"
9+
10+
"github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis"
11+
"github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/database"
12+
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service"
13+
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/testsetup"
14+
)
15+
16+
func TestSyncMonitor_ThrowsErrorWhenBlockNotIncreasing(t *testing.T) {
17+
ctx, cancel := context.WithCancel(context.Background())
18+
defer cancel()
19+
20+
dbpool, dbclose := testsetup.NewTestDBPool(ctx, t, database.Definition)
21+
defer dbclose()
22+
db := database.New(dbpool)
23+
24+
initialBlockNumber := int64(100)
25+
26+
err := db.SetTransactionSubmittedEventsSyncedUntil(ctx, database.SetTransactionSubmittedEventsSyncedUntilParams{
27+
BlockHash: []byte{0x01, 0x02, 0x03},
28+
BlockNumber: initialBlockNumber,
29+
Slot: 1,
30+
})
31+
if err != nil {
32+
t.Fatalf("failed to set initial synced data: %v", err)
33+
}
34+
35+
monitor := &gnosis.SyncMonitor{
36+
DBPool: dbpool,
37+
CheckInterval: 5 * time.Second,
38+
}
39+
40+
errCh := make(chan error, 1)
41+
42+
go func() {
43+
err := service.RunWithSighandler(ctx, monitor)
44+
if err != nil {
45+
errCh <- err
46+
}
47+
}()
48+
49+
time.Sleep(12 * time.Second)
50+
51+
select {
52+
case err := <-errCh:
53+
assert.ErrorContains(t, err, "block number has not increased between checks")
54+
case <-time.After(5 * time.Second):
55+
t.Fatal("expected an error, but none was returned")
56+
}
57+
}
58+
59+
func TestSyncMonitor_HandlesBlockNumberIncreasing(t *testing.T) {
60+
ctx, cancel := context.WithCancel(context.Background())
61+
defer cancel()
62+
63+
dbpool, closeDB := testsetup.NewTestDBPool(ctx, t, database.Definition)
64+
defer closeDB()
65+
db := database.New(dbpool)
66+
67+
initialBlockNumber := int64(100)
68+
err := db.SetTransactionSubmittedEventsSyncedUntil(ctx, database.SetTransactionSubmittedEventsSyncedUntilParams{
69+
BlockHash: []byte{0x01, 0x02, 0x03},
70+
BlockNumber: initialBlockNumber,
71+
Slot: 1,
72+
})
73+
if err != nil {
74+
t.Fatalf("failed to set initial synced data: %v", err)
75+
}
76+
77+
monitor := &gnosis.SyncMonitor{
78+
DBPool: dbpool,
79+
CheckInterval: 5 * time.Second,
80+
}
81+
82+
_, deferFn := service.RunBackground(ctx, monitor)
83+
defer deferFn()
84+
85+
doneCh := make(chan struct{})
86+
go func() {
87+
for i := 0; i < 5; i++ {
88+
newBlockNumber := initialBlockNumber + int64(i+1)
89+
err := db.SetTransactionSubmittedEventsSyncedUntil(ctx, database.SetTransactionSubmittedEventsSyncedUntilParams{
90+
BlockHash: []byte{0x01, 0x02, 0x03},
91+
BlockNumber: newBlockNumber,
92+
Slot: 1,
93+
})
94+
if err != nil {
95+
t.Errorf("failed to update block number: %v", err)
96+
return
97+
}
98+
99+
time.Sleep(5 * time.Second)
100+
}
101+
102+
doneCh <- struct{}{}
103+
}()
104+
105+
<-doneCh
106+
syncedData, err := db.GetTransactionSubmittedEventsSyncedUntil(ctx)
107+
if err != nil {
108+
t.Fatalf("failed to retrieve final block number: %v", err)
109+
}
110+
111+
assert.Equal(t, initialBlockNumber+5, syncedData.BlockNumber, "block number should have been incremented correctly")
112+
}
113+
114+
func TestSyncMonitor_ContinuesWhenNoRows(t *testing.T) {
115+
ctx, cancel := context.WithCancel(context.Background())
116+
defer cancel()
117+
118+
dbpool, closeDB := testsetup.NewTestDBPool(ctx, t, database.Definition)
119+
defer closeDB()
120+
_ = database.New(dbpool)
121+
122+
monitor := &gnosis.SyncMonitor{
123+
DBPool: dbpool,
124+
CheckInterval: 5 * time.Second,
125+
}
126+
127+
monitorCtx, cancelMonitor := context.WithCancel(ctx)
128+
defer cancelMonitor()
129+
130+
errCh := make(chan error, 1)
131+
go func() {
132+
err := service.RunWithSighandler(monitorCtx, monitor)
133+
if err != nil {
134+
errCh <- err
135+
}
136+
}()
137+
138+
time.Sleep(15 * time.Second)
139+
cancelMonitor()
140+
141+
select {
142+
case err := <-errCh:
143+
t.Fatalf("expected monitor to continue without error, but got: %v", err)
144+
case <-time.After(1 * time.Second):
145+
}
146+
}

0 commit comments

Comments
 (0)