@@ -11,13 +11,19 @@ import (
1111 "syscall"
1212 "time"
1313
14+ "github.com/jmoiron/sqlx"
1415 "github.com/kilnfi/cardano-validator-watcher/cmd/watcher/app/config"
1516 "github.com/kilnfi/cardano-validator-watcher/internal/blockfrost"
1617 "github.com/kilnfi/cardano-validator-watcher/internal/blockfrost/blockfrostapi"
18+ "github.com/kilnfi/cardano-validator-watcher/internal/cardano"
19+ "github.com/kilnfi/cardano-validator-watcher/internal/cardano/cardanocli"
20+ "github.com/kilnfi/cardano-validator-watcher/internal/database"
1721 "github.com/kilnfi/cardano-validator-watcher/internal/metrics"
1822 "github.com/kilnfi/cardano-validator-watcher/internal/pools"
1923 "github.com/kilnfi/cardano-validator-watcher/internal/server/http"
24+ "github.com/kilnfi/cardano-validator-watcher/internal/slotleader"
2025 "github.com/kilnfi/cardano-validator-watcher/internal/watcher"
26+ "github.com/kilnfi/cardano-validator-watcher/migrations"
2127 "github.com/prometheus/client_golang/prometheus"
2228 "golang.org/x/sync/errgroup"
2329
@@ -76,29 +82,40 @@ func NewWatcherCommand() *cobra.Command {
7682 cmd .Flags ().StringP ("http-server-host" , "" , http .ServerDefaultHost , "host on which HTTP server should listen" )
7783 cmd .Flags ().IntP ("http-server-port" , "" , http .ServerDefaultPort , "port on which HTTP server should listen" )
7884 cmd .Flags ().StringP ("network" , "" , "preprod" , "cardano network ID" )
85+ cmd .Flags ().StringP ("database-path" , "" , "watcher.db" , "path to the local database mainly used by cardano client" )
86+ cmd .Flags ().StringP ("cardano-config-dir" , "" , "/config" , "path to the directory where the cardano config files are stored" )
87+ cmd .Flags ().StringP ("cardano-timezone" , "" , "UTC" , "timezone to use with cardano-cli - https://en.wikipedia.org/wiki/List_of_tz_database_time_zones" )
88+ cmd .Flags ().StringP ("cardano-socket-path" , "" , "/var/run/cardano.socket" , "socket path to communicate with a cardano node" )
7989 cmd .Flags ().StringP ("blockfrost-project-id" , "" , "" , "blockfrost project id" )
8090 cmd .Flags ().StringP ("blockfrost-endpoint" , "" , "" , "blockfrost API endpoint" )
8191 cmd .Flags ().IntP ("blockfrost-max-routines" , "" , 10 , "number of routines used by blockfrost to perform concurrent actions" )
8292 cmd .Flags ().IntP ("blockfrost-timeout" , "" , 60 , "Timeout for requests to the Blockfrost API (in seconds)" )
83- cmd .Flags ().BoolP ("pool-watcher-enabled" , "" , true , "Enable pool watcher" )
84- cmd .Flags ().IntP ("pool-watcher-refresh-interval" , "" , 60 , "Interval at which the pool watcher collects data about the monitored pools (in seconds)" )
8593 cmd .Flags ().BoolP ("network-watcher-enabled" , "" , true , "Enable network watcher" )
8694 cmd .Flags ().IntP ("network-watcher-refresh-interval" , "" , 60 , "Interval at which the network watcher collects data about the network (in seconds)" )
95+ cmd .Flags ().BoolP ("pool-watcher-enabled" , "" , true , "Enable pool watcher" )
96+ cmd .Flags ().IntP ("pool-watcher-refresh-interval" , "" , 60 , "Interval at which the pool watcher collects data about the monitored pools (in seconds)" )
97+ cmd .Flags ().BoolP ("block-watcher-enabled" , "" , true , "Enable block watcher" )
98+ cmd .Flags ().IntP ("block-watcher-refresh-interval" , "" , 60 , "Interval at which the block watcher collects and process slots (in seconds)" )
8799
88100 // bind flag to viper
89101 checkError (viper .BindPFlag ("log-level" , cmd .Flag ("log-level" )), "unable to bind log-level flag" )
90102 checkError (viper .BindPFlag ("http.host" , cmd .Flag ("http-server-host" )), "unable to bind http-server-host flag" )
91103 checkError (viper .BindPFlag ("http.port" , cmd .Flag ("http-server-port" )), "unable to bind http-server-port flag" )
92104 checkError (viper .BindPFlag ("network" , cmd .Flag ("network" )), "unable to bind network flag" )
105+ checkError (viper .BindPFlag ("database.path" , cmd .Flag ("database-path" )), "unable to bind database-path flag" )
106+ checkError (viper .BindPFlag ("cardano.config-dir" , cmd .Flag ("cardano-config-dir" )), "unable to bind cardano-config-dir flag" )
107+ checkError (viper .BindPFlag ("cardano.timezone" , cmd .Flag ("cardano-timezone" )), "unable to bind cardano-timezone flag" )
108+ checkError (viper .BindPFlag ("cardano.socket-path" , cmd .Flag ("cardano-socket-path" )), "unable to bind cardano-socket-path flag" )
93109 checkError (viper .BindPFlag ("blockfrost.project-id" , cmd .Flag ("blockfrost-project-id" )), "unable to bind blockfrost-project-id flag" )
94110 checkError (viper .BindPFlag ("blockfrost.endpoint" , cmd .Flag ("blockfrost-endpoint" )), "unable to bind blockfrost-endpoint flag" )
95111 checkError (viper .BindPFlag ("blockfrost.max-routines" , cmd .Flag ("blockfrost-max-routines" )), "unable to bind blockfrost-max-routines flag" )
96112 checkError (viper .BindPFlag ("blockfrost.timeout" , cmd .Flag ("blockfrost-timeout" )), "unable to bind blockfrost-timeout flag" )
97- checkError (viper .BindPFlag ("pool-watcher.enabled" , cmd .Flag ("pool-watcher-enabled" )), "unable to bind pool-watcher-enabled flag" )
98- checkError (viper .BindPFlag ("pool-watcher.refresh-interval" , cmd .Flag ("pool-watcher-refresh-interval" )), "unable to bind pool-watcher-refresh-interval flag" )
99113 checkError (viper .BindPFlag ("network-watcher.enabled" , cmd .Flag ("network-watcher-enabled" )), "unable to bind network-watcher-enabled flag" )
100114 checkError (viper .BindPFlag ("network-watcher.refresh-interval" , cmd .Flag ("network-watcher-refresh-interval" )), "unable to bind network-watcher-refresh-interval flag" )
101-
115+ checkError (viper .BindPFlag ("pool-watcher.enabled" , cmd .Flag ("pool-watcher-enabled" )), "unable to bind pool-watcher-enabled flag" )
116+ checkError (viper .BindPFlag ("pool-watcher.refresh-interval" , cmd .Flag ("pool-watcher-refresh-interval" )), "unable to bind pool-watcher-refresh-interval flag" )
117+ checkError (viper .BindPFlag ("block-watcher.enabled" , cmd .Flag ("block-watcher-enabled" )), "unable to bind block-watcher-enabled flag" )
118+ checkError (viper .BindPFlag ("block-watcher.refresh-interval" , cmd .Flag ("block-watcher-refresh-interval" )), "unable to bind block-watcher-refresh-interval flag" )
102119 return cmd
103120}
104121
@@ -146,29 +163,60 @@ func run(_ *cobra.Command, _ []string) error {
146163
147164 eg , ctx := errgroup .WithContext (ctx )
148165
166+ // Connect and Init the DB
167+ dbOpts := database.Options {
168+ URL : "?_journal=WAL&_timeout=15000&_fk=true&cache=shared" ,
169+ Path : cfg .Database .Path ,
170+ MaxOpenConns : 1 ,
171+ }
172+ database := database .NewDatabase (dbOpts )
173+ if err := database .Connect (ctx ); err != nil {
174+ return fmt .Errorf ("unable to connect to database: %w" , err )
175+ }
176+ if err := database .MigrateUp (migrations .FS ); err != nil {
177+ return fmt .Errorf ("unable to migrate database: %w" , err )
178+ }
179+
149180 // Initialize blockfrost and cardano clients with options
150181 blockfrost := createBlockfrostClient ()
182+ cardano := createCardanoClient (blockfrost )
151183
152184 // Initialize prometheus metrics
153185 registry := prometheus .NewRegistry ()
154186 metrics := metrics .NewCollection ()
155187 metrics .MustRegister (registry )
156188
189+ epoch , err := blockfrost .GetLatestEpoch (ctx )
190+ if err != nil {
191+ return fmt .Errorf ("unable to get latest epoch: %w" , err )
192+ }
193+
194+ // Launch slot leader calculation for the current slot
195+ slotLeaderService := slotleader .NewSlotLeaderService (database .DB , cardano , blockfrost , cfg .Pools , metrics )
196+ if err := slotLeaderService .Refresh (ctx , epoch ); err != nil {
197+ return fmt .Errorf ("unable to refresh slot leaders: %w" , err )
198+ }
199+
157200 // Start HTTP server
158201 if err := startHTTPServer (eg , registry ); err != nil {
159202 return fmt .Errorf ("unable to start http server: %w" , err )
160203 }
161204
162- // Start Network Watcher
163- if cfg .NetworkWatcherConfig .Enabled {
164- startNetworkWatcher (ctx , eg , blockfrost , metrics )
165- }
166-
167205 // Start Pool Watcher
168206 if cfg .PoolWatcherConfig .Enabled {
169207 startPoolWatcher (ctx , eg , blockfrost , metrics , cfg .Pools )
170208 }
171209
210+ // Start Block Watcher
211+ if cfg .BlockWatcherConfig .Enabled {
212+ startBlockWatcher (ctx , eg , cardano , blockfrost , slotLeaderService , metrics , cfg .Pools , database .DB )
213+ }
214+
215+ // Start Network Watcher
216+ if cfg .NetworkWatcherConfig .Enabled {
217+ startNetworkWatcher (ctx , eg , blockfrost , metrics )
218+ }
219+
172220 <- ctx .Done ()
173221 logger .Info ("shutting down" )
174222
@@ -200,6 +248,17 @@ func createBlockfrostClient() blockfrost.Client {
200248 return blockfrostapi .NewClient (opts )
201249}
202250
251+ func createCardanoClient (blockfrost blockfrost.Client ) cardano.CardanoClient {
252+ opts := cardanocli.ClientOptions {
253+ ConfigDir : cfg .Cardano .ConfigDir ,
254+ Network : cfg .Network ,
255+ SocketPath : cfg .Cardano .SocketPath ,
256+ Timezone : cfg .Cardano .Timezone ,
257+ DBPath : cfg .Database .Path ,
258+ }
259+ return cardanocli .NewClient (opts , blockfrost , & cardanocli.RealCommandExecutor {})
260+ }
261+
203262func startHTTPServer (eg * errgroup.Group , registry * prometheus.Registry ) error {
204263 var err error
205264
@@ -255,6 +314,7 @@ func startPoolWatcher(
255314 })
256315}
257316
317+ // startNetworkWatcher starts the network watcher service
258318func startNetworkWatcher (
259319 ctx context.Context ,
260320 eg * errgroup.Group ,
@@ -279,6 +339,33 @@ func startNetworkWatcher(
279339 })
280340}
281341
342+ // startBlockWatcher starts the block watcher service
343+ func startBlockWatcher (
344+ ctx context.Context ,
345+ eg * errgroup.Group ,
346+ cardano cardano.CardanoClient ,
347+ blockfrost blockfrost.Client ,
348+ sl slotleader.SlotLeader ,
349+ metrics * metrics.Collection ,
350+ pools pools.Pools ,
351+ db * sqlx.DB ,
352+ ) {
353+ eg .Go (func () error {
354+ options := watcher.BlockWatcherOptions {
355+ RefreshInterval : time .Second * time .Duration (cfg .BlockWatcherConfig .RefreshInterval ),
356+ }
357+ blockWatcher := watcher .NewBlockWatcher (cardano , blockfrost , sl , pools , metrics , db , options )
358+ logger .Info (
359+ "starting watcher" ,
360+ slog .String ("component" , "block-watcher" ),
361+ )
362+ if err := blockWatcher .Start (ctx ); err != nil {
363+ return fmt .Errorf ("unable to start block watcher: %w" , err )
364+ }
365+ return nil
366+ })
367+ }
368+
282369// checkError is a helper function to log an error and exit the program
283370// used for the flag parsing
284371func checkError (err error , msg string ) {
0 commit comments