Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/code/async/account/gift_card.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,8 @@ func markFulfillmentAsActivelyScheduled(ctx context.Context, data code_data.Prov
return errors.New("expected fulfillment in unknown state")
}

// Note: different than Save, since we don't have distributed locks
return data.MarkFulfillmentAsActivelyScheduled(ctx, fulfillmentRecord.Id)
fulfillmentRecord.DisableActiveScheduling = false
return data.UpdateFulfillment(ctx, fulfillmentRecord)
}

// Must be unique, but consistent for idempotency, and ideally fit in a 32
Expand Down
68 changes: 39 additions & 29 deletions pkg/code/async/geyser/external_deposit.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package async_geyser
import (
"context"
"crypto/sha256"
"database/sql"
"fmt"
"strconv"
"strings"
Expand Down Expand Up @@ -274,6 +275,8 @@ func processPotentialExternalDepositIntoVm(ctx context.Context, data code_data.P
if err == nil {
syncedDepositCache.Insert(cacheKey, true, 1)
return nil
} else if err != deposit.ErrDepositNotFound {
return errors.Wrap(err, "error checking for existing external deposit record")
}

ownerAccount, err := common.NewAccountFromPublicKeyString(accountInfoRecord.OwnerAccount)
Expand All @@ -287,42 +290,49 @@ func processPotentialExternalDepositIntoVm(ctx context.Context, data code_data.P
}
usdMarketValue := usdExchangeRecord.Rate * float64(deltaQuarksIntoOmnibus) / float64(common.CoreMintQuarksPerUnit)

// For transaction history
intentRecord := &intent.Record{
IntentId: getExternalDepositIntentID(signature, userVirtualTimelockVaultAccount),
IntentType: intent.ExternalDeposit,
err = data.ExecuteInTx(ctx, sql.LevelDefault, func(ctx context.Context) error {
// For transaction history
intentRecord := &intent.Record{
IntentId: getExternalDepositIntentID(signature, userVirtualTimelockVaultAccount),
IntentType: intent.ExternalDeposit,

InitiatorOwnerAccount: ownerAccount.PublicKey().ToBase58(),
InitiatorOwnerAccount: ownerAccount.PublicKey().ToBase58(),

ExternalDepositMetadata: &intent.ExternalDepositMetadata{
DestinationTokenAccount: userVirtualTimelockVaultAccount.PublicKey().ToBase58(),
Quantity: uint64(deltaQuarksIntoOmnibus),
UsdMarketValue: usdMarketValue,
},
ExternalDepositMetadata: &intent.ExternalDepositMetadata{
DestinationTokenAccount: userVirtualTimelockVaultAccount.PublicKey().ToBase58(),
Quantity: uint64(deltaQuarksIntoOmnibus),
UsdMarketValue: usdMarketValue,
},

State: intent.StateConfirmed,
CreatedAt: time.Now(),
}
err = data.SaveIntent(ctx, intentRecord)
if err != nil {
return errors.Wrap(err, "error saving intent record")
}
State: intent.StateConfirmed,
CreatedAt: time.Now(),
}
err = data.SaveIntent(ctx, intentRecord)
if err != nil {
return errors.Wrap(err, "error saving intent record")
}

// For tracking in cached balances
externalDepositRecord := &deposit.Record{
Signature: signature,
Destination: userVirtualTimelockVaultAccount.PublicKey().ToBase58(),
Amount: uint64(deltaQuarksIntoOmnibus),
UsdMarketValue: usdMarketValue,
// For tracking in cached balances
externalDepositRecord := &deposit.Record{
Signature: signature,
Destination: userVirtualTimelockVaultAccount.PublicKey().ToBase58(),
Amount: uint64(deltaQuarksIntoOmnibus),
UsdMarketValue: usdMarketValue,

Slot: tokenBalances.Slot,
ConfirmationState: transaction.ConfirmationFinalized,
Slot: tokenBalances.Slot,
ConfirmationState: transaction.ConfirmationFinalized,

CreatedAt: time.Now(),
}
err = data.SaveExternalDeposit(ctx, externalDepositRecord)
CreatedAt: time.Now(),
}
err = data.SaveExternalDeposit(ctx, externalDepositRecord)
if err != nil {
return errors.Wrap(err, "error saving external deposit record")
}

return nil
})
if err != nil {
return errors.Wrap(err, "error saving external deposit record")
return err
}

syncedDepositCache.Insert(cacheKey, true, 1)
Expand Down
19 changes: 0 additions & 19 deletions pkg/code/async/sequencer/action_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,25 +170,6 @@ func markActionFailed(ctx context.Context, data code_data.Provider, intentId str
return data.UpdateAction(ctx, record)
}

func markActionRevoked(ctx context.Context, data code_data.Provider, intentId string, actionId uint32) error {
record, err := data.GetActionById(ctx, intentId, actionId)
if err != nil {
return err
}

if record.State == action.StateRevoked {
return nil
}

err = validateActionState(record, action.StateUnknown)
if err != nil {
return err
}

record.State = action.StateRevoked
return data.UpdateAction(ctx, record)
}

func getActionHandlers(data code_data.Provider) map[action.Type]ActionHandler {
handlersByType := make(map[action.Type]ActionHandler)
handlersByType[action.OpenAccount] = NewOpenAccountActionHandler(data)
Expand Down
4 changes: 2 additions & 2 deletions pkg/code/async/sequencer/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ func markFulfillmentAsActivelyScheduled(ctx context.Context, data code_data.Prov
return nil
}

// Note: different than Save, since we don't have distributed locks
return data.MarkFulfillmentAsActivelyScheduled(ctx, fulfillmentRecord.Id)
fulfillmentRecord.DisableActiveScheduling = false
return data.UpdateFulfillment(ctx, fulfillmentRecord)
}

func (p *service) sendToBlockchain(ctx context.Context, record *fulfillment.Record) error {
Expand Down
6 changes: 6 additions & 0 deletions pkg/code/data/action/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ type Record struct {

State State

Version uint64

CreatedAt time.Time
}

Expand Down Expand Up @@ -121,6 +123,8 @@ func (r *Record) Clone() Record {

State: r.State,

Version: r.Version,

CreatedAt: r.CreatedAt,
}
}
Expand All @@ -142,6 +146,8 @@ func (r *Record) CopyTo(dst *Record) {

dst.State = r.State

dst.Version = r.Version

dst.CreatedAt = r.CreatedAt
}

Expand Down
9 changes: 8 additions & 1 deletion pkg/code/data/action/memory/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func (s *store) PutAll(ctx context.Context, records ...*action.Record) error {
if record.CreatedAt.IsZero() {
record.CreatedAt = time.Now()
}
record.Version++

cloned := record.Clone()
s.records = append(s.records, &cloned)
Expand All @@ -193,14 +194,20 @@ func (s *store) Update(ctx context.Context, record *action.Record) error {
defer s.mu.Unlock()

if item := s.find(record); item != nil {
if record.Version != item.Version {
return action.ErrStaleVersion
}
record.Version++

if record.IntentType == intent.SendPublicPayment && record.ActionType == action.NoPrivacyWithdraw {
item.Quantity = pointer.Uint64Copy(record.Quantity)
}
item.State = record.State
item.Version = record.Version
return nil
}

return action.ErrActionNotFound
return action.ErrStaleVersion
}

// GetById implements action.store.GetById
Expand Down
33 changes: 19 additions & 14 deletions pkg/code/data/action/postgres/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type model struct {
Quantity sql.NullInt64 `db:"quantity"`
FeeType sql.NullInt32 `db:"fee_type"`
State uint `db:"state"`
Version int64 `db:"version"`
CreatedAt time.Time `db:"created_at"`
}

Expand Down Expand Up @@ -68,6 +69,7 @@ func toModel(obj *action.Record) (*model, error) {
Quantity: quantity,
FeeType: feeType,
State: uint(obj.State),
Version: int64(obj.Version),
CreatedAt: obj.CreatedAt,
}, nil
}
Expand All @@ -84,6 +86,7 @@ func fromModel(obj *model) *action.Record {
Quantity: pointer.Uint64IfValid(obj.Quantity.Valid, uint64(obj.Quantity.Int64)),
FeeType: (*transactionpb.FeePaymentAction_FeeType)(pointer.Int32IfValid(obj.FeeType.Valid, obj.FeeType.Int32)),
State: action.State(obj.State),
Version: uint64(obj.Version),
CreatedAt: obj.CreatedAt,
}
}
Expand All @@ -94,18 +97,19 @@ func (m *model) dbUpdate(ctx context.Context, db *sqlx.DB) error {
params := []interface{}{
m.Intent,
m.ActionId,
m.Version,
m.State,
}

if m.IntentType == uint(intent.SendPublicPayment) && m.ActionType == uint(action.NoPrivacyWithdraw) {
quantityUpdateStmt = ", quantity = $4"
quantityUpdateStmt = ", quantity = $5"
params = append(params, m.Quantity)
}

query := fmt.Sprintf(`UPDATE `+tableName+`
SET state = $3%s
WHERE intent = $1 AND action_id = $2
RETURNING id, intent, intent_type, action_id, action_type, source, destination, quantity, fee_type, state, created_at
SET state = $4%s, version = version + 1
WHERE intent = $1 AND action_id = $2 AND version = $3
RETURNING id, intent, intent_type, action_id, action_type, source, destination, quantity, fee_type, state, version, created_at
`, quantityUpdateStmt)

err := tx.QueryRowxContext(
Expand All @@ -114,7 +118,7 @@ func (m *model) dbUpdate(ctx context.Context, db *sqlx.DB) error {
params...,
).StructScan(m)
if err != nil {
return pgutil.CheckNoRows(err, action.ErrActionNotFound)
return pgutil.CheckNoRows(err, action.ErrStaleVersion)
}

return nil
Expand All @@ -124,7 +128,7 @@ func (m *model) dbUpdate(ctx context.Context, db *sqlx.DB) error {
func dbPutAllInTx(ctx context.Context, tx *sqlx.Tx, models []*model) ([]*model, error) {
var res []*model

query := `INSERT INTO ` + tableName + ` (intent, intent_type, action_id, action_type, source, destination, quantity, fee_type, state, created_at) VALUES `
query := `INSERT INTO ` + tableName + ` (intent, intent_type, action_id, action_type, source, destination, quantity, fee_type, state, version, created_at) VALUES `

var parameters []interface{}
for i, model := range models {
Expand All @@ -134,8 +138,8 @@ func dbPutAllInTx(ctx context.Context, tx *sqlx.Tx, models []*model) ([]*model,

baseIndex := len(parameters)
query += fmt.Sprintf(
`($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d)`,
baseIndex+1, baseIndex+2, baseIndex+3, baseIndex+4, baseIndex+5, baseIndex+6, baseIndex+7, baseIndex+8, baseIndex+9, baseIndex+10,
`($%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d, $%d + 1, $%d)`,
baseIndex+1, baseIndex+2, baseIndex+3, baseIndex+4, baseIndex+5, baseIndex+6, baseIndex+7, baseIndex+8, baseIndex+9, baseIndex+10, baseIndex+11,
)

if i != len(models)-1 {
Expand All @@ -153,11 +157,12 @@ func dbPutAllInTx(ctx context.Context, tx *sqlx.Tx, models []*model) ([]*model,
model.Quantity,
model.FeeType,
model.State,
model.Version,
model.CreatedAt,
)
}

query += ` RETURNING id, intent, intent_type, action_id, action_type, source, destination, quantity, fee_type, state, created_at`
query += ` RETURNING id, intent, intent_type, action_id, action_type, source, destination, quantity, fee_type, state, version, created_at`

err := tx.SelectContext(
ctx,
Expand All @@ -175,7 +180,7 @@ func dbPutAllInTx(ctx context.Context, tx *sqlx.Tx, models []*model) ([]*model,
func dbGetById(ctx context.Context, db *sqlx.DB, intent string, actionId uint32) (*model, error) {
res := &model{}

query := `SELECT id, intent, intent_type, action_id, action_type, source, destination, quantity, fee_type, state, created_at
query := `SELECT id, intent, intent_type, action_id, action_type, source, destination, quantity, fee_type, state, version, created_at
FROM ` + tableName + `
WHERE intent = $1 AND action_id = $2
LIMIT 1`
Expand All @@ -190,7 +195,7 @@ func dbGetById(ctx context.Context, db *sqlx.DB, intent string, actionId uint32)
func dbGetAllByIntent(ctx context.Context, db *sqlx.DB, intent string) ([]*model, error) {
res := []*model{}

query := `SELECT id, intent, intent_type, action_id, action_type, source, destination, quantity, fee_type, state, created_at
query := `SELECT id, intent, intent_type, action_id, action_type, source, destination, quantity, fee_type, state, version, created_at
FROM ` + tableName + `
WHERE intent = $1
ORDER BY action_id ASC`
Expand All @@ -210,7 +215,7 @@ func dbGetAllByIntent(ctx context.Context, db *sqlx.DB, intent string) ([]*model
func dbGetAllByAddress(ctx context.Context, db *sqlx.DB, address string) ([]*model, error) {
res := []*model{}

query := `SELECT id, intent, intent_type, action_id, action_type, source, destination, quantity, fee_type, state, created_at
query := `SELECT id, intent, intent_type, action_id, action_type, source, destination, quantity, fee_type, state, version, created_at
FROM ` + tableName + `
WHERE source = $1 OR destination = $1`

Expand Down Expand Up @@ -300,7 +305,7 @@ func dbGetNetBalanceBatch(ctx context.Context, db *sqlx.DB, accounts ...string)
func dbGetGiftCardClaimedAction(ctx context.Context, db *sqlx.DB, giftCardVault string) (*model, error) {
res := []*model{}

query := `SELECT id, intent, intent_type, action_id, action_type, source, destination, quantity, fee_type, state, created_at
query := `SELECT id, intent, intent_type, action_id, action_type, source, destination, quantity, fee_type, state, version, created_at
FROM ` + tableName + `
WHERE source = $1 AND action_type = $2 AND intent_type = $3 AND state != $4
LIMIT 2`
Expand Down Expand Up @@ -330,7 +335,7 @@ func dbGetGiftCardClaimedAction(ctx context.Context, db *sqlx.DB, giftCardVault
func dbGetGiftCardAutoReturnAction(ctx context.Context, db *sqlx.DB, giftCardVault string) (*model, error) {
res := []*model{}

query := `SELECT id, intent, intent_type, action_id, action_type, source, destination, quantity, fee_type, state, created_at
query := `SELECT id, intent, intent_type, action_id, action_type, source, destination, quantity, fee_type, state, version, created_at
FROM ` + tableName + `
WHERE source = $1 AND action_type = $2 AND intent_type = $3 AND state != $4
LIMIT 2`
Expand Down
10 changes: 9 additions & 1 deletion pkg/code/data/action/postgres/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,15 @@ func (s *store) Update(ctx context.Context, record *action.Record) error {
return err
}

return model.dbUpdate(ctx, s.db)
err = model.dbUpdate(ctx, s.db)
if err != nil {
return err
}

updated := fromModel(model)
updated.CopyTo(record)

return nil
}

// GetById implements action.store.GetById
Expand Down
2 changes: 2 additions & 0 deletions pkg/code/data/action/postgres/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const (

state INTEGER NOT NULL,

version INTEGER NOT NULL,

created_at timestamp with time zone NOT NULL,

CONSTRAINT codewallet__core_action__uniq__intent__and__action_id UNIQUE (intent, action_id)
Expand Down
7 changes: 4 additions & 3 deletions pkg/code/data/action/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import (
)

var (
ErrActionNotFound = errors.New("no action ecord could be found")
ErrMultipleActionsFound = errors.New("multiple action records found")
ErrActionExists = errors.New("action record already exists")
ErrActionNotFound = errors.New("no action could be found")
ErrMultipleActionsFound = errors.New("multiple actions found")
ErrActionExists = errors.New("action already exists")
ErrStaleVersion = errors.New("action version is stale")
)

type Store interface {
Expand Down
Loading
Loading