Skip to content

Commit e10e62d

Browse files
magik6krvagg
authored andcommitted
feat: Snark Market support in Balance Manager (#594)
* snark market balancemgr ui/schema * balancemgr filter supported types in CanAccept * implement proofshare bmgr do * make gen * cleanup * fix msg tracking address type
1 parent 7d5a249 commit e10e62d

File tree

7 files changed

+494
-190
lines changed

7 files changed

+494
-190
lines changed

harmony/harmonydb/sql/20250727-balancemgr.sql

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ CREATE TABLE balance_manager_addresses (
1313
-- "active-provider" - if subject above high watermark, send fil from subject to second address up to low watermark
1414
action_type TEXT NOT NULL, -- "requester", "active-provider"
1515

16+
-- added in 20250817-balancemgr-pshare.sql
17+
-- subject_type TEXT NOT NULL DEFAULT 'wallet', -- "wallet", "proofshare"
18+
1619
low_watermark_fil_balance TEXT NOT NULL DEFAULT '0',
1720
high_watermark_fil_balance TEXT NOT NULL DEFAULT '0',
1821

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
ALTER TABLE balance_manager_addresses ADD COLUMN subject_type TEXT NOT NULL DEFAULT 'wallet';
2+
3+
-- For proofshare rules we allow subject_address == second_address.
4+
-- Relax the constraint accordingly.
5+
ALTER TABLE balance_manager_addresses DROP CONSTRAINT IF EXISTS subject_not_equal_second;
6+
ALTER TABLE balance_manager_addresses ADD CONSTRAINT subject_not_equal_second
7+
CHECK (subject_type = 'proofshare' OR subject_address != second_address);
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
package balancemgr
2+
3+
import (
4+
"context"
5+
6+
"github.com/ipfs/go-cid"
7+
"golang.org/x/xerrors"
8+
9+
"github.com/filecoin-project/go-address"
10+
11+
"github.com/filecoin-project/curio/harmony/harmonydb"
12+
"github.com/filecoin-project/curio/harmony/harmonytask"
13+
"github.com/filecoin-project/curio/lib/proofsvc/common"
14+
15+
"github.com/filecoin-project/lotus/api"
16+
"github.com/filecoin-project/lotus/chain/types"
17+
)
18+
19+
func (b *BalanceMgrTask) adderProofshare(ctx context.Context, taskFunc harmonytask.AddTaskFunc, addr *balanceManagerAddress) error {
20+
svc := common.NewService(b.chain)
21+
idAddr, err := b.chain.StateLookupID(ctx, addr.SubjectAddress, types.EmptyTSK)
22+
if err != nil {
23+
return xerrors.Errorf("getting address ID: %w", err)
24+
}
25+
26+
addrID, err := address.IDFromAddress(idAddr)
27+
if err != nil {
28+
return xerrors.Errorf("getting address ID: %w", err)
29+
}
30+
31+
clientState, err := svc.GetClientState(ctx, addrID)
32+
if err != nil {
33+
return xerrors.Errorf("PSClientWallets: failed to get client state: %w", err)
34+
}
35+
36+
sourceBalance, err := b.chain.StateGetActor(ctx, addr.SubjectAddress, types.EmptyTSK)
37+
if err != nil {
38+
return xerrors.Errorf("getting source balance: %w", err)
39+
}
40+
41+
addr.SubjectBalance = types.BigInt(clientState.Balance)
42+
addr.SecondBalance = types.BigInt(sourceBalance.Balance)
43+
44+
var shouldCreateTask bool
45+
switch addr.ActionType {
46+
case "requester":
47+
shouldCreateTask = addr.SubjectBalance.LessThan(addr.LowWatermarkFilBalance)
48+
}
49+
50+
if shouldCreateTask {
51+
taskFunc(func(taskID harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
52+
// check that address.ID has active_task_id = null, set the task ID, set last_ to null
53+
n, err := tx.Exec(`
54+
UPDATE balance_manager_addresses
55+
SET active_task_id = $1, last_msg_cid = NULL, last_msg_sent_at = NULL, last_msg_landed_at = NULL
56+
WHERE id = $2 AND active_task_id IS NULL AND (last_msg_cid IS NULL OR last_msg_landed_at IS NOT NULL)
57+
`, taskID, addr.ID)
58+
if err != nil {
59+
return false, xerrors.Errorf("updating balance manager address: %w", err)
60+
}
61+
62+
return n > 0, nil
63+
})
64+
}
65+
return nil
66+
}
67+
68+
func (b *BalanceMgrTask) doProofshare(ctx context.Context, taskID harmonytask.TaskID, addr *balanceManagerAddress) (bool, error) {
69+
log.Infow("balancemgr proofshare Do",
70+
"id", addr.ID,
71+
"subject", addr.SubjectAddress,
72+
"low", types.FIL(addr.LowWatermarkFilBalance),
73+
"high", types.FIL(addr.HighWatermarkFilBalance))
74+
75+
svc := common.NewServiceCustomSend(b.chain, func(ctx context.Context, msg *types.Message, mss *api.MessageSendSpec) (cid.Cid, error) {
76+
mss.MaximizeFeeCap = true
77+
return b.sender.Send(ctx, msg, mss, "balancemgr-proofshare")
78+
})
79+
idAddr, err := b.chain.StateLookupID(ctx, addr.SubjectAddress, types.EmptyTSK)
80+
if err != nil {
81+
return false, xerrors.Errorf("getting address ID: %w", err)
82+
}
83+
84+
addrID, err := address.IDFromAddress(idAddr)
85+
if err != nil {
86+
return false, xerrors.Errorf("getting address ID: %w", err)
87+
}
88+
89+
clientState, err := svc.GetClientState(ctx, addrID)
90+
if err != nil {
91+
return false, xerrors.Errorf("PSClientWallets: failed to get client state: %w", err)
92+
}
93+
94+
sourceBalance, err := b.chain.StateGetActor(ctx, addr.SubjectAddress, types.EmptyTSK)
95+
if err != nil {
96+
return false, xerrors.Errorf("getting source balance: %w", err)
97+
}
98+
99+
addr.SubjectBalance = types.BigInt(clientState.Balance)
100+
addr.SecondBalance = types.BigInt(sourceBalance.Balance)
101+
102+
// calculate amount to send (based on latest chain balance)
103+
var amount types.BigInt
104+
var to address.Address
105+
var shouldSend bool
106+
107+
if addr.ActionType != "requester" {
108+
return false, xerrors.Errorf("action type is not requester: %s", addr.ActionType)
109+
}
110+
111+
if addr.SubjectBalance.LessThan(addr.LowWatermarkFilBalance) {
112+
amount = types.BigSub(addr.HighWatermarkFilBalance, addr.SubjectBalance)
113+
to = addr.SubjectAddress
114+
shouldSend = true
115+
}
116+
117+
if !shouldSend {
118+
log.Infow("balance within watermarks, no action needed",
119+
"subject", addr.SubjectAddress,
120+
"balance", types.FIL(addr.SubjectBalance),
121+
"low", types.FIL(addr.LowWatermarkFilBalance),
122+
"high", types.FIL(addr.HighWatermarkFilBalance))
123+
124+
_, err = b.db.Exec(ctx, `
125+
UPDATE balance_manager_addresses
126+
SET active_task_id = NULL, last_action = NOW()
127+
WHERE id = $1
128+
`, addr.ID)
129+
if err != nil {
130+
return false, xerrors.Errorf("clearing task id: %w", err)
131+
}
132+
return true, nil
133+
}
134+
135+
msgCid, err := svc.ClientDeposit(ctx, addr.SubjectAddress, amount)
136+
if err != nil {
137+
return false, xerrors.Errorf("ClientDeposit: %w", err)
138+
}
139+
140+
_, err = b.db.Exec(ctx, `INSERT INTO message_waits (signed_message_cid) VALUES ($1)`, msgCid)
141+
if err != nil {
142+
return false, xerrors.Errorf("inserting into message_waits: %w", err)
143+
}
144+
145+
_, err = b.db.Exec(ctx, `
146+
UPDATE balance_manager_addresses
147+
SET last_msg_cid = $2,
148+
last_msg_sent_at = NOW(),
149+
last_msg_landed_at = NULL,
150+
active_task_id = NULL
151+
WHERE id = $1
152+
`, addr.ID, msgCid.String())
153+
if err != nil {
154+
return false, xerrors.Errorf("updating message cid: %w", err)
155+
}
156+
157+
_, err = b.db.Exec(ctx, `
158+
INSERT INTO proofshare_client_messages (signed_cid, wallet, action)
159+
VALUES ($1, $2, $3)
160+
`, msgCid, addrID, "deposit-bmgr")
161+
if err != nil {
162+
return false, xerrors.Errorf("addMessageTracking: failed to insert proofshare_client_messages: %w", err)
163+
}
164+
165+
log.Infow("sent balance management message",
166+
"from", addr.SecondAddress,
167+
"to", to,
168+
"subjectType", "proofshare",
169+
"amount", types.FIL(amount),
170+
"msgCid", msgCid,
171+
"actionType", addr.ActionType)
172+
173+
return true, nil
174+
}
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
package balancemgr
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"golang.org/x/xerrors"
8+
9+
"github.com/filecoin-project/go-address"
10+
"github.com/filecoin-project/go-state-types/abi"
11+
"github.com/filecoin-project/go-state-types/builtin"
12+
13+
"github.com/filecoin-project/curio/harmony/harmonydb"
14+
"github.com/filecoin-project/curio/harmony/harmonytask"
15+
16+
"github.com/filecoin-project/lotus/api"
17+
"github.com/filecoin-project/lotus/chain/types"
18+
)
19+
20+
func (b *BalanceMgrTask) adderWallet(ctx context.Context, taskFunc harmonytask.AddTaskFunc, addr *balanceManagerAddress) error {
21+
// get balances
22+
subjectBalance, err := b.chain.WalletBalance(ctx, addr.SubjectAddress)
23+
if err != nil {
24+
return xerrors.Errorf("getting subject balance: %w", err)
25+
}
26+
27+
secondBalance, err := b.chain.WalletBalance(ctx, addr.SecondAddress)
28+
if err != nil {
29+
return xerrors.Errorf("getting second balance: %w", err)
30+
}
31+
32+
addr.SubjectBalance = subjectBalance
33+
addr.SecondBalance = secondBalance
34+
35+
var shouldCreateTask bool
36+
switch addr.ActionType {
37+
case "requester":
38+
shouldCreateTask = addr.SubjectBalance.LessThan(addr.LowWatermarkFilBalance)
39+
case "active-provider":
40+
shouldCreateTask = addr.SubjectBalance.GreaterThan(addr.HighWatermarkFilBalance)
41+
}
42+
43+
if shouldCreateTask {
44+
taskFunc(func(taskID harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
45+
// check that address.ID has active_task_id = null, set the task ID, set last_ to null
46+
n, err := tx.Exec(`
47+
UPDATE balance_manager_addresses
48+
SET active_task_id = $1, last_msg_cid = NULL, last_msg_sent_at = NULL, last_msg_landed_at = NULL
49+
WHERE id = $2 AND active_task_id IS NULL AND (last_msg_cid IS NULL OR last_msg_landed_at IS NOT NULL)
50+
`, taskID, addr.ID)
51+
if err != nil {
52+
return false, xerrors.Errorf("updating balance manager address: %w", err)
53+
}
54+
55+
return n > 0, nil
56+
})
57+
}
58+
59+
return nil
60+
}
61+
62+
// doWallet handles wallet subject rules.
63+
func (b *BalanceMgrTask) doWallet(ctx context.Context, taskID harmonytask.TaskID, addr *balanceManagerAddress) (bool, error) {
64+
// Get current balances
65+
subjectBalance, err := b.chain.WalletBalance(ctx, addr.SubjectAddress)
66+
if err != nil {
67+
return false, xerrors.Errorf("getting subject balance: %w", err)
68+
}
69+
70+
secondBalance, err := b.chain.WalletBalance(ctx, addr.SecondAddress)
71+
if err != nil {
72+
return false, xerrors.Errorf("getting second balance: %w", err)
73+
}
74+
75+
addr.SubjectBalance = subjectBalance
76+
addr.SecondBalance = secondBalance
77+
78+
// calculate amount to send (based on latest chain balance)
79+
var amount types.BigInt
80+
var from, to address.Address
81+
var shouldSend bool
82+
83+
switch addr.ActionType {
84+
case "requester":
85+
// If subject below low watermark, send from second to subject up to high watermark
86+
if addr.SubjectBalance.LessThan(addr.LowWatermarkFilBalance) {
87+
targetAmount := types.BigSub(addr.HighWatermarkFilBalance, addr.SubjectBalance)
88+
// Make sure we don't send more than second address has
89+
if targetAmount.GreaterThan(addr.SecondBalance) {
90+
log.Warnw("second address has insufficient balance",
91+
"needed", types.FIL(targetAmount),
92+
"available", types.FIL(addr.SecondBalance))
93+
94+
// clear the task
95+
_, err = b.db.Exec(ctx, `
96+
UPDATE balance_manager_addresses
97+
SET active_task_id = NULL, last_action = NOW()
98+
WHERE id = $1
99+
`, addr.ID)
100+
if err != nil {
101+
return false, xerrors.Errorf("clearing task id: %w", err)
102+
}
103+
104+
return true, nil
105+
}
106+
amount = targetAmount
107+
from = addr.SecondAddress
108+
to = addr.SubjectAddress
109+
shouldSend = true
110+
}
111+
112+
case "active-provider":
113+
// If subject above high watermark, send from subject to second down to low watermark
114+
if addr.SubjectBalance.GreaterThan(addr.HighWatermarkFilBalance) {
115+
amount = types.BigSub(addr.SubjectBalance, addr.LowWatermarkFilBalance)
116+
from = addr.SubjectAddress
117+
to = addr.SecondAddress
118+
shouldSend = true
119+
}
120+
121+
default:
122+
return false, xerrors.Errorf("unknown action type: %s", addr.ActionType)
123+
}
124+
125+
// If no need to send, clear the task and return
126+
if !shouldSend {
127+
log.Infow("balance within watermarks, no action needed",
128+
"subject", addr.SubjectAddress,
129+
"balance", types.FIL(addr.SubjectBalance),
130+
"low", types.FIL(addr.LowWatermarkFilBalance),
131+
"high", types.FIL(addr.HighWatermarkFilBalance))
132+
133+
_, err = b.db.Exec(ctx, `
134+
UPDATE balance_manager_addresses
135+
SET active_task_id = NULL, last_action = NOW()
136+
WHERE id = $1
137+
`, addr.ID)
138+
if err != nil {
139+
return false, xerrors.Errorf("clearing task id: %w", err)
140+
}
141+
return true, nil
142+
}
143+
144+
// send msg
145+
msg := &types.Message{
146+
From: from,
147+
To: to,
148+
Value: amount,
149+
Method: builtin.MethodSend,
150+
}
151+
152+
mss := &api.MessageSendSpec{
153+
MaxFee: abi.TokenAmount(MaxSendFee),
154+
MaximizeFeeCap: true,
155+
}
156+
157+
// Send the message - sender will handle message_wait insertion
158+
msgCid, err := b.sender.Send(ctx, msg, mss, fmt.Sprintf("balancemgr-%s", addr.ActionType))
159+
if err != nil {
160+
return false, xerrors.Errorf("sending message: %w", err)
161+
}
162+
163+
_, err = b.db.Exec(ctx, `INSERT INTO message_waits (signed_message_cid) VALUES ($1)`, msgCid)
164+
if err != nil {
165+
return false, xerrors.Errorf("inserting into message_waits: %w", err)
166+
}
167+
168+
// Update the database with message info
169+
_, err = b.db.Exec(ctx, `
170+
UPDATE balance_manager_addresses
171+
SET last_msg_cid = $2,
172+
last_msg_sent_at = NOW(),
173+
last_msg_landed_at = NULL,
174+
active_task_id = NULL
175+
WHERE id = $1
176+
`, addr.ID, msgCid.String())
177+
if err != nil {
178+
return false, xerrors.Errorf("updating message cid: %w", err)
179+
}
180+
181+
log.Infow("sent balance management message",
182+
"from", from,
183+
"to", to,
184+
"subjectType", "wallet",
185+
"amount", types.FIL(amount),
186+
"msgCid", msgCid,
187+
"actionType", addr.ActionType)
188+
189+
// Task complete - chain handler will clear active_task_id when message lands
190+
return true, nil
191+
}

0 commit comments

Comments
 (0)