Skip to content

Commit efa4fef

Browse files
committed
feat: add blocks watcher
1 parent a1f823b commit efa4fef

File tree

11 files changed

+2069
-14
lines changed

11 files changed

+2069
-14
lines changed

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
11
config.yaml
22
.codegpt
3-
bin/
3+
bin/
4+
config/
5+
*.db*

cmd/watcher/app/config/config.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,19 @@ type Config struct {
1111
Pools pools.Pools `mapstructure:"pools"`
1212
HTTP HTTPConfig `mapstructure:"http"`
1313
Network string `mapstructure:"network"`
14+
Cardano CardanoConfig `mapstructure:"cardano"`
15+
Database DatabaseConfig `mapstructure:"database"`
1416
Blockfrost BlockFrostConfig `mapstructure:"blockfrost"`
17+
BlockWatcherConfig BlockWatcherConfig `mapstructure:"block-watcher"`
1518
PoolWatcherConfig PoolWatcherConfig `mapstructure:"pool-watcher"`
1619
NetworkWatcherConfig NetworkWatcherConfig `mapstructure:"network-watcher"`
1720
}
1821

22+
type BlockWatcherConfig struct {
23+
Enabled bool `mapstructure:"enabled"`
24+
RefreshInterval int `mapstructure:"refresh-interval"`
25+
}
26+
1927
type PoolWatcherConfig struct {
2028
Enabled bool `mapstructure:"enabled"`
2129
RefreshInterval int `mapstructure:"refresh-interval"`
@@ -38,6 +46,16 @@ type BlockFrostConfig struct {
3846
Timeout int `mapstructure:"timeout"`
3947
}
4048

49+
type CardanoConfig struct {
50+
ConfigDir string `mapstructure:"config-dir"`
51+
Timezone string `mapstructure:"timezone"`
52+
SocketPath string `mapstructure:"socket-path"`
53+
}
54+
55+
type DatabaseConfig struct {
56+
Path string `mapstructure:"path"`
57+
}
58+
4159
func (c *Config) Validate() error {
4260
switch c.Network {
4361
case "mainnet", "preprod":

cmd/watcher/app/watcher.go

Lines changed: 97 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,19 @@ import (
1111
"syscall"
1212
"time"
1313

14+
"github.com/jmoiron/sqlx"
1415
"github.com/kilnfi/cardano-validator-watcher/cmd/watcher/app/config"
1516
"github.com/kilnfi/cardano-validator-watcher/internal/blockfrost"
1617
"github.com/kilnfi/cardano-validator-watcher/internal/blockfrost/blockfrostapi"
18+
"github.com/kilnfi/cardano-validator-watcher/internal/cardano"
19+
"github.com/kilnfi/cardano-validator-watcher/internal/cardano/cardanocli"
20+
"github.com/kilnfi/cardano-validator-watcher/internal/database"
1721
"github.com/kilnfi/cardano-validator-watcher/internal/metrics"
1822
"github.com/kilnfi/cardano-validator-watcher/internal/pools"
1923
"github.com/kilnfi/cardano-validator-watcher/internal/server/http"
24+
"github.com/kilnfi/cardano-validator-watcher/internal/slotleader"
2025
"github.com/kilnfi/cardano-validator-watcher/internal/watcher"
26+
"github.com/kilnfi/cardano-validator-watcher/migrations"
2127
"github.com/prometheus/client_golang/prometheus"
2228
"golang.org/x/sync/errgroup"
2329

@@ -76,29 +82,40 @@ func NewWatcherCommand() *cobra.Command {
7682
cmd.Flags().StringP("http-server-host", "", http.ServerDefaultHost, "host on which HTTP server should listen")
7783
cmd.Flags().IntP("http-server-port", "", http.ServerDefaultPort, "port on which HTTP server should listen")
7884
cmd.Flags().StringP("network", "", "preprod", "cardano network ID")
85+
cmd.Flags().StringP("database-path", "", "watcher.db", "path to the local database mainly used by cardano client")
86+
cmd.Flags().StringP("cardano-config-dir", "", "/config", "path to the directory where the cardano config files are stored")
87+
cmd.Flags().StringP("cardano-timezone", "", "UTC", "timezone to use with cardano-cli - https://en.wikipedia.org/wiki/List_of_tz_database_time_zones")
88+
cmd.Flags().StringP("cardano-socket-path", "", "/var/run/cardano.socket", "socket path to communicate with a cardano node")
7989
cmd.Flags().StringP("blockfrost-project-id", "", "", "blockfrost project id")
8090
cmd.Flags().StringP("blockfrost-endpoint", "", "", "blockfrost API endpoint")
8191
cmd.Flags().IntP("blockfrost-max-routines", "", 10, "number of routines used by blockfrost to perform concurrent actions")
8292
cmd.Flags().IntP("blockfrost-timeout", "", 60, "Timeout for requests to the Blockfrost API (in seconds)")
83-
cmd.Flags().BoolP("pool-watcher-enabled", "", true, "Enable pool watcher")
84-
cmd.Flags().IntP("pool-watcher-refresh-interval", "", 60, "Interval at which the pool watcher collects data about the monitored pools (in seconds)")
8593
cmd.Flags().BoolP("network-watcher-enabled", "", true, "Enable network watcher")
8694
cmd.Flags().IntP("network-watcher-refresh-interval", "", 60, "Interval at which the network watcher collects data about the network (in seconds)")
95+
cmd.Flags().BoolP("pool-watcher-enabled", "", true, "Enable pool watcher")
96+
cmd.Flags().IntP("pool-watcher-refresh-interval", "", 60, "Interval at which the pool watcher collects data about the monitored pools (in seconds)")
97+
cmd.Flags().BoolP("block-watcher-enabled", "", true, "Enable block watcher")
98+
cmd.Flags().IntP("block-watcher-refresh-interval", "", 60, "Interval at which the block watcher collects and process slots (in seconds)")
8799

88100
// bind flag to viper
89101
checkError(viper.BindPFlag("log-level", cmd.Flag("log-level")), "unable to bind log-level flag")
90102
checkError(viper.BindPFlag("http.host", cmd.Flag("http-server-host")), "unable to bind http-server-host flag")
91103
checkError(viper.BindPFlag("http.port", cmd.Flag("http-server-port")), "unable to bind http-server-port flag")
92104
checkError(viper.BindPFlag("network", cmd.Flag("network")), "unable to bind network flag")
105+
checkError(viper.BindPFlag("database.path", cmd.Flag("database-path")), "unable to bind database-path flag")
106+
checkError(viper.BindPFlag("cardano.config-dir", cmd.Flag("cardano-config-dir")), "unable to bind cardano-config-dir flag")
107+
checkError(viper.BindPFlag("cardano.timezone", cmd.Flag("cardano-timezone")), "unable to bind cardano-timezone flag")
108+
checkError(viper.BindPFlag("cardano.socket-path", cmd.Flag("cardano-socket-path")), "unable to bind cardano-socket-path flag")
93109
checkError(viper.BindPFlag("blockfrost.project-id", cmd.Flag("blockfrost-project-id")), "unable to bind blockfrost-project-id flag")
94110
checkError(viper.BindPFlag("blockfrost.endpoint", cmd.Flag("blockfrost-endpoint")), "unable to bind blockfrost-endpoint flag")
95111
checkError(viper.BindPFlag("blockfrost.max-routines", cmd.Flag("blockfrost-max-routines")), "unable to bind blockfrost-max-routines flag")
96112
checkError(viper.BindPFlag("blockfrost.timeout", cmd.Flag("blockfrost-timeout")), "unable to bind blockfrost-timeout flag")
97-
checkError(viper.BindPFlag("pool-watcher.enabled", cmd.Flag("pool-watcher-enabled")), "unable to bind pool-watcher-enabled flag")
98-
checkError(viper.BindPFlag("pool-watcher.refresh-interval", cmd.Flag("pool-watcher-refresh-interval")), "unable to bind pool-watcher-refresh-interval flag")
99113
checkError(viper.BindPFlag("network-watcher.enabled", cmd.Flag("network-watcher-enabled")), "unable to bind network-watcher-enabled flag")
100114
checkError(viper.BindPFlag("network-watcher.refresh-interval", cmd.Flag("network-watcher-refresh-interval")), "unable to bind network-watcher-refresh-interval flag")
101-
115+
checkError(viper.BindPFlag("pool-watcher.enabled", cmd.Flag("pool-watcher-enabled")), "unable to bind pool-watcher-enabled flag")
116+
checkError(viper.BindPFlag("pool-watcher.refresh-interval", cmd.Flag("pool-watcher-refresh-interval")), "unable to bind pool-watcher-refresh-interval flag")
117+
checkError(viper.BindPFlag("block-watcher.enabled", cmd.Flag("block-watcher-enabled")), "unable to bind block-watcher-enabled flag")
118+
checkError(viper.BindPFlag("block-watcher.refresh-interval", cmd.Flag("block-watcher-refresh-interval")), "unable to bind block-watcher-refresh-interval flag")
102119
return cmd
103120
}
104121

@@ -146,29 +163,60 @@ func run(_ *cobra.Command, _ []string) error {
146163

147164
eg, ctx := errgroup.WithContext(ctx)
148165

166+
// Connect and Init the DB
167+
dbOpts := database.Options{
168+
URL: "?_journal=WAL&_timeout=15000&_fk=true&cache=shared",
169+
Path: cfg.Database.Path,
170+
MaxOpenConns: 1,
171+
}
172+
database := database.NewDatabase(dbOpts)
173+
if err := database.Connect(ctx); err != nil {
174+
return fmt.Errorf("unable to connect to database: %w", err)
175+
}
176+
if err := database.MigrateUp(migrations.FS); err != nil {
177+
return fmt.Errorf("unable to migrate database: %w", err)
178+
}
179+
149180
// Initialize blockfrost and cardano clients with options
150181
blockfrost := createBlockfrostClient()
182+
cardano := createCardanoClient(blockfrost)
151183

152184
// Initialize prometheus metrics
153185
registry := prometheus.NewRegistry()
154186
metrics := metrics.NewCollection()
155187
metrics.MustRegister(registry)
156188

189+
epoch, err := blockfrost.GetLatestEpoch(ctx)
190+
if err != nil {
191+
return fmt.Errorf("unable to get latest epoch: %w", err)
192+
}
193+
194+
// Launch slot leader calculation for the current slot
195+
slotLeaderService := slotleader.NewSlotLeaderService(database.DB, cardano, blockfrost, cfg.Pools, metrics)
196+
if err := slotLeaderService.Refresh(ctx, epoch); err != nil {
197+
return fmt.Errorf("unable to refresh slot leaders: %w", err)
198+
}
199+
157200
// Start HTTP server
158201
if err := startHTTPServer(eg, registry); err != nil {
159202
return fmt.Errorf("unable to start http server: %w", err)
160203
}
161204

162-
// Start Network Watcher
163-
if cfg.NetworkWatcherConfig.Enabled {
164-
startNetworkWatcher(ctx, eg, blockfrost, metrics)
165-
}
166-
167205
// Start Pool Watcher
168206
if cfg.PoolWatcherConfig.Enabled {
169207
startPoolWatcher(ctx, eg, blockfrost, metrics, cfg.Pools)
170208
}
171209

210+
// Start Block Watcher
211+
if cfg.BlockWatcherConfig.Enabled {
212+
startBlockWatcher(ctx, eg, cardano, blockfrost, slotLeaderService, metrics, cfg.Pools, database.DB)
213+
}
214+
215+
// Start Network Watcher
216+
if cfg.NetworkWatcherConfig.Enabled {
217+
startNetworkWatcher(ctx, eg, blockfrost, metrics)
218+
}
219+
172220
<-ctx.Done()
173221
logger.Info("shutting down")
174222

@@ -200,6 +248,17 @@ func createBlockfrostClient() blockfrost.Client {
200248
return blockfrostapi.NewClient(opts)
201249
}
202250

251+
func createCardanoClient(blockfrost blockfrost.Client) cardano.CardanoClient {
252+
opts := cardanocli.ClientOptions{
253+
ConfigDir: cfg.Cardano.ConfigDir,
254+
Network: cfg.Network,
255+
SocketPath: cfg.Cardano.SocketPath,
256+
Timezone: cfg.Cardano.Timezone,
257+
DBPath: cfg.Database.Path,
258+
}
259+
return cardanocli.NewClient(opts, blockfrost, &cardanocli.RealCommandExecutor{})
260+
}
261+
203262
func startHTTPServer(eg *errgroup.Group, registry *prometheus.Registry) error {
204263
var err error
205264

@@ -255,6 +314,7 @@ func startPoolWatcher(
255314
})
256315
}
257316

317+
// startNetworkWatcher starts the network watcher service
258318
func startNetworkWatcher(
259319
ctx context.Context,
260320
eg *errgroup.Group,
@@ -279,6 +339,33 @@ func startNetworkWatcher(
279339
})
280340
}
281341

342+
// startBlockWatcher starts the block watcher service
343+
func startBlockWatcher(
344+
ctx context.Context,
345+
eg *errgroup.Group,
346+
cardano cardano.CardanoClient,
347+
blockfrost blockfrost.Client,
348+
sl slotleader.SlotLeader,
349+
metrics *metrics.Collection,
350+
pools pools.Pools,
351+
db *sqlx.DB,
352+
) {
353+
eg.Go(func() error {
354+
options := watcher.BlockWatcherOptions{
355+
RefreshInterval: time.Second * time.Duration(cfg.BlockWatcherConfig.RefreshInterval),
356+
}
357+
blockWatcher := watcher.NewBlockWatcher(cardano, blockfrost, sl, pools, metrics, db, options)
358+
logger.Info(
359+
"starting watcher",
360+
slog.String("component", "block-watcher"),
361+
)
362+
if err := blockWatcher.Start(ctx); err != nil {
363+
return fmt.Errorf("unable to start block watcher: %w", err)
364+
}
365+
return nil
366+
})
367+
}
368+
282369
// checkError is a helper function to log an error and exit the program
283370
// used for the flag parsing
284371
func checkError(err error, msg string) {

internal/metrics/metrics.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,13 @@ type Collection struct {
2020
PoolsPledgeMet *prometheus.GaugeVec
2121
PoolsSaturationLevel *prometheus.GaugeVec
2222
MonitoredValidatorsCount *prometheus.GaugeVec
23+
MissedBlocks *prometheus.CounterVec
24+
ConsecutiveMissedBlocks *prometheus.GaugeVec
25+
OrphanedBlocks *prometheus.CounterVec
26+
ValidatedBlocks *prometheus.CounterVec
27+
ExpectedBlocks *prometheus.GaugeVec
28+
LatestSlotProcessedByBlockWatcher prometheus.Gauge
29+
NextSlotLeader *prometheus.GaugeVec
2330
}
2431

2532
func NewCollection() *Collection {
@@ -127,6 +134,61 @@ func NewCollection() *Collection {
127134
},
128135
[]string{"status"},
129136
),
137+
MissedBlocks: prometheus.NewCounterVec(
138+
prometheus.CounterOpts{
139+
Namespace: "cardano_validator_watcher",
140+
Name: "missed_blocks_total",
141+
Help: "number of missed blocks in the current epoch",
142+
},
143+
[]string{"pool_name", "pool_id", "pool_instance", "epoch"},
144+
),
145+
ConsecutiveMissedBlocks: prometheus.NewGaugeVec(
146+
prometheus.GaugeOpts{
147+
Namespace: "cardano_validator_watcher",
148+
Name: "consecutive_missed_blocks",
149+
Help: "number of consecutive missed blocks in a row",
150+
},
151+
[]string{"pool_name", "pool_id", "pool_instance", "epoch"},
152+
),
153+
ValidatedBlocks: prometheus.NewCounterVec(
154+
prometheus.CounterOpts{
155+
Namespace: "cardano_validator_watcher",
156+
Name: "validated_blocks_total",
157+
Help: "number of validated blocks in the current epoch",
158+
},
159+
[]string{"pool_name", "pool_id", "pool_instance", "epoch"},
160+
),
161+
OrphanedBlocks: prometheus.NewCounterVec(
162+
prometheus.CounterOpts{
163+
Namespace: "cardano_validator_watcher",
164+
Name: "orphaned_blocks_total",
165+
Help: "number of orphaned blocks in the current epoch",
166+
},
167+
[]string{"pool_name", "pool_id", "pool_instance", "epoch"},
168+
),
169+
ExpectedBlocks: prometheus.NewGaugeVec(
170+
prometheus.GaugeOpts{
171+
Namespace: "cardano_validator_watcher",
172+
Name: "expected_blocks",
173+
Help: "number of expected blocks in the current epoch",
174+
},
175+
[]string{"pool_name", "pool_id", "pool_instance", "epoch"},
176+
),
177+
LatestSlotProcessedByBlockWatcher: prometheus.NewGauge(
178+
prometheus.GaugeOpts{
179+
Namespace: "cardano_validator_watcher",
180+
Name: "latest_slot_processed_by_block_watcher",
181+
Help: "latest slot processed by the block watcher",
182+
},
183+
),
184+
NextSlotLeader: prometheus.NewGaugeVec(
185+
prometheus.GaugeOpts{
186+
Namespace: "cardano_validator_watcher",
187+
Name: "next_slot_leader",
188+
Help: "next slot leader for each monitored pool",
189+
},
190+
[]string{"pool_name", "pool_id", "pool_instance", "epoch"},
191+
),
130192
}
131193
}
132194

@@ -147,4 +209,11 @@ func (m *Collection) MustRegister(reg prometheus.Registerer) {
147209
reg.MustRegister(m.PoolsPledgeMet)
148210
reg.MustRegister(m.PoolsSaturationLevel)
149211
reg.MustRegister(m.MonitoredValidatorsCount)
212+
reg.MustRegister(m.MissedBlocks)
213+
reg.MustRegister(m.ConsecutiveMissedBlocks)
214+
reg.MustRegister(m.ValidatedBlocks)
215+
reg.MustRegister(m.OrphanedBlocks)
216+
reg.MustRegister(m.ExpectedBlocks)
217+
reg.MustRegister(m.LatestSlotProcessedByBlockWatcher)
218+
reg.MustRegister(m.NextSlotLeader)
150219
}

internal/metrics/metrics_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,18 @@ func TestMustRegister(t *testing.T) {
2222
metrics.PoolsPledgeMet.WithLabelValues("pool_name", "pool_id", "pool_instance").Set(1)
2323
metrics.PoolsSaturationLevel.WithLabelValues("pool_name", "pool_id", "pool_instance").Set(85)
2424
metrics.MonitoredValidatorsCount.WithLabelValues("active").Set(10)
25+
metrics.MissedBlocks.WithLabelValues("pool_name", "pool_id", "pool_instance", "epoch").Inc()
26+
metrics.ConsecutiveMissedBlocks.WithLabelValues("pool_name", "pool_id", "pool_instance", "epoch").Inc()
27+
metrics.ValidatedBlocks.WithLabelValues("pool_name", "pool_id", "pool_instance", "epoch").Inc()
28+
metrics.OrphanedBlocks.WithLabelValues("pool_name", "pool_id", "pool_instance", "epoch").Inc()
29+
metrics.ExpectedBlocks.WithLabelValues("pool_name", "pool_id", "pool_instance", "epoch").Set(2)
30+
metrics.NextSlotLeader.WithLabelValues("pool_name", "pool_id", "pool_instance", "epoch").Set(2)
2531

2632
registry := prometheus.NewRegistry()
2733
metrics.MustRegister(registry)
2834

2935
// The expected number of metrics to be registered, based on the definitions provided in the Collection struct.
30-
expectedMetricsCount := 14
36+
expectedMetricsCount := 21
3137

3238
var totalRegisteredMetrics int
3339
size, _ := registry.Gather()

internal/watcher/errors.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package watcher
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
)
7+
8+
var (
9+
ErrBlockFrostAPINotReachable = errors.New("blockfrost API is not reachable")
10+
ErrCardanoNodeNotReachable = errors.New("cardano node is not reachable")
11+
)
12+
13+
type ErrNoSlotsAssignedToPool struct {
14+
PoolID string
15+
Epoch int
16+
}
17+
18+
func (e *ErrNoSlotsAssignedToPool) Error() string {
19+
return fmt.Sprintf("Pool %s has no slots assigned for epoch %d. Consider excluding this pool.", e.PoolID, e.Epoch)
20+
}

0 commit comments

Comments
 (0)