Skip to content

Commit 2334ffa

Browse files
author
jeffyanta
authored
Make intent, action and fulfillment models resistent to DB races (#198)
1 parent 69c9a0c commit 2334ffa

File tree

33 files changed

+458
-262
lines changed

33 files changed

+458
-262
lines changed

pkg/code/async/account/gift_card.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -351,8 +351,8 @@ func markFulfillmentAsActivelyScheduled(ctx context.Context, data code_data.Prov
351351
return errors.New("expected fulfillment in unknown state")
352352
}
353353

354-
// Note: different than Save, since we don't have distributed locks
355-
return data.MarkFulfillmentAsActivelyScheduled(ctx, fulfillmentRecord.Id)
354+
fulfillmentRecord.DisableActiveScheduling = false
355+
return data.UpdateFulfillment(ctx, fulfillmentRecord)
356356
}
357357

358358
// Must be unique, but consistent for idempotency, and ideally fit in a 32

pkg/code/async/geyser/external_deposit.go

Lines changed: 39 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package async_geyser
33
import (
44
"context"
55
"crypto/sha256"
6+
"database/sql"
67
"fmt"
78
"strconv"
89
"strings"
@@ -274,6 +275,8 @@ func processPotentialExternalDepositIntoVm(ctx context.Context, data code_data.P
274275
if err == nil {
275276
syncedDepositCache.Insert(cacheKey, true, 1)
276277
return nil
278+
} else if err != deposit.ErrDepositNotFound {
279+
return errors.Wrap(err, "error checking for existing external deposit record")
277280
}
278281

279282
ownerAccount, err := common.NewAccountFromPublicKeyString(accountInfoRecord.OwnerAccount)
@@ -287,42 +290,49 @@ func processPotentialExternalDepositIntoVm(ctx context.Context, data code_data.P
287290
}
288291
usdMarketValue := usdExchangeRecord.Rate * float64(deltaQuarksIntoOmnibus) / float64(common.CoreMintQuarksPerUnit)
289292

290-
// For transaction history
291-
intentRecord := &intent.Record{
292-
IntentId: getExternalDepositIntentID(signature, userVirtualTimelockVaultAccount),
293-
IntentType: intent.ExternalDeposit,
293+
err = data.ExecuteInTx(ctx, sql.LevelDefault, func(ctx context.Context) error {
294+
// For transaction history
295+
intentRecord := &intent.Record{
296+
IntentId: getExternalDepositIntentID(signature, userVirtualTimelockVaultAccount),
297+
IntentType: intent.ExternalDeposit,
294298

295-
InitiatorOwnerAccount: ownerAccount.PublicKey().ToBase58(),
299+
InitiatorOwnerAccount: ownerAccount.PublicKey().ToBase58(),
296300

297-
ExternalDepositMetadata: &intent.ExternalDepositMetadata{
298-
DestinationTokenAccount: userVirtualTimelockVaultAccount.PublicKey().ToBase58(),
299-
Quantity: uint64(deltaQuarksIntoOmnibus),
300-
UsdMarketValue: usdMarketValue,
301-
},
301+
ExternalDepositMetadata: &intent.ExternalDepositMetadata{
302+
DestinationTokenAccount: userVirtualTimelockVaultAccount.PublicKey().ToBase58(),
303+
Quantity: uint64(deltaQuarksIntoOmnibus),
304+
UsdMarketValue: usdMarketValue,
305+
},
302306

303-
State: intent.StateConfirmed,
304-
CreatedAt: time.Now(),
305-
}
306-
err = data.SaveIntent(ctx, intentRecord)
307-
if err != nil {
308-
return errors.Wrap(err, "error saving intent record")
309-
}
307+
State: intent.StateConfirmed,
308+
CreatedAt: time.Now(),
309+
}
310+
err = data.SaveIntent(ctx, intentRecord)
311+
if err != nil {
312+
return errors.Wrap(err, "error saving intent record")
313+
}
310314

311-
// For tracking in cached balances
312-
externalDepositRecord := &deposit.Record{
313-
Signature: signature,
314-
Destination: userVirtualTimelockVaultAccount.PublicKey().ToBase58(),
315-
Amount: uint64(deltaQuarksIntoOmnibus),
316-
UsdMarketValue: usdMarketValue,
315+
// For tracking in cached balances
316+
externalDepositRecord := &deposit.Record{
317+
Signature: signature,
318+
Destination: userVirtualTimelockVaultAccount.PublicKey().ToBase58(),
319+
Amount: uint64(deltaQuarksIntoOmnibus),
320+
UsdMarketValue: usdMarketValue,
317321

318-
Slot: tokenBalances.Slot,
319-
ConfirmationState: transaction.ConfirmationFinalized,
322+
Slot: tokenBalances.Slot,
323+
ConfirmationState: transaction.ConfirmationFinalized,
320324

321-
CreatedAt: time.Now(),
322-
}
323-
err = data.SaveExternalDeposit(ctx, externalDepositRecord)
325+
CreatedAt: time.Now(),
326+
}
327+
err = data.SaveExternalDeposit(ctx, externalDepositRecord)
328+
if err != nil {
329+
return errors.Wrap(err, "error saving external deposit record")
330+
}
331+
332+
return nil
333+
})
324334
if err != nil {
325-
return errors.Wrap(err, "error saving external deposit record")
335+
return err
326336
}
327337

328338
syncedDepositCache.Insert(cacheKey, true, 1)

pkg/code/async/sequencer/action_handler.go

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -170,25 +170,6 @@ func markActionFailed(ctx context.Context, data code_data.Provider, intentId str
170170
return data.UpdateAction(ctx, record)
171171
}
172172

173-
func markActionRevoked(ctx context.Context, data code_data.Provider, intentId string, actionId uint32) error {
174-
record, err := data.GetActionById(ctx, intentId, actionId)
175-
if err != nil {
176-
return err
177-
}
178-
179-
if record.State == action.StateRevoked {
180-
return nil
181-
}
182-
183-
err = validateActionState(record, action.StateUnknown)
184-
if err != nil {
185-
return err
186-
}
187-
188-
record.State = action.StateRevoked
189-
return data.UpdateAction(ctx, record)
190-
}
191-
192173
func getActionHandlers(data code_data.Provider) map[action.Type]ActionHandler {
193174
handlersByType := make(map[action.Type]ActionHandler)
194175
handlersByType[action.OpenAccount] = NewOpenAccountActionHandler(data)

pkg/code/async/sequencer/utils.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,8 @@ func markFulfillmentAsActivelyScheduled(ctx context.Context, data code_data.Prov
116116
return nil
117117
}
118118

119-
// Note: different than Save, since we don't have distributed locks
120-
return data.MarkFulfillmentAsActivelyScheduled(ctx, fulfillmentRecord.Id)
119+
fulfillmentRecord.DisableActiveScheduling = false
120+
return data.UpdateFulfillment(ctx, fulfillmentRecord)
121121
}
122122

123123
func (p *service) sendToBlockchain(ctx context.Context, record *fulfillment.Record) error {

pkg/code/data/action/action.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ type Record struct {
6666

6767
State State
6868

69+
Version uint64
70+
6971
CreatedAt time.Time
7072
}
7173

@@ -121,6 +123,8 @@ func (r *Record) Clone() Record {
121123

122124
State: r.State,
123125

126+
Version: r.Version,
127+
124128
CreatedAt: r.CreatedAt,
125129
}
126130
}
@@ -142,6 +146,8 @@ func (r *Record) CopyTo(dst *Record) {
142146

143147
dst.State = r.State
144148

149+
dst.Version = r.Version
150+
145151
dst.CreatedAt = r.CreatedAt
146152
}
147153

pkg/code/data/action/memory/store.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ func (s *store) PutAll(ctx context.Context, records ...*action.Record) error {
179179
if record.CreatedAt.IsZero() {
180180
record.CreatedAt = time.Now()
181181
}
182+
record.Version++
182183

183184
cloned := record.Clone()
184185
s.records = append(s.records, &cloned)
@@ -193,14 +194,20 @@ func (s *store) Update(ctx context.Context, record *action.Record) error {
193194
defer s.mu.Unlock()
194195

195196
if item := s.find(record); item != nil {
197+
if record.Version != item.Version {
198+
return action.ErrStaleVersion
199+
}
200+
record.Version++
201+
196202
if record.IntentType == intent.SendPublicPayment && record.ActionType == action.NoPrivacyWithdraw {
197203
item.Quantity = pointer.Uint64Copy(record.Quantity)
198204
}
199205
item.State = record.State
206+
item.Version = record.Version
200207
return nil
201208
}
202209

203-
return action.ErrActionNotFound
210+
return action.ErrStaleVersion
204211
}
205212

206213
// GetById implements action.store.GetById

pkg/code/data/action/postgres/model.go

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type model struct {
3232
Quantity sql.NullInt64 `db:"quantity"`
3333
FeeType sql.NullInt32 `db:"fee_type"`
3434
State uint `db:"state"`
35+
Version int64 `db:"version"`
3536
CreatedAt time.Time `db:"created_at"`
3637
}
3738

@@ -68,6 +69,7 @@ func toModel(obj *action.Record) (*model, error) {
6869
Quantity: quantity,
6970
FeeType: feeType,
7071
State: uint(obj.State),
72+
Version: int64(obj.Version),
7173
CreatedAt: obj.CreatedAt,
7274
}, nil
7375
}
@@ -84,6 +86,7 @@ func fromModel(obj *model) *action.Record {
8486
Quantity: pointer.Uint64IfValid(obj.Quantity.Valid, uint64(obj.Quantity.Int64)),
8587
FeeType: (*transactionpb.FeePaymentAction_FeeType)(pointer.Int32IfValid(obj.FeeType.Valid, obj.FeeType.Int32)),
8688
State: action.State(obj.State),
89+
Version: uint64(obj.Version),
8790
CreatedAt: obj.CreatedAt,
8891
}
8992
}
@@ -94,18 +97,19 @@ func (m *model) dbUpdate(ctx context.Context, db *sqlx.DB) error {
9497
params := []interface{}{
9598
m.Intent,
9699
m.ActionId,
100+
m.Version,
97101
m.State,
98102
}
99103

100104
if m.IntentType == uint(intent.SendPublicPayment) && m.ActionType == uint(action.NoPrivacyWithdraw) {
101-
quantityUpdateStmt = ", quantity = $4"
105+
quantityUpdateStmt = ", quantity = $5"
102106
params = append(params, m.Quantity)
103107
}
104108

105109
query := fmt.Sprintf(`UPDATE `+tableName+`
106-
SET state = $3%s
107-
WHERE intent = $1 AND action_id = $2
108-
RETURNING id, intent, intent_type, action_id, action_type, source, destination, quantity, fee_type, state, created_at
110+
SET state = $4%s, version = version + 1
111+
WHERE intent = $1 AND action_id = $2 AND version = $3
112+
RETURNING id, intent, intent_type, action_id, action_type, source, destination, quantity, fee_type, state, version, created_at
109113
`, quantityUpdateStmt)
110114

111115
err := tx.QueryRowxContext(
@@ -114,7 +118,7 @@ func (m *model) dbUpdate(ctx context.Context, db *sqlx.DB) error {
114118
params...,
115119
).StructScan(m)
116120
if err != nil {
117-
return pgutil.CheckNoRows(err, action.ErrActionNotFound)
121+
return pgutil.CheckNoRows(err, action.ErrStaleVersion)
118122
}
119123

120124
return nil
@@ -124,7 +128,7 @@ func (m *model) dbUpdate(ctx context.Context, db *sqlx.DB) error {
124128
func dbPutAllInTx(ctx context.Context, tx *sqlx.Tx, models []*model) ([]*model, error) {
125129
var res []*model
126130

127-
query := `INSERT INTO ` + tableName + ` (intent, intent_type, action_id, action_type, source, destination, quantity, fee_type, state, created_at) VALUES `
131+
query := `INSERT INTO ` + tableName + ` (intent, intent_type, action_id, action_type, source, destination, quantity, fee_type, state, version, created_at) VALUES `
128132

129133
var parameters []interface{}
130134
for i, model := range models {
@@ -134,8 +138,8 @@ func dbPutAllInTx(ctx context.Context, tx *sqlx.Tx, models []*model) ([]*model,
134138

135139
baseIndex := len(parameters)
136140
query += fmt.Sprintf(
137-
`($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d)`,
138-
baseIndex+1, baseIndex+2, baseIndex+3, baseIndex+4, baseIndex+5, baseIndex+6, baseIndex+7, baseIndex+8, baseIndex+9, baseIndex+10,
141+
`($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d + 1, $%d)`,
142+
baseIndex+1, baseIndex+2, baseIndex+3, baseIndex+4, baseIndex+5, baseIndex+6, baseIndex+7, baseIndex+8, baseIndex+9, baseIndex+10, baseIndex+11,
139143
)
140144

141145
if i != len(models)-1 {
@@ -153,11 +157,12 @@ func dbPutAllInTx(ctx context.Context, tx *sqlx.Tx, models []*model) ([]*model,
153157
model.Quantity,
154158
model.FeeType,
155159
model.State,
160+
model.Version,
156161
model.CreatedAt,
157162
)
158163
}
159164

160-
query += ` RETURNING id, intent, intent_type, action_id, action_type, source, destination, quantity, fee_type, state, created_at`
165+
query += ` RETURNING id, intent, intent_type, action_id, action_type, source, destination, quantity, fee_type, state, version, created_at`
161166

162167
err := tx.SelectContext(
163168
ctx,
@@ -175,7 +180,7 @@ func dbPutAllInTx(ctx context.Context, tx *sqlx.Tx, models []*model) ([]*model,
175180
func dbGetById(ctx context.Context, db *sqlx.DB, intent string, actionId uint32) (*model, error) {
176181
res := &model{}
177182

178-
query := `SELECT id, intent, intent_type, action_id, action_type, source, destination, quantity, fee_type, state, created_at
183+
query := `SELECT id, intent, intent_type, action_id, action_type, source, destination, quantity, fee_type, state, version, created_at
179184
FROM ` + tableName + `
180185
WHERE intent = $1 AND action_id = $2
181186
LIMIT 1`
@@ -190,7 +195,7 @@ func dbGetById(ctx context.Context, db *sqlx.DB, intent string, actionId uint32)
190195
func dbGetAllByIntent(ctx context.Context, db *sqlx.DB, intent string) ([]*model, error) {
191196
res := []*model{}
192197

193-
query := `SELECT id, intent, intent_type, action_id, action_type, source, destination, quantity, fee_type, state, created_at
198+
query := `SELECT id, intent, intent_type, action_id, action_type, source, destination, quantity, fee_type, state, version, created_at
194199
FROM ` + tableName + `
195200
WHERE intent = $1
196201
ORDER BY action_id ASC`
@@ -210,7 +215,7 @@ func dbGetAllByIntent(ctx context.Context, db *sqlx.DB, intent string) ([]*model
210215
func dbGetAllByAddress(ctx context.Context, db *sqlx.DB, address string) ([]*model, error) {
211216
res := []*model{}
212217

213-
query := `SELECT id, intent, intent_type, action_id, action_type, source, destination, quantity, fee_type, state, created_at
218+
query := `SELECT id, intent, intent_type, action_id, action_type, source, destination, quantity, fee_type, state, version, created_at
214219
FROM ` + tableName + `
215220
WHERE source = $1 OR destination = $1`
216221

@@ -300,7 +305,7 @@ func dbGetNetBalanceBatch(ctx context.Context, db *sqlx.DB, accounts ...string)
300305
func dbGetGiftCardClaimedAction(ctx context.Context, db *sqlx.DB, giftCardVault string) (*model, error) {
301306
res := []*model{}
302307

303-
query := `SELECT id, intent, intent_type, action_id, action_type, source, destination, quantity, fee_type, state, created_at
308+
query := `SELECT id, intent, intent_type, action_id, action_type, source, destination, quantity, fee_type, state, version, created_at
304309
FROM ` + tableName + `
305310
WHERE source = $1 AND action_type = $2 AND intent_type = $3 AND state != $4
306311
LIMIT 2`
@@ -330,7 +335,7 @@ func dbGetGiftCardClaimedAction(ctx context.Context, db *sqlx.DB, giftCardVault
330335
func dbGetGiftCardAutoReturnAction(ctx context.Context, db *sqlx.DB, giftCardVault string) (*model, error) {
331336
res := []*model{}
332337

333-
query := `SELECT id, intent, intent_type, action_id, action_type, source, destination, quantity, fee_type, state, created_at
338+
query := `SELECT id, intent, intent_type, action_id, action_type, source, destination, quantity, fee_type, state, version, created_at
334339
FROM ` + tableName + `
335340
WHERE source = $1 AND action_type = $2 AND intent_type = $3 AND state != $4
336341
LIMIT 2`

pkg/code/data/action/postgres/store.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,15 @@ func (s *store) Update(ctx context.Context, record *action.Record) error {
8989
return err
9090
}
9191

92-
return model.dbUpdate(ctx, s.db)
92+
err = model.dbUpdate(ctx, s.db)
93+
if err != nil {
94+
return err
95+
}
96+
97+
updated := fromModel(model)
98+
updated.CopyTo(record)
99+
100+
return nil
93101
}
94102

95103
// GetById implements action.store.GetById

pkg/code/data/action/postgres/store_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ const (
3636
3737
state INTEGER NOT NULL,
3838
39+
version INTEGER NOT NULL,
40+
3941
created_at timestamp with time zone NOT NULL,
4042
4143
CONSTRAINT codewallet__core_action__uniq__intent__and__action_id UNIQUE (intent, action_id)

pkg/code/data/action/store.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@ import (
88
)
99

1010
var (
11-
ErrActionNotFound = errors.New("no action ecord could be found")
12-
ErrMultipleActionsFound = errors.New("multiple action records found")
13-
ErrActionExists = errors.New("action record already exists")
11+
ErrActionNotFound = errors.New("no action could be found")
12+
ErrMultipleActionsFound = errors.New("multiple actions found")
13+
ErrActionExists = errors.New("action already exists")
14+
ErrStaleVersion = errors.New("action version is stale")
1415
)
1516

1617
type Store interface {

0 commit comments

Comments
 (0)