Skip to content

Commit ed000a4

Browse files
committed
feat(mempool): add reservation-aware pre-pack simulation
1 parent 9cf6fb1 commit ed000a4

File tree

4 files changed

+281
-0
lines changed

4 files changed

+281
-0
lines changed

chain/messagepool/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,5 +98,8 @@ func DefaultConfig() *types.MpoolConfig {
9898
ReplaceByFeeRatio: ReplaceByFeePercentageDefault,
9999
PruneCooldown: PruneCooldownDefault,
100100
GasLimitOverestimation: GasLimitOverestimation,
101+
// Reservation-aware pre-pack simulation is opt-in and advisory. It is
102+
// additionally gated by the global reservations feature flag.
103+
EnableReservationPrePack: false,
101104
}
102105
}

chain/messagepool/selection.go

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/filecoin-project/go-state-types/crypto"
1616

1717
"github.com/filecoin-project/lotus/build/buildconstants"
18+
"github.com/filecoin-project/lotus/chain/consensus"
1819
"github.com/filecoin-project/lotus/chain/messagepool/gasguess"
1920
"github.com/filecoin-project/lotus/chain/types"
2021
"github.com/filecoin-project/lotus/chain/vm"
@@ -89,6 +90,124 @@ type selectedMessages struct {
8990
gasLimit int64
9091
secpLimit int
9192
blsLimit int
93+
94+
// mp references the parent message pool; it is used for reservation-aware
95+
// pre-pack simulation when enabled.
96+
mp *MessagePool
97+
98+
// reservationEnabled toggles reservation-aware pre-pack simulation for this
99+
// selection pass. When true, reservedBySender and balanceBySender track
100+
// per-sender Σ(gas_limit * gas_fee_cap) and the sender's on-chain balance
101+
// at the selection base state.
102+
reservationEnabled bool
103+
reservationCtx context.Context
104+
reservationTipset *types.TipSet
105+
reservedBySender map[address.Address]types.BigInt
106+
balanceBySender map[address.Address]types.BigInt
107+
}
108+
109+
func (mp *MessagePool) reservationsPrePackEnabled() bool {
110+
cfg := mp.getConfig()
111+
if !cfg.EnableReservationPrePack {
112+
return false
113+
}
114+
// Pre-pack simulation shares the same activation/gating switch as tipset
115+
// reservations. Before activation, it is additionally gated by the mempool
116+
// config flag. After activation, it follows consensus reservations
117+
// activation and is always enabled when building blocks.
118+
//
119+
// We use the network version at the next epoch (the height at which the
120+
// selected messages will be applied).
121+
mp.curTsLk.RLock()
122+
defer mp.curTsLk.RUnlock()
123+
124+
if mp.curTs == nil {
125+
return false
126+
}
127+
128+
nextEpoch := mp.curTs.Height() + 1
129+
nv, err := mp.getNtwkVersion(nextEpoch)
130+
if err != nil {
131+
log.Warnw("mpool reservation pre-pack: failed to get network version; disabling reservation heuristics",
132+
"height", nextEpoch, "error", err)
133+
return false
134+
}
135+
136+
return consensus.ReservationsEnabled(nv)
137+
}
138+
139+
func (sm *selectedMessages) initReservations(ctx context.Context, ts *types.TipSet, mp *MessagePool) {
140+
sm.mp = mp
141+
if !mp.reservationsPrePackEnabled() {
142+
return
143+
}
144+
145+
sm.reservationEnabled = true
146+
sm.reservationCtx = ctx
147+
sm.reservationTipset = ts
148+
sm.reservedBySender = make(map[address.Address]types.BigInt)
149+
sm.balanceBySender = make(map[address.Address]types.BigInt)
150+
}
151+
152+
func (sm *selectedMessages) reserveForMessages(sender address.Address, msgs []*types.SignedMessage) bool {
153+
if !sm.reservationEnabled || len(msgs) == 0 {
154+
return true
155+
}
156+
157+
balance, ok := sm.balanceBySender[sender]
158+
if !ok {
159+
// If we don't already have a cached balance, attempt to load it from
160+
// chain state. If we cannot, disable reservation heuristics to avoid
161+
// partial behaviour.
162+
if sm.mp == nil || sm.reservationCtx == nil || sm.reservationTipset == nil {
163+
sm.reservationEnabled = false
164+
sm.reservedBySender = nil
165+
sm.balanceBySender = nil
166+
return true
167+
}
168+
169+
var err error
170+
balance, err = sm.mp.getStateBalance(sm.reservationCtx, sender, sm.reservationTipset)
171+
if err != nil {
172+
log.Warnw("mpool reservation pre-pack: failed to load sender balance; disabling reservation heuristics",
173+
"from", sender, "error", err)
174+
sm.reservationEnabled = false
175+
sm.reservedBySender = nil
176+
sm.balanceBySender = nil
177+
return true
178+
}
179+
sm.balanceBySender[sender] = balance
180+
}
181+
182+
existing := types.NewInt(0)
183+
if prev, ok := sm.reservedBySender[sender]; ok {
184+
existing = prev
185+
}
186+
187+
// Compute additional Σ(cap * limit) for this sender.
188+
additional := types.NewInt(0)
189+
for _, m := range msgs {
190+
// All messages in a chain share the same sender, but we are defensive.
191+
if m.Message.From != sender {
192+
continue
193+
}
194+
gasLimit := types.NewInt(uint64(m.Message.GasLimit))
195+
cost := types.BigMul(m.Message.GasFeeCap, gasLimit)
196+
additional = types.BigAdd(additional, cost)
197+
}
198+
199+
if types.BigCmp(additional, types.NewInt(0)) == 0 {
200+
return true
201+
}
202+
203+
nextTotal := types.BigAdd(existing, additional)
204+
if types.BigCmp(nextTotal, balance) > 0 {
205+
// Over-committed for this sender at the selection base state.
206+
return false
207+
}
208+
209+
sm.reservedBySender[sender] = nextTotal
210+
return true
92211
}
93212

94213
// returns false if chain can't be added due to block constraints
@@ -99,6 +218,16 @@ func (sm *selectedMessages) tryToAdd(mc *msgChain) bool {
99218
return false
100219
}
101220

221+
if sm.reservationEnabled && len(mc.msgs) > 0 {
222+
sender := mc.msgs[0].Message.From
223+
if !sm.reserveForMessages(sender, mc.msgs) {
224+
// Over-committed for this sender; invalidate this chain (and any
225+
// dependents) and treat it as not addable in this pass.
226+
mc.Invalidate()
227+
return false
228+
}
229+
}
230+
102231
if mc.sigType == crypto.SigTypeBLS {
103232
if sm.blsLimit < l {
104233
return false
@@ -170,6 +299,22 @@ func (sm *selectedMessages) tryToAddWithDeps(mc *msgChain, mp *MessagePool, base
170299
return false
171300
}
172301

302+
if sm.reservationEnabled && len(mc.msgs) > 0 {
303+
sender := mc.msgs[0].Message.From
304+
var toReserve []*types.SignedMessage
305+
for i := len(chainDeps) - 1; i >= 0; i-- {
306+
toReserve = append(toReserve, chainDeps[i].msgs...)
307+
}
308+
toReserve = append(toReserve, mc.msgs...)
309+
310+
if !sm.reserveForMessages(sender, toReserve) {
311+
// Over-committed for this sender; invalidate the chain (and its
312+
// dependents) so subsequent passes skip them.
313+
mc.Invalidate()
314+
return false
315+
}
316+
}
317+
173318
// the chain fits! include it together with all dependencies
174319
for i := len(chainDeps) - 1; i >= 0; i-- {
175320
curChain := chainDeps[i]
@@ -603,7 +748,9 @@ func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[a
603748
gasLimit: buildconstants.BlockGasLimit,
604749
blsLimit: cbg.MaxLength,
605750
secpLimit: cbg.MaxLength,
751+
mp: mp,
606752
}
753+
result.initReservations(ctx, ts, mp)
607754
minGas := int64(gasguess.MinGas)
608755

609756
// 1. Get priority actor chains

chain/messagepool/selection_test.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1498,6 +1498,130 @@ func TestGasReward(t *testing.T) {
14981498
}
14991499
}
15001500

1501+
func TestReservationPrePackPrunesOverCommittedChain(t *testing.T) {
1502+
// Construct a selectedMessages instance with reservation heuristics enabled
1503+
// and a single chain whose Σ(cap×limit) exceeds the configured sender
1504+
// balance. The chain should be invalidated and not added.
1505+
1506+
var (
1507+
addr, _ = address.NewIDAddress(100)
1508+
)
1509+
1510+
sm := &selectedMessages{
1511+
msgs: nil,
1512+
gasLimit: buildconstants.BlockGasLimit,
1513+
blsLimit: cbg.MaxLength,
1514+
secpLimit: cbg.MaxLength,
1515+
reservationEnabled: true,
1516+
reservedBySender: make(map[address.Address]types.BigInt),
1517+
balanceBySender: make(map[address.Address]types.BigInt),
1518+
}
1519+
1520+
// Sender balance is 50 atto.
1521+
sm.balanceBySender[addr] = types.NewInt(50)
1522+
1523+
m1 := &types.SignedMessage{
1524+
Message: types.Message{
1525+
From: addr,
1526+
GasLimit: 1,
1527+
GasFeeCap: types.NewInt(10),
1528+
},
1529+
}
1530+
m2 := &types.SignedMessage{
1531+
Message: types.Message{
1532+
From: addr,
1533+
GasLimit: 1,
1534+
GasFeeCap: types.NewInt(100),
1535+
},
1536+
}
1537+
1538+
mc := &msgChain{
1539+
msgs: []*types.SignedMessage{m1, m2},
1540+
gasLimit: m1.Message.GasLimit + m2.Message.GasLimit,
1541+
valid: true,
1542+
sigType: crypto.SigTypeSecp256k1,
1543+
}
1544+
1545+
ok := sm.tryToAdd(mc)
1546+
if ok {
1547+
t.Fatalf("expected tryToAdd to fail due to reservation pre-pack, but it succeeded")
1548+
}
1549+
if mc.valid {
1550+
t.Fatalf("expected chain to be invalidated when over-committed")
1551+
}
1552+
if len(sm.msgs) != 0 {
1553+
t.Fatalf("expected no messages to be added, got %d", len(sm.msgs))
1554+
}
1555+
}
1556+
1557+
func TestReservationPrePackAccumulatesCapTimesLimit(t *testing.T) {
1558+
// Verify that reservation-aware pre-pack simulation uses Σ(cap×limit) per
1559+
// sender when accumulating reserved totals.
1560+
1561+
addr, _ := address.NewIDAddress(200)
1562+
1563+
sm := &selectedMessages{
1564+
msgs: nil,
1565+
gasLimit: buildconstants.BlockGasLimit,
1566+
blsLimit: cbg.MaxLength,
1567+
secpLimit: cbg.MaxLength,
1568+
reservationEnabled: true,
1569+
reservedBySender: make(map[address.Address]types.BigInt),
1570+
balanceBySender: make(map[address.Address]types.BigInt),
1571+
}
1572+
1573+
// Large balance so neither chain is pruned.
1574+
sm.balanceBySender[addr] = types.NewInt(1_000_000)
1575+
1576+
m1 := &types.SignedMessage{
1577+
Message: types.Message{
1578+
From: addr,
1579+
GasLimit: 5,
1580+
GasFeeCap: types.NewInt(10),
1581+
},
1582+
}
1583+
// cost1 = 5 * 10 = 50
1584+
mc1 := &msgChain{
1585+
msgs: []*types.SignedMessage{m1},
1586+
gasLimit: m1.Message.GasLimit,
1587+
valid: true,
1588+
sigType: crypto.SigTypeSecp256k1,
1589+
}
1590+
1591+
if !sm.tryToAdd(mc1) {
1592+
t.Fatalf("expected first chain to be accepted")
1593+
}
1594+
1595+
expected := types.NewInt(50)
1596+
if got := sm.reservedBySender[addr]; !got.Equals(expected) {
1597+
t.Fatalf("expected reserved total %s, got %s", expected, got)
1598+
}
1599+
1600+
m2 := &types.SignedMessage{
1601+
Message: types.Message{
1602+
From: addr,
1603+
GasLimit: 3,
1604+
GasFeeCap: types.NewInt(20),
1605+
},
1606+
}
1607+
// cost2 = 3 * 20 = 60; cumulative = 110
1608+
mc2 := &msgChain{
1609+
msgs: []*types.SignedMessage{m2},
1610+
gasLimit: m2.Message.GasLimit,
1611+
valid: true,
1612+
sigType: crypto.SigTypeSecp256k1,
1613+
}
1614+
1615+
if !sm.tryToAdd(mc2) {
1616+
t.Fatalf("expected second chain to be accepted")
1617+
}
1618+
1619+
expected = types.NewInt(110)
1620+
if got := sm.reservedBySender[addr]; !got.Equals(expected) {
1621+
t.Fatalf("expected reserved total %s after second chain, got %s", expected, got)
1622+
}
1623+
}
1624+
15011625
func TestRealWorldSelection(t *testing.T) {
15021626

15031627
// load test-messages.json.gz and rewrite the messages so that

chain/types/mpool.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,13 @@ type MpoolConfig struct {
1313
ReplaceByFeeRatio Percent
1414
PruneCooldown time.Duration
1515
GasLimitOverestimation float64
16+
17+
// EnableReservationPrePack enables reservation-aware pre-pack simulation in
18+
// the mpool selector. When combined with the tipset reservation feature
19+
// gating, this provides an advisory check that per-sender Σ(gas_limit *
20+
// gas_fee_cap) across the selected block does not exceed the sender's
21+
// on-chain balance at the selection base state.
22+
EnableReservationPrePack bool
1623
}
1724

1825
func (mc *MpoolConfig) Clone() *MpoolConfig {

0 commit comments

Comments
 (0)