|
1 | 1 | package main
|
2 | 2 |
|
3 | 3 | import (
|
| 4 | + "errors" |
4 | 5 | "fmt"
|
5 |
| - "sort" |
| 6 | + "net/url" |
| 7 | + "os" |
6 | 8 | "strconv"
|
7 | 9 |
|
| 10 | + "github.com/google/uuid" |
| 11 | + "github.com/mitchellh/go-homedir" |
8 | 12 | "github.com/urfave/cli/v2"
|
| 13 | + "github.com/yugabyte/pgx/v5" |
9 | 14 | "golang.org/x/xerrors"
|
10 | 15 |
|
11 | 16 | "github.com/filecoin-project/go-address"
|
12 | 17 | "github.com/filecoin-project/go-state-types/abi"
|
13 | 18 |
|
14 | 19 | "github.com/filecoin-project/curio/deps"
|
| 20 | + "github.com/filecoin-project/curio/harmony/harmonydb" |
15 | 21 | "github.com/filecoin-project/curio/lib/reqcontext"
|
16 |
| - "github.com/filecoin-project/curio/market/dealmarket" |
17 |
| - "github.com/filecoin-project/curio/market/lmrpc" |
| 22 | + "github.com/filecoin-project/curio/market/storageIngest" |
18 | 23 | )
|
19 | 24 |
|
20 | 25 | var marketCmd = &cli.Command{
|
21 | 26 | Name: "market",
|
22 | 27 | Subcommands: []*cli.Command{
|
23 |
| - marketRPCInfoCmd, |
24 | 28 | marketSealCmd,
|
| 29 | + marketImportdataCmd, |
25 | 30 | },
|
26 | 31 | }
|
27 | 32 |
|
28 |
| -var marketRPCInfoCmd = &cli.Command{ |
| 33 | +var marketSealCmd = &cli.Command{ |
| 34 | + Name: "seal", |
| 35 | + Usage: "start sealing a deal sector early", |
29 | 36 | Flags: []cli.Flag{
|
30 |
| - &cli.StringSliceFlag{ |
31 |
| - Name: "layers", |
32 |
| - Usage: "list of layers to be interpreted (atop defaults). Default: base", |
| 37 | + &cli.StringFlag{ |
| 38 | + Name: "actor", |
| 39 | + Usage: "Specify actor address to start sealing sectors for", |
| 40 | + Required: true, |
| 41 | + }, |
| 42 | + &cli.BoolFlag{ |
| 43 | + Name: "synthetic", |
| 44 | + Usage: "Use synthetic PoRep", |
| 45 | + Value: false, |
33 | 46 | },
|
34 | 47 | },
|
| 48 | + ArgsUsage: "<sector>", |
35 | 49 | Action: func(cctx *cli.Context) error {
|
36 |
| - db, err := deps.MakeDB(cctx) |
37 |
| - if err != nil { |
38 |
| - return err |
39 |
| - } |
40 |
| - |
41 |
| - layers := cctx.StringSlice("layers") |
42 |
| - |
43 |
| - cfg, err := deps.GetConfig(cctx.Context, layers, db) |
| 50 | + act, err := address.NewFromString(cctx.String("actor")) |
44 | 51 | if err != nil {
|
45 |
| - return xerrors.Errorf("get config: %w", err) |
| 52 | + return xerrors.Errorf("parsing --actor: %w", err) |
46 | 53 | }
|
47 | 54 |
|
48 |
| - ts, err := lmrpc.MakeTokens(cfg) |
49 |
| - if err != nil { |
50 |
| - return xerrors.Errorf("make tokens: %w", err) |
| 55 | + if cctx.Args().Len() > 1 { |
| 56 | + return xerrors.Errorf("specify only one sector") |
51 | 57 | }
|
52 | 58 |
|
53 |
| - var addrTokens []struct { |
54 |
| - Address string |
55 |
| - Token string |
56 |
| - } |
| 59 | + sec := cctx.Args().First() |
57 | 60 |
|
58 |
| - for address, s := range ts { |
59 |
| - addrTokens = append(addrTokens, struct { |
60 |
| - Address string |
61 |
| - Token string |
62 |
| - }{ |
63 |
| - Address: address.String(), |
64 |
| - Token: s, |
65 |
| - }) |
| 61 | + sector, err := strconv.ParseUint(sec, 10, 64) |
| 62 | + if err != nil { |
| 63 | + return xerrors.Errorf("failed to parse the sector number: %w", err) |
66 | 64 | }
|
67 | 65 |
|
68 |
| - sort.Slice(addrTokens, func(i, j int) bool { |
69 |
| - return addrTokens[i].Address < addrTokens[j].Address |
70 |
| - }) |
71 |
| - |
72 |
| - for _, at := range addrTokens { |
73 |
| - fmt.Printf("[lotus-miner/boost compatible] %s %s\n", at.Address, at.Token) |
| 66 | + ctx := reqcontext.ReqContext(cctx) |
| 67 | + dep, err := deps.GetDepsCLI(ctx, cctx) |
| 68 | + if err != nil { |
| 69 | + return err |
74 | 70 | }
|
75 | 71 |
|
76 |
| - return nil |
| 72 | + return storageIngest.SealNow(ctx, dep.Chain, dep.DB, act, abi.SectorNumber(sector), cctx.Bool("synthetic")) |
77 | 73 | },
|
78 |
| - Name: "rpc-info", |
79 | 74 | }
|
80 | 75 |
|
81 |
| -var marketSealCmd = &cli.Command{ |
82 |
| - Name: "seal", |
83 |
| - Usage: "start sealing a deal sector early", |
| 76 | +var marketImportdataCmd = &cli.Command{ |
| 77 | + Name: "import-data", |
| 78 | + Usage: "Import data for offline deal", |
84 | 79 | Flags: []cli.Flag{
|
85 | 80 | &cli.StringFlag{
|
86 | 81 | Name: "actor",
|
87 | 82 | Usage: "Specify actor address to start sealing sectors for",
|
88 | 83 | Required: true,
|
89 | 84 | },
|
90 |
| - &cli.BoolFlag{ |
91 |
| - Name: "synthetic", |
92 |
| - Usage: "Use synthetic PoRep", |
93 |
| - Value: false, |
94 |
| - }, |
95 | 85 | },
|
96 |
| - ArgsUsage: "<sector>", |
| 86 | + ArgsUsage: "<deal UUID> <file> <host:port>", |
97 | 87 | Action: func(cctx *cli.Context) error {
|
98 |
| - act, err := address.NewFromString(cctx.String("actor")) |
| 88 | + if cctx.Args().Len() != 3 { |
| 89 | + return xerrors.Errorf("incorrect number of arguments") |
| 90 | + } |
| 91 | + |
| 92 | + idStr := cctx.Args().First() |
| 93 | + |
| 94 | + id, err := uuid.Parse(idStr) |
99 | 95 | if err != nil {
|
100 |
| - return xerrors.Errorf("parsing --actor: %w", err) |
| 96 | + return err |
101 | 97 | }
|
102 | 98 |
|
103 |
| - if cctx.Args().Len() > 1 { |
104 |
| - return xerrors.Errorf("specify only one sector") |
| 99 | + fileStr := cctx.Args().Get(1) |
| 100 | + fpath, err := homedir.Expand(fileStr) |
| 101 | + if err != nil { |
| 102 | + return err |
105 | 103 | }
|
106 | 104 |
|
107 |
| - sec := cctx.Args().First() |
| 105 | + f, err := os.Open(fpath) |
| 106 | + if err != nil { |
| 107 | + return err |
| 108 | + } |
108 | 109 |
|
109 |
| - sector, err := strconv.ParseUint(sec, 10, 64) |
| 110 | + defer func() { |
| 111 | + _ = f.Close() |
| 112 | + }() |
| 113 | + |
| 114 | + st, err := f.Stat() |
110 | 115 | if err != nil {
|
111 |
| - return xerrors.Errorf("failed to parse the sector number: %w", err) |
| 116 | + return err |
112 | 117 | }
|
113 | 118 |
|
| 119 | + rawSize := st.Size() |
| 120 | + |
| 121 | + fUrl := "file:///" + fpath |
| 122 | + |
114 | 123 | ctx := reqcontext.ReqContext(cctx)
|
115 | 124 | dep, err := deps.GetDepsCLI(ctx, cctx)
|
116 | 125 | if err != nil {
|
117 | 126 | return err
|
118 | 127 | }
|
119 | 128 |
|
120 |
| - return dealmarket.SealNow(ctx, dep.Chain, dep.DB, act, abi.SectorNumber(sector), cctx.Bool("synthetic")) |
| 129 | + comm, err := dep.DB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { |
| 130 | + var details []struct { |
| 131 | + Offline bool `db:"offline"` |
| 132 | + Piece string `db:"piece_cid"` |
| 133 | + Size abi.PaddedPieceSize `db:"piece_size"` |
| 134 | + } |
| 135 | + err = tx.Select(&details, `SELECT offline, piece_cid, piece_size FROM market_mk12_deals WHERE uuid = $1`, id.String()) |
| 136 | + if err != nil { |
| 137 | + return false, xerrors.Errorf("getting deal details from DB: %w", err) |
| 138 | + } |
| 139 | + |
| 140 | + if len(details) != 1 { |
| 141 | + return false, xerrors.Errorf("expected 1 row but got %d", len(details)) |
| 142 | + } |
| 143 | + |
| 144 | + deal := details[0] |
| 145 | + |
| 146 | + if !deal.Offline { |
| 147 | + return false, xerrors.Errorf("provided deal %s is an online deal", id.String()) |
| 148 | + } |
| 149 | + |
| 150 | + if abi.UnpaddedPieceSize(rawSize).Padded() != deal.Size { |
| 151 | + return false, xerrors.Errorf("piece size mismatch: database %d and calculated %d", deal.Size, abi.UnpaddedPieceSize(rawSize).Padded()) |
| 152 | + } |
| 153 | + |
| 154 | + var pieceID int64 |
| 155 | + // Attempt to select the piece ID first |
| 156 | + err = tx.QueryRow(`SELECT id FROM parked_pieces WHERE piece_cid = $1`, deal.Piece).Scan(&pieceID) |
| 157 | + |
| 158 | + if err != nil { |
| 159 | + if errors.Is(err, pgx.ErrNoRows) { |
| 160 | + // Piece does not exist, attempt to insert |
| 161 | + err = tx.QueryRow(` |
| 162 | + INSERT INTO parked_pieces (piece_cid, piece_padded_size, piece_raw_size) |
| 163 | + VALUES ($1, $2, $3) |
| 164 | + ON CONFLICT (piece_cid) DO NOTHING |
| 165 | + RETURNING id`, deal.Piece, deal.Size, rawSize).Scan(&pieceID) |
| 166 | + if err != nil { |
| 167 | + return false, xerrors.Errorf("inserting new parked piece and getting id: %w", err) |
| 168 | + } |
| 169 | + } else { |
| 170 | + // Some other error occurred during select |
| 171 | + return false, xerrors.Errorf("checking existing parked piece: %w", err) |
| 172 | + } |
| 173 | + } |
| 174 | + |
| 175 | + // Add parked_piece_ref |
| 176 | + var refID int64 |
| 177 | + err = tx.QueryRow(`INSERT INTO parked_piece_refs (piece_id, data_url) |
| 178 | + VALUES ($1, $2) RETURNING ref_id`, pieceID, fUrl).Scan(&refID) |
| 179 | + if err != nil { |
| 180 | + return false, xerrors.Errorf("inserting parked piece ref: %w", err) |
| 181 | + } |
| 182 | + |
| 183 | + pieceIDUrl := url.URL{ |
| 184 | + Scheme: "pieceref", |
| 185 | + Opaque: fmt.Sprintf("%d", refID), |
| 186 | + } |
| 187 | + |
| 188 | + // Insert the offline deal into the deal pipeline |
| 189 | + _, err = tx.Exec(`INSERT INTO market_mk12_deal_pipeline (url, file_size) |
| 190 | + VALUES ($1, $2) WHERE uuid = $3 ON CONFLICT (uuid) DO NOTHING`, |
| 191 | + pieceIDUrl, rawSize) |
| 192 | + if err != nil { |
| 193 | + return false, xerrors.Errorf("inserting deal into deal pipeline: %w", err) |
| 194 | + } |
| 195 | + |
| 196 | + return true, nil |
| 197 | + }, harmonydb.OptionRetry()) |
| 198 | + if err != nil { |
| 199 | + return err |
| 200 | + } |
| 201 | + if !comm { |
| 202 | + return xerrors.Errorf("failed to commit the transaction") |
| 203 | + } |
| 204 | + return nil |
121 | 205 | },
|
122 | 206 | }
|
0 commit comments