@@ -2,6 +2,9 @@ package gnosiskeyperwatcher
2
2
3
3
import (
4
4
"context"
5
+ "fmt"
6
+ "sync"
7
+ "time"
5
8
6
9
pubsub "github.com/libp2p/go-libp2p-pubsub"
7
10
"github.com/rs/zerolog/log"
@@ -13,22 +16,33 @@ import (
13
16
)
14
17
15
18
type KeysWatcher struct {
16
- config * keyper.Config
19
+ config * keyper.Config
20
+ blocksChannel chan * BlockReceivedEvent
21
+
22
+ recentBlocksMux sync.Mutex
23
+ recentBlocks map [uint64 ]* BlockReceivedEvent
24
+ mostRecentBlock uint64
17
25
}
18
26
19
- func NewKeysWatcher (config * keyper.Config ) * KeysWatcher {
27
+ func NewKeysWatcher (config * keyper.Config , blocksChannel chan * BlockReceivedEvent ) * KeysWatcher {
20
28
return & KeysWatcher {
21
- config : config ,
29
+ config : config ,
30
+ blocksChannel : blocksChannel ,
31
+ recentBlocksMux : sync.Mutex {},
32
+ recentBlocks : make (map [uint64 ]* BlockReceivedEvent ),
33
+ mostRecentBlock : 0 ,
22
34
}
23
35
}
24
36
25
- func (w * KeysWatcher ) Start (_ context.Context , runner service.Runner ) error {
37
+ func (w * KeysWatcher ) Start (ctx context.Context , runner service.Runner ) error {
26
38
p2pService , err := p2p .New (w .config .P2P )
27
39
if err != nil {
28
40
return err
29
41
}
30
42
p2pService .AddMessageHandler (w )
31
43
44
+ runner .Go (func () error { return w .insertBlocks (ctx ) })
45
+
32
46
return runner .StartService (p2pService )
33
47
}
34
48
@@ -43,11 +57,70 @@ func (w *KeysWatcher) ValidateMessage(_ context.Context, _ p2pmsg.Message) (pubs
43
57
}
44
58
45
59
func (w * KeysWatcher ) HandleMessage (_ context.Context , msgUntyped p2pmsg.Message ) ([]p2pmsg.Message , error ) {
60
+ t := time .Now ()
46
61
msg := msgUntyped .(* p2pmsg.DecryptionKeys )
47
62
extra := msg .Extra .(* p2pmsg.DecryptionKeys_Gnosis ).Gnosis
63
+
64
+ ev , ok := w .getRecentBlock (extra .Slot )
65
+ if ! ok {
66
+ log .Warn ().
67
+ Uint64 ("keys-block" , extra .Slot ).
68
+ Uint64 ("most-recent-block" , w .mostRecentBlock ).
69
+ Msg ("received keys for unknown block" )
70
+ return []p2pmsg.Message {}, nil
71
+ }
72
+
73
+ dt := t .Sub (ev .Time )
48
74
log .Info ().
49
75
Uint64 ("block" , extra .Slot ).
50
76
Int ("num-keys" , len (msg .Keys )).
77
+ Str ("latency" , fmt .Sprintf ("%.2fs" , dt .Seconds ())).
51
78
Msg ("new keys" )
52
79
return []p2pmsg.Message {}, nil
53
80
}
81
+
82
+ func (w * KeysWatcher ) insertBlocks (ctx context.Context ) error {
83
+ for {
84
+ select {
85
+ case <- ctx .Done ():
86
+ return ctx .Err ()
87
+ case ev , ok := <- w .blocksChannel :
88
+ if ! ok {
89
+ return nil
90
+ }
91
+ w .insertBlock (ev )
92
+ w .clearOldBlocks (ev )
93
+ }
94
+ }
95
+ }
96
+
97
+ func (w * KeysWatcher ) insertBlock (ev * BlockReceivedEvent ) {
98
+ w .recentBlocksMux .Lock ()
99
+ defer w .recentBlocksMux .Unlock ()
100
+ w .recentBlocks [ev .Header .Number .Uint64 ()] = ev
101
+ if ev .Header .Number .Uint64 () > w .mostRecentBlock {
102
+ w .mostRecentBlock = ev .Header .Number .Uint64 ()
103
+ }
104
+ }
105
+
106
+ func (w * KeysWatcher ) clearOldBlocks (latestEv * BlockReceivedEvent ) {
107
+ w .recentBlocksMux .Lock ()
108
+ defer w .recentBlocksMux .Unlock ()
109
+
110
+ tooOld := []uint64 {}
111
+ for block := range w .recentBlocks {
112
+ if block < latestEv .Header .Number .Uint64 ()- 100 {
113
+ tooOld = append (tooOld , block )
114
+ }
115
+ }
116
+ for _ , block := range tooOld {
117
+ delete (w .recentBlocks , block )
118
+ }
119
+ }
120
+
121
+ func (w * KeysWatcher ) getRecentBlock (blockNumber uint64 ) (* BlockReceivedEvent , bool ) {
122
+ w .recentBlocksMux .Lock ()
123
+ defer w .recentBlocksMux .Unlock ()
124
+ ev , ok := w .recentBlocks [blockNumber ]
125
+ return ev , ok
126
+ }
0 commit comments