Skip to content

Commit 49b2de4

Browse files
authored
chore: refactor allocate command (#1912)
* refactor allocate command * add command description
1 parent 6343680 commit 49b2de4

File tree

4 files changed

+295
-130
lines changed

4 files changed

+295
-130
lines changed

cmd/boost/direct_deal.go

Lines changed: 203 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"bufio"
45
"fmt"
56
"os"
67
"strconv"
@@ -14,32 +15,31 @@ import (
1415
"github.com/filecoin-project/go-address"
1516
"github.com/filecoin-project/go-state-types/abi"
1617
verifreg13types "github.com/filecoin-project/go-state-types/builtin/v13/verifreg"
17-
lapi "github.com/filecoin-project/lotus/api"
1818
"github.com/filecoin-project/lotus/build"
1919
"github.com/filecoin-project/lotus/chain/actors/builtin/verifreg"
2020
"github.com/filecoin-project/lotus/chain/types"
2121
lcli "github.com/filecoin-project/lotus/cli"
2222
"github.com/filecoin-project/lotus/lib/tablewriter"
2323
"github.com/ipfs/go-cid"
24+
"github.com/mitchellh/go-homedir"
2425
"github.com/urfave/cli/v2"
2526
"golang.org/x/sync/errgroup"
2627
)
2728

2829
var directDealAllocate = &cli.Command{
29-
Name: "allocate",
30-
Usage: "Create new allocation[s] for verified deals",
30+
Name: "allocate",
31+
Usage: "Create new allocation[s] for verified deals",
32+
Description: "The command can accept a CSV formatted file in the format 'pieceCid,pieceSize,miner,tmin,tmax,expiration'",
3133
Flags: []cli.Flag{
3234
&cli.StringSliceFlag{
33-
Name: "miner",
34-
Usage: "storage provider address[es]",
35-
Required: true,
36-
Aliases: []string{"m", "provider", "p"},
35+
Name: "miner",
36+
Usage: "storage provider address[es]",
37+
Aliases: []string{"m", "provider", "p"},
3738
},
3839
&cli.StringSliceFlag{
39-
Name: "piece-info",
40-
Usage: "data piece-info[s] to create the allocation. The format must be --piece-info pieceCid1=pieceSize1 --piece-info pieceCid2=pieceSize2",
41-
Required: true,
42-
Aliases: []string{"pi"},
40+
Name: "piece-info",
41+
Usage: "data piece-info[s] to create the allocation. The format must be --piece-info pieceCid1=pieceSize1 --piece-info pieceCid2=pieceSize2",
42+
Aliases: []string{"pi"},
4343
},
4444
&cli.StringFlag{
4545
Name: "wallet",
@@ -70,11 +70,165 @@ var directDealAllocate = &cli.Command{
7070
"Default is 60 days.",
7171
Value: verifreg13types.MaximumVerifiedAllocationExpiration,
7272
},
73+
&cli.StringFlag{
74+
Name: "piece-file",
75+
Usage: "file containing piece-info[s] to create the allocation. Each line in the file should be in the format 'pieceCid,pieceSize,miner,tmin,tmax,expiration'",
76+
Aliases: []string{"pf"},
77+
},
78+
&cli.IntFlag{
79+
Name: "batch-size",
80+
Usage: "number of extend requests per batch. If set incorrectly, this will lead to out of gas error",
81+
Value: 500,
82+
},
83+
&cli.IntFlag{
84+
Name: "confidence",
85+
Usage: "number of block confirmations to wait for",
86+
Value: int(build.MessageConfidence),
87+
},
7388
},
7489
Before: before,
7590
Action: func(cctx *cli.Context) error {
7691
ctx := bcli.ReqContext(cctx)
7792

93+
pieceFile := cctx.String("piece-file")
94+
miners := cctx.StringSlice("miner")
95+
pinfos := cctx.StringSlice("piece-info")
96+
if pieceFile == "" && len(pinfos) < 1 {
97+
return fmt.Errorf("must provide at least one --piece-info or use --piece-file")
98+
}
99+
100+
if pieceFile == "" && len(miners) < 1 {
101+
return fmt.Errorf("must provide at least one miner address or use --piece-file")
102+
}
103+
104+
if pieceFile != "" && len(pinfos) > 0 {
105+
return fmt.Errorf("cannot use both --piece-info and --piece-file flags at once")
106+
}
107+
108+
var pieceInfos []util.PieceInfos
109+
110+
if pieceFile != "" {
111+
// Read file line by line
112+
loc, err := homedir.Expand(pieceFile)
113+
if err != nil {
114+
return err
115+
}
116+
file, err := os.Open(loc)
117+
if err != nil {
118+
return err
119+
}
120+
defer file.Close()
121+
scanner := bufio.NewScanner(file)
122+
for scanner.Scan() {
123+
line := scanner.Text()
124+
// Extract pieceCid, pieceSize and MinerAddr from line
125+
parts := strings.Split(line, ",")
126+
if len(parts) != 6 {
127+
return fmt.Errorf("invalid line format. Expected pieceCid, pieceSize, MinerAddr, TMin, TMax, Exp at %s", line)
128+
}
129+
if parts[0] == "" || parts[1] == "" || parts[2] == "" || parts[3] == "" || parts[4] == "" || parts[5] == "" {
130+
return fmt.Errorf("empty column value in the input file at %s", line)
131+
}
132+
133+
pieceCid, err := cid.Parse(parts[0])
134+
if err != nil {
135+
return fmt.Errorf("failed to parse CID: %w", err)
136+
}
137+
pieceSize, err := strconv.ParseInt(parts[1], 10, 64)
138+
if err != nil {
139+
return fmt.Errorf("failed to parse size %w", err)
140+
}
141+
maddr, err := address.NewFromString(parts[2])
142+
if err != nil {
143+
return fmt.Errorf("failed to parse miner address %w", err)
144+
}
145+
146+
mid, err := address.IDFromAddress(maddr)
147+
if err != nil {
148+
return fmt.Errorf("failed to convert miner address %w", err)
149+
}
150+
151+
tmin, err := strconv.ParseUint(parts[3], 10, 64)
152+
if err != nil {
153+
return fmt.Errorf("failed to tmin %w", err)
154+
}
155+
156+
tmax, err := strconv.ParseUint(parts[4], 10, 64)
157+
if err != nil {
158+
return fmt.Errorf("failed to tmax %w", err)
159+
}
160+
161+
exp, err := strconv.ParseUint(parts[5], 10, 64)
162+
if err != nil {
163+
return fmt.Errorf("failed to expiration %w", err)
164+
}
165+
166+
if tmax < tmin {
167+
return fmt.Errorf("maximum duration %d cannot be smaller than minimum duration %d", tmax, tmin)
168+
}
169+
170+
pieceInfos = append(pieceInfos, util.PieceInfos{
171+
Cid: pieceCid,
172+
Size: pieceSize,
173+
Miner: abi.ActorID(mid),
174+
MinerAddr: maddr,
175+
Tmin: abi.ChainEpoch(tmin),
176+
Tmax: abi.ChainEpoch(tmax),
177+
Exp: abi.ChainEpoch(exp),
178+
})
179+
if err := scanner.Err(); err != nil {
180+
return err
181+
}
182+
}
183+
} else {
184+
for _, miner := range miners {
185+
maddr, err := address.NewFromString(miner)
186+
if err != nil {
187+
return fmt.Errorf("failed to parse miner address %w", err)
188+
}
189+
190+
mid, err := address.IDFromAddress(maddr)
191+
if err != nil {
192+
return fmt.Errorf("failed to convert miner address %w", err)
193+
}
194+
for _, p := range cctx.StringSlice("piece-info") {
195+
pieceDetail := strings.Split(p, "=")
196+
if len(pieceDetail) != 2 {
197+
return fmt.Errorf("incorrect pieceInfo format: %s", pieceDetail)
198+
}
199+
200+
size, err := strconv.ParseInt(pieceDetail[1], 10, 64)
201+
if err != nil {
202+
return fmt.Errorf("failed to parse the piece size for %s for pieceCid %s: %w", pieceDetail[0], pieceDetail[1], err)
203+
}
204+
pcid, err := cid.Parse(pieceDetail[0])
205+
if err != nil {
206+
return fmt.Errorf("failed to parse the pieceCid for %s: %w", pieceDetail[0], err)
207+
}
208+
209+
tmin := abi.ChainEpoch(cctx.Int64("term-min"))
210+
211+
tmax := abi.ChainEpoch(cctx.Int64("term-max"))
212+
213+
exp := abi.ChainEpoch(cctx.Int64("expiration"))
214+
215+
if tmax < tmin {
216+
return fmt.Errorf("maximum duration %d cannot be smaller than minimum duration %d", tmax, tmin)
217+
}
218+
219+
pieceInfos = append(pieceInfos, util.PieceInfos{
220+
Cid: pcid,
221+
Size: size,
222+
Miner: abi.ActorID(mid),
223+
MinerAddr: maddr,
224+
Tmin: tmin,
225+
Tmax: tmax,
226+
Exp: exp,
227+
})
228+
}
229+
}
230+
}
231+
78232
n, err := clinode.Setup(cctx.String(cmd.FlagRepo.Name))
79233
if err != nil {
80234
return err
@@ -94,7 +248,7 @@ var directDealAllocate = &cli.Command{
94248

95249
log.Debugw("selected wallet", "wallet", walletAddr)
96250

97-
msg, err := util.CreateAllocationMsg(ctx, gapi, cctx.StringSlice("piece-info"), cctx.StringSlice("miner"), walletAddr, abi.ChainEpoch(cctx.Int64("term-min")), abi.ChainEpoch(cctx.Int64("term-max")), abi.ChainEpoch(cctx.Int64("expiration")))
251+
msgs, err := util.CreateAllocationMsg(ctx, gapi, pieceInfos, walletAddr, cctx.Int("batch-size"))
98252

99253
if err != nil {
100254
return err
@@ -105,24 +259,49 @@ var directDealAllocate = &cli.Command{
105259
return fmt.Errorf("failed to get allocations: %w", err)
106260
}
107261

108-
mcid, sent, err := lib.SignAndPushToMpool(cctx, ctx, gapi, n, msg)
109-
if err != nil {
110-
return err
262+
var mcids []cid.Cid
263+
264+
for _, msg := range msgs {
265+
mcid, sent, err := lib.SignAndPushToMpool(cctx, ctx, gapi, n, msg)
266+
if err != nil {
267+
return err
268+
}
269+
if !sent {
270+
fmt.Printf("message %s with method %s not sent\n", msg.Cid(), msg.Method.String())
271+
continue
272+
}
273+
mcids = append(mcids, mcid)
111274
}
112-
if !sent {
113-
return nil
275+
276+
var mcidStr []string
277+
for _, c := range mcids {
278+
mcidStr = append(mcidStr, c.String())
114279
}
115280

116-
log.Infow("submitted data cap allocation message", "cid", mcid.String())
281+
log.Infow("submitted data cap allocation message[s]", mcidStr)
117282
log.Info("waiting for message to be included in a block")
118283

119-
res, err := gapi.StateWaitMsg(ctx, mcid, 1, lapi.LookbackNoLimit, true)
120-
if err != nil {
121-
return fmt.Errorf("waiting for message to be included in a block: %w", err)
122-
}
284+
// wait for msgs to get mined into a block
285+
eg := errgroup.Group{}
286+
eg.SetLimit(10)
287+
for _, msg := range mcids {
288+
m := msg
289+
eg.Go(func() error {
290+
wait, err := gapi.StateWaitMsg(ctx, m, uint64(cctx.Int("confidence")), 2000, true)
291+
if err != nil {
292+
return fmt.Errorf("timeout waiting for message to land on chain %s", wait.Message)
123293

124-
if !res.Receipt.ExitCode.IsSuccess() {
125-
return fmt.Errorf("failed to execute the message with error: %s", res.Receipt.ExitCode.Error())
294+
}
295+
296+
if wait.Receipt.ExitCode.IsError() {
297+
return fmt.Errorf("failed to execute message %s: %w", wait.Message, wait.Receipt.ExitCode)
298+
}
299+
return nil
300+
})
301+
}
302+
err = eg.Wait()
303+
if err != nil {
304+
return err
126305
}
127306

128307
// Return early of quiet flag is set

0 commit comments

Comments
 (0)