Skip to content

Commit 48e953d

Browse files
committed
poller redesign
1 parent 1c11a76 commit 48e953d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+2015
-2665
lines changed

.circleci/config.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,3 +275,10 @@ workflows:
275275
target: "./itests/curio_test.go"
276276
get-params: true
277277
resource_class: 2xlarge
278+
- test:
279+
name: test-idxStore
280+
requires:
281+
- build
282+
suite: idxStore
283+
target: "./lib/indexing/indexstore/indexstore_test.go"
284+
get-params: true

cmd/curio/market.go

Lines changed: 141 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,122 +1,206 @@
11
package main
22

33
import (
4+
"errors"
45
"fmt"
5-
"sort"
6+
"net/url"
7+
"os"
68
"strconv"
79

10+
"github.com/google/uuid"
11+
"github.com/mitchellh/go-homedir"
812
"github.com/urfave/cli/v2"
13+
"github.com/yugabyte/pgx/v5"
914
"golang.org/x/xerrors"
1015

1116
"github.com/filecoin-project/go-address"
1217
"github.com/filecoin-project/go-state-types/abi"
1318

1419
"github.com/filecoin-project/curio/deps"
20+
"github.com/filecoin-project/curio/harmony/harmonydb"
1521
"github.com/filecoin-project/curio/lib/reqcontext"
16-
"github.com/filecoin-project/curio/market/dealmarket"
17-
"github.com/filecoin-project/curio/market/lmrpc"
22+
"github.com/filecoin-project/curio/market/storageIngest"
1823
)
1924

2025
var marketCmd = &cli.Command{
2126
Name: "market",
2227
Subcommands: []*cli.Command{
23-
marketRPCInfoCmd,
2428
marketSealCmd,
29+
marketImportdataCmd,
2530
},
2631
}
2732

28-
var marketRPCInfoCmd = &cli.Command{
33+
var marketSealCmd = &cli.Command{
34+
Name: "seal",
35+
Usage: "start sealing a deal sector early",
2936
Flags: []cli.Flag{
30-
&cli.StringSliceFlag{
31-
Name: "layers",
32-
Usage: "list of layers to be interpreted (atop defaults). Default: base",
37+
&cli.StringFlag{
38+
Name: "actor",
39+
Usage: "Specify actor address to start sealing sectors for",
40+
Required: true,
41+
},
42+
&cli.BoolFlag{
43+
Name: "synthetic",
44+
Usage: "Use synthetic PoRep",
45+
Value: false,
3346
},
3447
},
48+
ArgsUsage: "<sector>",
3549
Action: func(cctx *cli.Context) error {
36-
db, err := deps.MakeDB(cctx)
37-
if err != nil {
38-
return err
39-
}
40-
41-
layers := cctx.StringSlice("layers")
42-
43-
cfg, err := deps.GetConfig(cctx.Context, layers, db)
50+
act, err := address.NewFromString(cctx.String("actor"))
4451
if err != nil {
45-
return xerrors.Errorf("get config: %w", err)
52+
return xerrors.Errorf("parsing --actor: %w", err)
4653
}
4754

48-
ts, err := lmrpc.MakeTokens(cfg)
49-
if err != nil {
50-
return xerrors.Errorf("make tokens: %w", err)
55+
if cctx.Args().Len() > 1 {
56+
return xerrors.Errorf("specify only one sector")
5157
}
5258

53-
var addrTokens []struct {
54-
Address string
55-
Token string
56-
}
59+
sec := cctx.Args().First()
5760

58-
for address, s := range ts {
59-
addrTokens = append(addrTokens, struct {
60-
Address string
61-
Token string
62-
}{
63-
Address: address.String(),
64-
Token: s,
65-
})
61+
sector, err := strconv.ParseUint(sec, 10, 64)
62+
if err != nil {
63+
return xerrors.Errorf("failed to parse the sector number: %w", err)
6664
}
6765

68-
sort.Slice(addrTokens, func(i, j int) bool {
69-
return addrTokens[i].Address < addrTokens[j].Address
70-
})
71-
72-
for _, at := range addrTokens {
73-
fmt.Printf("[lotus-miner/boost compatible] %s %s\n", at.Address, at.Token)
66+
ctx := reqcontext.ReqContext(cctx)
67+
dep, err := deps.GetDepsCLI(ctx, cctx)
68+
if err != nil {
69+
return err
7470
}
7571

76-
return nil
72+
return storageIngest.SealNow(ctx, dep.Chain, dep.DB, act, abi.SectorNumber(sector), cctx.Bool("synthetic"))
7773
},
78-
Name: "rpc-info",
7974
}
8075

81-
var marketSealCmd = &cli.Command{
82-
Name: "seal",
83-
Usage: "start sealing a deal sector early",
76+
var marketImportdataCmd = &cli.Command{
77+
Name: "import-data",
78+
Usage: "Import data for offline deal",
8479
Flags: []cli.Flag{
8580
&cli.StringFlag{
8681
Name: "actor",
8782
Usage: "Specify actor address to start sealing sectors for",
8883
Required: true,
8984
},
90-
&cli.BoolFlag{
91-
Name: "synthetic",
92-
Usage: "Use synthetic PoRep",
93-
Value: false,
94-
},
9585
},
96-
ArgsUsage: "<sector>",
86+
ArgsUsage: "<deal UUID> <file> <host:port>",
9787
Action: func(cctx *cli.Context) error {
98-
act, err := address.NewFromString(cctx.String("actor"))
88+
if cctx.Args().Len() != 3 {
89+
return xerrors.Errorf("incorrect number of arguments")
90+
}
91+
92+
idStr := cctx.Args().First()
93+
94+
id, err := uuid.Parse(idStr)
9995
if err != nil {
100-
return xerrors.Errorf("parsing --actor: %w", err)
96+
return err
10197
}
10298

103-
if cctx.Args().Len() > 1 {
104-
return xerrors.Errorf("specify only one sector")
99+
fileStr := cctx.Args().Get(1)
100+
fpath, err := homedir.Expand(fileStr)
101+
if err != nil {
102+
return err
105103
}
106104

107-
sec := cctx.Args().First()
105+
f, err := os.Open(fpath)
106+
if err != nil {
107+
return err
108+
}
108109

109-
sector, err := strconv.ParseUint(sec, 10, 64)
110+
defer func() {
111+
_ = f.Close()
112+
}()
113+
114+
st, err := f.Stat()
110115
if err != nil {
111-
return xerrors.Errorf("failed to parse the sector number: %w", err)
116+
return err
112117
}
113118

119+
rawSize := st.Size()
120+
121+
fUrl := "file:///" + fpath
122+
114123
ctx := reqcontext.ReqContext(cctx)
115124
dep, err := deps.GetDepsCLI(ctx, cctx)
116125
if err != nil {
117126
return err
118127
}
119128

120-
return dealmarket.SealNow(ctx, dep.Chain, dep.DB, act, abi.SectorNumber(sector), cctx.Bool("synthetic"))
129+
comm, err := dep.DB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
130+
var details []struct {
131+
Offline bool `db:"offline"`
132+
Piece string `db:"piece_cid"`
133+
Size abi.PaddedPieceSize `db:"piece_size"`
134+
}
135+
err = tx.Select(&details, `SELECT offline, piece_cid, piece_size FROM market_mk12_deals WHERE uuid = $1`, id.String())
136+
if err != nil {
137+
return false, xerrors.Errorf("getting deal details from DB: %w", err)
138+
}
139+
140+
if len(details) != 1 {
141+
return false, xerrors.Errorf("expected 1 row but got %d", len(details))
142+
}
143+
144+
deal := details[0]
145+
146+
if !deal.Offline {
147+
return false, xerrors.Errorf("provided deal %s is an online deal", id.String())
148+
}
149+
150+
if abi.UnpaddedPieceSize(rawSize).Padded() != deal.Size {
151+
return false, xerrors.Errorf("piece size mismatch: database %d and calculated %d", deal.Size, abi.UnpaddedPieceSize(rawSize).Padded())
152+
}
153+
154+
var pieceID int64
155+
// Attempt to select the piece ID first
156+
err = tx.QueryRow(`SELECT id FROM parked_pieces WHERE piece_cid = $1`, deal.Piece).Scan(&pieceID)
157+
158+
if err != nil {
159+
if errors.Is(err, pgx.ErrNoRows) {
160+
// Piece does not exist, attempt to insert
161+
err = tx.QueryRow(`
162+
INSERT INTO parked_pieces (piece_cid, piece_padded_size, piece_raw_size)
163+
VALUES ($1, $2, $3)
164+
ON CONFLICT (piece_cid) DO NOTHING
165+
RETURNING id`, deal.Piece, deal.Size, rawSize).Scan(&pieceID)
166+
if err != nil {
167+
return false, xerrors.Errorf("inserting new parked piece and getting id: %w", err)
168+
}
169+
} else {
170+
// Some other error occurred during select
171+
return false, xerrors.Errorf("checking existing parked piece: %w", err)
172+
}
173+
}
174+
175+
// Add parked_piece_ref
176+
var refID int64
177+
err = tx.QueryRow(`INSERT INTO parked_piece_refs (piece_id, data_url)
178+
VALUES ($1, $2) RETURNING ref_id`, pieceID, fUrl).Scan(&refID)
179+
if err != nil {
180+
return false, xerrors.Errorf("inserting parked piece ref: %w", err)
181+
}
182+
183+
pieceIDUrl := url.URL{
184+
Scheme: "pieceref",
185+
Opaque: fmt.Sprintf("%d", refID),
186+
}
187+
188+
// Insert the offline deal into the deal pipeline
189+
_, err = tx.Exec(`INSERT INTO market_mk12_deal_pipeline (url, file_size)
190+
VALUES ($1, $2) WHERE uuid = $3 ON CONFLICT (uuid) DO NOTHING`,
191+
pieceIDUrl, rawSize)
192+
if err != nil {
193+
return false, xerrors.Errorf("inserting deal into deal pipeline: %w", err)
194+
}
195+
196+
return true, nil
197+
}, harmonydb.OptionRetry())
198+
if err != nil {
199+
return err
200+
}
201+
if !comm {
202+
return xerrors.Errorf("failed to commit the transaction")
203+
}
204+
return nil
121205
},
122206
}

cmd/curio/pipeline.go

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -140,17 +140,33 @@ var sealStartCmd = &cli.Command{
140140
}
141141
}
142142

143-
num, err := seal.AllocateSectorNumbers(ctx, dep.Chain, dep.DB, act, cctx.Int("count"), func(tx *harmonydb.Tx, numbers []abi.SectorNumber) (bool, error) {
144-
for _, n := range numbers {
145-
_, err := tx.Exec("insert into sectors_sdr_pipeline (sp_id, sector_number, reg_seal_proof, user_sector_duration_epochs) values ($1, $2, $3, $4)", mid, n, spt, userDuration)
143+
var num []abi.SectorNumber
144+
145+
comm, err := dep.DB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
146+
num, err = seal.AllocateSectorNumbers(ctx, dep.Chain, tx, act, cctx.Int("count"))
147+
if err != nil {
148+
return false, err
149+
}
150+
151+
for _, n := range num {
152+
_, err := tx.Exec("insert into sectors_sdr_pipeline (sp_id, sector_number, reg_seal_proof) values ($1, $2, $3)", mid, n, spt)
146153
if err != nil {
147154
return false, xerrors.Errorf("inserting into sectors_sdr_pipeline: %w", err)
148155
}
149156
}
157+
158+
if err != nil {
159+
return false, xerrors.Errorf("allocating sector numbers: %w", err)
160+
}
150161
return true, nil
151162
})
163+
152164
if err != nil {
153-
return xerrors.Errorf("allocating sector numbers: %w", err)
165+
return xerrors.Errorf("failed to allocate new sectors: %w", err)
166+
}
167+
168+
if !comm {
169+
return xerrors.Errorf("failed to commit the transaction")
154170
}
155171

156172
for _, number := range num {

cmd/curio/run.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
"github.com/filecoin-project/curio/cmd/curio/tasks"
1717
"github.com/filecoin-project/curio/deps"
1818
"github.com/filecoin-project/curio/lib/shutdown"
19-
"github.com/filecoin-project/curio/market/lmrpc"
2019

2120
"github.com/filecoin-project/lotus/lib/ulimit"
2221
"github.com/filecoin-project/lotus/metrics"
@@ -131,10 +130,6 @@ var runCmd = &cli.Command{
131130
}
132131
defer taskEngine.GracefullyTerminate()
133132

134-
if err := lmrpc.ServeCurioMarketRPCFromConfig(dependencies.DB, dependencies.Chain, dependencies.Cfg); err != nil {
135-
return xerrors.Errorf("starting market RPCs: %w", err)
136-
}
137-
138133
err = rpc.ListenAndServe(ctx, dependencies, shutdownChan) // Monitor for shutdown.
139134
if err != nil {
140135
return err

0 commit comments

Comments
 (0)