Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cmd/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@ package cli
import (
"context"
"fmt"
"net/http"
"os"
"runtime"
"strconv"
"strings"

_ "net/http/pprof"

"github.com/rs/zerolog/log"

"github.com/mitchellh/mapstructure"
Expand Down Expand Up @@ -112,6 +116,11 @@ func Cmd() *cobra.Command {
}

func Start() error {
log.Info().Msg("go run pprof")
go func() {
log.Info().Any("pprof", http.ListenAndServe("0.0.0.0:7070", nil)).Msg("started pprof")
}()
runtime.SetBlockProfileRate(1)
log.Info().Msg("Starting Observer")
// start services here
ctx := context.Background()
Expand Down
3 changes: 3 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ services:
- P2P_DISCOVERY_NAMESPACE=${P2P_DISCOVERY_NAMESPACE}
ports:
- "23003:23003"
- "7070:6060"
command: ["start", "--rpc-url", "${RPC_URL}", "--beacon-api-url", "${BEACON_API_URL}", "--sequencer-contract-address", "${SEQUENCER_CONTRACT_ADDRESS}", "--validator-registry-contract-address", "${VALIDATOR_REGISTRY_CONTRACT_ADDRESS}", "--p2pkey", "${P2P_KEY}"]

db:
image: postgres:14.12
restart: unless-stopped
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=${DB_PASSWORD}
Expand Down
4 changes: 4 additions & 0 deletions internal/watcher/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,16 @@ func (bw *BlocksWatcher) Start(ctx context.Context, runner service.Runner) error
newHeads := make(chan *types.Header)
sub, err := bw.ethClient.SubscribeNewHead(ctx, newHeads)
if err != nil {
log.Info().Err(err).Msg("error on subscribe, no new head")
return err
}
runner.Defer(sub.Unsubscribe)
runner.Go(func() error {
for {
log.Info().Msg("looking for new head")
select {
case <-ctx.Done():
log.Info().Msg("context cancelled, no new head")
return ctx.Err()
case head := <-newHeads:
log.Info().
Expand All @@ -53,6 +56,7 @@ func (bw *BlocksWatcher) Start(ctx context.Context, runner service.Runner) error
}
bw.blocksChannel <- ev
case err := <-sub.Err():
log.Info().Err(err).Msg("got an err, no new head")
return err
}
}
Expand Down
25 changes: 25 additions & 0 deletions internal/watcher/decryption_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,24 +54,41 @@ func (pmw *P2PMsgsWatcher) handleDecryptionKeyMsg(msg *p2pmsg.DecryptionKeys) ([

func (pmw *P2PMsgsWatcher) insertBlocks(ctx context.Context) error {
for {
log.Info().Msg("polling insertBlocks, for new head channel")
select {
case <-ctx.Done():
log.Info().Msg("context cancelled for insertBlocks, no new head")
return ctx.Err()
case ev, ok := <-pmw.blocksChannel:
if !ok {
log.Info().Msg("returning NIL from insertBlocks, no new head")
return nil
}
log.Info().
Hex("block-hash", ev.Header.Hash().Bytes()).
Int64("block-number", ev.Header.Number.Int64()).
Msg("planning to insertBlock, with new head")
err := pmw.insertBlock(ctx, ev)
if err != nil {
log.Info().Err(err).Msg("error in insertBlock, no new head")
return err
}
log.Info().Msg("calling ClearOldBlocks, before new head")
pmw.clearOldBlocks(ev)
}
}
}

func (pmw *P2PMsgsWatcher) insertBlock(ctx context.Context, ev *BlockReceivedEvent) error {
log.Info().
Hex("block-hash", ev.Header.Hash().Bytes()).
Int64("block-number", ev.Header.Number.Int64()).
Msg("trying to obtain lock for insertBlock new head")
pmw.recentBlocksMux.Lock()
log.Info().
Hex("block-hash", ev.Header.Hash().Bytes()).
Int64("block-number", ev.Header.Number.Int64()).
Msg("obtained lock for insertBlock new head")
defer pmw.recentBlocksMux.Unlock()
pmw.recentBlocks[ev.Header.Number.Uint64()] = ev
if ev.Header.Number.Uint64() > pmw.mostRecentBlock {
Expand All @@ -91,7 +108,15 @@ func (pmw *P2PMsgsWatcher) insertBlock(ctx context.Context, ev *BlockReceivedEve
}

func (pmw *P2PMsgsWatcher) clearOldBlocks(latestEv *BlockReceivedEvent) {
log.Info().
Hex("block-hash", latestEv.Header.Hash().Bytes()).
Int64("block-number", latestEv.Header.Number.Int64()).
Msg("trying to obtain lock for clearOldBlocks new head")
pmw.recentBlocksMux.Lock()
log.Info().
Hex("block-hash", latestEv.Header.Hash().Bytes()).
Int64("block-number", latestEv.Header.Number.Int64()).
Msg("obtained lock for clearOldBlocks new head")
defer pmw.recentBlocksMux.Unlock()

tooOld := []uint64{}
Expand Down
4 changes: 2 additions & 2 deletions internal/watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (w *Watcher) Start(ctx context.Context, runner service.Runner) error {
return err
}
log.Info().
Bytes("encrypted transaction", txEvent.EncryptedTransaction).
Hex("encrypted transaction (hex)", txEvent.EncryptedTransaction).
Msg("new encrypted transaction")
case dd := <-decryptionDataChannel:
keys, identites := getDecryptionKeysAndIdentities(dd.Keys)
Expand Down Expand Up @@ -178,7 +178,7 @@ func (w *Watcher) Start(ctx context.Context, runner service.Runner) error {
return err
}
log.Info().
Bytes("key shares", share.Share).
Hex("key shares (hex)", share.Share).
Int64("slot", ks.Slot).
Msg("new key shares")
}
Expand Down
6 changes: 6 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package main

import (
"net/http"
"os"

"github.com/rs/zerolog/log"
"github.com/shutter-network/gnosh-metrics/cmd/cli"

_ "net/http/pprof"
)

func main() {
go func() {
log.Info().Any("pprof", http.ListenAndServe("0.0.0.0:6060", nil)).Msg("started pprof")
}()
status := 0
if err := cli.Cmd().Execute(); err != nil {
log.Info().Err(err).Msg("failed running server")
Expand Down