Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
8831db2
initial draft of market 2.0
LexLuthr May 14, 2025
acbd66b
PUT endpoint for deals
LexLuthr May 14, 2025
7f47865
add some type validation tests
LexLuthr May 14, 2025
7ca6276
info page and pdp router changes
LexLuthr May 16, 2025
f442ac7
client, commv2, ui
LexLuthr May 20, 2025
a72ecba
mk20 GC
LexLuthr May 20, 2025
8b68fea
aggregate deals
LexLuthr May 23, 2025
f3e99f6
offline deal and put deals
LexLuthr May 27, 2025
c80f1bf
finish UI
LexLuthr May 28, 2025
f670210
UI, reindex, retrievals
LexLuthr May 28, 2025
34b12f8
update pdp client service url
LexLuthr May 28, 2025
f10b1c0
basic docs, new api, UI changes
LexLuthr May 29, 2025
0b7a76f
chunk upload
LexLuthr Jun 11, 2025
6fa55eb
fix actor info page
LexLuthr Jun 11, 2025
701a0bd
add some PDP metrics
LexLuthr Jun 11, 2025
1a2d0aa
use piecepark for upload
LexLuthr Jun 16, 2025
4b8da8e
pcid2, pdpv1, retrievalv1
LexLuthr Jun 26, 2025
d7ba6f3
PDP pipeline
LexLuthr Jul 8, 2025
b790dae
openAPI, auth, streamline types and methods
LexLuthr Jul 15, 2025
cb98d48
pdp prove test
LexLuthr Jul 21, 2025
c72541a
deleteRoot, deleteProofSet
LexLuthr Jul 21, 2025
f3b430d
serial upload
LexLuthr Jul 22, 2025
1587e74
working PDP pipeline, indexing, ipni
LexLuthr Aug 8, 2025
99941cf
rename PDP
LexLuthr Aug 12, 2025
e8aa78e
Merge branch 'main' into feat/market2
LexLuthr Aug 12, 2025
29f099e
lotus version
LexLuthr Aug 13, 2025
25d75e9
CommPV2 integration with PDP contracts
LexLuthr Aug 15, 2025
066597b
piece cleanup, string client
LexLuthr Aug 21, 2025
9f96195
Merge branch 'main' into feat/market2
LexLuthr Aug 21, 2025
e2a193e
ts-client-swagger
snadrus Sep 1, 2025
73bc33a
CidV2ts, ez-upload
snadrus Sep 2, 2025
8071098
IPDPProvingSchedule changes
LexLuthr Sep 3, 2025
5d4ad52
fix swagger definitions
LexLuthr Sep 4, 2025
aafe7a7
niceties
snadrus Sep 4, 2025
bcb975e
minor fixes
snadrus Sep 4, 2025
9cbb71f
Merge branch 'feat/market2' into feat/tssdk
snadrus Sep 4, 2025
f015317
picked-up the changes, e2e test
snadrus Sep 5, 2025
420778d
auth fixes
snadrus Sep 7, 2025
32a9eba
added wait, sep deal calls
snadrus Sep 8, 2025
b05d9c9
switch to camelCase for json
LexLuthr Sep 8, 2025
1863ea1
undo m20 ddo download
LexLuthr Sep 8, 2025
10c986e
seperated submit and upload
snadrus Sep 9, 2025
967229d
name
snadrus Sep 9, 2025
90ceb6e
undo camelCase
LexLuthr Sep 9, 2025
a96fae0
Merge remote-tracking branch 'origin/feat/market2' into feat/tssdk
snadrus Sep 9, 2025
4cf1fb5
fix test
LexLuthr Sep 9, 2025
5217182
fix status, add status command
LexLuthr Sep 9, 2025
a4b10e6
Merge branch 'main' into feat/market2
LexLuthr Sep 9, 2025
25b1c16
Merge remote-tracking branch 'origin/feat/market2' into feat/tssdk
snadrus Sep 9, 2025
5d90eea
some progress
snadrus Sep 9, 2025
0df5b00
fix swagger types, UI fixes
LexLuthr Sep 9, 2025
6ec50bb
Merge remote-tracking branch 'origin/feat/market2' into feat/tssdk
snadrus Sep 9, 2025
8857f3f
update
snadrus Sep 10, 2025
5fe3e82
packageJson update
snadrus Sep 10, 2025
3523a02
naming
snadrus Sep 11, 2025
80d9896
readme
snadrus Sep 11, 2025
68157cf
version
snadrus Sep 11, 2025
699e043
progress
snadrus Sep 15, 2025
006173a
toplevel namespace, streaming piece
snadrus Sep 15, 2025
b91caa5
update ver
snadrus Sep 15, 2025
c3f52bf
rename
snadrus Sep 17, 2025
23633d2
packaging ts
snadrus Sep 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ jobs:
target: "./itests/harmonydb_test.go"
- name: test-itest-alertnow
target: "./itests/alertnow_test.go"
- name: test-itest-pdp-prove
target: "./itests/pdp_prove_test.go"
steps:
- uses: actions/checkout@v4

Expand Down Expand Up @@ -311,6 +313,10 @@ jobs:
run: go install github.com/hannahhoward/cbor-gen-for
shell: bash

- name: Install swag cli
run: go install github.com/swaggo/swag/cmd/[email protected]
shell: bash

# - name: Install gotext
# run: go install golang.org/x/text/cmd/gotext
# shell: bash
Expand Down
26 changes: 21 additions & 5 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ ENV RUSTUP_HOME=/usr/local/rustup \
PATH=/usr/local/cargo/bin:$PATH \
RUST_VERSION=1.63.0

COPY ./ /opt/curio
WORKDIR /opt/curio
RUN git submodule update --init
RUN go mod download

RUN set -eux; \
dpkgArch="$(dpkg --print-architecture)"; \
case "${dpkgArch##*-}" in \
Expand All @@ -32,9 +37,6 @@ RUN set -eux; \
cargo --version; \
rustc --version;

COPY ./ /opt/curio
WORKDIR /opt/curio

### make configurable filecoin-ffi build
ARG FFI_BUILD_FROM_SOURCE=0
ENV FFI_BUILD_FROM_SOURCE=${FFI_BUILD_FROM_SOURCE}
Expand All @@ -56,13 +58,26 @@ RUN go install github.com/ipld/go-car/cmd/car@latest \
RUN go install github.com/LexLuthr/piece-server@latest \
&& cp $GOPATH/bin/piece-server /usr/local/bin/

RUN go install github.com/ipni/storetheindex@v0.8.38 \
RUN go install github.com/ipni/storetheindex@latest \
&& cp $GOPATH/bin/storetheindex /usr/local/bin/

RUN go install github.com/ethereum/go-ethereum/cmd/geth@latest \
&& cp $GOPATH/bin/geth /usr/local/bin/

#####################################
FROM ubuntu:22.04 AS curio-all-in-one

RUN apt-get update && apt-get install -y dnsutils vim curl aria2 jq
RUN apt-get update && apt-get install -y dnsutils vim curl aria2 jq git wget nodejs npm

# Install Foundry
RUN curl -L https://foundry.paradigm.xyz | bash \
&& bash -c ". ~/.foundry/bin/foundryup"

# Make sure foundry binaries are available in PATH
ENV PATH="/root/.foundry/bin:${PATH}"

# Verify installation
RUN forge --version && cast --version && anvil --version

# Copy libraries and binaries from curio-builder
COPY --from=curio-builder /etc/ssl/certs /etc/ssl/certs
Expand Down Expand Up @@ -98,6 +113,7 @@ COPY --from=curio-builder /opt/curio/sptool /usr/local/bin/
COPY --from=piece-server-builder /usr/local/bin/piece-server /usr/local/bin/
COPY --from=piece-server-builder /usr/local/bin/car /usr/local/bin/
COPY --from=piece-server-builder /usr/local/bin/storetheindex /usr/local/bin/
COPY --from=piece-server-builder /usr/local/bin/geth /usr/local/bin/

# Set up directories and permissions
RUN mkdir /var/tmp/filecoin-proof-parameters \
Expand Down
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,11 @@ go-generate:
gen: gensimple
.PHONY: gen

gensimple: api-gen go-generate cfgdoc-gen docsgen docsgen-cli
marketgen:
swag init -dir market/mk20/http -g http.go -o market/mk20/http --parseDependencyLevel 3 --parseDependency
.PHONY: marketgen

gensimple: api-gen go-generate cfgdoc-gen docsgen marketgen docsgen-cli
$(GOCC) run ./scripts/fiximports
go mod tidy
.PHONY: gen
Expand Down
6 changes: 3 additions & 3 deletions alertmanager/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package alertmanager
import (
"bytes"
"context"
"database/sql"
"fmt"
"math"
"strings"
Expand All @@ -13,6 +12,7 @@ import (
"github.com/dustin/go-humanize"
cbor "github.com/ipfs/go-ipld-cbor"
"github.com/samber/lo"
"github.com/yugabyte/pgx/v5"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
Expand Down Expand Up @@ -346,7 +346,7 @@ func (al *alerts) getAddresses() ([]address.Address, []address.Address, error) {
cfg := config.DefaultCurioConfig()
err := al.db.QueryRow(al.ctx, `SELECT config FROM harmony_config WHERE title=$1`, layer).Scan(&text)
if err != nil {
if strings.Contains(err.Error(), sql.ErrNoRows.Error()) {
if strings.Contains(err.Error(), pgx.ErrNoRows.Error()) {
return nil, nil, xerrors.Errorf("missing layer '%s' ", layer)
}
return nil, nil, xerrors.Errorf("could not read layer '%s': %w", layer, err)
Expand Down Expand Up @@ -731,7 +731,7 @@ func missingSectorCheck(al *alerts) {
SectorID int64 `db:"sector_num"`
}

err := al.db.Select(al.ctx, &sectors, `SELECT miner_id, sector_num FROM sector_location WHERE sector_filetype = 2 GROUP BY miner_id, sector_num ORDER BY miner_id, sector_num`)
err := al.db.Select(al.ctx, &sectors, `SELECT miner_id, sector_num FROM sector_location WHERE sector_filetype = ANY(ARRAY[2,8]) GROUP BY miner_id, sector_num ORDER BY miner_id, sector_num`)
if err != nil {
al.alertMap[Name].err = xerrors.Errorf("getting sealed sectors from database: %w", err)
return
Expand Down
66 changes: 49 additions & 17 deletions alertmanager/plugin/slack_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,29 +65,60 @@ func (s *SlackWebhook) SendAlert(data *AlertPayload) error {

// Iterate through the map to construct the remaining blocks
for key, value := range data.Details {
// Split value into sentences by period followed by space
// Split value into sentences by period followed by space.
sentences := strings.Split(value.(string), ". ")
formattedValue := fmt.Sprintf("• *%s*\n", key)

// Add a bullet point before each trimmed sentence
// Add the key as the header for each block.
baseFormattedValue := fmt.Sprintf("• *%s*\n", key)
currentFormattedValue := baseFormattedValue

// Keep track of the character limit (3000) when adding sentences.
for _, sentence := range sentences {
trimmedSentence := strings.TrimSpace(sentence) // Trim leading and trailing spaces
trimmedSentence := strings.TrimSpace(sentence) // Trim leading and trailing spaces.
if trimmedSentence != "" {
formattedValue += fmt.Sprintf("• %s.\n", trimmedSentence) // Add period back and newline
// Add a bullet point and sentence, restoring the period and newline.
newSection := fmt.Sprintf("• %s.\n", trimmedSentence)

// Check if adding this section exceeds the 3000-character limit.
if len(currentFormattedValue)+len(newSection) > 3000 {
// If limit exceeds, add the currentFormattedValue block to payload and start a new block.
payload.Blocks = append(payload.Blocks,
Block{
Type: "section",
Text: &TextBlock{
Type: "mrkdwn",
Text: currentFormattedValue,
},
},
Block{
Type: "divider",
},
)

// Start a new formatted value with the baseFormattedValue.
currentFormattedValue = baseFormattedValue
}

// Append the newSection to the currentFormattedValue.
currentFormattedValue += newSection
}
}
payload.Blocks = append(payload.Blocks,
Block{
Type: "section",
Text: &TextBlock{
Type: "mrkdwn",
Text: formattedValue,

// Add the last block if it contains any content.
if currentFormattedValue != baseFormattedValue {
payload.Blocks = append(payload.Blocks,
Block{
Type: "section",
Text: &TextBlock{
Type: "mrkdwn",
Text: currentFormattedValue,
},
},
},
Block{
Type: "divider",
},
)
Block{
Type: "divider",
},
)
}
}

// Marshal the payload to JSON
Expand Down Expand Up @@ -163,7 +194,8 @@ func (s *SlackWebhook) SendAlert(data *AlertPayload) error {
}
})
if err != nil {
return fmt.Errorf("after %d retries,last error: %w", iter, err)
log.Errorw("Slack Webhook payload:", string(jsonData))
return fmt.Errorf("after %d retries,last error: %w, %s", iter, err, string(jsonData))
}
return nil
}
57 changes: 42 additions & 15 deletions cmd/curio/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps, shutdownChan chan
machine := dependencies.ListenAddr
prover := dependencies.Prover
iStore := dependencies.IndexStore
pp := dependencies.SectorReader

chainSched := chainsched.New(full)

Expand Down Expand Up @@ -234,12 +233,13 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps, shutdownChan chan
{
// Piece handling
if cfg.Subsystems.EnableParkPiece {
parkPieceTask, err := piece2.NewParkPieceTask(db, must.One(slrLazy.Val()), cfg.Subsystems.ParkPieceMaxTasks)
parkPieceTask, err := piece2.NewParkPieceTask(db, must.One(slrLazy.Val()), stor, cfg.Subsystems.ParkPieceMaxTasks)
if err != nil {
return nil, err
}
cleanupPieceTask := piece2.NewCleanupPieceTask(db, must.One(slrLazy.Val()), 0)
activeTasks = append(activeTasks, parkPieceTask, cleanupPieceTask)
aggregateChunksTask := piece2.NewAggregateChunksTask(db, stor, must.One(slrLazy.Val()))
activeTasks = append(activeTasks, parkPieceTask, cleanupPieceTask, aggregateChunksTask)
}
}

Expand All @@ -257,21 +257,27 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps, shutdownChan chan
}

{
var sdeps cuhttp.ServiceDeps
// Market tasks
var dm *storage_market.CurioStorageDealMarket
if cfg.Subsystems.EnableDealMarket {
// Main market poller should run on all nodes
dm = storage_market.NewCurioStorageDealMarket(miners, db, cfg, si, full, as)
dm = storage_market.NewCurioStorageDealMarket(miners, db, cfg, must.One(dependencies.EthClient.Val()), si, full, as, must.One(slrLazy.Val()))
err := dm.StartMarket(ctx)
if err != nil {
return nil, err
}

sdeps.DealMarket = dm

if cfg.Subsystems.EnableCommP {
commpTask := storage_market.NewCommpTask(dm, db, must.One(slrLazy.Val()), full, cfg.Subsystems.CommPMaxTasks)
activeTasks = append(activeTasks, commpTask)
}

aggTask := storage_market.NewAggregateTask(dm, db, must.One(slrLazy.Val()), lstor, full)
activeTasks = append(activeTasks, aggTask)

// PSD and Deal find task do not require many resources. They can run on all machines
psdTask := storage_market.NewPSDTask(dm, db, sender, as, &cfg.Market.StorageMarketConfig.MK12, full)
dealFindTask := storage_market.NewFindDealTask(dm, db, full, &cfg.Market.StorageMarketConfig.MK12)
Expand All @@ -288,30 +294,48 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps, shutdownChan chan
if err != nil {
return nil, err
}
var sdeps cuhttp.ServiceDeps

if cfg.Subsystems.EnablePDP {
es := getSenderEth()
sdeps.EthSender = es

pdp.NewWatcherCreate(db, must.One(dependencies.EthClient.Val()), chainSched)
pdp.NewWatcherRootAdd(db, must.One(dependencies.EthClient.Val()), chainSched)
ethClient := must.One(dependencies.EthClient.Val())

pdp.NewWatcherDataSetCreate(db, ethClient, chainSched)
pdp.NewWatcherPieceAdd(db, chainSched, ethClient)
pdp.NewWatcherDelete(db, chainSched)
pdp.NewWatcherPieceDelete(db, chainSched)

pdpProveTask := pdp.NewProveTask(chainSched, db, must.One(dependencies.EthClient.Val()), dependencies.Chain, es, dependencies.CachedPieceReader)
pdpNextProvingPeriodTask := pdp.NewNextProvingPeriodTask(db, must.One(dependencies.EthClient.Val()), dependencies.Chain, chainSched, es)
pdpInitProvingPeriodTask := pdp.NewInitProvingPeriodTask(db, must.One(dependencies.EthClient.Val()), dependencies.Chain, chainSched, es)
pdpProveTask := pdp.NewProveTask(chainSched, db, ethClient, dependencies.Chain, es, dependencies.CachedPieceReader, iStore)
pdpNextProvingPeriodTask := pdp.NewNextProvingPeriodTask(db, ethClient, dependencies.Chain, chainSched, es)
pdpInitProvingPeriodTask := pdp.NewInitProvingPeriodTask(db, ethClient, dependencies.Chain, chainSched, es)
pdpNotifTask := pdp.NewPDPNotifyTask(db)
activeTasks = append(activeTasks, pdpNotifTask, pdpProveTask, pdpNextProvingPeriodTask, pdpInitProvingPeriodTask)

addProofSetTask := pdp.NewPDPTaskAddDataSet(db, es, ethClient, full)
pdpAddRoot := pdp.NewPDPTaskAddPiece(db, es, ethClient)
pdpDelRoot := pdp.NewPDPTaskDeletePiece(db, es, ethClient)
pdpDelProofSetTask := pdp.NewPDPTaskDeleteDataSet(db, es, ethClient, full)

pdpAggregateTask := pdp.NewAggregatePDPDealTask(db, sc)
pdpCache := pdp.NewTaskPDPSaveCache(db, dependencies.CachedPieceReader, iStore)
commPTask := pdp.NewPDPCommpTask(db, sc, cfg.Subsystems.CommPMaxTasks)

activeTasks = append(activeTasks, pdpNotifTask, pdpProveTask, pdpNextProvingPeriodTask, pdpInitProvingPeriodTask, commPTask, pdpAddRoot, addProofSetTask, pdpAggregateTask, pdpCache, pdpDelRoot, pdpDelProofSetTask)
}

idxMax := taskhelp.Max(cfg.Subsystems.IndexingMaxTasks)

indexingTask := indexing.NewIndexingTask(db, sc, iStore, pp, cfg, idxMax)
ipniTask := indexing.NewIPNITask(db, sc, iStore, pp, cfg, idxMax)
activeTasks = append(activeTasks, ipniTask, indexingTask)
indexingTask := indexing.NewIndexingTask(db, sc, iStore, dependencies.SectorReader, dependencies.CachedPieceReader, cfg, idxMax)
ipniTask := indexing.NewIPNITask(db, sc, dependencies.SectorReader, dependencies.CachedPieceReader, cfg, idxMax)
pdpIdxTask := indexing.NewPDPIndexingTask(db, sc, iStore, dependencies.CachedPieceReader, cfg, idxMax)
pdpIPNITask := indexing.NewPDPIPNITask(db, sc, dependencies.CachedPieceReader, cfg, idxMax)
activeTasks = append(activeTasks, ipniTask, indexingTask, pdpIdxTask, pdpIPNITask)

if cfg.HTTP.Enable {
err = cuhttp.StartHTTPServer(ctx, dependencies, &sdeps, dm)
if !cfg.Subsystems.EnableDealMarket {
return nil, xerrors.New("deal market must be enabled on HTTP server")
}
err = cuhttp.StartHTTPServer(ctx, dependencies, &sdeps)
if err != nil {
return nil, xerrors.Errorf("failed to start the HTTP server: %w", err)
}
Expand All @@ -321,6 +345,9 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps, shutdownChan chan
amTask := alertmanager.NewAlertTask(full, db, cfg.Alerting, dependencies.Al)
activeTasks = append(activeTasks, amTask)

pcl := gc.NewPieceCleanupTask(db, iStore)
activeTasks = append(activeTasks, pcl)

log.Infow("This Curio instance handles",
"miner_addresses", miners,
"tasks", lo.Map(activeTasks, func(t harmonytask.TaskInterface, _ int) string { return t.TypeDetails().Name }))
Expand Down
Loading