Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# This workflow will build a golang project
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go

name: Test

on:
push:
branches: ['main']
pull_request:
branches: ['main']

jobs:
test:
name: Run tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3

- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.23'

- name: Test
run: go test -v -failfast ./...
7 changes: 5 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@ require (
github.com/jackc/pgx/v4 v4.18.3
github.com/jsign/go-filsigner v0.4.1
github.com/labstack/echo/v4 v4.11.4
github.com/libp2p/go-libp2p v0.42.0
github.com/libp2p/go-yamux/v4 v4.0.2
github.com/mattn/go-isatty v0.0.20
github.com/multiformats/go-multiaddr v0.16.1
github.com/multiformats/go-multihash v0.2.3
github.com/stretchr/testify v1.10.0
github.com/whyrusleeping/cbor-gen v0.3.1
golang.org/x/sync v0.16.0
golang.org/x/sys v0.35.0
Expand Down Expand Up @@ -70,6 +73,7 @@ require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.6 // indirect
github.com/daaku/go.zipexe v1.0.2 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect
github.com/dchest/blake2b v1.0.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect
Expand Down Expand Up @@ -150,13 +154,11 @@ require (
github.com/lib/pq v1.10.9 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
github.com/libp2p/go-flow-metrics v0.2.0 // indirect
github.com/libp2p/go-libp2p v0.42.0 // indirect
github.com/libp2p/go-libp2p-asn-util v0.4.1 // indirect
github.com/libp2p/go-libp2p-pubsub v0.13.0 // indirect
github.com/libp2p/go-msgio v0.3.0 // indirect
github.com/libp2p/go-netroute v0.2.2 // indirect
github.com/libp2p/go-reuseport v0.4.0 // indirect
github.com/libp2p/go-yamux/v4 v4.0.2 // indirect
github.com/libp2p/go-yamux/v5 v5.0.1 // indirect
github.com/magefile/mage v1.15.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
Expand Down Expand Up @@ -200,6 +202,7 @@ require (
github.com/pion/turn/v4 v4.0.2 // indirect
github.com/pion/webrtc/v4 v4.1.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/polydawn/refmt v0.89.0 // indirect
github.com/prometheus/client_golang v1.22.0 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
Expand Down
7 changes: 7 additions & 0 deletions service/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package service

const (
ListEligibleDefaultSize = 500
ListEligibleMaxSize = 2 << 20
ShowRecentFailuresHours = 24
)
119 changes: 119 additions & 0 deletions service/lotus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package service

import (
"context"
"fmt"
"time"

"code.riba.cloud/go/toolbox-interplanetary/fil"
"code.riba.cloud/go/toolbox/cmn"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
filabi "github.com/filecoin-project/go-state-types/abi"
filbig "github.com/filecoin-project/go-state-types/big"
filbuiltin "github.com/filecoin-project/go-state-types/builtin"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/storacha/spade/internal/app"
)

// AuthorizationLotusClient defines the minimal Filecoin client interface
// required to support SP authorization.
type AuthorizationLotusClient interface {
// ChainGetTipSetByHeight looks back for a tipset at the specified epoch.
ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
// StateAccountKey retrieves the key address for an account at a given tipset.
StateAccountKey(context.Context, address.Address, types.TipSetKey) (address.Address, error)
// StateMinerInfo retrieves miner info at a given tipset.
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error)
}

// LookbackLotusClient defines the minimal Filecoin client interface required to
// support lookback tipset retrieval.
type LookbackLotusClient interface {
// ChainHead returns the current head of the chain.
ChainHead(context.Context) (*types.TipSet, error)
// ChainGetTipSetByHeight looks back for a tipset at the specified epoch.
ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
}

// ReservationLotusClient defines the minimal Filecoin client interface required
// to support reservation eligibility checks and related operations.
type ReservationLotusClient interface {
LookbackLotusClient
// MinerGetBaseInfo retrieves mining base info for a miner at a given tipset.
MinerGetBaseInfo(context.Context, address.Address, abi.ChainEpoch, types.TipSetKey) (*api.MiningBaseInfo, error)
}

// SpadeLotusClient defines the minimal Lotus client interface required to
// support all Spade service operations.
type SpadeLotusClient interface {
AuthorizationLotusClient
ReservationLotusClient
}

var collateralCache, _ = lru.New[filabi.ChainEpoch, filbig.Int](128)

func ProviderCollateralEstimateGiB(ctx context.Context, sourceEpoch filabi.ChainEpoch) (filbig.Int, error) {
if pc, didFind := collateralCache.Get(sourceEpoch); didFind {
return pc, nil
}

collateralGiB, err := app.EpochMinProviderCollateralEstimateGiB(ctx, sourceEpoch)
if err != nil {
return collateralGiB, cmn.WrErr(err)
}

// make it 1.7 times larger, so that fluctuations in the state won't prevent the deal from being proposed/published later
// capped by https://github.com/filecoin-project/lotus/blob/v1.13.2-rc2/markets/storageadapter/provider.go#L267
// and https://github.com/filecoin-project/lotus/blob/v1.13.2-rc2/markets/storageadapter/provider.go#L41
inflatedCollateralGiB := filbig.Div(
filbig.Product(
collateralGiB,
filbig.NewInt(17),
),
filbig.NewInt(10),
)

collateralCache.Add(sourceEpoch, inflatedCollateralGiB)
return inflatedCollateralGiB, nil
}

// GetTipset retrieves the tipset at the specified lookback epoch. It is a
// copy of [fil.GetTipset] adjusted to use the minimal interface
// [LookbackLotusClient].
func GetTipset(ctx context.Context, lapi LookbackLotusClient, lookback uint) (*fil.LotusTS, error) {
latestHead, err := lapi.ChainHead(ctx)
if err != nil {
return nil, fmt.Errorf("failed getting chain head: %w", err)
}

wallUnix := time.Now().Unix()
filUnix := int64(latestHead.Blocks()[0].Timestamp)

if wallUnix < filUnix-3 || // allow few seconds clock-drift tolerance
wallUnix > filUnix+int64(
fil.PropagationDelaySecs+(fil.APIMaxTipsetsBehind*filbuiltin.EpochDurationSeconds),
) {
return nil, fmt.Errorf(
"lotus API out of sync: chainHead reports unixtime %d (height: %d) while walltime is %d (delta: %s)",
filUnix,
latestHead.Height(),
wallUnix,
time.Second*time.Duration(wallUnix-filUnix),
)
}

if lookback == 0 {
return latestHead, nil
}

latestHeight := latestHead.Height()
tipsetAtLookback, err := lapi.ChainGetTipSetByHeight(ctx, latestHeight-filabi.ChainEpoch(lookback), latestHead.Key())
if err != nil {
return nil, fmt.Errorf("determining target tipset %d epochs ago failed: %w", lookback, err)
}

return tipsetAtLookback, nil
}
113 changes: 113 additions & 0 deletions service/pg/authorization.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package pg

import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"time"

"code.riba.cloud/go/toolbox-interplanetary/fil"
"github.com/google/uuid"
"github.com/storacha/spade/apitypes"
"github.com/storacha/spade/service"
"github.com/storacha/spade/spid"
spid_lotus "github.com/storacha/spade/spid/lotus"
)

func (p *PgLotusSpadeService) Authorize(ctx context.Context, req service.Request) (service.Authorization, error) {
challenge, err := spid.Parse(req.Headers.Get("Authorization"))
if err != nil {
return service.Authorization{}, fmt.Errorf("parsing authorization header: %w", err)
}

err = spid_lotus.ResolveAndVerify(ctx, p.lotusAPI, challenge)
if err != nil {
return service.Authorization{}, fmt.Errorf("verifying authorization signature: %w", err)
}

sp := fil.MustParseActorString(challenge.Addr().String())
signedArgs, err := challenge.Args().Values()
if err != nil {
return service.Authorization{}, fmt.Errorf("getting signed args values: %w", err)
}

reqJ, err := json.Marshal(
struct {
Method string
Host string
Path string
Params string
ParamsSigned url.Values
Headers http.Header
}{
Method: req.Method,
Host: req.Host,
Path: req.Path,
Params: req.Params.Encode(),
ParamsSigned: signedArgs,
Headers: req.Headers,
},
)
Comment on lines +36 to +52
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels kinda weird to have this as an anonymous struct, your call.

if err != nil {
return service.Authorization{}, err
}

spDetails := [4]int16{-1, -1, -1, -1}
var requestUUID string
var stateEpoch int64
var spInfo apitypes.SPInfo
var spInfoLastPoll *time.Time
if err := p.db.QueryRow(
ctx,
`
INSERT INTO spd.requests ( provider_id, request_dump )
VALUES ( $1, $2 )
RETURNING
request_uuid,
( SELECT ( metadata->'market_state'->'epoch' )::INTEGER FROM spd.global ),
COALESCE( (
SELECT
ARRAY[
COALESCE( org_id, -1 ),
COALESCE( city_id, -1),
COALESCE( country_id, -1),
COALESCE( continent_id, -1)
]
FROM spd.providers
WHERE provider_id = $1
LIMIT 1
), ARRAY[-1, -1, -1, -1] ),
(
SELECT info
FROM spd.providers_info
WHERE provider_id = $1
),
(
SELECT provider_last_polled
FROM spd.providers_info
WHERE provider_id = $1
)
`,
sp,
reqJ,
).Scan(&requestUUID, &stateEpoch, &spDetails, &spInfo, &spInfoLastPoll); err != nil {
return service.Authorization{}, err
}

reqID, err := uuid.Parse(requestUUID)
if err != nil {
return service.Authorization{}, fmt.Errorf("parsing UUID: %w", err)
}

return service.Authorization{
RequestID: reqID,
StateEpoch: stateEpoch,
SignedArgs: signedArgs,
ProviderID: sp,
ProviderDetails: spDetails,
ProviderInfo: spInfo,
LastPoll: spInfoLastPoll,
}, nil
}
47 changes: 47 additions & 0 deletions service/pg/eligibility.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package pg

import (
"context"
"fmt"

"code.riba.cloud/go/toolbox-interplanetary/fil"
"github.com/georgysavva/scany/pgxscan"
"github.com/storacha/spade/service"
)

func (p *PgLotusSpadeService) EligiblePieces(ctx context.Context, sp fil.ActorID, options ...service.EligiblePiecesOption) ([]service.EligiblePiece, bool, error) {
cfg := service.EligiblePiecesConfig{Limit: service.ListEligibleDefaultSize}
for _, opt := range options {
opt(&cfg)
}

lim := cfg.Limit
tenantID := cfg.TenantID
// how to list: start small, find setting below
useQueryFunc := "pieces_eligible_head"
if lim > service.ListEligibleDefaultSize { // deduce from requested lim
useQueryFunc = "pieces_eligible_full"
}

orderedPieces := make([]service.EligiblePiece, 0, lim+1)
if err := pgxscan.Select(
ctx,
p.db,
&orderedPieces,
fmt.Sprintf("SELECT * FROM spd.%s( $1, $2, $3, $4, $5 )", useQueryFunc),
sp,
lim+1, // ask for one extra, to disambiguate "there is more"
tenantID,
cfg.IncludeSourceless,
false,
); err != nil {
return nil, false, err
}

var more bool
if uint64(len(orderedPieces)) > lim {
orderedPieces = orderedPieces[:lim]
more = true
}
return orderedPieces, more, nil
}
36 changes: 36 additions & 0 deletions service/pg/errorlogger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package pg

import (
"context"
"encoding/json"

"github.com/google/uuid"
"github.com/storacha/spade/apitypes"
)

func (s *PgLotusSpadeService) RequestError(ctx context.Context, requestID uuid.UUID, code apitypes.APIErrorCode, message string, payload any) error {
jPayload, err := json.Marshal(payload)
if err != nil {
return err
}
_, err = s.db.Exec(
ctx,
`
UPDATE spd.requests SET
request_meta = JSONB_STRIP_NULLS( request_meta || JSONB_BUILD_OBJECT(
'error', $1::TEXT,
'error_code', $2::INTEGER,
'error_slug', $3::TEXT,
'payload', $4::JSONB
) )
WHERE
request_uuid = $5
`,
message,
code,
code.String(),
jPayload,
requestID.String(),
)
return err
}
Loading
Loading