Skip to content

Commit 5fef969

Browse files
committed
Basic and minimal swap worker
1 parent 76e6897 commit 5fef969

File tree

5 files changed

+487
-0
lines changed

5 files changed

+487
-0
lines changed

pkg/code/async/swap/config.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package async_swap
2+
3+
import (
4+
"github.com/code-payments/code-server/pkg/config"
5+
"github.com/code-payments/code-server/pkg/config/env"
6+
"github.com/code-payments/code-server/pkg/config/memory"
7+
"github.com/code-payments/code-server/pkg/config/wrapper"
8+
)
9+
10+
const (
11+
envConfigPrefix = "SWAP_SERVICE_"
12+
13+
BatchSizeConfigEnvName = envConfigPrefix + "WORKER_BATCH_SIZE"
14+
defaultFulfillmentBatchSize = 100
15+
)
16+
17+
type conf struct {
18+
batchSize config.Uint64
19+
enableCachedTransactionLookup config.Bool
20+
}
21+
22+
// ConfigProvider defines how config values are pulled
23+
type ConfigProvider func() *conf
24+
25+
// WithEnvConfigs returns configuration pulled from environment variables
26+
func WithEnvConfigs() ConfigProvider {
27+
return func() *conf {
28+
return &conf{
29+
batchSize: env.NewUint64Config(BatchSizeConfigEnvName, defaultFulfillmentBatchSize),
30+
enableCachedTransactionLookup: wrapper.NewBoolConfig(memory.NewConfig(false), false),
31+
}
32+
}
33+
}

pkg/code/async/swap/service.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package async_swap
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/sirupsen/logrus"
8+
9+
"github.com/code-payments/code-server/pkg/code/async"
10+
code_data "github.com/code-payments/code-server/pkg/code/data"
11+
"github.com/code-payments/code-server/pkg/code/data/swap"
12+
)
13+
14+
type service struct {
15+
log *logrus.Entry
16+
conf *conf
17+
data code_data.Provider
18+
}
19+
20+
func New(data code_data.Provider, configProvider ConfigProvider) async.Service {
21+
return &service{
22+
log: logrus.StandardLogger().WithField("service", "swap"),
23+
conf: configProvider(),
24+
data: data,
25+
}
26+
27+
}
28+
29+
func (p *service) Start(ctx context.Context, interval time.Duration) error {
30+
31+
for _, state := range []swap.State{
32+
swap.StateFunding,
33+
swap.StateSubmitting,
34+
} {
35+
go func(state swap.State) {
36+
37+
err := p.worker(ctx, state, interval)
38+
if err != nil && err != context.Canceled {
39+
p.log.WithError(err).Warnf("swap processing loop terminated unexpectedly for state %s", state.String())
40+
}
41+
42+
}(state)
43+
}
44+
45+
select {
46+
case <-ctx.Done():
47+
return ctx.Err()
48+
}
49+
}

pkg/code/async/swap/util.go

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
package async_swap
2+
3+
import (
4+
"context"
5+
"crypto/sha256"
6+
"database/sql"
7+
"fmt"
8+
"strconv"
9+
"time"
10+
11+
"github.com/mr-tron/base58"
12+
"github.com/pkg/errors"
13+
14+
"github.com/code-payments/code-server/pkg/code/common"
15+
currency_util "github.com/code-payments/code-server/pkg/code/currency"
16+
"github.com/code-payments/code-server/pkg/code/data/deposit"
17+
"github.com/code-payments/code-server/pkg/code/data/intent"
18+
"github.com/code-payments/code-server/pkg/code/data/nonce"
19+
"github.com/code-payments/code-server/pkg/code/data/swap"
20+
"github.com/code-payments/code-server/pkg/code/data/transaction"
21+
"github.com/code-payments/code-server/pkg/solana"
22+
)
23+
24+
func (p *service) validateSwapState(record *swap.Record, states ...swap.State) error {
25+
for _, validState := range states {
26+
if record.State == validState {
27+
return nil
28+
}
29+
}
30+
return errors.New("invalid swap state")
31+
}
32+
33+
func (p *service) markSwapFunded(ctx context.Context, record *swap.Record) error {
34+
err := p.validateSwapState(record, swap.StateFunding)
35+
if err != nil {
36+
return err
37+
}
38+
39+
record.State = swap.StateFunded
40+
return p.data.SaveSwap(ctx, record)
41+
}
42+
43+
func (p *service) markSwapFinalized(ctx context.Context, record *swap.Record) error {
44+
err := p.validateSwapState(record, swap.StateSubmitting)
45+
if err != nil {
46+
return err
47+
}
48+
49+
err = p.markNonceReleasedDueToSubmittedTransaction(ctx, record)
50+
if err != nil {
51+
return err
52+
}
53+
54+
record.State = swap.StateFinalized
55+
return p.data.SaveSwap(ctx, record)
56+
}
57+
58+
func (p *service) markSwapFailed(ctx context.Context, record *swap.Record) error {
59+
err := p.validateSwapState(record, swap.StateSubmitting)
60+
if err != nil {
61+
return err
62+
}
63+
64+
err = p.markNonceReleasedDueToSubmittedTransaction(ctx, record)
65+
if err != nil {
66+
return err
67+
}
68+
69+
record.State = swap.StateFailed
70+
return p.data.SaveSwap(ctx, record)
71+
}
72+
73+
// todo: commonalities between this and geyser external deposit logic
74+
func (p *service) updateBalances(ctx context.Context, record *swap.Record) error {
75+
owner, err := common.NewAccountFromPublicKeyString(record.Owner)
76+
if err != nil {
77+
return err
78+
}
79+
80+
toMint, err := common.NewAccountFromPublicKeyString(record.ToMint)
81+
if err != nil {
82+
return err
83+
}
84+
85+
destinationVmConfig, err := common.GetVmConfigForMint(ctx, p.data, toMint)
86+
if err != nil {
87+
return err
88+
}
89+
90+
ownerDestinationTimelockVault, err := owner.ToTimelockVault(destinationVmConfig)
91+
if err != nil {
92+
return err
93+
}
94+
95+
tokenBalances, err := p.data.GetBlockchainTransactionTokenBalances(ctx, *record.TransactionSignature)
96+
if err != nil {
97+
return err
98+
}
99+
100+
deltaQuarksIntoOmnibus, err := getDeltaQuarksFromTokenBalances(destinationVmConfig.Omnibus, tokenBalances)
101+
if err != nil {
102+
return err
103+
}
104+
if deltaQuarksIntoOmnibus <= 0 {
105+
return errors.New("delta quarks into destination vm omnibus is not positive")
106+
}
107+
108+
usdMarketValue, _, err := currency_util.CalculateUsdMarketValue(ctx, p.data, toMint, uint64(deltaQuarksIntoOmnibus), time.Now())
109+
if err != nil {
110+
return err
111+
}
112+
113+
return p.data.ExecuteInTx(ctx, sql.LevelDefault, func(ctx context.Context) error {
114+
// For transaction history
115+
intentRecord := &intent.Record{
116+
IntentId: getSwapDepositIntentID(*record.TransactionSignature, ownerDestinationTimelockVault),
117+
IntentType: intent.ExternalDeposit,
118+
119+
MintAccount: toMint.PublicKey().ToBase58(),
120+
121+
InitiatorOwnerAccount: owner.PublicKey().ToBase58(),
122+
123+
ExternalDepositMetadata: &intent.ExternalDepositMetadata{
124+
DestinationTokenAccount: ownerDestinationTimelockVault.PublicKey().ToBase58(),
125+
Quantity: uint64(deltaQuarksIntoOmnibus),
126+
UsdMarketValue: usdMarketValue,
127+
},
128+
129+
State: intent.StateConfirmed,
130+
CreatedAt: time.Now(),
131+
}
132+
err = p.data.SaveIntent(ctx, intentRecord)
133+
if err != nil {
134+
return err
135+
}
136+
137+
// For tracking in cached balances
138+
externalDepositRecord := &deposit.Record{
139+
Signature: *record.TransactionSignature,
140+
Destination: ownerDestinationTimelockVault.PublicKey().ToBase58(),
141+
Amount: uint64(deltaQuarksIntoOmnibus),
142+
UsdMarketValue: usdMarketValue,
143+
144+
Slot: tokenBalances.Slot,
145+
ConfirmationState: transaction.ConfirmationFinalized,
146+
147+
CreatedAt: time.Now(),
148+
}
149+
return p.data.SaveExternalDeposit(ctx, externalDepositRecord)
150+
})
151+
}
152+
153+
func (p *service) markNonceReleasedDueToSubmittedTransaction(ctx context.Context, record *swap.Record) error {
154+
err := p.validateSwapState(record, swap.StateSubmitting)
155+
if err != nil {
156+
return err
157+
}
158+
159+
nonceRecord, err := p.data.GetNonce(ctx, record.Nonce)
160+
if err != nil {
161+
return err
162+
}
163+
164+
if *record.TransactionSignature != nonceRecord.Signature {
165+
return errors.New("unexpected nonce signature")
166+
}
167+
168+
if record.Blockhash != nonceRecord.Blockhash {
169+
return errors.New("unexpected nonce blockhash")
170+
}
171+
172+
if nonceRecord.State != nonce.StateReserved {
173+
return errors.New("unexpected nonce state")
174+
}
175+
176+
nonceRecord.State = nonce.StateReleased
177+
return p.data.SaveNonce(ctx, nonceRecord)
178+
}
179+
180+
func (p *service) getTransaction(ctx context.Context, record *swap.Record) (*transaction.Record, error) {
181+
if record.TransactionSignature == nil || len(*record.TransactionSignature) == 0 {
182+
return nil, transaction.ErrNotFound
183+
}
184+
185+
if p.conf.enableCachedTransactionLookup.Get(ctx) {
186+
return p.data.GetTransaction(ctx, *record.TransactionSignature)
187+
}
188+
189+
return p.getTransactionFromBlockchain(ctx, record)
190+
}
191+
192+
func (p *service) getTransactionFromBlockchain(ctx context.Context, record *swap.Record) (*transaction.Record, error) {
193+
stx, err := p.data.GetBlockchainTransaction(ctx, *record.TransactionSignature, solana.CommitmentFinalized)
194+
if err == solana.ErrSignatureNotFound {
195+
return nil, transaction.ErrNotFound
196+
}
197+
if err != nil {
198+
return nil, err
199+
}
200+
201+
tx, err := transaction.FromConfirmedTransaction(stx)
202+
if err != nil {
203+
return nil, err
204+
}
205+
206+
return tx, nil
207+
}
208+
209+
func getDeltaQuarksFromTokenBalances(tokenAccount *common.Account, tokenBalances *solana.TransactionTokenBalances) (int64, error) {
210+
var preQuarkBalance, postQuarkBalance int64
211+
var err error
212+
for _, tokenBalance := range tokenBalances.PreTokenBalances {
213+
if tokenBalances.Accounts[tokenBalance.AccountIndex] == tokenAccount.PublicKey().ToBase58() {
214+
preQuarkBalance, err = strconv.ParseInt(tokenBalance.TokenAmount.Amount, 10, 64)
215+
if err != nil {
216+
return 0, errors.Wrap(err, "error parsing pre token balance")
217+
}
218+
break
219+
}
220+
}
221+
for _, tokenBalance := range tokenBalances.PostTokenBalances {
222+
if tokenBalances.Accounts[tokenBalance.AccountIndex] == tokenAccount.PublicKey().ToBase58() {
223+
postQuarkBalance, err = strconv.ParseInt(tokenBalance.TokenAmount.Amount, 10, 64)
224+
if err != nil {
225+
return 0, errors.Wrap(err, "error parsing post token balance")
226+
}
227+
break
228+
}
229+
}
230+
231+
return postQuarkBalance - preQuarkBalance, nil
232+
}
233+
234+
func getSwapDepositIntentID(signature string, destination *common.Account) string {
235+
combined := fmt.Sprintf("%s-%s", signature, destination.PublicKey().ToBase58())
236+
hashed := sha256.Sum256([]byte(combined))
237+
return base58.Encode(hashed[:])
238+
}

0 commit comments

Comments
 (0)