Skip to content

Commit 822d55a

Browse files
author
Jeff Yanta
committed
DB-level changes to support distributed nonce pools
1 parent 021ecc0 commit 822d55a

File tree

12 files changed

+894
-285
lines changed

12 files changed

+894
-285
lines changed

pkg/code/async/nonce/metrics.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ func (p *service) metricsGaugeWorker(ctx context.Context) error {
3131
nonce.StateAvailable,
3232
nonce.StateReserved,
3333
nonce.StateInvalid,
34+
nonce.StateClaimed,
3435
} {
3536
count, err := p.data.GetNonceCountByStateAndPurpose(ctx, nonce.EnvironmentCvm, common.CodeVmAccount.PublicKey().ToBase58(), state, nonce.PurposeClientTransaction)
3637
if err != nil {

pkg/code/async/nonce/pool.go

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -95,31 +95,36 @@ func (p *service) handle(ctx context.Context, record *nonce.Record) error {
9595
blockhash yet.
9696
9797
StateAvailable:
98-
Available to be used by a payment intent, subscription, or
99-
other nonce-related transaction/instruction.
98+
Available to be by a fulfillment for a virtual instruction
99+
or transaction.
100100
101101
StateReserved:
102-
Reserved by a payment intent, subscription, or other
103-
nonce-related transaction/instruction.
102+
Reserved by a by a fulfillment for a virtual instruction
103+
or transaction.
104104
105105
StateInvalid:
106106
The nonce account is invalid (e.g. insufficient funds, etc).
107107
108+
StateClaimed:
109+
The nonce is claimed by a process for future use (identified
110+
by a node ID)
111+
108112
Transitions:
109113
StateUnknown
110114
-> StateInvalid
111115
-> StateReleased
112116
StateReleased
113117
-> StateAvailable
114118
StateAvailable
115-
-> [externally] StateReserved (nonce used in a new transaction)
119+
-> [externally] StateClaimed (nonce claimed by a process for future use)
120+
StateClaimed
121+
-> [externally] StateAvailable (nonce released by the process that claimed it)
122+
-> [externally] StateReserved (nonce reserved for future use in a virtual instruction or transaction)
116123
StateReserved
117-
-> [externally] StateReleased (nonce used in a submitted transaction)
118-
-> [externally] StateAvailable (nonce will never be submitted in the transaction - eg. it became revoked)
124+
-> [externally] StateReleased (nonce used in a submitted virtual instruction or transaction)
125+
-> [externally] StateAvailable (nonce will never be submitted in the virtual instruction or transaction - eg. it became revoked)
119126
*/
120127

121-
// todo: distributed lock on the nonce
122-
123128
switch record.State {
124129
case nonce.StateUnknown:
125130
return p.handleUnknown(ctx, record)
@@ -131,6 +136,8 @@ func (p *service) handle(ctx context.Context, record *nonce.Record) error {
131136
return p.handleReserved(ctx, record)
132137
case nonce.StateInvalid:
133138
return p.handleInvalid(ctx, record)
139+
case nonce.StateClaimed:
140+
return p.handleClaimed(ctx, record)
134141
default:
135142
return nil
136143
}
@@ -259,3 +266,8 @@ func (p *service) handleInvalid(ctx context.Context, record *nonce.Record) error
259266
// as is for further investigation.
260267
return nil
261268
}
269+
270+
func (p *service) handleClaimed(ctx context.Context, record *nonce.Record) error {
271+
// Nothing to do here
272+
return nil
273+
}

pkg/code/data/internal.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ type DatabaseData interface {
136136
GetNonceCountByStateAndPurpose(ctx context.Context, env nonce.Environment, instance string, state nonce.State, purpose nonce.Purpose) (uint64, error)
137137
GetAllNonceByState(ctx context.Context, env nonce.Environment, instance string, state nonce.State, opts ...query.Option) ([]*nonce.Record, error)
138138
GetRandomAvailableNonceByPurpose(ctx context.Context, env nonce.Environment, instance string, purpose nonce.Purpose) (*nonce.Record, error)
139+
BatchClaimAvailableNoncesByPurpose(ctx context.Context, env nonce.Environment, instance string, purpose nonce.Purpose, limit int, nodeID string, minExpireAt, maxExpireAt time.Time) ([]*nonce.Record, error)
139140
SaveNonce(ctx context.Context, record *nonce.Record) error
140141

141142
// Fulfillment
@@ -532,6 +533,9 @@ func (dp *DatabaseProvider) GetAllNonceByState(ctx context.Context, env nonce.En
532533
func (dp *DatabaseProvider) GetRandomAvailableNonceByPurpose(ctx context.Context, env nonce.Environment, instance string, purpose nonce.Purpose) (*nonce.Record, error) {
533534
return dp.nonces.GetRandomAvailableByPurpose(ctx, env, instance, purpose)
534535
}
536+
func (dp *DatabaseProvider) BatchClaimAvailableNoncesByPurpose(ctx context.Context, env nonce.Environment, instance string, purpose nonce.Purpose, limit int, nodeID string, minExpireAt, maxExpireAt time.Time) ([]*nonce.Record, error) {
537+
return dp.nonces.BatchClaimAvailableByPurpose(ctx, env, instance, purpose, limit, nodeID, minExpireAt, maxExpireAt)
538+
}
535539
func (dp *DatabaseProvider) SaveNonce(ctx context.Context, record *nonce.Record) error {
536540
return dp.nonces.Save(ctx, record)
537541
}

pkg/code/data/nonce/memory/store.go

Lines changed: 72 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@ import (
55
"math/rand"
66
"sort"
77
"sync"
8+
"time"
89

910
"github.com/code-payments/code-server/pkg/code/data/nonce"
1011
"github.com/code-payments/code-server/pkg/database/query"
12+
"github.com/code-payments/code-server/pkg/pointer"
1113
)
1214

1315
type store struct {
@@ -131,6 +133,16 @@ func (s *store) filter(items []*nonce.Record, cursor query.Cursor, limit uint64,
131133
return res
132134
}
133135

136+
func (s *store) filterAvailable(items []*nonce.Record) []*nonce.Record {
137+
var res []*nonce.Record
138+
for _, item := range items {
139+
if item.IsAvailable() {
140+
res = append(res, item)
141+
}
142+
}
143+
return res
144+
}
145+
134146
func (s *store) Count(ctx context.Context, env nonce.Environment, instance string) (uint64, error) {
135147
s.mu.Lock()
136148
defer s.mu.Unlock()
@@ -165,13 +177,25 @@ func (s *store) Save(ctx context.Context, data *nonce.Record) error {
165177

166178
s.last++
167179
if item := s.find(data); item != nil {
180+
if item.Version != data.Version {
181+
return nonce.ErrStaleVersion
182+
}
183+
184+
data.Version++
185+
168186
item.Blockhash = data.Blockhash
169-
item.State = data.State
170187
item.Signature = data.Signature
188+
item.State = data.State
189+
item.ClaimNodeID = pointer.StringCopy(data.ClaimNodeID)
190+
item.ClaimExpiresAt = pointer.TimeCopy(data.ClaimExpiresAt)
191+
item.Version = data.Version
171192
} else {
172193
if data.Id == 0 {
173194
data.Id = s.last
174195
}
196+
197+
data.Version++
198+
175199
c := data.Clone()
176200
s.records = append(s.records, &c)
177201
}
@@ -184,7 +208,8 @@ func (s *store) Get(ctx context.Context, address string) (*nonce.Record, error)
184208
defer s.mu.Unlock()
185209

186210
if item := s.findAddress(address); item != nil {
187-
return item, nil
211+
cloned := item.Clone()
212+
return &cloned, nil
188213
}
189214

190215
return nil, nonce.ErrNonceNotFound
@@ -201,7 +226,7 @@ func (s *store) GetAllByState(ctx context.Context, env nonce.Environment, instan
201226
return nil, nonce.ErrNonceNotFound
202227
}
203228

204-
return res, nil
229+
return clonedRecords(res), nil
205230
}
206231

207232
return nil, nonce.ErrNonceNotFound
@@ -212,10 +237,53 @@ func (s *store) GetRandomAvailableByPurpose(ctx context.Context, env nonce.Envir
212237
defer s.mu.Unlock()
213238

214239
items := s.findByStateAndPurpose(env, instance, nonce.StateAvailable, purpose)
240+
items = append(items, s.findByStateAndPurpose(env, instance, nonce.StateClaimed, purpose)...)
241+
items = s.filterAvailable(items)
215242
if len(items) == 0 {
216243
return nil, nonce.ErrNonceNotFound
217244
}
218245

219246
index := rand.Intn(len(items))
220-
return items[index], nil
247+
cloned := items[index].Clone()
248+
return &cloned, nil
249+
}
250+
251+
func (s *store) BatchClaimAvailableByPurpose(ctx context.Context, env nonce.Environment, instance string, purpose nonce.Purpose, limit int, nodeID string, minExpireAt, maxExpireAt time.Time) ([]*nonce.Record, error) {
252+
s.mu.Lock()
253+
defer s.mu.Unlock()
254+
255+
items := s.findByStateAndPurpose(env, instance, nonce.StateAvailable, purpose)
256+
items = append(items, s.findByStateAndPurpose(env, instance, nonce.StateClaimed, purpose)...)
257+
items = s.filterAvailable(items)
258+
if len(items) == 0 {
259+
return nil, nonce.ErrNonceNotFound
260+
}
261+
if len(items) > limit {
262+
items = items[:limit]
263+
}
264+
265+
for i, l := 0, len(items); i < l; i++ {
266+
j := rand.Intn(l)
267+
items[i], items[j] = items[j], items[i]
268+
}
269+
for i := 0; i < len(items); i++ {
270+
window := maxExpireAt.Sub(minExpireAt)
271+
expiry := minExpireAt.Add(time.Duration(rand.Intn(int(window))))
272+
273+
items[i].State = nonce.StateClaimed
274+
items[i].ClaimNodeID = pointer.String(nodeID)
275+
items[i].ClaimExpiresAt = pointer.Time(expiry)
276+
items[i].Version++
277+
}
278+
279+
return clonedRecords(items), nil
280+
}
281+
282+
func clonedRecords(items []*nonce.Record) []*nonce.Record {
283+
res := make([]*nonce.Record, len(items))
284+
for i, item := range items {
285+
cloned := item.Clone()
286+
res[i] = &cloned
287+
}
288+
return res
221289
}

pkg/code/data/nonce/nonce.go

Lines changed: 62 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ package nonce
33
import (
44
"crypto/ed25519"
55
"errors"
6+
"time"
67

8+
"github.com/code-payments/code-server/pkg/pointer"
79
"github.com/mr-tron/base58"
810
)
911

@@ -22,22 +24,22 @@ const (
2224
)
2325

2426
var (
27+
ErrStaleVersion = errors.New("nonce version is stale")
2528
ErrNonceNotFound = errors.New("no records could be found")
26-
ErrInvalidNonce = errors.New("invalid nonce")
2729
)
2830

2931
type State uint8
3032

3133
const (
3234
StateUnknown State = iota
3335
StateReleased // The nonce is almost ready but we don't know its blockhash yet.
34-
StateAvailable // The nonce is available to be used by a payment intent, subscription, or other nonce-related transaction/instruction.
35-
StateReserved // The nonce is reserved by a payment intent, subscription, or other nonce-related transaction/instruction.
36+
StateAvailable // The nonce is available to be used by a fulfillment for a virtual instruction or transaction.
37+
StateReserved // The nonce is reserved by a fulfillment for a virtual instruction or transaction.
3638
StateInvalid // The nonce account is invalid (e.g. insufficient funds, etc).
39+
StateClaimed // The nonce is claimed by a process for future use (identified by a node ID)
3740
)
3841

39-
// Split nonce pool across different use cases. This has an added benefit of:
40-
// - Solving for race conditions without distributed locks.
42+
// Split nonce pool across different use cases. This has an added benefit of:.
4143
// - Avoiding different use cases from starving each other and ending up in a
4244
// deadlocked state. Concretely, it would be really bad if clients could starve
4345
// internal processes from creating transactions that would allow us to progress
@@ -65,10 +67,11 @@ type Record struct {
6567
State State
6668

6769
Signature string
68-
}
6970

70-
func (r *Record) GetPublicKey() (ed25519.PublicKey, error) {
71-
return base58.Decode(r.Address)
71+
ClaimNodeID *string
72+
ClaimExpiresAt *time.Time
73+
74+
Version uint64
7275
}
7376

7477
func (r *Record) Clone() Record {
@@ -82,6 +85,9 @@ func (r *Record) Clone() Record {
8285
Purpose: r.Purpose,
8386
State: r.State,
8487
Signature: r.Signature,
88+
ClaimNodeID: pointer.StringCopy(r.ClaimNodeID),
89+
ClaimExpiresAt: pointer.TimeCopy(r.ClaimExpiresAt),
90+
Version: r.Version,
8591
}
8692
}
8793

@@ -95,32 +101,72 @@ func (r *Record) CopyTo(dst *Record) {
95101
dst.Purpose = r.Purpose
96102
dst.State = r.State
97103
dst.Signature = r.Signature
104+
dst.ClaimNodeID = pointer.StringCopy(r.ClaimNodeID)
105+
dst.ClaimExpiresAt = pointer.TimeCopy(r.ClaimExpiresAt)
106+
dst.Version = r.Version
98107
}
99108

100-
func (v *Record) Validate() error {
101-
if len(v.Address) == 0 {
109+
func (r *Record) Validate() error {
110+
if len(r.Address) == 0 {
102111
return errors.New("nonce account address is required")
103112
}
104113

105-
if len(v.Authority) == 0 {
114+
if len(r.Authority) == 0 {
106115
return errors.New("authority address is required")
107116
}
108117

109-
if v.Environment == EnvironmentUnknown {
118+
if r.Environment == EnvironmentUnknown {
110119
return errors.New("nonce environment must be set")
111120
}
112121

113-
if len(v.EnvironmentInstance) == 0 {
122+
if len(r.EnvironmentInstance) == 0 {
114123
return errors.New("nonce environment instance must be set")
115124
}
116125

117-
if v.Purpose == PurposeUnknown {
126+
if r.Purpose == PurposeUnknown {
118127
return errors.New("nonce purpose must be set")
119128
}
120129

130+
switch r.State {
131+
case StateClaimed:
132+
if r.ClaimNodeID == nil {
133+
return errors.New("claim node id is required")
134+
}
135+
136+
if r.ClaimExpiresAt == nil {
137+
return errors.New("claim expiration timestamp is required")
138+
}
139+
default:
140+
if r.ClaimNodeID != nil {
141+
return errors.New("claim node id cannot be set")
142+
}
143+
144+
if r.ClaimExpiresAt != nil {
145+
return errors.New("claim expiration timestamp cannot be set")
146+
}
147+
}
148+
121149
return nil
122150
}
123151

152+
func (r *Record) IsAvailable() bool {
153+
if r.State == StateAvailable {
154+
return true
155+
}
156+
if r.State != StateClaimed {
157+
return false
158+
}
159+
if r.ClaimExpiresAt == nil {
160+
return false
161+
}
162+
163+
return time.Now().After(*r.ClaimExpiresAt)
164+
}
165+
166+
func (r *Record) GetPublicKey() (ed25519.PublicKey, error) {
167+
return base58.Decode(r.Address)
168+
}
169+
124170
func (e Environment) String() string {
125171
switch e {
126172
case EnvironmentUnknown:
@@ -145,6 +191,8 @@ func (s State) String() string {
145191
return "reserved"
146192
case StateInvalid:
147193
return "invalid"
194+
case StateClaimed:
195+
return "claimed"
148196
}
149197

150198
return "unknown"

0 commit comments

Comments
 (0)