Skip to content

Commit 37b767b

Browse files
updated CI for asdf plugins and resolved conflicts
2 parents 5d42d99 + eae7aa2 commit 37b767b

File tree

8 files changed

+246
-12
lines changed

8 files changed

+246
-12
lines changed

.circleci/config.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,13 +150,13 @@ jobs:
150150
name: "Install asdf managed executables"
151151
command: |
152152
asdf install tinygo latest
153-
asdf install binaryen
154-
asdf install nodejs
153+
asdf install binaryen latest
154+
asdf install nodejs latest
155155
- run:
156156
name: "Run unit tests with gotestsum"
157157
command: |
158158
mkdir -p report/unit report/integration
159-
gotestsum -f standard-verbose --junitfile report/unit/tests.xml -- -short -race ./...
159+
gotestsum -f standard-verbose --junitfile report/unit/tests.xml -- -short -race -count=1 -p 1 ./...
160160
- run:
161161
name: Wait for PostgreSQL
162162
command: |

rolling-shutter/keyper/database/keyper.sqlc.gen.go

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rolling-shutter/keyper/database/sql/queries/keyper.sql

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,9 @@ LIMIT 1;
121121
-- name: DeleteShutterMessage :exec
122122
DELETE FROM tendermint_outgoing_messages WHERE id=$1;
123123

124+
-- name: DeleteShutterMessageByDesc :exec
125+
DELETE FROM tendermint_outgoing_messages WHERE description=$1;
126+
124127
-- name: InsertEon :exec
125128
INSERT INTO eons (eon, height, activation_block_number, keyper_config_index)
126129
VALUES ($1, $2, $3, $4);

rolling-shutter/keyper/smobserver/smstate.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ func (st *ShuttermintState) handleBatchConfig(
274274
keypers = append(keypers, shdb.EncodeAddress(k))
275275
}
276276
keypermetrics.MetricsKeyperBatchConfigInfo.WithLabelValues(strconv.FormatUint(e.KeyperConfigIndex, 10), strings.Join(keypers, ",")).Set(1)
277-
return queries.InsertBatchConfig(
277+
if err := queries.InsertBatchConfig(
278278
ctx,
279279
database.InsertBatchConfigParams{
280280
KeyperConfigIndex: int32(e.KeyperConfigIndex),
@@ -284,7 +284,12 @@ func (st *ShuttermintState) handleBatchConfig(
284284
Started: e.Started,
285285
ActivationBlockNumber: int64(e.ActivationBlockNumber),
286286
},
287-
)
287+
); err != nil {
288+
return err
289+
}
290+
291+
return queries.DeleteShutterMessageByDesc(ctx, fmt.Sprintf("new batch config (activation-block-number=%d, config-index=%d)",
292+
e.ActivationBlockNumber, e.KeyperConfigIndex))
288293
}
289294

290295
func (st *ShuttermintState) handleBatchConfigStarted(

rolling-shutter/keyperimpl/shutterservice/config.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -81,16 +81,18 @@ func (c *Config) GetAddress() common.Address {
8181
}
8282

8383
type ChainConfig struct {
84-
Node *configuration.EthnodeConfig `shconfig:",required"`
85-
Contracts *ContractsConfig `shconfig:",required"`
86-
SyncStartBlockNumber uint64 `shconfig:",required"`
84+
Node *configuration.EthnodeConfig `shconfig:",required"`
85+
Contracts *ContractsConfig `shconfig:",required"`
86+
SyncStartBlockNumber uint64 `shconfig:",required"`
87+
SyncMonitorCheckInterval uint64 `shconfig:",required"`
8788
}
8889

8990
func NewChainConfig() *ChainConfig {
9091
c := &ChainConfig{
91-
Node: configuration.NewEthnodeConfig(),
92-
Contracts: NewContractsConfig(),
93-
SyncStartBlockNumber: 0,
92+
Node: configuration.NewEthnodeConfig(),
93+
Contracts: NewContractsConfig(),
94+
SyncStartBlockNumber: 0,
95+
SyncMonitorCheckInterval: 0,
9496
}
9597
c.Init()
9698
return c
@@ -111,10 +113,12 @@ func (c *ChainConfig) Validate() error {
111113

112114
func (c *ChainConfig) SetDefaultValues() error {
113115
c.SyncStartBlockNumber = 0
116+
c.SyncMonitorCheckInterval = 30
114117
return c.Contracts.SetDefaultValues()
115118
}
116119

117120
func (c *ChainConfig) SetExampleValues() error {
121+
c.SyncMonitorCheckInterval = 30
118122
return nil
119123
}
120124

rolling-shutter/keyperimpl/shutterservice/keyper.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package shutterservice
33
import (
44
"context"
55
"log/slog"
6+
"time"
67

78
"github.com/ethereum/go-ethereum/ethclient"
89
gethLog "github.com/ethereum/go-ethereum/log"
@@ -35,6 +36,7 @@ type Keyper struct {
3536
registrySyncer *RegistrySyncer
3637
eonKeyPublisher *eonkeypublisher.EonKeyPublisher
3738
latestTriggeredTime *uint64
39+
syncMonitor *SyncMonitor
3840

3941
// input events
4042
newBlocks chan *syncevent.LatestBlock
@@ -111,8 +113,12 @@ func (kpr *Keyper) Start(ctx context.Context, runner service.Runner) error {
111113
return err
112114
}
113115

116+
kpr.syncMonitor = &SyncMonitor{
117+
DBPool: kpr.dbpool,
118+
CheckInterval: time.Duration(kpr.config.Chain.SyncMonitorCheckInterval) * time.Second,
119+
}
114120
runner.Go(func() error { return kpr.processInputs(ctx) })
115-
return runner.StartService(kpr.core, kpr.chainSyncClient, kpr.eonKeyPublisher)
121+
return runner.StartService(kpr.core, kpr.chainSyncClient, kpr.eonKeyPublisher, kpr.syncMonitor)
116122
}
117123

118124
func NewKeyper(kpr *Keyper, messagingMiddleware *MessagingMiddleware) (*keyper.KeyperCore, error) {
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package shutterservice
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/jackc/pgx/v4"
8+
"github.com/jackc/pgx/v4/pgxpool"
9+
"github.com/pkg/errors"
10+
"github.com/rs/zerolog/log"
11+
12+
"github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/shutterservice/database"
13+
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service"
14+
)
15+
16+
type SyncMonitor struct {
17+
DBPool *pgxpool.Pool
18+
CheckInterval time.Duration
19+
}
20+
21+
func (s *SyncMonitor) Start(ctx context.Context, runner service.Runner) error {
22+
runner.Go(func() error {
23+
return s.runMonitor(ctx)
24+
})
25+
26+
return nil
27+
}
28+
29+
func (s *SyncMonitor) runMonitor(ctx context.Context) error {
30+
var lastBlockNumber int64
31+
db := database.New(s.DBPool)
32+
33+
log.Debug().Msg("starting the sync monitor")
34+
35+
for {
36+
select {
37+
case <-time.After(s.CheckInterval):
38+
record, err := db.GetIdentityRegisteredEventsSyncedUntil(ctx)
39+
if err != nil {
40+
if errors.Is(err, pgx.ErrNoRows) {
41+
log.Warn().Err(err).Msg("no rows found in table identity_registered_events_synced_until")
42+
continue
43+
}
44+
return errors.Wrap(err, "error getting identity_registered_events_synced_until")
45+
}
46+
47+
currentBlockNumber := record.BlockNumber
48+
log.Debug().Int64("current-block-number", currentBlockNumber).Msg("current block number")
49+
50+
if currentBlockNumber > lastBlockNumber {
51+
lastBlockNumber = currentBlockNumber
52+
} else {
53+
log.Error().
54+
Int64("last-block-number", lastBlockNumber).
55+
Int64("current-block-number", currentBlockNumber).
56+
Msg("block number has not increased between checks")
57+
return errors.New("block number has not increased between checks")
58+
}
59+
case <-ctx.Done():
60+
log.Info().Msg("stopping syncMonitor due to context cancellation")
61+
return ctx.Err()
62+
}
63+
}
64+
}
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package shutterservice_test
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"gotest.tools/assert"
9+
10+
"github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/shutterservice"
11+
"github.com/shutter-network/rolling-shutter/rolling-shutter/keyperimpl/shutterservice/database"
12+
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/service"
13+
"github.com/shutter-network/rolling-shutter/rolling-shutter/medley/testsetup"
14+
)
15+
16+
func TestAPISyncMonitor_ThrowsErrorWhenBlockNotIncreasing(t *testing.T) {
17+
ctx, cancel := context.WithCancel(context.Background())
18+
defer cancel()
19+
20+
dbpool, dbclose := testsetup.NewTestDBPool(ctx, t, database.Definition)
21+
defer dbclose()
22+
db := database.New(dbpool)
23+
24+
initialBlockNumber := int64(100)
25+
26+
err := db.SetIdentityRegisteredEventSyncedUntil(ctx, database.SetIdentityRegisteredEventSyncedUntilParams{
27+
BlockHash: []byte{0x01, 0x02, 0x03},
28+
BlockNumber: initialBlockNumber,
29+
})
30+
if err != nil {
31+
t.Fatalf("failed to set initial synced data: %v", err)
32+
}
33+
34+
monitor := &shutterservice.SyncMonitor{
35+
DBPool: dbpool,
36+
CheckInterval: 5 * time.Second,
37+
}
38+
39+
errCh := make(chan error, 1)
40+
41+
go func() {
42+
err := service.RunWithSighandler(ctx, monitor)
43+
if err != nil {
44+
errCh <- err
45+
}
46+
}()
47+
48+
time.Sleep(12 * time.Second)
49+
50+
select {
51+
case err := <-errCh:
52+
assert.ErrorContains(t, err, "block number has not increased between checks")
53+
case <-time.After(5 * time.Second):
54+
t.Fatal("expected an error, but none was returned")
55+
}
56+
}
57+
58+
func TestAPISyncMonitor_HandlesBlockNumberIncreasing(t *testing.T) {
59+
ctx, cancel := context.WithCancel(context.Background())
60+
defer cancel()
61+
62+
dbpool, closeDB := testsetup.NewTestDBPool(ctx, t, database.Definition)
63+
defer closeDB()
64+
db := database.New(dbpool)
65+
66+
initialBlockNumber := int64(100)
67+
err := db.SetIdentityRegisteredEventSyncedUntil(ctx, database.SetIdentityRegisteredEventSyncedUntilParams{
68+
BlockHash: []byte{0x01, 0x02, 0x03},
69+
BlockNumber: initialBlockNumber,
70+
})
71+
if err != nil {
72+
t.Fatalf("failed to set initial synced data: %v", err)
73+
}
74+
75+
monitor := &shutterservice.SyncMonitor{
76+
DBPool: dbpool,
77+
CheckInterval: 5 * time.Second,
78+
}
79+
80+
_, deferFn := service.RunBackground(ctx, monitor)
81+
defer deferFn()
82+
83+
doneCh := make(chan struct{})
84+
go func() {
85+
for i := 0; i < 5; i++ {
86+
newBlockNumber := initialBlockNumber + int64(i+1)
87+
err := db.SetIdentityRegisteredEventSyncedUntil(ctx, database.SetIdentityRegisteredEventSyncedUntilParams{
88+
BlockHash: []byte{0x01, 0x02, 0x03},
89+
BlockNumber: newBlockNumber,
90+
})
91+
if err != nil {
92+
t.Errorf("failed to update block number: %v", err)
93+
return
94+
}
95+
96+
time.Sleep(5 * time.Second)
97+
}
98+
99+
doneCh <- struct{}{}
100+
}()
101+
102+
<-doneCh
103+
syncedData, err := db.GetIdentityRegisteredEventsSyncedUntil(ctx)
104+
if err != nil {
105+
t.Fatalf("failed to retrieve final block number: %v", err)
106+
}
107+
108+
assert.Equal(t, initialBlockNumber+5, syncedData.BlockNumber, "block number should have been incremented correctly")
109+
}
110+
111+
func TestAPISyncMonitor_ContinuesWhenNoRows(t *testing.T) {
112+
ctx, cancel := context.WithCancel(context.Background())
113+
defer cancel()
114+
115+
dbpool, closeDB := testsetup.NewTestDBPool(ctx, t, database.Definition)
116+
defer closeDB()
117+
_ = database.New(dbpool)
118+
119+
monitor := &shutterservice.SyncMonitor{
120+
DBPool: dbpool,
121+
CheckInterval: 5 * time.Second,
122+
}
123+
124+
monitorCtx, cancelMonitor := context.WithCancel(ctx)
125+
defer cancelMonitor()
126+
127+
errCh := make(chan error, 1)
128+
go func() {
129+
err := service.RunWithSighandler(monitorCtx, monitor)
130+
if err != nil {
131+
errCh <- err
132+
}
133+
}()
134+
135+
time.Sleep(15 * time.Second)
136+
cancelMonitor()
137+
138+
select {
139+
case err := <-errCh:
140+
t.Fatalf("expected monitor to continue without error, but got: %v", err)
141+
case <-time.After(1 * time.Second):
142+
}
143+
}

0 commit comments

Comments
 (0)