diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..f566351 --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,25 @@ +# This workflow will build a golang project +# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go + +name: Test + +on: + push: + branches: ['main'] + pull_request: + branches: ['main'] + +jobs: + test: + name: Run tests + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version: '1.23' + + - name: Test + run: go test -v -failfast ./... diff --git a/go.mod b/go.mod index e677447..17d360a 100644 --- a/go.mod +++ b/go.mod @@ -33,9 +33,12 @@ require ( github.com/jackc/pgx/v4 v4.18.3 github.com/jsign/go-filsigner v0.4.1 github.com/labstack/echo/v4 v4.11.4 + github.com/libp2p/go-libp2p v0.42.0 + github.com/libp2p/go-yamux/v4 v4.0.2 github.com/mattn/go-isatty v0.0.20 github.com/multiformats/go-multiaddr v0.16.1 github.com/multiformats/go-multihash v0.2.3 + github.com/stretchr/testify v1.10.0 github.com/whyrusleeping/cbor-gen v0.3.1 golang.org/x/sync v0.16.0 golang.org/x/sys v0.35.0 @@ -70,6 +73,7 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.6 // indirect github.com/daaku/go.zipexe v1.0.2 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect github.com/dchest/blake2b v1.0.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect @@ -150,13 +154,11 @@ require ( github.com/lib/pq v1.10.9 // indirect github.com/libp2p/go-buffer-pool v0.1.0 // indirect github.com/libp2p/go-flow-metrics v0.2.0 // indirect - github.com/libp2p/go-libp2p v0.42.0 // indirect github.com/libp2p/go-libp2p-asn-util v0.4.1 // indirect github.com/libp2p/go-libp2p-pubsub v0.13.0 // indirect github.com/libp2p/go-msgio v0.3.0 // indirect github.com/libp2p/go-netroute v0.2.2 // indirect github.com/libp2p/go-reuseport v0.4.0 // indirect - github.com/libp2p/go-yamux/v4 v4.0.2 // indirect github.com/libp2p/go-yamux/v5 v5.0.1 // indirect github.com/magefile/mage v1.15.0 // indirect github.com/mailru/easyjson v0.7.7 // indirect @@ -200,6 +202,7 @@ require ( github.com/pion/turn/v4 v4.0.2 // indirect github.com/pion/webrtc/v4 v4.1.2 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/polydawn/refmt v0.89.0 // indirect github.com/prometheus/client_golang v1.22.0 // indirect github.com/prometheus/client_model v0.6.2 // indirect diff --git a/service/constants.go b/service/constants.go new file mode 100644 index 0000000..c122071 --- /dev/null +++ b/service/constants.go @@ -0,0 +1,7 @@ +package service + +const ( + ListEligibleDefaultSize = 500 + ListEligibleMaxSize = 2 << 20 + ShowRecentFailuresHours = 24 +) diff --git a/service/lotus.go b/service/lotus.go new file mode 100644 index 0000000..121fa2d --- /dev/null +++ b/service/lotus.go @@ -0,0 +1,119 @@ +package service + +import ( + "context" + "fmt" + "time" + + "code.riba.cloud/go/toolbox-interplanetary/fil" + "code.riba.cloud/go/toolbox/cmn" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + filabi "github.com/filecoin-project/go-state-types/abi" + filbig "github.com/filecoin-project/go-state-types/big" + filbuiltin "github.com/filecoin-project/go-state-types/builtin" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" + lru "github.com/hashicorp/golang-lru/v2" + "github.com/storacha/spade/internal/app" +) + +// AuthorizationLotusClient defines the minimal Filecoin client interface +// required to support SP authorization. +type AuthorizationLotusClient interface { + // ChainGetTipSetByHeight looks back for a tipset at the specified epoch. + ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error) + // StateAccountKey retrieves the key address for an account at a given tipset. + StateAccountKey(context.Context, address.Address, types.TipSetKey) (address.Address, error) + // StateMinerInfo retrieves miner info at a given tipset. + StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error) +} + +// LookbackLotusClient defines the minimal Filecoin client interface required to +// support lookback tipset retrieval. +type LookbackLotusClient interface { + // ChainHead returns the current head of the chain. + ChainHead(context.Context) (*types.TipSet, error) + // ChainGetTipSetByHeight looks back for a tipset at the specified epoch. + ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error) +} + +// ReservationLotusClient defines the minimal Filecoin client interface required +// to support reservation eligibility checks and related operations. +type ReservationLotusClient interface { + LookbackLotusClient + // MinerGetBaseInfo retrieves mining base info for a miner at a given tipset. + MinerGetBaseInfo(context.Context, address.Address, abi.ChainEpoch, types.TipSetKey) (*api.MiningBaseInfo, error) +} + +// SpadeLotusClient defines the minimal Lotus client interface required to +// support all Spade service operations. +type SpadeLotusClient interface { + AuthorizationLotusClient + ReservationLotusClient +} + +var collateralCache, _ = lru.New[filabi.ChainEpoch, filbig.Int](128) + +func ProviderCollateralEstimateGiB(ctx context.Context, sourceEpoch filabi.ChainEpoch) (filbig.Int, error) { + if pc, didFind := collateralCache.Get(sourceEpoch); didFind { + return pc, nil + } + + collateralGiB, err := app.EpochMinProviderCollateralEstimateGiB(ctx, sourceEpoch) + if err != nil { + return collateralGiB, cmn.WrErr(err) + } + + // make it 1.7 times larger, so that fluctuations in the state won't prevent the deal from being proposed/published later + // capped by https://github.com/filecoin-project/lotus/blob/v1.13.2-rc2/markets/storageadapter/provider.go#L267 + // and https://github.com/filecoin-project/lotus/blob/v1.13.2-rc2/markets/storageadapter/provider.go#L41 + inflatedCollateralGiB := filbig.Div( + filbig.Product( + collateralGiB, + filbig.NewInt(17), + ), + filbig.NewInt(10), + ) + + collateralCache.Add(sourceEpoch, inflatedCollateralGiB) + return inflatedCollateralGiB, nil +} + +// GetTipset retrieves the tipset at the specified lookback epoch. It is a +// copy of [fil.GetTipset] adjusted to use the minimal interface +// [LookbackLotusClient]. +func GetTipset(ctx context.Context, lapi LookbackLotusClient, lookback uint) (*fil.LotusTS, error) { + latestHead, err := lapi.ChainHead(ctx) + if err != nil { + return nil, fmt.Errorf("failed getting chain head: %w", err) + } + + wallUnix := time.Now().Unix() + filUnix := int64(latestHead.Blocks()[0].Timestamp) + + if wallUnix < filUnix-3 || // allow few seconds clock-drift tolerance + wallUnix > filUnix+int64( + fil.PropagationDelaySecs+(fil.APIMaxTipsetsBehind*filbuiltin.EpochDurationSeconds), + ) { + return nil, fmt.Errorf( + "lotus API out of sync: chainHead reports unixtime %d (height: %d) while walltime is %d (delta: %s)", + filUnix, + latestHead.Height(), + wallUnix, + time.Second*time.Duration(wallUnix-filUnix), + ) + } + + if lookback == 0 { + return latestHead, nil + } + + latestHeight := latestHead.Height() + tipsetAtLookback, err := lapi.ChainGetTipSetByHeight(ctx, latestHeight-filabi.ChainEpoch(lookback), latestHead.Key()) + if err != nil { + return nil, fmt.Errorf("determining target tipset %d epochs ago failed: %w", lookback, err) + } + + return tipsetAtLookback, nil +} diff --git a/service/pg/authorization.go b/service/pg/authorization.go new file mode 100644 index 0000000..85f5fc2 --- /dev/null +++ b/service/pg/authorization.go @@ -0,0 +1,113 @@ +package pg + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" + "time" + + "code.riba.cloud/go/toolbox-interplanetary/fil" + "github.com/google/uuid" + "github.com/storacha/spade/apitypes" + "github.com/storacha/spade/service" + "github.com/storacha/spade/spid" + spid_lotus "github.com/storacha/spade/spid/lotus" +) + +func (p *PgLotusSpadeService) Authorize(ctx context.Context, req service.Request) (service.Authorization, error) { + challenge, err := spid.Parse(req.Headers.Get("Authorization")) + if err != nil { + return service.Authorization{}, fmt.Errorf("parsing authorization header: %w", err) + } + + err = spid_lotus.ResolveAndVerify(ctx, p.lotusAPI, challenge) + if err != nil { + return service.Authorization{}, fmt.Errorf("verifying authorization signature: %w", err) + } + + sp := fil.MustParseActorString(challenge.Addr().String()) + signedArgs, err := challenge.Args().Values() + if err != nil { + return service.Authorization{}, fmt.Errorf("getting signed args values: %w", err) + } + + reqJ, err := json.Marshal( + struct { + Method string + Host string + Path string + Params string + ParamsSigned url.Values + Headers http.Header + }{ + Method: req.Method, + Host: req.Host, + Path: req.Path, + Params: req.Params.Encode(), + ParamsSigned: signedArgs, + Headers: req.Headers, + }, + ) + if err != nil { + return service.Authorization{}, err + } + + spDetails := [4]int16{-1, -1, -1, -1} + var requestUUID string + var stateEpoch int64 + var spInfo apitypes.SPInfo + var spInfoLastPoll *time.Time + if err := p.db.QueryRow( + ctx, + ` + INSERT INTO spd.requests ( provider_id, request_dump ) + VALUES ( $1, $2 ) + RETURNING + request_uuid, + ( SELECT ( metadata->'market_state'->'epoch' )::INTEGER FROM spd.global ), + COALESCE( ( + SELECT + ARRAY[ + COALESCE( org_id, -1 ), + COALESCE( city_id, -1), + COALESCE( country_id, -1), + COALESCE( continent_id, -1) + ] + FROM spd.providers + WHERE provider_id = $1 + LIMIT 1 + ), ARRAY[-1, -1, -1, -1] ), + ( + SELECT info + FROM spd.providers_info + WHERE provider_id = $1 + ), + ( + SELECT provider_last_polled + FROM spd.providers_info + WHERE provider_id = $1 + ) + `, + sp, + reqJ, + ).Scan(&requestUUID, &stateEpoch, &spDetails, &spInfo, &spInfoLastPoll); err != nil { + return service.Authorization{}, err + } + + reqID, err := uuid.Parse(requestUUID) + if err != nil { + return service.Authorization{}, fmt.Errorf("parsing UUID: %w", err) + } + + return service.Authorization{ + RequestID: reqID, + StateEpoch: stateEpoch, + SignedArgs: signedArgs, + ProviderID: sp, + ProviderDetails: spDetails, + ProviderInfo: spInfo, + LastPoll: spInfoLastPoll, + }, nil +} diff --git a/service/pg/eligibility.go b/service/pg/eligibility.go new file mode 100644 index 0000000..5083ccf --- /dev/null +++ b/service/pg/eligibility.go @@ -0,0 +1,47 @@ +package pg + +import ( + "context" + "fmt" + + "code.riba.cloud/go/toolbox-interplanetary/fil" + "github.com/georgysavva/scany/pgxscan" + "github.com/storacha/spade/service" +) + +func (p *PgLotusSpadeService) EligiblePieces(ctx context.Context, sp fil.ActorID, options ...service.EligiblePiecesOption) ([]service.EligiblePiece, bool, error) { + cfg := service.EligiblePiecesConfig{Limit: service.ListEligibleDefaultSize} + for _, opt := range options { + opt(&cfg) + } + + lim := cfg.Limit + tenantID := cfg.TenantID + // how to list: start small, find setting below + useQueryFunc := "pieces_eligible_head" + if lim > service.ListEligibleDefaultSize { // deduce from requested lim + useQueryFunc = "pieces_eligible_full" + } + + orderedPieces := make([]service.EligiblePiece, 0, lim+1) + if err := pgxscan.Select( + ctx, + p.db, + &orderedPieces, + fmt.Sprintf("SELECT * FROM spd.%s( $1, $2, $3, $4, $5 )", useQueryFunc), + sp, + lim+1, // ask for one extra, to disambiguate "there is more" + tenantID, + cfg.IncludeSourceless, + false, + ); err != nil { + return nil, false, err + } + + var more bool + if uint64(len(orderedPieces)) > lim { + orderedPieces = orderedPieces[:lim] + more = true + } + return orderedPieces, more, nil +} diff --git a/service/pg/errorlogger.go b/service/pg/errorlogger.go new file mode 100644 index 0000000..7a53b04 --- /dev/null +++ b/service/pg/errorlogger.go @@ -0,0 +1,36 @@ +package pg + +import ( + "context" + "encoding/json" + + "github.com/google/uuid" + "github.com/storacha/spade/apitypes" +) + +func (s *PgLotusSpadeService) RequestError(ctx context.Context, requestID uuid.UUID, code apitypes.APIErrorCode, message string, payload any) error { + jPayload, err := json.Marshal(payload) + if err != nil { + return err + } + _, err = s.db.Exec( + ctx, + ` + UPDATE spd.requests SET + request_meta = JSONB_STRIP_NULLS( request_meta || JSONB_BUILD_OBJECT( + 'error', $1::TEXT, + 'error_code', $2::INTEGER, + 'error_slug', $3::TEXT, + 'payload', $4::JSONB + ) ) + WHERE + request_uuid = $5 + `, + message, + code, + code.String(), + jPayload, + requestID.String(), + ) + return err +} diff --git a/service/pg/manifest.go b/service/pg/manifest.go new file mode 100644 index 0000000..805832d --- /dev/null +++ b/service/pg/manifest.go @@ -0,0 +1,85 @@ +package pg + +import ( + "context" + "fmt" + + "code.riba.cloud/go/toolbox-interplanetary/fil" + filabi "github.com/filecoin-project/go-state-types/abi" + "github.com/georgysavva/scany/pgxscan" + "github.com/google/uuid" + "github.com/ipfs/go-cid" + "github.com/storacha/spade/service" +) + +func (p *PgLotusSpadeService) PieceManifest(ctx context.Context, sp fil.ActorID, proposal uuid.UUID) (service.PieceManifest, error) { + pcs := make([]struct { + AggLog2Size int `db:"agg_log2size"` + AggPCidV1 string `db:"agg_pcid_v1"` + SegPCidV2 string `db:"seg_pcid_v2"` + UrlTemplate string + }, 0, 8<<10) + + if err := pgxscan.Select( + ctx, + p.db, + &pcs, + ` + SELECT + ap.piece_cid AS agg_pcid_v1, + ap.piece_log2_size AS agg_log2size, + sp.piece_cid AS seg_pcid_v2, + t.tenant_meta->'bulk_piece_source'->>'url_template' AS url_template + FROM spd.piece_segments ps + JOIN spd.pieces ap USING ( piece_id ) + JOIN spd.pieces sp ON ( ps.segment_id = sp.piece_id ) + JOIN spd.proposals pr ON ( pr.piece_id = ps.piece_id ) + JOIN spd.clients cl USING ( client_id ) + JOIN spd.tenants t USING ( tenant_id ) + WHERE + (ap.piece_meta->'is_frc58_segmented')::bool + AND + pr.proposal_uuid = $1 + AND + -- ensure we only display SPs own proposals, no list-sharing + pr.provider_id = $2 + AND + -- only pending proposals + pr.proposal_delivered IS NOT NULL AND pr.proposal_failstamp = 0 AND pr.activated_deal_id IS NULL + + -- ordering is critical + ORDER BY ps.position + `, + proposal.String(), + sp, + ); err != nil { + return service.PieceManifest{}, err + } + + if len(pcs) == 0 { + return service.PieceManifest{}, service.ErrManifestNotFound + } + + aggCP, err := fil.CommPFromPieceInfo(filabi.PieceInfo{ + Size: 1 << pcs[0].AggLog2Size, + PieceCID: cid.MustParse(pcs[0].AggPCidV1), + }) + if err != nil { + return service.PieceManifest{}, err + } + + segCids := make([]cid.Cid, 0, len(pcs)) + for _, pc := range pcs { + s, err := cid.Parse(pc.SegPCidV2) + if err != nil { + return service.PieceManifest{}, fmt.Errorf("parsing segment CID %q: %w", pc.SegPCidV2, err) + } + segCids = append(segCids, s) + } + + return service.PieceManifest{ + PieceCid: aggCP.PCidV2(), + SegmentCids: segCids, + UrlTemplate: pcs[0].UrlTemplate, + }, nil +} diff --git a/service/pg/pg.go b/service/pg/pg.go new file mode 100644 index 0000000..1493539 --- /dev/null +++ b/service/pg/pg.go @@ -0,0 +1,44 @@ +package pg + +import ( + "context" + + "github.com/georgysavva/scany/pgxscan" + "github.com/jackc/pgconn" + "github.com/jackc/pgx/v4" + "github.com/storacha/spade/internal/app" + "github.com/storacha/spade/service" +) + +type PgClient interface { + pgxscan.Querier + BeginFunc(ctx context.Context, f func(pgx.Tx) error) error + Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error) + QueryRow(ctx context.Context, sql string, args ...any) pgx.Row +} + +type PgLotusSpadeService struct { + db PgClient + lotusAPI service.SpadeLotusClient + lookback uint +} + +type PgLotusSpadeServiceOption func(*PgLotusSpadeService) + +// WithLookback sets the number of lookback epochs. +func WithLookback(lookback uint) PgLotusSpadeServiceOption { + return func(s *PgLotusSpadeService) { + s.lookback = lookback + } +} + +// New creates a new Spade service backed by Postgres and Lotus. +func New(db PgClient, lotusAPI service.SpadeLotusClient, opts ...PgLotusSpadeServiceOption) *PgLotusSpadeService { + s := &PgLotusSpadeService{db: db, lotusAPI: lotusAPI, lookback: uint(app.FilDefaultLookback)} + for _, opt := range opts { + opt(s) + } + return s +} + +var _ service.SpadeService = (*PgLotusSpadeService)(nil) diff --git a/service/pg/proposal.go b/service/pg/proposal.go new file mode 100644 index 0000000..2549650 --- /dev/null +++ b/service/pg/proposal.go @@ -0,0 +1,81 @@ +package pg + +import ( + "context" + "time" + + "code.riba.cloud/go/toolbox-interplanetary/fil" + filbuiltin "github.com/filecoin-project/go-state-types/builtin" + "github.com/georgysavva/scany/pgxscan" + "github.com/storacha/spade/service" +) + +func (p *PgLotusSpadeService) PendingProposals(ctx context.Context, sp fil.ActorID) ([]service.PendingProposal, error) { + pending := make([]service.PendingProposal, 0, 4096) + + if err := pgxscan.Select( + ctx, + p.db, + &pending, + ` + SELECT + pr.proposal_uuid AS proposal_id, + pr.piece_id, + pr.proposal_meta->>'signed_proposal_cid' AS proposal_cid, + pr.start_epoch, + pr.client_id, + pr.proposal_delivered, + c.tenant_id, + p.piece_cid, + pr.proxied_log2_size AS piece_log2_size, + pr.proposal_failstamp, + pr.proposal_meta->>'failure' AS error, + ( EXISTS ( + SELECT 42 + FROM spd.published_deals pd + WHERE + pd.piece_id = pr.piece_id + AND + pd.provider_id = pr.provider_id + AND + pd.client_id = pr.client_id + AND + pd.status = 'published' + ) ) AS is_published, + ARRAY( + SELECT uri FROM spd.sources_uri WHERE sources_uri.piece_id = pr.piece_id + ) AS data_sources, + ( + CASE WHEN (p.piece_meta->'is_frc58_segmented')::bool THEN 'frc58' ELSE NULL END + ) AS segmentation + FROM spd.proposals pr + JOIN spd.pieces p USING ( piece_id ) + JOIN spd.clients c USING ( client_id ) + LEFT JOIN spd.mv_pieces_availability pa USING ( piece_id ) + WHERE + pr.provider_id = $1 + AND + pr.start_epoch > $2 + AND + pr.activated_deal_id is NULL + AND + ( + pr.proposal_failstamp = 0 + OR + -- show everything failed in the past N hours + pr.proposal_failstamp > ( spd.big_now() - $3::BIGINT * 3600 * 1000 * 1000 * 1000 ) + ) + ORDER BY + pr.proposal_failstamp DESC, + ( pr.start_epoch / 360 ), -- 3h sort granularity + pr.proxied_log2_size, + p.piece_cid + `, + sp, + fil.ClockMainnet.TimeToEpoch(time.Now())+filbuiltin.EpochsInHour, + service.ShowRecentFailuresHours, + ); err != nil { + return nil, err + } + return pending, nil +} diff --git a/service/pg/reservation.go b/service/pg/reservation.go new file mode 100644 index 0000000..6d2cd78 --- /dev/null +++ b/service/pg/reservation.go @@ -0,0 +1,320 @@ +package pg + +import ( + "context" + "math/bits" + "time" + + "code.riba.cloud/go/toolbox-interplanetary/fil" + "code.riba.cloud/go/toolbox/cmn" + "github.com/dgraph-io/ristretto" + filabi "github.com/filecoin-project/go-state-types/abi" + filbig "github.com/filecoin-project/go-state-types/big" + filbuiltin "github.com/filecoin-project/go-state-types/builtin" + filmarket "github.com/filecoin-project/go-state-types/builtin/v9/market" + "github.com/georgysavva/scany/pgxscan" + "github.com/google/uuid" + "github.com/ipfs/go-cid" + "github.com/jackc/pgx/v4" + "github.com/storacha/spade/apitypes" + "github.com/storacha/spade/internal/app" + "github.com/storacha/spade/service" +) + +const requestPieceLockStatement = `SELECT PG_ADVISORY_XACT_LOCK( 1234567890111 )` + +func (p *PgLotusSpadeService) ReservePiece(ctx context.Context, sp fil.ActorID, spInfo apitypes.SPInfo, piece cid.Cid, options ...service.ReservePieceOption) ([]apitypes.TenantReplicationState, error) { + cfg := service.ReservePieceConfig{} + for _, opt := range options { + opt(&cfg) + } + + err := spIneligibleErr(ctx, p.db, p.lotusAPI, sp, p.lookback) + if err != nil { + return nil, err + } + + var replStates []apitypes.TenantReplicationState + if err := p.db.BeginFunc(ctx, func(tx pgx.Tx) error { + _, err := tx.Exec(ctx, requestPieceLockStatement) + if err != nil { + return err + } + + type tenantEligible struct { + apitypes.TenantReplicationState + IsExclusive bool `db:"exclusive_replication"` + TenantClientID *fil.ActorID `db:"client_id_to_use"` + TenantClientAddress *string `db:"client_address_to_use"` + + PieceID int64 + PieceSizeBytes int64 + + DealDurationDays int16 + StartWithinHours int16 + RecentlyUsedStartEpoch *int64 + + TenantMeta []byte + } + + tenantsEligible := make([]tenantEligible, 0, 8) + + if err := pgxscan.Select( + ctx, + tx, + &tenantsEligible, + ` + SELECT + * + FROM spd.piece_realtime_eligibility( $1, $2 ) + WHERE + ( 0 = $3 OR tenant_id = $3) + `, + sp, + piece, + cfg.TenantID, + ); err != nil { + return err + } + + if len(tenantsEligible) == 0 { + return service.ErrUnclaimedPiece + } + + if tenantsEligible[0].PieceSizeBytes > 1<= te.MaxTotal || + te.InOrg >= te.MaxOrg || + te.InCity >= te.MaxCity || + te.InCountry >= te.MaxCountry || + te.InContinent >= te.MaxContinent { + countOverReplicated++ + invalidated = true + } + if te.SpInFlightBytes+te.PieceSizeBytes > te.MaxInFlightBytes { + countOverPending++ + invalidated = true + } + + if !invalidated && chosenTenant == nil { + chosenTenant = &te + } + } + + // handle "no takers" here, for ease of reading further down + // this is slightly convoluted since we can have a "mixed error condition" - handled in the default: + if chosenTenant == nil { + switch len(tenantsEligible) { + case countAlreadyDealt: + return service.ErrProviderHasReplica + case countNoDataCap: + return service.ErrTenantsOutOfDatacap + case countOverReplicated: + return service.ErrTooManyReplicas + case countOverPending: + return service.ErrProviderAboveMaxInFlight + default: + return service.ErrReplicationRulesViolation + } + } + + if cfg.TenantPolicy != app.TEMPPolicies[chosenTenant.TenantID] { + return service.ErrTenantPolicyMismatch + } + + // + // Here, at the very end, is where we would make a tightly-timeboxed outbound call + // to check for potential external eligibility criteria + // Then either return ErrExternalReservationRefused or proceed below. + // + // We *DO* always check using our own replication rules first, and keep a lock for the duration + // ( in order to maintain a uniform "decency floor" among our esteemed SPs ;) + // + + // We got that far - let's do it! + startEpoch := fil.ClockMainnet.TimeToEpoch(time.Now().Add( + time.Hour * time.Duration(chosenTenant.StartWithinHours), + )) + + // a lot of this logic is broken / needs to be replaced by something saner. But... in another life. + if chosenTenant.RecentlyUsedStartEpoch != nil { + startEpoch = filabi.ChainEpoch(*chosenTenant.RecentlyUsedStartEpoch) + } + + // round the epoch down to a day boundary + // we *must* work with startEpoch/StartWithinHours to produce identical retry-deals + // 2h +/- because network started at 22:00 UTC + rde := ((startEpoch-app.FilDefaultLookback-(filbuiltin.EpochsInHour*filabi.ChainEpoch(chosenTenant.StartWithinHours))-240)/2880)*2880 + 240 + + // this is relatively expensive to do within the txn lock + // however we cache it and call it exactly once per day, so we should be fine + gbpce, err := service.ProviderCollateralEstimateGiB( + ctx, rde, + ) + if err != nil { + return cmn.WrErr(err) + } + + encodedLabel, err := filmarket.NewLabelFromString(piece.String()) + if err != nil { + return cmn.WrErr(err) + } + + prop := struct { + ProposalV0 filmarket.DealProposal `json:"filmarket_proposal"` + }{ + ProposalV0: filmarket.DealProposal{ + // do not change under any circumstances: even when payments eventually happen, they will happen explicitly out of band + // ( a notable exception here would be contract-listener style interactions, but that's way off ) + StoragePricePerEpoch: filbig.Zero(), // DO NOT CHANGE + + VerifiedDeal: true, + PieceCID: piece, + PieceSize: filabi.PaddedPieceSize(chosenTenant.PieceSizeBytes), + + Provider: sp.AsFilAddr(), + Client: chosenTenant.TenantClientID.AsFilAddr(), + + StartEpoch: startEpoch, + EndEpoch: startEpoch + filabi.ChainEpoch(chosenTenant.DealDurationDays)*filbuiltin.EpochsInDay, + Label: encodedLabel, + + ClientCollateral: filbig.Zero(), + ProviderCollateral: filbig.Rsh( + filbig.Mul(gbpce, filbig.NewInt(chosenTenant.PieceSizeBytes)), + 30, + ), + }, + } + + // inherit the request uuid as the proposal uuid (a uuid is a uuid is a uuid) + proposalID := cfg.RequestID + if proposalID == uuid.Nil { + newUUID, err := uuid.NewRandom() + if err != nil { + return err + } + proposalID = newUUID + } + + if _, err := tx.Exec( + ctx, + ` + INSERT INTO spd.proposals + ( proposal_uuid, piece_id, provider_id, client_id, start_epoch, end_epoch, proxied_log2_size, proposal_meta ) + VALUES ( $1, $2, $3, $4, $5, $6, $7, $8 ) + `, + proposalID.String(), + chosenTenant.PieceID, + sp, + *chosenTenant.TenantClientID, + prop.ProposalV0.StartEpoch, + prop.ProposalV0.EndEpoch, + bits.TrailingZeros64(uint64(chosenTenant.PieceSizeBytes)), + prop, + ); err != nil { + return err + } + + // we managed - bump the counts where applicable and return stats + for i := range tenantsEligible { + if tenantsEligible[i].IsExclusive && replStates[i].TenantID != chosenTenant.TenantID { + continue + } + replStates[i].Total++ + replStates[i].InOrg++ + replStates[i].InCity++ + replStates[i].InCountry++ + replStates[i].InContinent++ + replStates[i].DealAlreadyExists = true + replStates[i].SpInFlightBytes += chosenTenant.PieceSizeBytes + } + + return nil + }); err != nil { + return replStates, err + } + + return replStates, nil +} + +// using ristretto here because of SetWithTTL() below +var providerEligibleCache, _ = ristretto.NewCache(&ristretto.Config{ + NumCounters: 1e7, BufferItems: 64, + MaxCost: 1024, + Cost: func(interface{}) int64 { return 1 }, +}) + +func spIneligibleErr(ctx context.Context, db PgClient, filClient service.ReservationLotusClient, spID fil.ActorID, lookback uint) (defErr error) { + // do not cache chain-independent factors + var ignoreChainEligibility bool + err := db.QueryRow( + ctx, + ` + SELECT COALESCE( ( provider_meta->'ignore_chain_eligibility' )::BOOL, false ) + FROM spd.providers + WHERE + NOT COALESCE( ( provider_meta->'globally_inactivated' )::BOOL, false ) + AND + provider_id = $1 + `, + spID, + ).Scan(&ignoreChainEligibility) + if err == pgx.ErrNoRows { + return service.ErrStorageProviderSuspended + } else if err != nil { + return err + } else if ignoreChainEligibility { + return nil + } + + defer func() { + if defErr != nil { + providerEligibleCache.Del(uint64(spID)) + } else { + providerEligibleCache.SetWithTTL(uint64(spID), true, 1, time.Minute) + } + }() + + if _, found := providerEligibleCache.Get(uint64(spID)); found { + return nil + } + + curTipset, err := service.GetTipset(ctx, filClient, lookback) + if err != nil { + return err + } + + mbi, err := filClient.MinerGetBaseInfo(ctx, spID.AsFilAddr(), curTipset.Height(), curTipset.Key()) + if err != nil { + return err + } + if mbi == nil || !mbi.EligibleForMining { + return service.ErrStorageProviderIneligibleToMine + } + + return nil +} diff --git a/service/service.go b/service/service.go new file mode 100644 index 0000000..c2d5bd4 --- /dev/null +++ b/service/service.go @@ -0,0 +1,187 @@ +package service + +import ( + "context" + "errors" + "net/http" + "net/url" + "time" + + "code.riba.cloud/go/toolbox-interplanetary/fil" + "github.com/google/uuid" + "github.com/ipfs/go-cid" + "github.com/storacha/spade/apitypes" +) + +// Service errors map to [apitypes.APIErrorCode] codes. +var ( + ErrManifestNotFound = errors.New("piece manifest not found") + ErrUnclaimedPiece = errors.New("piece is not claimed by any selected tenant") + ErrOversizedPiece = errors.New("piece size exceeds provider's sector size") + ErrProviderHasReplica = errors.New("provider already has proposed or active replica according to all selected replication rules") + ErrTenantsOutOfDatacap = errors.New("all selected tenants are out of DataCap") + ErrTooManyReplicas = errors.New("piece is over-replicated according to all selected replication rules") + ErrProviderAboveMaxInFlight = errors.New("provider has more proposals in-flight than permitted by selected tenant rules") + ErrReplicationRulesViolation = errors.New("no selected tenants would grant a deal according to their individual rules") + ErrTenantPolicyMismatch = errors.New("tenant policy does not match the expected value") + ErrStorageProviderSuspended = errors.New("provider is suspended") + ErrStorageProviderIneligibleToMine = errors.New("provider is ineligible to mine new deals") +) + +type EligiblePiece struct { + PieceID int64 + PieceLog2Size uint8 + Tenants []int16 `db:"tenant_ids"` + apitypes.Piece +} + +type EligiblePiecesOption = func(*EligiblePiecesConfig) + +type EligiblePiecesConfig struct { + TenantID int16 + Limit uint64 + IncludeSourceless bool +} + +func WithEligiblePiecesIncludeSourceless(include bool) EligiblePiecesOption { + return func(opts *EligiblePiecesConfig) { + opts.IncludeSourceless = include + } +} + +func WithEligiblePiecesTenantID(tenantID int16) EligiblePiecesOption { + return func(opts *EligiblePiecesConfig) { + opts.TenantID = tenantID + } +} + +func WithEligiblePiecesLimit(limit uint64) EligiblePiecesOption { + return func(opts *EligiblePiecesConfig) { + opts.Limit = limit + } +} + +type ReservePieceOption = func(*ReservePieceConfig) + +type ReservePieceConfig struct { + TenantID int16 + TenantPolicy string + RequestID uuid.UUID +} + +// WithReservePieceTenantID sets an optional tenant ID the piece should be +// reserved with. +func WithReservePieceTenantID(tenantID int16) ReservePieceOption { + return func(opts *ReservePieceConfig) { + opts.TenantID = tenantID + } +} + +// WithReservePieceTenantPolicy sets an optional tenant policy string to +// validate against when reserving the piece. Note: if the chosen tenant has a +// policy configured, this must match. i.e. this is mandatory for tenants that +// have a policy set. +func WithReservePieceTenantPolicy(tenantPolicy string) ReservePieceOption { + return func(opts *ReservePieceConfig) { + opts.TenantPolicy = tenantPolicy + } +} + +// WithReservePieceRequestID sets an optional request ID to associate with the +// reservation. +func WithReservePieceRequestID(requestID uuid.UUID) ReservePieceOption { + return func(opts *ReservePieceConfig) { + opts.RequestID = requestID + } +} + +type PendingProposal struct { + apitypes.DealProposal + ClientID fil.ActorID + PieceID int64 + ProposalFailstamp int64 + Error *string + ProposalDelivered *time.Time + IsPublished bool + PieceLog2Size int8 +} + +type PieceManifest struct { + PieceCid cid.Cid // aggregated piece CID (v2) + SegmentCids []cid.Cid // segment piece CIDs (v2) + UrlTemplate string +} + +type Request struct { + Method string + Host string + Path string + Params url.Values + Headers http.Header +} + +type Authorization struct { + RequestID uuid.UUID + SignedArgs url.Values + StateEpoch int64 + ProviderID fil.ActorID + ProviderDetails [4]int16 + ProviderInfo apitypes.SPInfo + LastPoll *time.Time +} + +type ErrorLogger interface { + // RequestError logs an error associated with a specific request ID. + RequestError(ctx context.Context, requestID uuid.UUID, code apitypes.APIErrorCode, message string, payload any) error +} + +type AuthorizationService interface { + ErrorLogger + // Authorize validates and verifies a request's SPID challenge and returns + // authorization details. + Authorize(ctx context.Context, req Request) (Authorization, error) +} + +type EligibilityService interface { + ErrorLogger + // EligiblePieces lists Piece CIDs a storage provider is eligible to receive a + // deal for. Unless configured differently, [listEligibleDefaultSize] pieces + // are returned. The boolean return value indicates whether there are more + // pieces available beyond those returned. + EligiblePieces(ctx context.Context, storageProvider fil.ActorID, options ...EligiblePiecesOption) ([]EligiblePiece, bool, error) +} + +type ProposalService interface { + ErrorLogger + // PendingProposals lists current outstanding reservations including those in + // error. + PendingProposals(ctx context.Context, storageProvider fil.ActorID) ([]PendingProposal, error) +} + +type PieceManifestService interface { + ErrorLogger + // PieceManifest produces a manifest for a segmented piece. + PieceManifest(ctx context.Context, storageProvider fil.ActorID, proposal uuid.UUID) (PieceManifest, error) +} + +type ReservationService interface { + ErrorLogger + // ReservePiece requests a deal proposal (and thus reservation) for a specific + // Piece CID. Note: replication states may be returned for feedback to users + // even when an error occurs. + ReservePiece(ctx context.Context, storageProvider fil.ActorID, storageProviderInfo apitypes.SPInfo, piece cid.Cid, options ...ReservePieceOption) ([]apitypes.TenantReplicationState, error) +} + +type StatusService interface { + ErrorLogger +} + +// SpadeService defines the core business logic of the Spade SP API. +type SpadeService interface { + AuthorizationService + ErrorLogger + EligibilityService + ProposalService + PieceManifestService + ReservationService +} diff --git a/spid/args/args.go b/spid/args/args.go new file mode 100644 index 0000000..34605d8 --- /dev/null +++ b/spid/args/args.go @@ -0,0 +1,59 @@ +package args + +import ( + "fmt" + "net/url" + + "github.com/storacha/spade/spid/signature" +) + +// Args are optional signed SPID authorization arguments. +type Args struct { + data []byte + sig []byte +} + +// Raw retrieves the raw argument bytes. +func (a Args) Raw() []byte { + return a.data +} + +func (a Args) Signature() []byte { + return a.sig +} + +// Values parses the argument bytes as URL-encoded query values. +func (a Args) Values() (url.Values, error) { + v, err := url.ParseQuery(string(a.data)) + if err != nil { + return nil, fmt.Errorf("parsing arguments as URL query: %w", err) + } + return v, nil +} + +func New(data, sig []byte) Args { + return Args{data: data, sig: sig} +} + +// NewFromValues creates signed [Args] from URL values and signed using the +// provided signer and drand randomness beacon data. +func NewFromValues(signer signature.Signer, entropy []byte, values url.Values) (Args, error) { + msg := []byte(values.Encode()) + sig, err := Sign(signer, entropy, msg) + if err != nil { + return Args{}, err + } + return New(msg, sig), nil +} + +func SignaturePayload(entropy []byte, data []byte) []byte { + return append(append([]byte{0x20, 0x20, 0x20}, entropy...), data...) +} + +func Sign(signer signature.Signer, entropy []byte, data []byte) ([]byte, error) { + return signer.Sign(SignaturePayload(entropy, data)) +} + +func Verify(verifier signature.Verifier, entropy []byte, args Args) error { + return verifier.Verify(SignaturePayload(entropy, args.Raw()), args.Signature()) +} diff --git a/spid/lotus/lotus.go b/spid/lotus/lotus.go new file mode 100644 index 0000000..4bee954 --- /dev/null +++ b/spid/lotus/lotus.go @@ -0,0 +1,158 @@ +// SPID Lotus integration utilities +package lotus + +import ( + "context" + "fmt" + "net/url" + "time" + + "code.riba.cloud/go/toolbox-interplanetary/fil" + filaddr "github.com/filecoin-project/go-address" + filabi "github.com/filecoin-project/go-state-types/abi" + filprovider "github.com/filecoin-project/go-state-types/builtin/v9/miner" + "github.com/filecoin-project/go-state-types/crypto" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" + lru "github.com/hashicorp/golang-lru/v2" + "github.com/storacha/spade/spid" + "github.com/storacha/spade/spid/args" +) + +type LotusWalletSignerClient interface { + // WalletSign signs the given bytes with the specified address. + WalletSign(context.Context, filaddr.Address, []byte) (*crypto.Signature, error) +} + +// LotusSigner is a [signature.Signer] that uses a Lotus node's wallet to sign +// messages. +type LotusSigner struct { + Context context.Context // context for the API call + Addr filaddr.Address // address to sign with + Client LotusWalletSignerClient // lotus wallet client +} + +func (ls LotusSigner) Sign(msg []byte) ([]byte, error) { + sig, err := ls.Client.WalletSign(ls.Context, ls.Addr, msg) + if err != nil { + return nil, fmt.Errorf("signing message: %w", err) + } + return sig.Data, nil +} + +type LotusTipSetGetterClient interface { + // ChainGetTipSetByHeight looks back for a tipset at the specified epoch. + ChainGetTipSetByHeight(context.Context, filabi.ChainEpoch, types.TipSetKey) (*types.TipSet, error) +} + +type LotusWorkerAddressResolverClient interface { + LotusTipSetGetterClient + // StateAccountKey retrieves the key address for an account at a given tipset. + StateAccountKey(context.Context, filaddr.Address, types.TipSetKey) (filaddr.Address, error) + // StateMinerInfo retrieves miner info at a given tipset. + StateMinerInfo(context.Context, filaddr.Address, types.TipSetKey) (api.MinerInfo, error) +} + +type LotusSignatureVerifierClient interface { + LotusTipSetGetterClient + LotusWorkerAddressResolverClient +} + +type LotusBeaconGetterAndWalletSignerClient interface { + LotusTipSetGetterClient + LotusWalletSignerClient +} + +var beaconCache, _ = lru.New[filabi.ChainEpoch, types.BeaconEntry](spid.SigGraceEpochs * 4) + +func BeaconByHeight(ctx context.Context, client LotusTipSetGetterClient, epoch filabi.ChainEpoch) (types.BeaconEntry, error) { + be, didFind := beaconCache.Get(epoch) + if !didFind { + var curChallengeTs *fil.LotusTS + var err error + + // Do it a few times because lotus is getting slower and slower to finalize 😭 + // Can't sleep too much though not to timeout the call + // spid.bash has been adjusted with a backoff to deal with this as well + for i := 0; i < 3; i++ { + curChallengeTs, err = client.ChainGetTipSetByHeight(ctx, epoch, fil.LotusTSK{}) + if err == nil { + break + } + time.Sleep(200 * time.Millisecond) + } + + if err != nil { + // do not make slow-chain a 500 + return types.BeaconEntry{}, fmt.Errorf( + "unable to get tipset at height %d (%s): %s", + epoch, fil.ClockMainnet.EpochToTime(epoch), + err, + ) + } + be = curChallengeTs.Blocks()[0].BeaconEntries[len(curChallengeTs.Blocks()[0].BeaconEntries)-1] + beaconCache.Add(epoch, be) + } + return be, nil +} + +// ResolveWorkerAddress resolves the worker address for the given storage +// provider address. The address must be resolvable as of +// [filprovider.ChainFinality] epochs before the specified epoch. +func ResolveWorkerAddress(ctx context.Context, client LotusWorkerAddressResolverClient, addr filaddr.Address, epoch filabi.ChainEpoch) (filaddr.Address, error) { + miFinTs, err := client.ChainGetTipSetByHeight(ctx, epoch-filprovider.ChainFinality, fil.LotusTSK{}) + if err != nil { + return filaddr.Address{}, err + } + mi, err := client.StateMinerInfo(ctx, addr, miFinTs.Key()) + if err != nil { + return filaddr.Address{}, err + } + return client.StateAccountKey(ctx, mi.Worker, miFinTs.Key()) +} + +// ResolveAndVerify resolves the worker address for the given storage provider +// address at the challenge epoch, retrieves the beacon entry for that epoch, +// and verifies the signature over the challenge args. +// +// This is a convenience function that combines [ResolveWorkerAddress], +// [BeaconByHeight], and [Verify] in one step. +func ResolveAndVerify(ctx context.Context, client LotusSignatureVerifierClient, challenge spid.Challenge) error { + worker, err := ResolveWorkerAddress(ctx, client, challenge.Addr(), challenge.Epoch()) + if err != nil { + return fmt.Errorf("resolving worker address %q at height %d: %w", challenge.Addr(), challenge.Epoch(), err) + } + + beacon, err := BeaconByHeight(ctx, client, challenge.Epoch()) + if err != nil { + return fmt.Errorf("getting beacon at height %d: %w", challenge.Epoch(), err) + } + + return spid.Verify(worker, beacon.Data, challenge) +} + +// New creates a new SPID challenge for the given storage provider address and +// values, using the Lotus API to retrieve the beacon data for the current epoch +// and sign the args. +func New(ctx context.Context, client LotusBeaconGetterAndWalletSignerClient, addr filaddr.Address, values url.Values) (spid.Challenge, error) { + epoch := fil.ClockMainnet.TimeToEpoch(time.Now()) + beacon, err := BeaconByHeight(ctx, client, epoch) + if err != nil { + return spid.Challenge{}, fmt.Errorf("getting beacon at height %d: %w", epoch, err) + } + args, err := NewArgs(ctx, client, addr, beacon.Data, values) + if err != nil { + return spid.Challenge{}, fmt.Errorf("creating SPID args: %w", err) + } + return spid.New(addr, epoch, args) +} + +// NewArgs creates SPID authorization args using the Lotus API to sign them. +func NewArgs(ctx context.Context, client LotusWalletSignerClient, addr filaddr.Address, entropy []byte, values url.Values) (args.Args, error) { + signer := LotusSigner{ + Context: ctx, + Addr: addr, + Client: client, + } + return args.NewFromValues(signer, entropy, values) +} diff --git a/spid/lotus/lotus_test.go b/spid/lotus/lotus_test.go new file mode 100644 index 0000000..26e5e6f --- /dev/null +++ b/spid/lotus/lotus_test.go @@ -0,0 +1,149 @@ +package lotus_test + +import ( + "context" + "encoding/base64" + "fmt" + "net/url" + "testing" + "time" + + "code.riba.cloud/go/toolbox-interplanetary/fil" + filaddr "github.com/filecoin-project/go-address" + filabi "github.com/filecoin-project/go-state-types/abi" + filprovider "github.com/filecoin-project/go-state-types/builtin/v9/miner" + "github.com/filecoin-project/go-state-types/crypto" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" + "github.com/ipfs/go-cid" + blsgo "github.com/jsign/go-filsigner/bls" + "github.com/storacha/spade/spid" + spid_lotus "github.com/storacha/spade/spid/lotus" + "github.com/stretchr/testify/require" +) + +func TestLotus(t *testing.T) { + sp, err := filaddr.NewIDAddress(1000) + require.NoError(t, err) + + workerID, err := filaddr.NewIDAddress(1001) + require.NoError(t, err) + + pk, err := base64.StdEncoding.DecodeString("DHdFHAqEZV/DJ8WlHqHkvxyEGqUfOJd78QwrkqkFrp4=") + require.NoError(t, err) + + workerKey, err := blsgo.GetPubKey(pk) + require.NoError(t, err) + + argVals := url.Values{} + argVals.Set("foo", "bar") + + t.Run("roundtrip", func(t *testing.T) { + epoch := fil.ClockMainnet.TimeToEpoch(time.Now()) + + lotusAPI := &mockLotusClient{ + t: t, + wallets: map[filaddr.Address][]byte{sp: pk}, + miners: map[filaddr.Address]filaddr.Address{sp: workerID}, + workers: map[filaddr.Address]filaddr.Address{workerID: workerKey}, + beacons: map[filabi.ChainEpoch]fil.LotusBeaconEntry{ + epoch: {Round: 1000, Data: []byte("beacon_for_1000")}, + // we add a beacon for the next epoch, just incase we transition during + // the test. + epoch + 1: {Round: 1000, Data: []byte("beacon_for_1001")}, + // we must add a beacon for the finality epoch too, as it's used + // to resolve the worker address. + epoch - filprovider.ChainFinality: {Round: 100, Data: []byte("beacon_for_100")}, + epoch - filprovider.ChainFinality + 1: {Round: 101, Data: []byte("beacon_for_101")}, + }, + } + + id, err := spid_lotus.New(context.Background(), lotusAPI, sp, argVals) + require.NoError(t, err) + + idstr := spid.Format(id) + t.Logf("Authorization: %s", idstr) + + parsed, err := spid.Parse(idstr) + require.NoError(t, err) + require.Equal(t, id, parsed) + + err = spid_lotus.ResolveAndVerify(context.Background(), lotusAPI, parsed) + require.NoError(t, err) + }) +} + +type mockLotusClient struct { + t *testing.T + wallets map[filaddr.Address][]byte // address -> private key (bls only) + miners map[filaddr.Address]filaddr.Address // miner ID -> worker ID + workers map[filaddr.Address]filaddr.Address // worker ID -> worker key + beacons map[filabi.ChainEpoch]fil.LotusBeaconEntry +} + +func (m *mockLotusClient) ChainGetTipSetByHeight(ctx context.Context, epoch filabi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) { + be, ok := m.beacons[epoch] + if !ok { + return nil, fmt.Errorf("tipset for epoch %d not found", epoch) + } + bh := mockBlockHeader(m.t) + bh.BeaconEntries = []types.BeaconEntry{be} + return types.NewTipSet([]*types.BlockHeader{bh}) +} + +func (m *mockLotusClient) StateAccountKey(ctx context.Context, worker filaddr.Address, tsk types.TipSetKey) (filaddr.Address, error) { + worker, ok := m.workers[worker] + if !ok { + return filaddr.Address{}, fmt.Errorf("account key for worker %s not found", worker) + } + return worker, nil +} + +func (m *mockLotusClient) StateMinerInfo(ctx context.Context, addr filaddr.Address, tsk types.TipSetKey) (api.MinerInfo, error) { + worker, ok := m.miners[addr] + if !ok { + return api.MinerInfo{}, fmt.Errorf("info for miner %s not found", addr) + } + return api.MinerInfo{Worker: worker}, nil +} + +func (m *mockLotusClient) WalletSign(ctx context.Context, addr filaddr.Address, msg []byte) (*crypto.Signature, error) { + pk, ok := m.wallets[addr] + if !ok { + return nil, fmt.Errorf("no wallet for address %s", addr) + } + sig, err := blsgo.Sign(pk, msg) + if err != nil { + return nil, fmt.Errorf("signing message: %w", err) + } + return &crypto.Signature{Type: crypto.SigTypeBLS, Data: sig}, nil +} + +var _ spid_lotus.LotusSignatureVerifierClient = (*mockLotusClient)(nil) + +func mockBlockHeader(t testing.TB) *types.BlockHeader { + addr, err := filaddr.NewIDAddress(12512063) + require.NoError(t, err) + + c, err := cid.Decode("bafyreicmaj5hhoy5mgqvamfhgexxyergw7hdeshizghodwkjg6qmpoco7i") + require.NoError(t, err) + + return &types.BlockHeader{ + Miner: addr, + Ticket: &types.Ticket{ + VRFProof: []byte("vrf proof0000000vrf proof0000000"), + }, + ElectionProof: &types.ElectionProof{ + VRFProof: []byte("vrf proof0000000vrf proof0000000"), + }, + Parents: []cid.Cid{c, c}, + ParentMessageReceipts: c, + BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS, Data: []byte("boo! im a signature")}, + ParentWeight: types.NewInt(123125126212), + Messages: c, + Height: 85919298723, + ParentStateRoot: c, + BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS, Data: []byte("boo! im a signature")}, + ParentBaseFee: types.NewInt(3432432843291), + } +} diff --git a/spid/signature/bls.go b/spid/signature/bls.go new file mode 100644 index 0000000..e544fa3 --- /dev/null +++ b/spid/signature/bls.go @@ -0,0 +1,31 @@ +package signature + +import ( + "fmt" + + filaddr "github.com/filecoin-project/go-address" + blsgo "github.com/jsign/go-filsigner/bls" +) + +type BLSVerifier struct { + Addr filaddr.Address +} + +func (v BLSVerifier) Verify(msg []byte, sig []byte) error { + sigMatch, err := blsgo.Verify(v.Addr.Payload(), msg, sig) + if err != nil { + return fmt.Errorf("BLS signature verification failed: %w", err) + } + if !sigMatch { + return fmt.Errorf("BLS signature verification failed") + } + return nil +} + +type BLSSigner struct { + PrivateKey []byte +} + +func (v BLSSigner) Sign(msg []byte) ([]byte, error) { + return blsgo.Sign(v.PrivateKey, msg) +} diff --git a/spid/signature/signature.go b/spid/signature/signature.go new file mode 100644 index 0000000..e58b084 --- /dev/null +++ b/spid/signature/signature.go @@ -0,0 +1,9 @@ +package signature + +type Signer interface { + Sign(msg []byte) ([]byte, error) +} + +type Verifier interface { + Verify(msg []byte, sig []byte) error +} diff --git a/spid/spid.go b/spid/spid.go new file mode 100644 index 0000000..8fed50a --- /dev/null +++ b/spid/spid.go @@ -0,0 +1,155 @@ +package spid + +import ( + "encoding/base64" + "fmt" + "regexp" + "strconv" + "time" + + "code.riba.cloud/go/toolbox-interplanetary/fil" + filaddr "github.com/filecoin-project/go-address" + filabi "github.com/filecoin-project/go-state-types/abi" + lru "github.com/hashicorp/golang-lru/v2" + "github.com/storacha/spade/spid/args" + "github.com/storacha/spade/spid/signature" +) + +const ( + Scheme = `FIL-SPID-V0` + SigGraceEpochs = 5 // 30 secs per epoch +) + +var ( + spAuthRe = regexp.MustCompile( + `^` + Scheme + `\s+` + + // fil epoch + `([0-9]+)` + `\s*;\s*` + + // spID + `([ft]0[0-9]+)` + `\s*;\s*` + + // signature + `([^; ]+)` + + // optional signed argument + `(?:\s*\;\s*([^; ]+))?` + + `\s*$`, + ) + challengeCache, _ = lru.New[string, verifySigResult](SigGraceEpochs * 128) +) + +type verifySigResult struct { + invalidSigErr error +} + +type rawHeader struct { + epoch string + addr string + sigB64 string + arg string +} + +// Challenge is a parsed SPID authorization that can be verified. +type Challenge struct { + epoch filabi.ChainEpoch // Filecoin epoch + addr filaddr.Address // storage provider address + args args.Args // decoded signed args +} + +func New(addr filaddr.Address, epoch filabi.ChainEpoch, args args.Args) (Challenge, error) { + return Challenge{addr: addr, epoch: epoch, args: args}, nil +} + +func (c Challenge) Addr() filaddr.Address { + return c.addr +} + +func (c Challenge) Epoch() filabi.ChainEpoch { + return c.epoch +} + +// Args retrieves the signed arguments. +func (c Challenge) Args() args.Args { + return c.args +} + +func (c Challenge) String() string { + return Format(c) +} + +// Parse parses the provided Authorization header string as a SPID Challenge. +// Note: argument signature verification is not performed here. +func Parse(s string) (Challenge, error) { + var challenge Challenge + res := spAuthRe.FindStringSubmatch(s) + + if len(res) != 4 && len(res) != 5 { + return Challenge{}, fmt.Errorf("invalid %s auth header: %q", Scheme, s) + } + + hdr := rawHeader{} + hdr.epoch, hdr.addr, hdr.sigB64 = res[1], res[2], res[3] + if len(res) == 5 { // allow for optional argument + hdr.arg = res[4] + } + + var err error + challenge.addr, err = filaddr.NewFromString(hdr.addr) + if err != nil { + return Challenge{}, fmt.Errorf("unexpected %s auth address: %q", Scheme, hdr.addr) + } + + e, err := strconv.ParseInt(hdr.epoch, 10, 32) + if err != nil { + return Challenge{}, fmt.Errorf("unexpected %s auth epoch: %s", Scheme, hdr.epoch) + } + challenge.epoch = filabi.ChainEpoch(e) + + curFilEpoch := fil.ClockMainnet.TimeToEpoch(time.Now()) + if curFilEpoch < challenge.epoch { + return Challenge{}, fmt.Errorf("%s auth epoch is in the future: %d", Scheme, challenge.epoch) + } + if curFilEpoch-challenge.epoch > SigGraceEpochs { + return Challenge{}, fmt.Errorf("%s auth epoch is too far in the past: %d", Scheme, challenge.epoch) + } + + arg, err := base64.StdEncoding.DecodeString(hdr.arg) + if err != nil { + return Challenge{}, fmt.Errorf("unable to decode optional argument: %s", err.Error()) + } + sig, err := base64.StdEncoding.DecodeString(hdr.sigB64) + if err != nil { + return Challenge{}, fmt.Errorf("unexpected %s auth signature encoding '%s'", Scheme, hdr.sigB64) + } + challenge.args = args.New(arg, sig) + + return challenge, nil +} + +func Format(c Challenge) string { + str := fmt.Sprintf( + "%s %d;%s;%s", + Scheme, + c.Epoch(), + c.Addr().String(), + base64.StdEncoding.EncodeToString(c.Args().Signature()), + ) + if len(c.Args().Raw()) != 0 { + str += ";" + base64.StdEncoding.EncodeToString(c.Args().Raw()) + } + return str +} + +// Verify verifies the signature over the provided challenge args for the given +// address and drand beacon data. The addr should be the storage provider +// worker address (a BLS key) and the entropy must be the drand randomness +// beacon data corresponding to the epoch used during signing. +func Verify(addr filaddr.Address, entropy []byte, challenge Challenge) error { + var vsr verifySigResult + if maybeResult, known := challengeCache.Get(challenge.String()); known { + vsr = maybeResult + } else { + err := args.Verify(signature.BLSVerifier{Addr: addr}, entropy, challenge.Args()) + vsr.invalidSigErr = err + challengeCache.Add(challenge.String(), vsr) + } + return vsr.invalidSigErr +} diff --git a/spid/spid_test.go b/spid/spid_test.go new file mode 100644 index 0000000..462958b --- /dev/null +++ b/spid/spid_test.go @@ -0,0 +1,54 @@ +package spid_test + +import ( + "encoding/base64" + "net/url" + "testing" + "time" + + "code.riba.cloud/go/toolbox-interplanetary/fil" + filaddr "github.com/filecoin-project/go-address" + blsgo "github.com/jsign/go-filsigner/bls" + "github.com/storacha/spade/spid" + spid_args "github.com/storacha/spade/spid/args" + "github.com/storacha/spade/spid/signature" + "github.com/stretchr/testify/require" +) + +func TestSPID(t *testing.T) { + sp, err := filaddr.NewIDAddress(1000) + require.NoError(t, err) + + pk, err := base64.StdEncoding.DecodeString("DHdFHAqEZV/DJ8WlHqHkvxyEGqUfOJd78QwrkqkFrp4=") + require.NoError(t, err) + + workerKey, err := blsgo.GetPubKey(pk) + require.NoError(t, err) + + argVals := url.Values{} + argVals.Set("foo", "bar") + + entropy := []byte("test_entropy") + + t.Run("roundtrip", func(t *testing.T) { + signer := signature.BLSSigner{PrivateKey: pk} + + args, err := spid_args.NewFromValues(signer, entropy, argVals) + require.NoError(t, err) + + epoch := fil.ClockMainnet.TimeToEpoch(time.Now()) + + id, err := spid.New(sp, epoch, args) + require.NoError(t, err) + + idstr := spid.Format(id) + t.Logf("Authorization: %s", idstr) + + parsed, err := spid.Parse(idstr) + require.NoError(t, err) + require.Equal(t, id, parsed) + + err = spid.Verify(workerKey, entropy, parsed) + require.NoError(t, err) + }) +} diff --git a/webapi/apiSpInvoke.go b/webapi/apiSpInvoke.go index 95ae9d8..f506101 100644 --- a/webapi/apiSpInvoke.go +++ b/webapi/apiSpInvoke.go @@ -1,31 +1,29 @@ package main import ( - "context" + "errors" "fmt" - "math/bits" "net/http" "strings" "time" - "code.riba.cloud/go/toolbox-interplanetary/fil" "code.riba.cloud/go/toolbox/cmn" - filabi "github.com/filecoin-project/go-state-types/abi" - filbig "github.com/filecoin-project/go-state-types/big" - filbuiltin "github.com/filecoin-project/go-state-types/builtin" - filmarket "github.com/filecoin-project/go-state-types/builtin/v9/market" - "github.com/georgysavva/scany/pgxscan" - lru "github.com/hashicorp/golang-lru/v2" "github.com/ipfs/go-cid" - "github.com/jackc/pgx/v4" "github.com/labstack/echo/v4" "github.com/multiformats/go-multihash" "github.com/storacha/spade/apitypes" "github.com/storacha/spade/internal/app" "github.com/storacha/spade/internal/filtypes" + "github.com/storacha/spade/service" ) -func apiSpInvoke(c echo.Context) (defErr error) { +func NewSpInvokeHandler(svc service.ReservationService) echo.HandlerFunc { + return func(c echo.Context) error { + return apiSpInvoke(c, svc) + } +} + +func apiSpInvoke(c echo.Context, svc service.ReservationService) (defErr error) { ctx, ctxMeta := unpackAuthedEchoContext(c) // return retFail( @@ -37,6 +35,7 @@ func apiSpInvoke(c echo.Context) (defErr error) { if argCall := ctxMeta.signedArgs.Get("call"); argCall != "reserve_piece" { return retFail( c, + svc, apitypes.ErrInvalidRequest, "Unrecognized call '%s'", argCall, @@ -46,11 +45,12 @@ func apiSpInvoke(c echo.Context) (defErr error) { pCidArg := ctxMeta.signedArgs.Get("piece_cid") pCid, err := cid.Parse(pCidArg) if err != nil { - return retFail(c, apitypes.ErrInvalidRequest, "Requested PieceCid '%s' is not valid: %s", pCidArg, err) + return retFail(c, svc, apitypes.ErrInvalidRequest, "Requested PieceCid '%s' is not valid: %s", pCidArg, err) } if pCid.Prefix().Codec != cid.FilCommitmentUnsealed || pCid.Prefix().MhType != multihash.SHA2_256_TRUNC254_PADDED { return retFail( c, + svc, apitypes.ErrInvalidRequest, "Requested PieceCID '%s' does not have expected codec (%x) and multihash (%x)", pCid, @@ -63,7 +63,7 @@ func apiSpInvoke(c echo.Context) (defErr error) { if c.QueryParams().Has("tenant") { tid, err := parseUIntQueryParam(c, "tenant", 1, 1<<15) if err != nil { - return retFail(c, apitypes.ErrInvalidRequest, err.Error()) + return retFail(c, svc, apitypes.ErrInvalidRequest, err.Error()) } tenantID = int16(tid) } @@ -73,6 +73,7 @@ func apiSpInvoke(c echo.Context) (defErr error) { ctxMeta.spInfoLastPolled.Before(time.Now().Add(-1*app.PolledSPInfoStaleAfterMinutes*time.Minute)) { return retFail( c, + svc, apitypes.ErrStorageProviderInfoTooOld, "Provider has not been dialed by the polling system recently: please try again in about a minute", ) @@ -82,6 +83,7 @@ func apiSpInvoke(c echo.Context) (defErr error) { if ctxMeta.spInfo.PeerInfo == nil || len(ctxMeta.spInfo.PeerInfo.Protos) == 0 { return retFail( c, + svc, apitypes.ErrStorageProviderUndialable, strings.Join([]string{ "It appears your provider can not be libp2p-dialed over the TCP transport.", @@ -95,6 +97,7 @@ func apiSpInvoke(c echo.Context) (defErr error) { if _, canV120 := ctxMeta.spInfo.PeerInfo.Protos[filtypes.StorageProposalV120]; !canV120 { return retFail( c, + svc, apitypes.ErrStorageProviderUnsupported, strings.Join([]string{ "It appears your provider does not support %s.", @@ -104,301 +107,95 @@ func apiSpInvoke(c echo.Context) (defErr error) { ) } - errCode, err := spIneligibleErr(ctx, ctxMeta.authedActorID) + replStates, err := svc.ReservePiece( + ctx, + ctxMeta.authedActorID, + ctxMeta.spInfo, + pCid, + service.WithReservePieceRequestID(ctxMeta.requestID), + service.WithReservePieceTenantID(tenantID), + service.WithReservePieceTenantPolicy(ctxMeta.signedArgs.Get("tenant_policy")), + ) if err != nil { - return cmn.WrErr(err) - } else if errCode != 0 { - return retFail(c, errCode, ineligibleSpMsg(ctxMeta.authedActorID)) - } - - return ctxMeta.Db[app.DbMain].BeginFunc(ctx, func(tx pgx.Tx) error { - - _, err = tx.Exec( - ctx, - requestPieceLockStatement, - ) - if err != nil { - return cmn.WrErr(err) + if errors.Is(err, service.ErrStorageProviderSuspended) { + return retFail(c, svc, apitypes.ErrStorageProviderSuspended, ineligibleSpMsg(ctxMeta.authedActorID)) } - - type tenantEligible struct { - apitypes.TenantReplicationState - IsExclusive bool `db:"exclusive_replication"` - TenantClientID *fil.ActorID `db:"client_id_to_use"` - TenantClientAddress *string `db:"client_address_to_use"` - - PieceID int64 - PieceSizeBytes int64 - - DealDurationDays int16 - StartWithinHours int16 - RecentlyUsedStartEpoch *int64 - - TenantMeta []byte - } - - tenantsEligible := make([]tenantEligible, 0, 8) - - if err := pgxscan.Select( - ctx, - tx, - &tenantsEligible, - ` - SELECT - * - FROM spd.piece_realtime_eligibility( $1, $2 ) - WHERE - ( 0 = $3 OR tenant_id = $3) - `, - ctxMeta.authedActorID, - pCid, - tenantID, - ); err != nil { - return cmn.WrErr(err) + if errors.Is(err, service.ErrStorageProviderIneligibleToMine) { + return retFail(c, svc, apitypes.ErrStorageProviderIneligibleToMine, ineligibleSpMsg(ctxMeta.authedActorID)) } - - if len(tenantsEligible) == 0 { - return retFail(c, apitypes.ErrUnclaimedPieceCID, "Piece %s is not claimed by any selected tenant", pCid) + if errors.Is(err, service.ErrUnclaimedPiece) { + return retFail(c, svc, apitypes.ErrUnclaimedPieceCID, "Piece %s is not claimed by any tenant", pCid) } - - if tenantsEligible[0].PieceSizeBytes > 1<>30, 1<<(ctxMeta.spInfo.SectorLog2Size-30), ) } - - // count ineligibles, assemble actual return - var countNoDataCap, countAlreadyDealt, countOverReplicated, countOverPending int - var chosenTenant *tenantEligible - resp := apitypes.ResponseDealRequest{ - ReplicationStates: make([]apitypes.TenantReplicationState, len(tenantsEligible)), + if errors.Is(err, service.ErrProviderHasReplica) { + return retPayloadAnnotated(c, svc, http.StatusForbidden, + apitypes.ErrProviderHasReplica, + apitypes.ResponseDealRequest{ReplicationStates: replStates}, + "Provider already has proposed or active replica for %s according to all selected replication rules", pCid, + ) } - for i, te := range tenantsEligible { - if te.TenantClientID != nil { - s := te.TenantClientID.String() - te.TenantReplicationState.TenantClient = &s - } - resp.ReplicationStates[i] = te.TenantReplicationState - - var invalidated bool - - if te.TenantClient == nil { - countNoDataCap++ - invalidated = true - } - if te.DealAlreadyExists { - countAlreadyDealt++ - invalidated = true - } - if te.Total >= te.MaxTotal || - te.InOrg >= te.MaxOrg || - te.InCity >= te.MaxCity || - te.InCountry >= te.MaxCountry || - te.InContinent >= te.MaxContinent { - countOverReplicated++ - invalidated = true - } - if te.SpInFlightBytes+te.PieceSizeBytes > te.MaxInFlightBytes { - countOverPending++ - invalidated = true - } - - if !invalidated && chosenTenant == nil { - chosenTenant = &te - } + if errors.Is(err, service.ErrTenantsOutOfDatacap) { + return retPayloadAnnotated(c, svc, http.StatusForbidden, + apitypes.ErrTenantsOutOfDatacap, + apitypes.ResponseDealRequest{ReplicationStates: replStates}, + "All selected tenants with claim to %s are out of DataCap 🙀", pCid, + ) } - - // handle "no takers" here, for ease of reading further down - // this is slightly convoluted since we can have a "mixed error condition" - handled in the default: - if chosenTenant == nil { - - switch len(tenantsEligible) { - - case countAlreadyDealt: - return retPayloadAnnotated(c, http.StatusForbidden, - apitypes.ErrProviderHasReplica, - resp, - "Provider already has proposed or active replica for %s according to all selected replication rules", pCid, - ) - case countNoDataCap: - return retPayloadAnnotated(c, http.StatusForbidden, - apitypes.ErrTenantsOutOfDatacap, - resp, - "All selected tenants with claim to %s are out of DataCap 🙀", pCid, - ) - - case countOverReplicated: - return retPayloadAnnotated(c, http.StatusForbidden, - apitypes.ErrTooManyReplicas, - resp, - "Piece %s is over-replicated according to all selected replication rules", pCid, - ) - - case countOverPending: - return retPayloadAnnotated(c, http.StatusForbidden, - apitypes.ErrProviderAboveMaxInFlight, - resp, - "Provider has more proposals in-flight than permitted by selected tenant rules", - ) - - default: - return retPayloadAnnotated(c, http.StatusForbidden, - apitypes.ErrReplicationRulesViolation, - resp, - "None of the selected tenants would grant a deal for %s according to their individual rules", pCid, - ) - } + if errors.Is(err, service.ErrTooManyReplicas) { + return retPayloadAnnotated(c, svc, http.StatusForbidden, + apitypes.ErrProviderAboveMaxInFlight, + apitypes.ResponseDealRequest{ReplicationStates: replStates}, + "Provider has more proposals in-flight than permitted by selected tenant rules", + ) } - - if ctxMeta.signedArgs.Get("tenant_policy") != app.TEMPPolicies[chosenTenant.TenantID] { + if errors.Is(err, service.ErrProviderAboveMaxInFlight) { + return retPayloadAnnotated(c, svc, http.StatusForbidden, + apitypes.ErrProviderAboveMaxInFlight, + apitypes.ResponseDealRequest{ReplicationStates: replStates}, + "Provider has more proposals in-flight than permitted by selected tenant rules", + ) + } + if errors.Is(err, service.ErrReplicationRulesViolation) { + return retPayloadAnnotated(c, svc, http.StatusForbidden, + apitypes.ErrReplicationRulesViolation, + apitypes.ResponseDealRequest{ReplicationStates: replStates}, + "None of the selected tenants would grant a deal for %s according to their individual rules", pCid, + ) + } + if errors.Is(err, service.ErrTenantPolicyMismatch) { + if tenantID == 0 && len(replStates) > 0 { + tenantID = replStates[0].TenantID + } return retFail( c, + svc, apitypes.ErrInvalidRequest, "Incorrect policy for tenant %d", - chosenTenant.TenantID, + tenantID, ) } - - // - // Here, at the very end, is where we would make a tightly-timeboxed outbound call - // to check for potential external eligibility criteria - // Then either return ErrExternalReservationRefused or proceed below. - // - // We *DO* always check using our own replication rules first, and keep a lock for the duration - // ( in order to maintain a uniform "decency floor" among our esteemed SPs ;) - // - - // We got that far - let's do it! - startEpoch := fil.ClockMainnet.TimeToEpoch(time.Now().Add( - time.Hour * time.Duration(chosenTenant.StartWithinHours), - )) - - // a lot of this logic is broken / needs to be replaced by something saner. But... in another life. - if chosenTenant.RecentlyUsedStartEpoch != nil { - startEpoch = filabi.ChainEpoch(*chosenTenant.RecentlyUsedStartEpoch) - } - - // round the epoch down to a day boundary - // we *must* work with startEpoch/StartWithinHours to produce identical retry-deals - // 2h +/- because network started at 22:00 UTC - rde := ((startEpoch-app.FilDefaultLookback-(filbuiltin.EpochsInHour*filabi.ChainEpoch(chosenTenant.StartWithinHours))-240)/2880)*2880 + 240 - - // this is relatively expensive to do within the txn lock - // however we cache it and call it exactly once per day, so we should be fine - gbpce, err := providerCollateralEstimateGiB( - ctx, rde, - ) - if err != nil { - return cmn.WrErr(err) - } - - encodedLabel, err := filmarket.NewLabelFromString(pCid.String()) - if err != nil { - return cmn.WrErr(err) - } - - prop := struct { - ProposalV0 filmarket.DealProposal `json:"filmarket_proposal"` - }{ - ProposalV0: filmarket.DealProposal{ - // do not change under any circumstances: even when payments eventually happen, they will happen explicitly out of band - // ( a notable exception here would be contract-listener style interactions, but that's way off ) - StoragePricePerEpoch: filbig.Zero(), // DO NOT CHANGE - - VerifiedDeal: true, - PieceCID: pCid, - PieceSize: filabi.PaddedPieceSize(chosenTenant.PieceSizeBytes), - - Provider: ctxMeta.authedActorID.AsFilAddr(), - Client: chosenTenant.TenantClientID.AsFilAddr(), - - StartEpoch: startEpoch, - EndEpoch: startEpoch + filabi.ChainEpoch(chosenTenant.DealDurationDays)*filbuiltin.EpochsInDay, - Label: encodedLabel, - - ClientCollateral: filbig.Zero(), - ProviderCollateral: filbig.Rsh( - filbig.Mul(gbpce, filbig.NewInt(chosenTenant.PieceSizeBytes)), - 30, - ), - }, - } - - if _, err := tx.Exec( - ctx, - ` - INSERT INTO spd.proposals - ( proposal_uuid, piece_id, provider_id, client_id, start_epoch, end_epoch, proxied_log2_size, proposal_meta ) - VALUES ( $1, $2, $3, $4, $5, $6, $7, $8 ) - `, - c.Request().Header.Get("X-SPADE-REQUEST-UUID"), // inherit the request uuid as the proposal uuid (a uuid is a uuid is a uuid) - chosenTenant.PieceID, - ctxMeta.authedActorID, - *chosenTenant.TenantClientID, - prop.ProposalV0.StartEpoch, - prop.ProposalV0.EndEpoch, - bits.TrailingZeros64(uint64(chosenTenant.PieceSizeBytes)), - prop, - ); err != nil { - return cmn.WrErr(err) - } - - // we managed - bump the counts where applicable and return stats - for i := range tenantsEligible { - if tenantsEligible[i].IsExclusive && resp.ReplicationStates[i].TenantID != chosenTenant.TenantID { - continue - } - - resp.ReplicationStates[i].Total++ - resp.ReplicationStates[i].InOrg++ - resp.ReplicationStates[i].InCity++ - resp.ReplicationStates[i].InCountry++ - resp.ReplicationStates[i].InContinent++ - resp.ReplicationStates[i].DealAlreadyExists = true - resp.ReplicationStates[i].SpInFlightBytes += chosenTenant.PieceSizeBytes - } - - return retPayloadAnnotated( - c, - http.StatusOK, - 0, - resp, - strings.Join([]string{ - fmt.Sprintf("Deal queued for PieceCID %s", pCid), - ``, - `In about 5 minutes check the pending list:`, - " " + curlAuthedForSP(c, ctxMeta.authedActorID, "/sp/pending_proposals", nil), - }, "\n"), - ) - }) -} - -var collateralCache, _ = lru.New[filabi.ChainEpoch, filbig.Int](128) - -func providerCollateralEstimateGiB(ctx context.Context, sourceEpoch filabi.ChainEpoch) (filbig.Int, error) { //nolint:revive - if pc, didFind := collateralCache.Get(sourceEpoch); didFind { - return pc, nil - } - - collateralGiB, err := app.EpochMinProviderCollateralEstimateGiB(ctx, sourceEpoch) - if err != nil { - return collateralGiB, cmn.WrErr(err) + return cmn.WrErr(err) } - // make it 1.7 times larger, so that fluctuations in the state won't prevent the deal from being proposed/published later - // capped by https://github.com/filecoin-project/lotus/blob/v1.13.2-rc2/markets/storageadapter/provider.go#L267 - // and https://github.com/filecoin-project/lotus/blob/v1.13.2-rc2/markets/storageadapter/provider.go#L41 - inflatedCollateralGiB := filbig.Div( - filbig.Product( - collateralGiB, - filbig.NewInt(17), - ), - filbig.NewInt(10), + return retPayloadAnnotated( + c, + svc, + http.StatusOK, + 0, + apitypes.ResponseDealRequest{ + ReplicationStates: replStates, + }, + strings.Join([]string{ + fmt.Sprintf("Deal queued for PieceCID %s", pCid), + ``, + `In about 5 minutes check the pending list:`, + " " + curlAuthedForSP(c, ctxMeta.authedActorID, "/sp/pending_proposals", nil), + }, "\n"), ) - - collateralCache.Add(sourceEpoch, inflatedCollateralGiB) - return inflatedCollateralGiB, nil } diff --git a/webapi/apiSpListEligible.go b/webapi/apiSpListEligible.go index 00fcccc..601a67d 100644 --- a/webapi/apiSpListEligible.go +++ b/webapi/apiSpListEligible.go @@ -7,21 +7,27 @@ import ( "strings" "code.riba.cloud/go/toolbox/cmn" - "github.com/georgysavva/scany/pgxscan" "github.com/labstack/echo/v4" "github.com/storacha/spade/apitypes" "github.com/storacha/spade/internal/app" + "github.com/storacha/spade/service" ) -func apiSpListEligible(c echo.Context) error { +func NewSpListEligibleHandler(svc service.EligibilityService) echo.HandlerFunc { + return func(c echo.Context) error { + return apiSpListEligible(c, svc) + } +} + +func apiSpListEligible(c echo.Context, svc service.EligibilityService) error { ctx, ctxMeta := unpackAuthedEchoContext(c) - lim := uint64(listEligibleDefaultSize) + lim := uint64(service.ListEligibleDefaultSize) if c.QueryParams().Has("limit") { var err error - lim, err = parseUIntQueryParam(c, "limit", 1, listEligibleMaxSize) + lim, err = parseUIntQueryParam(c, "limit", 1, service.ListEligibleMaxSize) if err != nil { - return retFail(c, apitypes.ErrInvalidRequest, err.Error()) + return retFail(c, svc, apitypes.ErrInvalidRequest, err.Error()) } } @@ -29,40 +35,21 @@ func apiSpListEligible(c echo.Context) error { if c.QueryParams().Has("tenant") { tid, err := parseUIntQueryParam(c, "tenant", 1, 1<<15) if err != nil { - return retFail(c, apitypes.ErrInvalidRequest, err.Error()) + return retFail(c, svc, apitypes.ErrInvalidRequest, err.Error()) } tenantID = int16(tid) } - // how to list: start small, find setting below - useQueryFunc := "pieces_eligible_head" - - if c.QueryParams().Has("internal-nolateral") { // secret flag to tune this in flight / figure out optimal values - if truthyBoolQueryParam(c, "internal-nolateral") { - useQueryFunc = "pieces_eligible_full" - } - } else if lim > listEligibleDefaultSize { // deduce from requested lim - useQueryFunc = "pieces_eligible_full" - } - - orderedPieces := make([]*struct { - PieceID int64 - PieceLog2Size uint8 - Tenants []int16 `db:"tenant_ids"` - *apitypes.Piece - }, 0, lim+1) - - if err := pgxscan.Select( + pieces, more, err := svc.EligiblePieces( ctx, - ctxMeta.Db[app.DbMain], - &orderedPieces, - fmt.Sprintf("SELECT * FROM spd.%s( $1, $2, $3, $4, $5 )", useQueryFunc), ctxMeta.authedActorID, - lim+1, // ask for one extra, to disambiguate "there is more" - tenantID, - truthyBoolQueryParam(c, "include-sourceless"), - false, - ); err != nil { + service.WithEligiblePiecesLimit(lim), + service.WithEligiblePiecesTenantID(tenantID), + service.WithEligiblePiecesIncludeSourceless( + truthyBoolQueryParam(c, "include-sourceless"), + ), + ) + if err != nil { return cmn.WrErr(err) } @@ -76,12 +63,10 @@ func apiSpListEligible(c echo.Context) error { } // we got more than requested - indicate that this set is large - if uint64(len(orderedPieces)) > lim { - orderedPieces = orderedPieces[:lim] - + if more { exLim := lim - if exLim < listEligibleDefaultSize { - exLim = listEligibleDefaultSize + if exLim < service.ListEligibleDefaultSize { + exLim = service.ListEligibleDefaultSize } info = append( @@ -95,8 +80,8 @@ func apiSpListEligible(c echo.Context) error { ) } - ret := make(apitypes.ResponsePiecesEligible, len(orderedPieces)) - for i, p := range orderedPieces { + ret := make(apitypes.ResponsePiecesEligible, len(pieces)) + for i, p := range pieces { sa := make(url.Values, 2) sa.Add("call", "reserve_piece") sa.Add("piece_cid", p.PieceCid) @@ -105,8 +90,8 @@ func apiSpListEligible(c echo.Context) error { p.SampleReserveCmd = curlAuthedForSP(c, ctxMeta.authedActorID, "/sp/invoke", sa) p.ClaimingTenant = p.Tenants[0] p.TenantPolicyCid = app.TEMPPolicies[p.Tenants[0]] - ret[i] = p.Piece + ret[i] = &p.Piece } - return retPayloadAnnotated(c, http.StatusOK, 0, ret, strings.Join(info, "\n")) + return retPayloadAnnotated(c, svc, http.StatusOK, 0, ret, strings.Join(info, "\n")) } diff --git a/webapi/apiSpPendingReservations.go b/webapi/apiSpPendingReservations.go index adbcddf..63a93a0 100644 --- a/webapi/apiSpPendingReservations.go +++ b/webapi/apiSpPendingReservations.go @@ -9,90 +9,25 @@ import ( "code.riba.cloud/go/toolbox-interplanetary/fil" "code.riba.cloud/go/toolbox/cmn" filabi "github.com/filecoin-project/go-state-types/abi" - filbuiltin "github.com/filecoin-project/go-state-types/builtin" - "github.com/georgysavva/scany/pgxscan" "github.com/labstack/echo/v4" "github.com/storacha/spade/apitypes" - "github.com/storacha/spade/internal/app" + "github.com/storacha/spade/service" ) -func apiSpListPendingProposals(c echo.Context) error { - ctx, ctxMeta := unpackAuthedEchoContext(c) - - type pendingProposals struct { - apitypes.DealProposal - ClientID fil.ActorID - PieceID int64 - ProposalFailstamp int64 - Error *string - ProposalDelivered *time.Time - IsPublished bool - PieceLog2Size int8 +func NewSpListPendingProposalsHandler(svc service.ProposalService) echo.HandlerFunc { + return func(c echo.Context) error { + return apiSpListPendingProposals(c, svc) } - pending := make([]pendingProposals, 0, 4096) +} - if err := pgxscan.Select( +func apiSpListPendingProposals(c echo.Context, svc service.ProposalService) error { + ctx, ctxMeta := unpackAuthedEchoContext(c) + + pending, err := svc.PendingProposals( ctx, - ctxMeta.Db[app.DbMain], - &pending, - ` - SELECT - pr.proposal_uuid AS proposal_id, - pr.piece_id, - pr.proposal_meta->>'signed_proposal_cid' AS proposal_cid, - pr.start_epoch, - pr.client_id, - pr.proposal_delivered, - c.tenant_id, - p.piece_cid, - pr.proxied_log2_size AS piece_log2_size, - pr.proposal_failstamp, - pr.proposal_meta->>'failure' AS error, - ( EXISTS ( - SELECT 42 - FROM spd.published_deals pd - WHERE - pd.piece_id = pr.piece_id - AND - pd.provider_id = pr.provider_id - AND - pd.client_id = pr.client_id - AND - pd.status = 'published' - ) ) AS is_published, - ARRAY( - SELECT uri FROM spd.sources_uri WHERE sources_uri.piece_id = pr.piece_id - ) AS data_sources, - ( - CASE WHEN (p.piece_meta->'is_frc58_segmented')::bool THEN 'frc58' ELSE NULL END - ) AS segmentation - FROM spd.proposals pr - JOIN spd.pieces p USING ( piece_id ) - JOIN spd.clients c USING ( client_id ) - LEFT JOIN spd.mv_pieces_availability pa USING ( piece_id ) - WHERE - pr.provider_id = $1 - AND - pr.start_epoch > $2 - AND - pr.activated_deal_id is NULL - AND - ( - pr.proposal_failstamp = 0 - OR - -- show everything failed in the past N hours - pr.proposal_failstamp > ( spd.big_now() - $3::BIGINT * 3600 * 1000 * 1000 * 1000 ) - ) - ORDER BY - pr.proposal_failstamp DESC, - ( pr.start_epoch / 360 ), -- 3h sort granularity - pr.proxied_log2_size, - p.piece_cid - `, ctxMeta.authedActorID, - fil.ClockMainnet.TimeToEpoch(time.Now())+filbuiltin.EpochsInHour, - showRecentFailuresHours, - ); err != nil { + ) + if err != nil { return cmn.WrErr(err) } @@ -159,14 +94,14 @@ func apiSpListPendingProposals(c echo.Context) error { msg := fmt.Sprintf( ` -This is an overview of deals recently proposed to SP %s + This is an overview of deals recently proposed to SP %s -There currently are %0.2f GiB of pending deals: - % 4d deal-proposals to send out - % 4d successful proposals pending publishing - % 4d deals published on chain awaiting sector activation + There currently are %0.2f GiB of pending deals: + % 4d deal-proposals to send out + % 4d successful proposals pending publishing + % 4d deals published on chain awaiting sector activation -You can request deal proposals using API endpoints as described in the docs`, + You can request deal proposals using API endpoints as described in the docs`, ctxMeta.authedActorID, float64(outstandingBytes)/(1<<30), toPropose, @@ -175,7 +110,7 @@ You can request deal proposals using API endpoints as described in the docs`, ) if len(fails) > 0 { - msg += fmt.Sprintf("\n\nIn the past %dh there were %d proposal errors, shown in recent_failures below.", showRecentFailuresHours, len(fails)) + msg += fmt.Sprintf("\n\nIn the past %dh there were %d proposal errors, shown in recent_failures below.", service.ShowRecentFailuresHours, len(fails)) ret.RecentFailures = make([]apitypes.ProposalFailure, 0, len(fails)) for _, f := range fails { @@ -188,6 +123,7 @@ You can request deal proposals using API endpoints as described in the docs`, return retPayloadAnnotated( c, + svc, http.StatusOK, 0, ret, diff --git a/webapi/apiSpPieceManifest.go b/webapi/apiSpPieceManifest.go index 80caa0b..9570a79 100644 --- a/webapi/apiSpPieceManifest.go +++ b/webapi/apiSpPieceManifest.go @@ -2,128 +2,92 @@ package main import ( "bytes" + "errors" "net/http" "text/template" - "code.riba.cloud/go/toolbox-interplanetary/fil" "code.riba.cloud/go/toolbox/cmn" - filabi "github.com/filecoin-project/go-state-types/abi" - "github.com/georgysavva/scany/pgxscan" "github.com/google/uuid" - "github.com/ipfs/go-cid" "github.com/labstack/echo/v4" "github.com/storacha/spade/apitypes" - "github.com/storacha/spade/internal/app" - "golang.org/x/xerrors" + "github.com/storacha/spade/service" ) -func apiSpPieceManifest(c echo.Context) error { +type TemplateParams struct { + SegPCidV2 string +} + +func NewSpPieceManifestHandler(svc service.PieceManifestService) echo.HandlerFunc { + return func(c echo.Context) error { + return apiSpPieceManifest(c, svc) + } +} + +func apiSpPieceManifest(c echo.Context, svc service.PieceManifestService) error { ctx, ctxMeta := unpackAuthedEchoContext(c) - pu := c.QueryParams().Get("proposal") - if pu == "" { + ps := c.QueryParams().Get("proposal") + if ps == "" { return retFail( c, + svc, apitypes.ErrInvalidRequest, "A `proposal` UUID parameter must be supplied to this call", ) } - if _, err := uuid.Parse(pu); err != nil { + pu, err := uuid.Parse(ps) + if err != nil { return retFail( c, + svc, apitypes.ErrInvalidRequest, "The supplied `proposal` parameter '%s' is not a valid UUID: %s", - pu, + ps, err, ) } - pcs := make([]struct { - AggLog2Size int `db:"agg_log2size"` - AggPCidV1 string `db:"agg_pcid_v1"` - SegPCidV2 string `db:"seg_pcid_v2"` - UrlTemplate string - }, 0, 8<<10) - - if err := pgxscan.Select( - ctx, - ctxMeta.Db[app.DbMain], - &pcs, - ` - SELECT - ap.piece_cid AS agg_pcid_v1, - ap.piece_log2_size AS agg_log2size, - sp.piece_cid AS seg_pcid_v2, - t.tenant_meta->'bulk_piece_source'->>'url_template' AS url_template - FROM spd.piece_segments ps - JOIN spd.pieces ap USING ( piece_id ) - JOIN spd.pieces sp ON ( ps.segment_id = sp.piece_id ) - JOIN spd.proposals pr ON ( pr.piece_id = ps.piece_id ) - JOIN spd.clients cl USING ( client_id ) - JOIN spd.tenants t USING ( tenant_id ) - WHERE - (ap.piece_meta->'is_frc58_segmented')::bool - AND - pr.proposal_uuid = $1 - AND - -- ensure we only display SPs own proposals, no list-sharing - pr.provider_id = $2 - AND - -- only pending proposals - pr.proposal_delivered IS NOT NULL AND pr.proposal_failstamp = 0 AND pr.activated_deal_id IS NULL - - -- ordering is critical - ORDER BY ps.position - `, - pu, - ctxMeta.authedActorID, - ); err != nil { + manifest, err := svc.PieceManifest(ctx, ctxMeta.authedActorID, pu) + if err != nil { + if errors.Is(err, service.ErrManifestNotFound) { + return retFail( + c, + svc, + apitypes.ErrInvalidRequest, + "no results for proposal UUID '%s': either it does not exist, is too recent, does not belong to %s or is not segmented", + ps, + ctxMeta.authedActorID.AsFilAddr().String(), + ) + } return cmn.WrErr(err) } - if len(pcs) == 0 { - return retFail( - c, - apitypes.ErrInvalidRequest, - "no results for proposal UUID '%s': either it does not exist, is too recent, does not belong to %s or is not segmented", - pu, - ctxMeta.authedActorID.AsFilAddr().String(), - ) - } - - utText := pcs[0].UrlTemplate + utText := manifest.UrlTemplate if utText == "" { - return xerrors.New("do not know how to handle segments without a URL template yet...") + return errors.New("do not know how to handle segments without a URL template yet...") } ut, err := template.New("url").Parse(utText) if err != nil { return cmn.WrErr(err) } - aggCP, err := fil.CommPFromPieceInfo(filabi.PieceInfo{ - Size: 1 << pcs[0].AggLog2Size, - PieceCID: cid.MustParse(pcs[0].AggPCidV1), - }) - if err != nil { - return cmn.WrErr(err) - } - resp := apitypes.ResponsePieceManifestFR58{ - AggPCidV2: aggCP.PCidV2().String(), - Segments: make([]apitypes.Segment, len(pcs)), + AggPCidV2: manifest.PieceCid.String(), + Segments: make([]apitypes.Segment, len(manifest.SegmentCids)), } - for i := range pcs { + for i, s := range manifest.SegmentCids { u := new(bytes.Buffer) - if err := ut.Execute(u, pcs[i]); err != nil { + if err := ut.Execute(u, TemplateParams{SegPCidV2: s.String()}); err != nil { return cmn.WrErr(err) } - resp.Segments[i].PCidV2 = pcs[i].SegPCidV2 + resp.Segments[i].PCidV2 = s.String() resp.Segments[i].Sources = []string{u.String()} } return retPayloadAnnotated( c, + svc, http.StatusOK, 0, resp, diff --git a/webapi/apiSpStatus.go b/webapi/apiSpStatus.go index cd54fb8..815a969 100644 --- a/webapi/apiSpStatus.go +++ b/webapi/apiSpStatus.go @@ -3,13 +3,21 @@ package main import ( "github.com/labstack/echo/v4" "github.com/storacha/spade/apitypes" + "github.com/storacha/spade/service" ) -func apiSpStatus(c echo.Context) error { +func NewSpStatusHandler(svc service.StatusService) echo.HandlerFunc { + return func(c echo.Context) error { + return apiSpStatus(c, svc) + } +} + +func apiSpStatus(c echo.Context, svc service.StatusService) error { _, ctxMeta := unpackAuthedEchoContext(c) return retFail( c, + svc, apitypes.ErrSystemTemporarilyDisabled, ` Auth successful, your SP is authorized for Spade and your signature is valid. diff --git a/webapi/auth.go b/webapi/auth.go index fdb03f6..77e0f56 100644 --- a/webapi/auth.go +++ b/webapi/auth.go @@ -2,144 +2,36 @@ package main import ( "context" - "encoding/base64" - "encoding/json" - "fmt" "io" - "net/http" "net/url" - "regexp" - "strconv" "time" "code.riba.cloud/go/toolbox-interplanetary/fil" - "code.riba.cloud/go/toolbox/cmn" - filaddr "github.com/filecoin-project/go-address" - filabi "github.com/filecoin-project/go-state-types/abi" - filprovider "github.com/filecoin-project/go-state-types/builtin/v9/miner" - lru "github.com/hashicorp/golang-lru/v2" - blsgo "github.com/jsign/go-filsigner/bls" + "github.com/google/uuid" "github.com/labstack/echo/v4" "github.com/storacha/spade/apitypes" "github.com/storacha/spade/internal/app" + "github.com/storacha/spade/service" + "github.com/storacha/spade/spid" ) -const ( - sigGraceEpochs = 5 // 30 secs per epoch - authScheme = `FIL-SPID-V0` -) - -type rawHdr struct { - epoch string - addr string - sigB64 string - arg string -} -type sigChallenge struct { - authHdr string - addr filaddr.Address - epoch int64 - arg []byte - hdr rawHdr -} - -type verifySigResult struct { - invalidSigErrstr string +func NewSpIDAuthMiddleware(svc service.AuthorizationService) echo.MiddlewareFunc { + return func(next echo.HandlerFunc) echo.HandlerFunc { + return spidAuth(next, svc) + } } -var ( - spAuthRe = regexp.MustCompile( - `^` + authScheme + `\s+` + - // fil epoch - `([0-9]+)` + `\s*;\s*` + - // spID - `([ft]0[0-9]+)` + `\s*;\s*` + - // signature - `([^; ]+)` + - // optional signed argument - `(?:\s*\;\s*([^; ]+))?` + - `\s*$`, - ) - challengeCache, _ = lru.New[rawHdr, verifySigResult](sigGraceEpochs * 128) - beaconCache, _ = lru.New[int64, *fil.LotusBeaconEntry](sigGraceEpochs * 4) -) - -func spidAuth(next echo.HandlerFunc) echo.HandlerFunc { +func spidAuth(next echo.HandlerFunc, svc service.AuthorizationService) echo.HandlerFunc { return func(c echo.Context) error { - - ctx, log, _, _ := app.UnpackCtx(c.Request().Context()) + ctx, _, _, _ := app.UnpackCtx(c.Request().Context()) // the SP portion does not accept body payloads if b := c.Request().Body; b != nil && c.Request().ContentLength != 0 { if _, err := b.Read(make([]byte, 1)); err != io.EOF { - return retFail(c, apitypes.ErrInvalidRequest, "spid requests witn content in the HTTP body are not supported") + return retFail(c, svc, apitypes.ErrInvalidRequest, "spid requests with content in the HTTP body are not supported") } } - var challenge sigChallenge - challenge.authHdr = c.Request().Header.Get(echo.HeaderAuthorization) - res := spAuthRe.FindStringSubmatch(challenge.authHdr) - - if len(res) == 5 { - challenge.hdr.epoch, challenge.hdr.addr, challenge.hdr.sigB64, challenge.hdr.arg = res[1], res[2], res[3], res[4] - } else { - return retAuthFail(c, "invalid/unexpected %s Authorization header '%s'", authScheme, challenge.authHdr) - } - - var err error - challenge.addr, err = filaddr.NewFromString(challenge.hdr.addr) - if err != nil { - return retAuthFail(c, "unexpected %s auth address '%s'", authScheme, challenge.hdr.addr) - } - - challenge.epoch, err = strconv.ParseInt(challenge.hdr.epoch, 10, 32) - if err != nil { - return retAuthFail(c, "unexpected %s auth epoch '%s'", authScheme, challenge.hdr.epoch) - } - - curFilEpoch := int64(fil.ClockMainnet.TimeToEpoch(time.Now())) - if curFilEpoch < challenge.epoch { - log.Debugw("challenge from the future", "challengeEpoch", challenge.epoch, "wallEpoch", curFilEpoch) - return retAuthFail(c, "%s auth epoch '%d' is in the future", authScheme, challenge.epoch) - } - if curFilEpoch-challenge.epoch > sigGraceEpochs { - return retAuthFail(c, "%s auth epoch '%d' is too far in the past", authScheme, challenge.epoch) - } - - challenge.arg, err = base64.StdEncoding.DecodeString(challenge.hdr.arg) - if err != nil { - return retAuthFail(c, "unable to decode optional argument: %s", err.Error()) - } - - signedArgs, err := url.ParseQuery(string(challenge.arg)) - if err != nil { - return retAuthFail(c, "unable to parse optional argument: %s", err.Error()) - } - - var vsr verifySigResult - if maybeResult, known := challengeCache.Get(challenge.hdr); known { - vsr = maybeResult - } else { - vsr, err = verifySig(ctx, challenge) - if err != nil { - return cmn.WrErr(err) - } - challengeCache.Add(challenge.hdr, vsr) - } - - if vsr.invalidSigErrstr != "" { - return retAuthFail(c, vsr.invalidSigErrstr) - } - - // set only on request object for logging, not part of response - c.Request().Header.Set("X-SPADE-LOGGED-SP", challenge.addr.String()) - - // if challenge.addr.String() == "f01" { - // challenge.addr, _ = filaddr.NewFromString("f02") - // } - - spID := fil.MustParseActorString(challenge.addr.String()) - reqCopy := c.Request().Clone(ctx) // do not need to store any IPs anywhere in the DB for _, strip := range []string{ @@ -148,86 +40,42 @@ func spidAuth(next echo.HandlerFunc) echo.HandlerFunc { delete(reqCopy.Header, strip) } - reqJ, err := json.Marshal( - struct { - Method string - Host string - Path string - Params string - ParamsSigned url.Values - Headers http.Header - }{ - Method: reqCopy.Method, - Host: reqCopy.Host, - Path: reqCopy.URL.Path, - Params: reqCopy.URL.Query().Encode(), - ParamsSigned: signedArgs, - Headers: reqCopy.Header, - }, - ) + auth, err := svc.Authorize(ctx, service.Request{ + Method: reqCopy.Method, + Host: reqCopy.Host, + Path: reqCopy.URL.Path, + Params: reqCopy.URL.Query(), + Headers: reqCopy.Header, + }) if err != nil { - return cmn.WrErr(err) + return retAuthFail(c, svc, "authorizing request: %s", spid.Scheme, err.Error()) } - spDetails := [4]int16{-1, -1, -1, -1} - var requestUUID string - var stateEpoch int64 - var spInfo apitypes.SPInfo - var spInfoLastPoll *time.Time - if err := app.GetGlobalCtx(ctx).Db[app.DbMain].QueryRow( - ctx, - ` - INSERT INTO spd.requests ( provider_id, request_dump ) - VALUES ( $1, $2 ) - RETURNING - request_uuid, - ( SELECT ( metadata->'market_state'->'epoch' )::INTEGER FROM spd.global ), - COALESCE( ( - SELECT - ARRAY[ - COALESCE( org_id, -1 ), - COALESCE( city_id, -1), - COALESCE( country_id, -1), - COALESCE( continent_id, -1) - ] - FROM spd.providers - WHERE provider_id = $1 - LIMIT 1 - ), ARRAY[-1, -1, -1, -1] ), - ( - SELECT info - FROM spd.providers_info - WHERE provider_id = $1 - ), - ( - SELECT provider_last_polled - FROM spd.providers_info - WHERE provider_id = $1 - ) - `, - spID, - reqJ, - ).Scan(&requestUUID, &stateEpoch, &spDetails, &spInfo, &spInfoLastPoll); err != nil { - return cmn.WrErr(err) - } + // set only on request object for logging, not part of response + c.Request().Header.Set("X-SPADE-LOGGED-SP", auth.ProviderID.String()) - c.Response().Header().Set("X-SPADE-FIL-SPID", challenge.addr.String()) + // if challenge.addr.String() == "f01" { + // challenge.addr, _ = filaddr.NewFromString("f02") + // } + + c.Response().Header().Set("X-SPADE-FIL-SPID", auth.ProviderID.String()) // set on both request (for logging ) and response object - c.Request().Header.Set("X-SPADE-REQUEST-UUID", requestUUID) - c.Response().Header().Set("X-SPADE-REQUEST-UUID", requestUUID) + c.Request().Header.Set("X-SPADE-REQUEST-UUID", auth.RequestID.String()) + c.Response().Header().Set("X-SPADE-REQUEST-UUID", auth.RequestID.String()) c.Set("♠️", metaContext{ GlobalContext: app.GetGlobalCtx(ctx), - stateEpoch: stateEpoch, - authedActorID: spID, - signedArgs: signedArgs, - spOrgID: spDetails[0], - spCityID: spDetails[1], - spCountryID: spDetails[2], - spContinentID: spDetails[3], - spInfo: spInfo, - spInfoLastPolled: spInfoLastPoll, + requestID: auth.RequestID, + stateEpoch: auth.StateEpoch, + authedActorID: auth.ProviderID, + signedArgs: auth.SignedArgs, + spOrgID: auth.ProviderDetails[0], + spCityID: auth.ProviderDetails[1], + spCountryID: auth.ProviderDetails[2], + spContinentID: auth.ProviderDetails[3], + spInfo: auth.ProviderInfo, + spInfoLastPolled: auth.LastPoll, }) return next(c) @@ -236,6 +84,7 @@ func spidAuth(next echo.HandlerFunc) echo.HandlerFunc { type metaContext struct { app.GlobalContext + requestID uuid.UUID authedActorID fil.ActorID stateEpoch int64 spInfo apitypes.SPInfo @@ -251,78 +100,3 @@ func unpackAuthedEchoContext(c echo.Context) (context.Context, metaContext) { meta, _ := c.Get("♠️").(metaContext) // ignore potential nil error on purpose return c.Request().Context(), meta } - -func verifySig(ctx context.Context, challenge sigChallenge) (verifySigResult, error) { - - sig, err := base64.StdEncoding.DecodeString(challenge.hdr.sigB64) - if err != nil { - return verifySigResult{ - invalidSigErrstr: fmt.Sprintf("unexpected %s auth signature encoding '%s'", authScheme, challenge.hdr.sigB64), - }, nil - } - - lAPI := app.GetGlobalCtx(ctx).LotusAPI - - be, didFind := beaconCache.Get(challenge.epoch) - if !didFind { - - var curChallengeTs *fil.LotusTS - var err error - - // Do it a few times because lotus is getting slower and slower to finalize 😭 - // Can't sleep too much though not to timeout the call - // spid.bash has been adjusted with a backoff to deal with this as well - for i := 0; i < 3; i++ { - curChallengeTs, err = lAPI.ChainGetTipSetByHeight(ctx, filabi.ChainEpoch(challenge.epoch), fil.LotusTSK{}) - if err == nil { - break - } - time.Sleep(200 * time.Millisecond) - } - - if err != nil { - // do not make slow-chain a 500 - return verifySigResult{ - invalidSigErrstr: fmt.Sprintf( - "%s signature validation failed for auth header '%s': unable to get tipset at height %d (%s): %s", - authScheme, challenge.authHdr, - challenge.epoch, fil.ClockMainnet.EpochToTime(filabi.ChainEpoch(challenge.epoch)), - err, - )}, nil - } - bev := curChallengeTs.Blocks()[0].BeaconEntries[len(curChallengeTs.Blocks()[0].BeaconEntries)-1] - be = &bev - - beaconCache.Add(challenge.epoch, be) - } - - miFinTs, err := lAPI.ChainGetTipSetByHeight(ctx, filabi.ChainEpoch(challenge.epoch)-filprovider.ChainFinality, fil.LotusTSK{}) - if err != nil { - return verifySigResult{}, cmn.WrErr(err) - } - mi, err := lAPI.StateMinerInfo(ctx, challenge.addr, miFinTs.Key()) - if err != nil { - return verifySigResult{}, cmn.WrErr(err) - } - workerAddr, err := lAPI.StateAccountKey(ctx, mi.Worker, miFinTs.Key()) - if err != nil { - return verifySigResult{}, cmn.WrErr(err) - } - - // worker keys are always BLS - sigMatch, err := blsgo.Verify( - workerAddr.Payload(), - append(append([]byte{0x20, 0x20, 0x20}, be.Data...), challenge.arg...), - []byte(sig), - ) - if err != nil { - return verifySigResult{}, cmn.WrErr(err) - } - - if !sigMatch { - return verifySigResult{ - invalidSigErrstr: fmt.Sprintf("%s signature validation failed for auth header '%s'", authScheme, challenge.authHdr), - }, nil - } - return verifySigResult{}, nil -} diff --git a/webapi/constants.go b/webapi/constants.go deleted file mode 100644 index 3a04f41..0000000 --- a/webapi/constants.go +++ /dev/null @@ -1,10 +0,0 @@ -package main - -const ( - listEligibleDefaultSize = 500 - listEligibleMaxSize = 2 << 20 - - showRecentFailuresHours = 24 - - requestPieceLockStatement = `SELECT PG_ADVISORY_XACT_LOCK( 1234567890111 )` -) diff --git a/webapi/main.go b/webapi/main.go index 275bb9a..868bc1b 100644 --- a/webapi/main.go +++ b/webapi/main.go @@ -15,10 +15,11 @@ import ( "github.com/labstack/echo/v4" "github.com/labstack/echo/v4/middleware" "github.com/storacha/spade/internal/app" + "github.com/storacha/spade/service/pg" "golang.org/x/sys/unix" ) -func setup() *echo.Echo { +func setup(ctx context.Context) *echo.Echo { // // Server setup e := echo.New() @@ -34,15 +35,22 @@ func setup() *echo.Echo { }, )) + // [app.GlobalInit] will have run by this point so main DB should be available + // on context. + _, _, db, gCtx := app.UnpackCtx(ctx) + svc := pg.New(db, gCtx.LotusAPI) + // routes - registerRoutes(e) + registerRoutes(e, svc) // // Housekeeping e.HideBanner = true e.HidePort = true e.JSONSerializer = new(rawJSONSerializer) - e.Any("*", retInvalidRoute) + e.Any("*", func(c echo.Context) error { + return retInvalidRoute(c, svc) + }) return e } @@ -93,7 +101,7 @@ func main() { // we will still catch the failure-to-write either way signal.Ignore(unix.SIGPIPE) - e = setup() + e = setup(cctx.Context) e.Server.BaseContext = func(net.Listener) context.Context { return cctx.Context } return e.Start(cctx.String("webapi-listen-address")) }, diff --git a/webapi/routes.go b/webapi/routes.go index 6c34858..83f0f7e 100644 --- a/webapi/routes.go +++ b/webapi/routes.go @@ -1,18 +1,21 @@ package main -import "github.com/labstack/echo/v4" +import ( + "github.com/labstack/echo/v4" + "github.com/storacha/spade/service" +) // This lists in one place all recognized routes & parameters // FIXME - we should make an openapi or something for this... -func registerRoutes(e *echo.Echo) { - spRoutes := e.Group("/sp", spidAuth) +func registerRoutes(e *echo.Echo, svc service.SpadeService) { + spRoutes := e.Group("/sp", NewSpIDAuthMiddleware(svc)) // // /status produces human and machine readable information about the system and the currently-authenticated SP // // Recognized parameters: none // - spRoutes.GET("/status", apiSpStatus) + spRoutes.GET("/status", NewSpStatusHandler(svc)) // // /eligible_pieces produces a listing of PieceCIDs that a storage provider is eligible to receive a deal for. @@ -31,14 +34,14 @@ func registerRoutes(e *echo.Echo) { // - include-sourceless = // When true the result includes eligible pieces without any known sources. Such pieces are omitted by default. // - spRoutes.GET("/eligible_pieces", apiSpListEligible) + spRoutes.GET("/eligible_pieces", NewSpListEligibleHandler(svc)) // // /pending_proposals produces a list of current outstanding reservations, recent errors and various statistics. // // Recognized parameters: none // - spRoutes.GET("/pending_proposals", apiSpListPendingProposals) + spRoutes.GET("/pending_proposals", NewSpListPendingProposalsHandler(svc)) // // /piece_manifest produces a manifest for a segmented piece. You need a reservation proposal UUID to call this. @@ -47,7 +50,7 @@ func registerRoutes(e *echo.Echo) { // // - proposal = // - spRoutes.GET("/piece_manifest", apiSpPieceManifest) + spRoutes.GET("/piece_manifest", NewSpPieceManifestHandler(svc)) // // /invoke is the sole mutating (POST) method, with several recognized RPC-calls: @@ -58,6 +61,8 @@ func registerRoutes(e *echo.Echo) { // delivered to the SP by a periodic task, executed outside of this webapp. // // - spRoutes.POST("/invoke", apiSpInvoke) - spRoutes.GET("/invoke", retInvalidRoute) + spRoutes.POST("/invoke", NewSpInvokeHandler(svc)) + spRoutes.GET("/invoke", func(c echo.Context) error { + return retInvalidRoute(c, svc) + }) } diff --git a/webapi/util.go b/webapi/util.go index 7bc434f..c9fd1c9 100644 --- a/webapi/util.go +++ b/webapi/util.go @@ -1,8 +1,6 @@ package main import ( - "context" - "encoding/json" "fmt" "net/http" "net/url" @@ -13,11 +11,10 @@ import ( "code.riba.cloud/go/toolbox-interplanetary/fil" "code.riba.cloud/go/toolbox/cmn" - "github.com/dgraph-io/ristretto" - "github.com/jackc/pgx/v4" "github.com/labstack/echo/v4" "github.com/storacha/spade/apitypes" - "github.com/storacha/spade/internal/app" + "github.com/storacha/spade/service" + "github.com/storacha/spade/spid" "golang.org/x/xerrors" ) @@ -44,7 +41,7 @@ func parseUIntQueryParam(c echo.Context, pname string, min, max uint64) (uint64, return val, nil } -func retPayloadAnnotated(c echo.Context, httpCode int, errCode apitypes.APIErrorCode, payload apitypes.ResponsePayload, fmsg string, args ...interface{}) error { +func retPayloadAnnotated(c echo.Context, log service.ErrorLogger, httpCode int, errCode apitypes.APIErrorCode, payload apitypes.ResponsePayload, fmsg string, args ...interface{}) error { ctx, ctxMeta := unpackAuthedEchoContext(c) msg := fmt.Sprintf(fmsg, args...) @@ -93,31 +90,10 @@ func retPayloadAnnotated(c echo.Context, httpCode int, errCode apitypes.APIError c.Request().Header.Set("X-SPADE-FAILURE-SLUG", r.ErrSlug) // set on *request* so that echo can log it if r.RequestID != "" && (msg != "" || errCode != 0) { - jPayload, err := json.Marshal(payload) + err := log.RequestError(ctx, ctxMeta.requestID, errCode, msg, payload) if err != nil { return cmn.WrErr(err) } - if _, err := ctxMeta.Db[app.DbMain].Exec( - ctx, - ` - UPDATE spd.requests SET - request_meta = JSONB_STRIP_NULLS( request_meta || JSONB_BUILD_OBJECT( - 'error', $1::TEXT, - 'error_code', $2::INTEGER, - 'error_slug', $3::TEXT, - 'payload', $4::JSONB - ) ) - WHERE - request_uuid = $5 - `, - msg, - r.ErrCode, - r.ErrSlug, - jPayload, - r.RequestID, - ); err != nil { - return cmn.WrErr(err) - } } } @@ -149,9 +125,10 @@ func curlAuthedForSP(c echo.Context, spID fil.ActorID, path string, sigArgs url. ) } -func retFail(c echo.Context, errCode apitypes.APIErrorCode, fMsg string, args ...interface{}) error { +func retFail(c echo.Context, log service.ErrorLogger, errCode apitypes.APIErrorCode, fMsg string, args ...interface{}) error { return retPayloadAnnotated( c, + log, http.StatusForbidden, // DO NOT use 400: we rewrite that on the nginx level to normalize a class of transport errors errCode, nil, @@ -159,10 +136,11 @@ func retFail(c echo.Context, errCode apitypes.APIErrorCode, fMsg string, args .. ) } -func retAuthFail(c echo.Context, f string, args ...interface{}) error { - c.Response().Header().Set(echo.HeaderWWWAuthenticate, authScheme) +func retAuthFail(c echo.Context, log service.ErrorLogger, f string, args ...interface{}) error { + c.Response().Header().Set(echo.HeaderWWWAuthenticate, spid.Scheme) return retPayloadAnnotated( c, + log, http.StatusUnauthorized, apitypes.ErrUnauthorizedAccess, nil, @@ -171,9 +149,10 @@ func retAuthFail(c echo.Context, f string, args ...interface{}) error { ) } -func retInvalidRoute(c echo.Context) error { +func retInvalidRoute(c echo.Context, log service.ErrorLogger) error { return retFail( c, + log, apitypes.ErrInvalidRequest, "invalid route request: %s %s", c.Request().Method, @@ -181,13 +160,6 @@ func retInvalidRoute(c echo.Context) error { ) } -// using ristretto here because of SetWithTTL() below -var providerEligibleCache, _ = ristretto.NewCache(&ristretto.Config{ - NumCounters: 1e7, BufferItems: 64, - MaxCost: 1024, - Cost: func(interface{}) int64 { return 1 }, -}) - func ineligibleSpMsg(spID fil.ActorID) string { return fmt.Sprintf( ` @@ -201,63 +173,9 @@ Make sure that you: - Have not faulted in the past 48h If the problem persists, or you believe this is a spurious error: please contact the API -administrators in #spade-sp over at the Fil Slack https://filecoin.io/slack -( direct link: https://filecoinproject.slack.com/archives/C0377FJCG1L ) +administrators in #♠-spade-sp-♠ over at the Storacha Discord https://discord.gg/pqa6Dn6RnP. +( direct link: https://discord.com/channels/1247475892435816553/1365086771347587072 ) `, spID, ) } - -func spIneligibleErr(ctx context.Context, spID fil.ActorID) (defIneligibleCode apitypes.APIErrorCode, defErr error) { - _, _, db, gctx := app.UnpackCtx(ctx) - - // do not cache chain-independent factors - var ignoreChainEligibility bool - err := db.QueryRow( - ctx, - ` - SELECT COALESCE( ( provider_meta->'ignore_chain_eligibility' )::BOOL, false ) - FROM spd.providers - WHERE - NOT COALESCE( ( provider_meta->'globally_inactivated' )::BOOL, false ) - AND - provider_id = $1 - `, - spID, - ).Scan(&ignoreChainEligibility) - if err == pgx.ErrNoRows { - return apitypes.ErrStorageProviderSuspended, nil - } else if err != nil { - return 0, cmn.WrErr(err) - } else if ignoreChainEligibility { - return 0, nil - } - - defer func() { - if defErr != nil { - providerEligibleCache.Del(uint64(spID)) - defIneligibleCode = 0 - } else { - providerEligibleCache.SetWithTTL(uint64(spID), defIneligibleCode, 1, time.Minute) - } - }() - - if protoReason, found := providerEligibleCache.Get(uint64(spID)); found { - return protoReason.(apitypes.APIErrorCode), nil - } - - curTipset, err := app.DefaultLookbackTipset(ctx) - if err != nil { - return 0, cmn.WrErr(err) - } - - mbi, err := gctx.LotusAPI.MinerGetBaseInfo(ctx, spID.AsFilAddr(), curTipset.Height(), curTipset.Key()) - if err != nil { - return 0, cmn.WrErr(err) - } - if mbi == nil || !mbi.EligibleForMining { - return apitypes.ErrStorageProviderIneligibleToMine, nil - } - - return 0, nil -}