diff --git a/cmd/cli/cli.go b/cmd/cli/cli.go index d45514e..71b68a4 100644 --- a/cmd/cli/cli.go +++ b/cmd/cli/cli.go @@ -3,10 +3,14 @@ package cli import ( "context" "fmt" + "net/http" "os" + "runtime" "strconv" "strings" + _ "net/http/pprof" + "github.com/rs/zerolog/log" "github.com/mitchellh/mapstructure" @@ -112,6 +116,11 @@ func Cmd() *cobra.Command { } func Start() error { + log.Info().Msg("go run pprof") + go func() { + log.Info().Any("pprof", http.ListenAndServe("0.0.0.0:7070", nil)).Msg("started pprof") + }() + runtime.SetBlockProfileRate(1) log.Info().Msg("Starting Observer") // start services here ctx := context.Background() diff --git a/docker-compose.yml b/docker-compose.yml index 0572d98..3680f87 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -19,11 +19,14 @@ services: - P2P_DISCOVERY_NAMESPACE=${P2P_DISCOVERY_NAMESPACE} ports: - "23003:23003" + - "7070:6060" command: ["start", "--rpc-url", "${RPC_URL}", "--beacon-api-url", "${BEACON_API_URL}", "--sequencer-contract-address", "${SEQUENCER_CONTRACT_ADDRESS}", "--validator-registry-contract-address", "${VALIDATOR_REGISTRY_CONTRACT_ADDRESS}", "--p2pkey", "${P2P_KEY}"] db: image: postgres:14.12 restart: unless-stopped + ports: + - 5432:5432 environment: - POSTGRES_USER=postgres - POSTGRES_PASSWORD=${DB_PASSWORD} diff --git a/internal/watcher/blocks.go b/internal/watcher/blocks.go index 51c68bf..f10bda4 100644 --- a/internal/watcher/blocks.go +++ b/internal/watcher/blocks.go @@ -34,13 +34,16 @@ func (bw *BlocksWatcher) Start(ctx context.Context, runner service.Runner) error newHeads := make(chan *types.Header) sub, err := bw.ethClient.SubscribeNewHead(ctx, newHeads) if err != nil { + log.Info().Err(err).Msg("error on subscribe, no new head") return err } runner.Defer(sub.Unsubscribe) runner.Go(func() error { for { + log.Info().Msg("looking for new head") select { case <-ctx.Done(): + log.Info().Msg("context cancelled, no new head") return ctx.Err() case head := <-newHeads: log.Info(). @@ -53,6 +56,7 @@ func (bw *BlocksWatcher) Start(ctx context.Context, runner service.Runner) error } bw.blocksChannel <- ev case err := <-sub.Err(): + log.Info().Err(err).Msg("got an err, no new head") return err } } diff --git a/internal/watcher/decryption_keys.go b/internal/watcher/decryption_keys.go index e348281..d9018a2 100644 --- a/internal/watcher/decryption_keys.go +++ b/internal/watcher/decryption_keys.go @@ -54,24 +54,41 @@ func (pmw *P2PMsgsWatcher) handleDecryptionKeyMsg(msg *p2pmsg.DecryptionKeys) ([ func (pmw *P2PMsgsWatcher) insertBlocks(ctx context.Context) error { for { + log.Info().Msg("polling insertBlocks, for new head channel") select { case <-ctx.Done(): + log.Info().Msg("context cancelled for insertBlocks, no new head") return ctx.Err() case ev, ok := <-pmw.blocksChannel: if !ok { + log.Info().Msg("returning NIL from insertBlocks, no new head") return nil } + log.Info(). + Hex("block-hash", ev.Header.Hash().Bytes()). + Int64("block-number", ev.Header.Number.Int64()). + Msg("planning to insertBlock, with new head") err := pmw.insertBlock(ctx, ev) if err != nil { + log.Info().Err(err).Msg("error in insertBlock, no new head") return err } + log.Info().Msg("calling ClearOldBlocks, before new head") pmw.clearOldBlocks(ev) } } } func (pmw *P2PMsgsWatcher) insertBlock(ctx context.Context, ev *BlockReceivedEvent) error { + log.Info(). + Hex("block-hash", ev.Header.Hash().Bytes()). + Int64("block-number", ev.Header.Number.Int64()). + Msg("trying to obtain lock for insertBlock new head") pmw.recentBlocksMux.Lock() + log.Info(). + Hex("block-hash", ev.Header.Hash().Bytes()). + Int64("block-number", ev.Header.Number.Int64()). + Msg("obtained lock for insertBlock new head") defer pmw.recentBlocksMux.Unlock() pmw.recentBlocks[ev.Header.Number.Uint64()] = ev if ev.Header.Number.Uint64() > pmw.mostRecentBlock { @@ -91,7 +108,15 @@ func (pmw *P2PMsgsWatcher) insertBlock(ctx context.Context, ev *BlockReceivedEve } func (pmw *P2PMsgsWatcher) clearOldBlocks(latestEv *BlockReceivedEvent) { + log.Info(). + Hex("block-hash", latestEv.Header.Hash().Bytes()). + Int64("block-number", latestEv.Header.Number.Int64()). + Msg("trying to obtain lock for clearOldBlocks new head") pmw.recentBlocksMux.Lock() + log.Info(). + Hex("block-hash", latestEv.Header.Hash().Bytes()). + Int64("block-number", latestEv.Header.Number.Int64()). + Msg("obtained lock for clearOldBlocks new head") defer pmw.recentBlocksMux.Unlock() tooOld := []uint64{} diff --git a/internal/watcher/watcher.go b/internal/watcher/watcher.go index 284d3bc..612c7f1 100644 --- a/internal/watcher/watcher.go +++ b/internal/watcher/watcher.go @@ -140,7 +140,7 @@ func (w *Watcher) Start(ctx context.Context, runner service.Runner) error { return err } log.Info(). - Bytes("encrypted transaction", txEvent.EncryptedTransaction). + Hex("encrypted transaction (hex)", txEvent.EncryptedTransaction). Msg("new encrypted transaction") case dd := <-decryptionDataChannel: keys, identites := getDecryptionKeysAndIdentities(dd.Keys) @@ -178,7 +178,7 @@ func (w *Watcher) Start(ctx context.Context, runner service.Runner) error { return err } log.Info(). - Bytes("key shares", share.Share). + Hex("key shares (hex)", share.Share). Int64("slot", ks.Slot). Msg("new key shares") } diff --git a/main.go b/main.go index e5c5e85..ac01fc1 100644 --- a/main.go +++ b/main.go @@ -1,13 +1,19 @@ package main import ( + "net/http" "os" "github.com/rs/zerolog/log" "github.com/shutter-network/gnosh-metrics/cmd/cli" + + _ "net/http/pprof" ) func main() { + go func() { + log.Info().Any("pprof", http.ListenAndServe("0.0.0.0:6060", nil)).Msg("started pprof") + }() status := 0 if err := cli.Cmd().Execute(); err != nil { log.Info().Err(err).Msg("failed running server")