Skip to content

Commit ae514cb

Browse files
LexLuthrrvagg
authored andcommitted
fix: per sector commit simulation (#567)
* per sector commit simulation * fix batching timezone
1 parent d12af46 commit ae514cb

File tree

2 files changed

+208
-12
lines changed

2 files changed

+208
-12
lines changed
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/*
2+
Logic:
3+
1) Select all rows needing a commit message:
4+
- after_porep = TRUE
5+
- porep_proof IS NOT NULL
6+
- task_id_commit_msg IS NULL
7+
- after_commit_msg = FALSE
8+
- start_epoch IS NOT NULL
9+
10+
2) In the same query, number these rows per group (sp_id, reg_seal_proof),
11+
chunk them by p_max_batch size, and group them to get each batch's:
12+
- sp_id, reg_seal_proof
13+
- Array of sector_number
14+
- MIN(start_epoch) as batch_start_epoch
15+
- MIN(commit_ready_at) as earliest_ready_at
16+
17+
3) Iterate over these batches in ascending order, checking for the first batch
18+
that meets ANY of:
19+
(a) (batch_start_epoch - p_slack_epoch - p_current_height) <= 0
20+
(b) (earliest_ready_at + p_timeout_secs) > now()
21+
If found, update those rows in sectors_sdr_pipeline => task_id_commit_msg = p_new_task_id
22+
and return the number of rows updated.
23+
24+
4) If no batch meets a condition, return 0.
25+
26+
Usage example:
27+
SELECT poll_start_batch_commit_msg(
28+
slack_epoch_value,
29+
current_height_value,
30+
max_batch_size,
31+
new_task_id_value,
32+
timeout_secs
33+
);
34+
*/
35+
36+
CREATE OR REPLACE FUNCTION poll_start_batch_commit_msgs(
37+
p_slack_epoch BIGINT, -- "Slack" epoch offset
38+
p_current_height BIGINT, -- Current on-chain height
39+
p_max_batch INT, -- Max sectors per batch
40+
p_new_task_id BIGINT, -- Task ID to set for a chosen batch
41+
p_timeout_secs INT -- If earliest_ready + this > now(), condition is met
42+
)
43+
-- We return a TABLE of (updated_count BIGINT, reason TEXT),
44+
-- but in practice it will yield exactly one row.
45+
RETURNS TABLE (
46+
updated_count BIGINT,
47+
reason TEXT
48+
)
49+
LANGUAGE plpgsql
50+
AS $$
51+
DECLARE
52+
batch_rec RECORD;
53+
cond_slack BOOLEAN;
54+
cond_timeout BOOLEAN;
55+
cond_fee BOOLEAN;
56+
BEGIN
57+
-- Default outputs if we never find a batch
58+
updated_count := 0;
59+
reason := 'NONE';
60+
/*
61+
Single query logic:
62+
(1) Select the rows that need commit assignment.
63+
(2) Partition them by (sp_id, reg_seal_proof), using ROW_NUMBER() to break
64+
them into sub-batches of size p_max_batch.
65+
(3) GROUP those sub-batches to get:
66+
- batch_start_epoch = min(start_epoch)
67+
- earliest_ready_at = min(commit_ready_at)
68+
- sector_nums = array of sector_number
69+
(4) Loop over results, check conditions, update if found, return count.
70+
(5) If we finish the loop, return 0.
71+
*/
72+
FOR batch_rec IN
73+
WITH initial AS (
74+
SELECT
75+
sp_id,
76+
sector_number,
77+
start_epoch,
78+
commit_ready_at,
79+
reg_seal_proof
80+
FROM sectors_sdr_pipeline
81+
WHERE after_porep = TRUE
82+
AND porep_proof IS NOT NULL
83+
AND task_id_commit_msg IS NULL
84+
AND after_commit_msg = FALSE
85+
AND start_epoch IS NOT NULL
86+
ORDER BY sp_id, reg_seal_proof, start_epoch
87+
),
88+
numbered AS (
89+
SELECT
90+
l.*,
91+
ROW_NUMBER() OVER (
92+
PARTITION BY l.sp_id, l.reg_seal_proof
93+
ORDER BY l.commit_ready_at
94+
) AS rn
95+
FROM initial l
96+
),
97+
chunked AS (
98+
SELECT
99+
sp_id,
100+
reg_seal_proof,
101+
FLOOR((rn - 1)::NUMERIC / p_max_batch) AS batch_index,
102+
start_epoch,
103+
commit_ready_at,
104+
sector_number
105+
FROM numbered
106+
),
107+
grouped AS (
108+
SELECT
109+
sp_id,
110+
reg_seal_proof,
111+
batch_index,
112+
MIN(start_epoch) AS batch_start_epoch,
113+
MIN(commit_ready_at) AS earliest_ready_at,
114+
ARRAY_AGG(sector_number) AS sector_nums
115+
FROM chunked
116+
GROUP BY sp_id, reg_seal_proof, batch_index
117+
ORDER BY sp_id, reg_seal_proof, batch_index
118+
)
119+
SELECT
120+
sp_id,
121+
reg_seal_proof,
122+
sector_nums,
123+
batch_start_epoch,
124+
earliest_ready_at
125+
FROM grouped
126+
LOOP
127+
-- Evaluate conditions separately so we can pick a 'reason' if triggered.
128+
cond_slack := ((batch_rec.batch_start_epoch - p_slack_epoch) <= p_current_height);
129+
cond_timeout := ((batch_rec.earliest_ready_at + MAKE_INTERVAL(secs => p_timeout_secs)) < NOW() AT TIME ZONE 'UTC');
130+
131+
IF (cond_slack OR cond_timeout OR cond_fee) THEN
132+
-- If multiple conditions are true, pick an order of precedence.
133+
IF cond_slack THEN
134+
reason := 'SLACK (min start epoch: ' || batch_rec.batch_start_epoch || ')';
135+
ELSIF cond_timeout THEN
136+
reason := 'TIMEOUT (earliest_ready_at: ' || batch_rec.earliest_ready_at || ')';
137+
END IF;
138+
139+
-- Perform the update
140+
UPDATE sectors_sdr_pipeline t
141+
SET task_id_commit_msg = p_new_task_id
142+
WHERE t.sp_id = batch_rec.sp_id
143+
AND t.reg_seal_proof = batch_rec.reg_seal_proof
144+
AND t.sector_number = ANY(batch_rec.sector_nums)
145+
AND t.after_porep = TRUE
146+
AND t.task_id_commit_msg IS NULL
147+
AND t.after_commit_msg = FALSE;
148+
149+
GET DIAGNOSTICS updated_count = ROW_COUNT;
150+
151+
RETURN NEXT;
152+
RETURN; -- Return immediately with updated_count and reason
153+
END IF;
154+
END LOOP;
155+
156+
-- If we finish the loop with no triggered condition, we return updated_count=0, reason='NONE'
157+
RETURN NEXT;
158+
RETURN;
159+
END;
160+
$$;

tasks/seal/task_submit_commit.go

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,16 @@ func (s *SubmitCommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool)
148148

149149
regProof := sectorParamsArr[0].RegSealProof
150150

151+
balance, err := s.api.StateMinerAvailableBalance(ctx, maddr, types.EmptyTSK)
152+
if err != nil {
153+
return false, xerrors.Errorf("getting miner balance: %w", err)
154+
}
155+
156+
mi, err := s.api.StateMinerInfo(ctx, maddr, types.EmptyTSK)
157+
if err != nil {
158+
return false, xerrors.Errorf("getting miner info: %w", err)
159+
}
160+
151161
params := miner.ProveCommitSectors3Params{
152162
RequireActivationSuccess: s.cfg.RequireActivationSuccess,
153163
RequireNotificationSuccess: s.cfg.RequireNotificationSuccess,
@@ -306,6 +316,26 @@ func (s *SubmitCommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool)
306316
collateralPerSector = big.Zero()
307317
}
308318

319+
simulateSendParam := miner.ProveCommitSectors3Params{
320+
SectorActivations: []miner.SectorActivationManifest{
321+
{
322+
SectorNumber: abi.SectorNumber(sectorParams.SectorNumber),
323+
Pieces: pams,
324+
},
325+
},
326+
SectorProofs: [][]byte{
327+
sectorParams.Proof,
328+
},
329+
RequireActivationSuccess: s.cfg.RequireActivationSuccess,
330+
RequireNotificationSuccess: s.cfg.RequireNotificationSuccess,
331+
}
332+
333+
err = s.simuateCommitPerSector(ctx, maddr, mi, balance, collateral, ts, simulateSendParam)
334+
if err != nil {
335+
log.Errorw("failed to simulate commit for sector", "Miner", maddr.String(), "Sector", sectorParams.SectorNumber, "err", err)
336+
continue
337+
}
338+
309339
collateral = big.Add(collateral, collateralPerSector)
310340

311341
params.SectorActivations = append(params.SectorActivations, miner.SectorActivationManifest{
@@ -337,7 +367,7 @@ func (s *SubmitCommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool)
337367

338368
maxFee := s.cfg.feeCfg.MaxCommitBatchGasFee.FeeForSectors(len(infos))
339369

340-
msg, err := s.createCommitMessage(ctx, maddr, sectorParamsArr[0].RegSealProof, sectorParamsArr[0].SpID, collateral, params, infos, ts)
370+
msg, err := s.createCommitMessage(ctx, maddr, mi, balance, sectorParamsArr[0].RegSealProof, sectorParamsArr[0].SpID, collateral, params, infos, ts)
341371
if err != nil {
342372
return false, xerrors.Errorf("failed to create the commit message: %w", err)
343373
}
@@ -365,7 +395,23 @@ func (s *SubmitCommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool)
365395
return true, nil
366396
}
367397

368-
func (s *SubmitCommitTask) createCommitMessage(ctx context.Context, maddr address.Address, sealProof abi.RegisteredSealProof, SpID int64, collateral abi.TokenAmount, params miner.ProveCommitSectors3Params, infos []proof.AggregateSealVerifyInfo, ts *types.TipSet) (*types.Message, error) {
398+
func (s *SubmitCommitTask) simuateCommitPerSector(ctx context.Context, maddr address.Address, mi api.MinerInfo, balance big.Int, collateral abi.TokenAmount, ts *types.TipSet, param miner.ProveCommitSectors3Params) error {
399+
maxFee := s.cfg.feeCfg.MaxCommitBatchGasFee.FeeForSectors(1)
400+
401+
collateral = s.calculateCollateral(balance, collateral)
402+
goodFunds := big.Add(maxFee, collateral)
403+
enc := new(bytes.Buffer)
404+
if err := param.MarshalCBOR(enc); err != nil {
405+
return xerrors.Errorf("could not serialize commit params: %w", err)
406+
}
407+
_, err := s.gasEstimateCommit(ctx, maddr, enc.Bytes(), mi, goodFunds, collateral, maxFee, ts.Key())
408+
if err != nil {
409+
return err
410+
}
411+
return nil
412+
}
413+
414+
func (s *SubmitCommitTask) createCommitMessage(ctx context.Context, maddr address.Address, mi api.MinerInfo, balance big.Int, sealProof abi.RegisteredSealProof, SpID int64, collateral abi.TokenAmount, params miner.ProveCommitSectors3Params, infos []proof.AggregateSealVerifyInfo, ts *types.TipSet) (*types.Message, error) {
369415
aggParams := params
370416
var aggCost, cost big.Int
371417
var msg, aggMsg *types.Message
@@ -376,16 +422,6 @@ func (s *SubmitCommitTask) createCommitMessage(ctx context.Context, maddr addres
376422
return nil, xerrors.Errorf("getting network version: %s", err)
377423
}
378424

379-
balance, err := s.api.StateMinerAvailableBalance(ctx, maddr, types.EmptyTSK)
380-
if err != nil {
381-
return nil, xerrors.Errorf("getting miner balance: %w", err)
382-
}
383-
384-
mi, err := s.api.StateMinerInfo(ctx, maddr, types.EmptyTSK)
385-
if err != nil {
386-
return nil, xerrors.Errorf("getting miner info: %w", err)
387-
}
388-
389425
if len(infos) >= miner.MinAggregatedSectors {
390426
arp, err := aggregateProofType(nv)
391427
if err != nil {

0 commit comments

Comments
 (0)