Skip to content

Commit 4d01b01

Browse files
committed
Add auto-expiration for locked account keys and metrics for available keys left
1 parent feb2103 commit 4d01b01

File tree

8 files changed

+169
-135
lines changed

8 files changed

+169
-135
lines changed

bootstrap/bootstrap.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ type Bootstrap struct {
6060
events *ingestion.Engine
6161
profiler *api.ProfileServer
6262
db *pebbleDB.DB
63-
keystore *requester.Keystore
63+
keystore *requester.KeyStore
6464
}
6565

6666
func New(config config.Config) (*Bootstrap, error) {
@@ -220,7 +220,7 @@ func (b *Bootstrap) StartAPIServer(ctx context.Context) error {
220220
})
221221
}
222222

223-
b.keystore = requester.NewKeystore(accountKeys)
223+
b.keystore = requester.NewKeyStore(accountKeys)
224224

225225
evm, err := requester.NewEVM(
226226
b.storages.Registers,

metrics/collector.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ type Collector interface {
1919
EVMAccountInteraction(address string)
2020
MeasureRequestDuration(start time.Time, method string)
2121
OperatorBalance(account *flow.Account)
22+
AvailableSigningKeys(count int)
2223
}
2324

2425
var _ Collector = &DefaultCollector{}
@@ -35,6 +36,7 @@ type DefaultCollector struct {
3536
operatorBalance prometheus.Gauge
3637
evmAccountCallCounters *prometheus.CounterVec
3738
requestDurations *prometheus.HistogramVec
39+
availableSigningkeys prometheus.Gauge
3840
}
3941

4042
func NewCollector(logger zerolog.Logger) Collector {
@@ -90,6 +92,11 @@ func NewCollector(logger zerolog.Logger) Collector {
9092
Buckets: prometheus.DefBuckets,
9193
}, []string{"method"})
9294

95+
availableSigningKeys := prometheus.NewGauge(prometheus.GaugeOpts{
96+
Name: prefixedName("available_signing_keys"),
97+
Help: "Number of keys available for transaction signing",
98+
})
99+
93100
metrics := []prometheus.Collector{
94101
apiErrors,
95102
traceDownloadErrorCounter,
@@ -101,6 +108,7 @@ func NewCollector(logger zerolog.Logger) Collector {
101108
operatorBalance,
102109
evmAccountCallCounters,
103110
requestDurations,
111+
availableSigningKeys,
104112
}
105113
if err := registerMetrics(logger, metrics...); err != nil {
106114
logger.Info().Msg("using noop collector as metric register failed")
@@ -118,6 +126,7 @@ func NewCollector(logger zerolog.Logger) Collector {
118126
evmAccountCallCounters: evmAccountCallCounters,
119127
requestDurations: requestDurations,
120128
operatorBalance: operatorBalance,
129+
availableSigningkeys: availableSigningKeys,
121130
}
122131
}
123132

@@ -172,6 +181,10 @@ func (c *DefaultCollector) MeasureRequestDuration(start time.Time, method string
172181
Observe(time.Since(start).Seconds())
173182
}
174183

184+
func (c *DefaultCollector) AvailableSigningKeys(count int) {
185+
c.availableSigningkeys.Set(float64(count))
186+
}
187+
175188
func prefixedName(name string) string {
176189
return fmt.Sprintf("evm_gateway_%s", name)
177190
}

metrics/nop.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,4 @@ func (c *nopCollector) EVMTransactionIndexed(int) {}
2121
func (c *nopCollector) EVMAccountInteraction(string) {}
2222
func (c *nopCollector) MeasureRequestDuration(time.Time, string) {}
2323
func (c *nopCollector) OperatorBalance(*flow.Account) {}
24+
func (c *nopCollector) AvailableSigningKeys(count int) {}

services/ingestion/event_subscriber.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,10 @@ var _ EventSubscriber = &RPCEventSubscriber{}
3333
type RPCEventSubscriber struct {
3434
logger zerolog.Logger
3535

36-
client *requester.CrossSporkClient
37-
chain flowGo.ChainID
38-
keystore *requester.Keystore
39-
height uint64
36+
client *requester.CrossSporkClient
37+
chain flowGo.ChainID
38+
keyLock requester.KeyLock
39+
height uint64
4040

4141
recovery bool
4242
recoveredEvents []flow.Event
@@ -46,17 +46,17 @@ func NewRPCEventSubscriber(
4646
logger zerolog.Logger,
4747
client *requester.CrossSporkClient,
4848
chainID flowGo.ChainID,
49-
keystore *requester.Keystore,
49+
keyLock requester.KeyLock,
5050
startHeight uint64,
5151
) *RPCEventSubscriber {
5252
logger = logger.With().Str("component", "subscriber").Logger()
5353
return &RPCEventSubscriber{
5454
logger: logger,
5555

56-
client: client,
57-
chain: chainID,
58-
keystore: keystore,
59-
height: startHeight,
56+
client: client,
57+
chain: chainID,
58+
keyLock: keyLock,
59+
height: startHeight,
6060
}
6161
}
6262

@@ -173,7 +173,7 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha
173173
}
174174
}
175175
for _, evt := range blockEvents.Events {
176-
r.keystore.UnlockKey(evt.TransactionID)
176+
r.keyLock.UnlockKey(evt.TransactionID)
177177
}
178178

179179
eventsChan <- evmEvents

services/ingestion/event_subscriber_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func Test_Subscribing(t *testing.T) {
4343
)
4444
require.NoError(t, err)
4545

46-
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, requester.NewKeystore(nil), 1)
46+
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, requester.NewKeyStore(nil), 1)
4747

4848
events := subscriber.Subscribe(context.Background())
4949

@@ -83,7 +83,7 @@ func Test_MissingBlockEvent(t *testing.T) {
8383
)
8484
require.NoError(t, err)
8585

86-
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, requester.NewKeystore(nil), 1)
86+
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, requester.NewKeyStore(nil), 1)
8787

8888
events := subscriber.Subscribe(context.Background())
8989

@@ -185,7 +185,7 @@ func Test_SubscribingWithRetryOnError(t *testing.T) {
185185
)
186186
require.NoError(t, err)
187187

188-
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, requester.NewKeystore(nil), 1)
188+
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, requester.NewKeyStore(nil), 1)
189189

190190
events := subscriber.Subscribe(context.Background())
191191

@@ -248,7 +248,7 @@ func Test_SubscribingWithRetryOnErrorMultipleBlocks(t *testing.T) {
248248
)
249249
require.NoError(t, err)
250250

251-
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, requester.NewKeystore(nil), 1)
251+
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, requester.NewKeyStore(nil), 1)
252252

253253
events := subscriber.Subscribe(context.Background())
254254

@@ -310,7 +310,7 @@ func Test_SubscribingWithRetryOnErrorEmptyBlocks(t *testing.T) {
310310
)
311311
require.NoError(t, err)
312312

313-
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, requester.NewKeystore(nil), 1)
313+
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, requester.NewKeyStore(nil), 1)
314314

315315
events := subscriber.Subscribe(context.Background())
316316

services/requester/key_store.go

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
package requester
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
"time"
7+
8+
flowsdk "github.com/onflow/flow-go-sdk"
9+
"github.com/onflow/flow-go-sdk/crypto"
10+
flowGo "github.com/onflow/flow-go/model/flow"
11+
)
12+
13+
var ErrNoKeysAvailable = fmt.Errorf("no keys available")
14+
15+
const accountKeyExpiry = 10 * time.Second
16+
17+
type AccountKey struct {
18+
flowsdk.AccountKey
19+
20+
mu sync.Mutex
21+
ks *KeyStore
22+
Address flowsdk.Address
23+
Signer crypto.Signer
24+
lastUsed time.Time
25+
}
26+
27+
// Done unlocks a key after use and puts it back into the pool.
28+
func (k *AccountKey) Done() {
29+
k.mu.Lock()
30+
defer k.mu.Unlock()
31+
32+
k.ks.availableKeys <- k
33+
}
34+
35+
func (k *AccountKey) SetProposerPayerAndSign(
36+
tx *flowsdk.Transaction,
37+
account *flowsdk.Account,
38+
) error {
39+
if k.Address != account.Address {
40+
return fmt.Errorf(
41+
"expected address: %v, got address: %v",
42+
k.Address,
43+
account.Address,
44+
)
45+
}
46+
if k.Index >= uint32(len(account.Keys)) {
47+
return fmt.Errorf(
48+
"key index: %d exceeds keys length: %d",
49+
k.Index,
50+
len(account.Keys),
51+
)
52+
}
53+
seqNumber := account.Keys[k.Index].SequenceNumber
54+
55+
return tx.
56+
SetProposalKey(k.Address, k.Index, seqNumber).
57+
SetPayer(k.Address).
58+
SignEnvelope(k.Address, k.Index, k.Signer)
59+
}
60+
61+
func (k *AccountKey) expired() bool {
62+
return time.Since(k.lastUsed) > flowGo.DefaultTransactionExpiry
63+
}
64+
65+
type KeyLock interface {
66+
LockKey(txID flowsdk.Identifier, key *AccountKey)
67+
UnlockKey(txID flowsdk.Identifier)
68+
}
69+
70+
type KeyStore struct {
71+
availableKeys chan *AccountKey
72+
usedKeys map[flowsdk.Identifier]*AccountKey
73+
size int
74+
}
75+
76+
var _ KeyLock = (*KeyStore)(nil)
77+
78+
func NewKeyStore(keys []*AccountKey) *KeyStore {
79+
ks := &KeyStore{
80+
usedKeys: map[flowsdk.Identifier]*AccountKey{},
81+
}
82+
83+
availableKeys := make(chan *AccountKey, len(keys))
84+
for _, key := range keys {
85+
key.ks = ks
86+
availableKeys <- key
87+
}
88+
ks.size = len(keys)
89+
ks.availableKeys = availableKeys
90+
91+
go ks.keyExpiryChecker()
92+
93+
return ks
94+
}
95+
96+
func (k *KeyStore) AvailableKeys() int {
97+
return k.size - len(k.usedKeys)
98+
}
99+
100+
func (k *KeyStore) Take() (*AccountKey, error) {
101+
select {
102+
case key := <-k.availableKeys:
103+
return key, nil
104+
default:
105+
return nil, ErrNoKeysAvailable
106+
}
107+
}
108+
109+
func (k *KeyStore) LockKey(txID flowsdk.Identifier, key *AccountKey) {
110+
key.mu.Lock()
111+
defer key.mu.Unlock()
112+
113+
key.lastUsed = time.Now()
114+
k.usedKeys[txID] = key
115+
}
116+
117+
func (k *KeyStore) UnlockKey(txID flowsdk.Identifier) {
118+
key, ok := k.usedKeys[txID]
119+
if ok && key != nil {
120+
key.Done()
121+
delete(k.usedKeys, txID)
122+
}
123+
}
124+
125+
func (k *KeyStore) keyExpiryChecker() {
126+
for range time.Tick(accountKeyExpiry) {
127+
for txID, key := range k.usedKeys {
128+
if key.expired() {
129+
k.UnlockKey(txID)
130+
}
131+
}
132+
}
133+
}

0 commit comments

Comments
 (0)