Skip to content

Commit d3f0d46

Browse files
author
ylembachar
committed
add last event synched monitor
1 parent a3a0ae8 commit d3f0d46

File tree

4 files changed

+222
-4
lines changed

4 files changed

+222
-4
lines changed

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,7 @@ log
88
node_modules
99
result
1010
temp
11-
work
11+
work
12+
.env
13+
.idea
14+
.DS_Store

rolling-shutter/keyperimpl/gnosis/keyper.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ type Keyper struct {
4949
validatorSyncer *ValidatorSyncer
5050
eonKeyPublisher *eonkeypublisher.EonKeyPublisher
5151
latestTriggeredSlot *uint64
52+
syncMonitor *SyncMonitor
5253

5354
// input events
5455
newBlocks chan *syncevent.LatestBlock
@@ -62,7 +63,8 @@ type Keyper struct {
6263

6364
func New(c *Config) *Keyper {
6465
return &Keyper{
65-
config: c,
66+
config: c,
67+
syncMonitor: &SyncMonitor{},
6668
}
6769
}
6870

@@ -154,8 +156,12 @@ func (kpr *Keyper) Start(ctx context.Context, runner service.Runner) error {
154156
return errors.Wrap(err, "failed to reset transaction pointer age")
155157
}
156158

159+
kpr.syncMonitor = &SyncMonitor{
160+
DBPool: kpr.dbpool,
161+
}
162+
157163
runner.Go(func() error { return kpr.processInputs(ctx) })
158-
return runner.StartService(kpr.core, kpr.chainSyncClient, kpr.slotTicker, kpr.eonKeyPublisher)
164+
return runner.StartService(kpr.core, kpr.chainSyncClient, kpr.slotTicker, kpr.eonKeyPublisher, kpr.syncMonitor)
159165
}
160166

161167
func NewKeyper(kpr *Keyper, messagingMiddleware *MessagingMiddleware) (*keyper.KeyperCore, error) {
@@ -295,4 +301,4 @@ func (kpr *Keyper) channelNewKeyperSet(_ context.Context, ev *syncevent.KeyperSe
295301
func (kpr *Keyper) channelNewEonPublicKey(_ context.Context, key keyper.EonPublicKey) error {
296302
kpr.newEonPublicKeys <- key
297303
return nil
298-
}
304+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package gnosis
2+
3+
import (
4+
"context"
5+
"github.com/jackc/pgx/v4/pgxpool"
6+
"github.com/pkg/errors"
7+
"github.com/rs/zerolog/log"
8+
"github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis/database"
9+
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service"
10+
"time"
11+
12+
_ "github.com/lib/pq"
13+
)
14+
15+
const (
16+
checkInterval = 30 * time.Second
17+
)
18+
19+
type SyncMonitor struct {
20+
DBPool *pgxpool.Pool
21+
}
22+
23+
func (s *SyncMonitor) Start(ctx context.Context, runner service.Runner) error {
24+
runner.Go(func() error {
25+
return s.runMonitor(ctx)
26+
})
27+
28+
return nil
29+
}
30+
31+
func (s *SyncMonitor) runMonitor(ctx context.Context) error {
32+
var lastBlockNumber int64
33+
db := database.New(s.DBPool)
34+
35+
for {
36+
select {
37+
case <-time.After(checkInterval):
38+
record, err := db.GetTransactionSubmittedEventsSyncedUntil(ctx)
39+
if err != nil {
40+
log.Warn().Err(err).Msg("error fetching block number")
41+
continue
42+
}
43+
44+
currentBlockNumber := record.BlockNumber
45+
log.Debug().Int64("current-block-number", currentBlockNumber).Msg("current block number")
46+
47+
if currentBlockNumber > lastBlockNumber {
48+
lastBlockNumber = currentBlockNumber
49+
} else {
50+
log.Error().
51+
Int64("last-block-number", lastBlockNumber).
52+
Int64("current-block-number", currentBlockNumber).
53+
Msg("block number has not increased between checks")
54+
return errors.New("block number has not increased between checks")
55+
}
56+
case <-ctx.Done():
57+
log.Info().Msg("stopping syncMonitor due to context cancellation")
58+
return nil
59+
}
60+
}
61+
}
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
package gnosis_test
2+
3+
import (
4+
"context"
5+
"github.com/rs/zerolog/log"
6+
"github.com/shutter-network/rolling-shutter/rolling-shutter/keyper/database"
7+
"github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/gnosis"
8+
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service"
9+
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/testsetup"
10+
"gotest.tools/assert"
11+
"testing"
12+
"time"
13+
)
14+
15+
func TestSyncMonitor_ThrowsErrorWhenBlockNotIncreasing(t *testing.T) {
16+
ctx, cancel := context.WithCancel(context.Background())
17+
defer cancel()
18+
19+
dbpool, closeDB := testsetup.NewTestDBPool(ctx, t, database.Definition)
20+
defer closeDB()
21+
22+
_, err := dbpool.Exec(ctx, `
23+
CREATE TABLE IF NOT EXISTS transaction_submitted_events_synced_until(
24+
enforce_one_row bool PRIMARY KEY DEFAULT true,
25+
block_hash bytea NOT NULL,
26+
block_number bigint NOT NULL CHECK (block_number >= 0),
27+
slot bigint NOT NULL CHECK (slot >= 0)
28+
);
29+
`)
30+
if err != nil {
31+
t.Fatalf("failed to create table: %v", err)
32+
}
33+
34+
initialBlockNumber := int64(100)
35+
_, err = dbpool.Exec(ctx, `
36+
INSERT INTO transaction_submitted_events_synced_until (block_hash, block_number, slot)
37+
VALUES ($1, $2, $3);
38+
`, []byte{0x01, 0x02, 0x03}, initialBlockNumber, 1)
39+
if err != nil {
40+
t.Fatalf("failed to insert initial data: %v", err)
41+
}
42+
43+
monitor := &gnosis.SyncMonitor{
44+
DBPool: dbpool,
45+
}
46+
47+
errCh := make(chan error, 1)
48+
49+
go func() {
50+
err := service.RunWithSighandler(ctx, monitor)
51+
if err != nil {
52+
errCh <- err
53+
}
54+
}()
55+
56+
time.Sleep(80 * time.Second)
57+
58+
select {
59+
case err := <-errCh:
60+
assert.ErrorContains(t, err, "block number has not increased between checks")
61+
case <-time.After(5 * time.Second):
62+
t.Fatal("expected an error, but none was returned")
63+
}
64+
65+
}
66+
67+
func TestSyncMonitor_HandlesBlockNumberIncreasing(t *testing.T) {
68+
ctx, cancel := context.WithCancel(context.Background())
69+
defer cancel()
70+
71+
dbpool, closeDB := testsetup.NewTestDBPool(ctx, t, database.Definition)
72+
defer closeDB()
73+
74+
_, err := dbpool.Exec(ctx, `
75+
CREATE TABLE IF NOT EXISTS transaction_submitted_events_synced_until(
76+
enforce_one_row bool PRIMARY KEY DEFAULT true,
77+
block_hash bytea NOT NULL,
78+
block_number bigint NOT NULL CHECK (block_number >= 0),
79+
slot bigint NOT NULL CHECK (slot >= 0)
80+
);
81+
`)
82+
if err != nil {
83+
t.Fatalf("failed to create table: %v", err)
84+
}
85+
86+
initialBlockNumber := int64(100)
87+
_, err = dbpool.Exec(ctx, `
88+
INSERT INTO transaction_submitted_events_synced_until (block_hash, block_number, slot)
89+
VALUES ($1, $2, $3);
90+
`, []byte{0x01, 0x02, 0x03}, initialBlockNumber, 1)
91+
if err != nil {
92+
t.Fatalf("failed to insert initial data: %v", err)
93+
}
94+
95+
var count int
96+
err = dbpool.QueryRow(ctx, `
97+
SELECT count(*) FROM transaction_submitted_events_synced_until
98+
WHERE block_number = $1;
99+
`, initialBlockNumber).Scan(&count)
100+
if err != nil {
101+
t.Fatalf("failed to verify initial data: %v", err)
102+
}
103+
104+
assert.Equal(t, 1, count, "initial data should be inserted")
105+
106+
monitor := &gnosis.SyncMonitor{
107+
DBPool: dbpool,
108+
}
109+
110+
_, deferFn := service.RunBackground(ctx, monitor)
111+
defer deferFn()
112+
113+
doneCh := make(chan struct{})
114+
go func() {
115+
for i := 0; i < 5; i++ {
116+
// Simulate block number increment by updating the database
117+
newBlockNumber := initialBlockNumber + int64(i+1)
118+
log.Info().
119+
Int64("previous-block-number", initialBlockNumber+int64(i)).
120+
Int64("new-block-number", newBlockNumber).
121+
Msg("comparing blocks")
122+
123+
_, err := dbpool.Exec(ctx, `
124+
UPDATE transaction_submitted_events_synced_until
125+
SET block_number = $1
126+
WHERE block_number = $2;
127+
`, newBlockNumber, initialBlockNumber+int64(i))
128+
if err != nil {
129+
t.Fatalf("failed to update block number: %v", err)
130+
}
131+
132+
time.Sleep(30 * time.Second)
133+
}
134+
135+
doneCh <- struct{}{}
136+
}()
137+
138+
select {
139+
case <-doneCh:
140+
var finalBlockNumber int64
141+
err = dbpool.QueryRow(ctx, `SELECT block_number FROM transaction_submitted_events_synced_until LIMIT 1;`).Scan(&finalBlockNumber)
142+
if err != nil {
143+
t.Fatalf("failed to retrieve final block number: %v", err)
144+
}
145+
146+
assert.Equal(t, initialBlockNumber+5, finalBlockNumber, "block number should have been incremented correctly")
147+
}
148+
}

0 commit comments

Comments
 (0)