diff --git a/api/types/types.go b/api/types/types.go index 1326b29a6..267d88309 100644 --- a/api/types/types.go +++ b/api/types/types.go @@ -1,6 +1,9 @@ package types -import "time" +import ( + "database/sql" + "time" +) type NodeInfo struct { ID int @@ -11,9 +14,9 @@ type NodeInfo struct { LastContact time.Time Unschedulable bool - Name string - StartupTime time.Time - Tasks string - Layers string - Miners string + Name sql.NullString // Can be NULL from harmony_machine_details + StartupTime sql.NullTime // Can be NULL from harmony_machine_details + Tasks sql.NullString // Can be NULL from harmony_machine_details + Layers sql.NullString // Can be NULL from harmony_machine_details + Miners sql.NullString // Can be NULL from harmony_machine_details } diff --git a/build/openrpc/curio.json b/build/openrpc/curio.json index 0d5352000..e9ced4b59 100644 --- a/build/openrpc/curio.json +++ b/build/openrpc/curio.json @@ -423,11 +423,26 @@ "HostPort": "string value", "LastContact": "0001-01-01T00:00:00Z", "Unschedulable": true, - "Name": "string value", - "StartupTime": "0001-01-01T00:00:00Z", - "Tasks": "string value", - "Layers": "string value", - "Miners": "string value" + "Name": { + "String": "string value", + "Valid": true + }, + "StartupTime": { + "Time": "0001-01-01T00:00:00Z", + "Valid": true + }, + "Tasks": { + "String": "string value", + "Valid": true + }, + "Layers": { + "String": "string value", + "Valid": true + }, + "Miners": { + "String": "string value", + "Valid": true + } } ], "additionalProperties": false, @@ -451,24 +466,69 @@ "type": "string" }, "Layers": { - "type": "string" + "additionalProperties": false, + "properties": { + "String": { + "type": "string" + }, + "Valid": { + "type": "boolean" + } + }, + "type": "object" }, "Miners": { - "type": "string" + "additionalProperties": false, + "properties": { + "String": { + "type": "string" + }, + "Valid": { + "type": "boolean" + } + }, + "type": "object" }, "Name": { - "type": "string" + "additionalProperties": false, + "properties": { + "String": { + "type": "string" + }, + "Valid": { + "type": "boolean" + } + }, + "type": "object" }, "RAM": { "title": "number", "type": "number" }, "StartupTime": { - "format": "date-time", - "type": "string" + "additionalProperties": false, + "properties": { + "Time": { + "format": "date-time", + "type": "string" + }, + "Valid": { + "type": "boolean" + } + }, + "type": "object" }, "Tasks": { - "type": "string" + "additionalProperties": false, + "properties": { + "String": { + "type": "string" + }, + "Valid": { + "type": "boolean" + } + }, + "type": "object" }, "Unschedulable": { "type": "boolean" diff --git a/cmd/curio/info.go b/cmd/curio/info.go index 93477e3cb..8466d92a0 100644 --- a/cmd/curio/info.go +++ b/cmd/curio/info.go @@ -26,17 +26,35 @@ var infoCmd = &cli.Command{ } fmt.Printf("Node Info:\n") fmt.Printf("ID: %d\n", info.ID) - fmt.Printf("Name: %s\n", info.Name) + if info.Name.Valid { + fmt.Printf("Name: %s\n", info.Name.String) + } fmt.Printf("CPU: %d\n", info.CPU) fmt.Printf("RAM: %s\n", humanize.Bytes(uint64(info.RAM))) fmt.Printf("GPU: %.2f\n", info.GPU) fmt.Printf("Schedulable: %t\n", !info.Unschedulable) fmt.Printf("HostPort: %s\n", info.HostPort) - fmt.Printf("Tasks: %s\n", info.Tasks) - fmt.Printf("Layers: %s\n", info.Layers) - fmt.Printf("Miners: %s\n", info.Miners) + if info.Tasks.Valid { + fmt.Printf("Tasks: %s\n", info.Tasks.String) + } else { + fmt.Printf("Tasks: None\n") + } + if info.Layers.Valid { + fmt.Printf("Layers: %s\n", info.Layers.String) + } else { + fmt.Printf("Layers: None\n") + } + if info.Miners.Valid { + fmt.Printf("Miners: %s\n", info.Miners.String) + } else { + fmt.Printf("Miners: None\n") + } fmt.Printf("LastContact: %s\n", info.LastContact) - fmt.Printf("StartupTime: %s\n", info.StartupTime) + if info.StartupTime.Valid { + fmt.Printf("StartupTime: %s\n", info.StartupTime.Time) + } else { + fmt.Printf("StartupTime: N/A\n") + } return nil }, } diff --git a/deps/stats/wallet_exporter.go b/deps/stats/wallet_exporter.go index d698769ef..d69cb8737 100644 --- a/deps/stats/wallet_exporter.go +++ b/deps/stats/wallet_exporter.go @@ -140,10 +140,14 @@ func attoToNano(atto types.BigInt) int64 { func StartWalletExporter(ctx context.Context, db *harmonydb.DB, api api.FullNode, spIDs []address.Address) { go func() { + ticker := time.NewTicker(WalletExporterInterval) + defer ticker.Stop() + for { select { case <-ctx.Done(): - case <-time.After(WalletExporterInterval): + return + case <-ticker.C: walletExporterCycle(ctx, db, api, spIDs) } } diff --git a/documentation/en/api.md b/documentation/en/api.md index 68808cadf..524974a08 100644 --- a/documentation/en/api.md +++ b/documentation/en/api.md @@ -143,11 +143,26 @@ Response: "HostPort": "string value", "LastContact": "0001-01-01T00:00:00Z", "Unschedulable": true, - "Name": "string value", - "StartupTime": "0001-01-01T00:00:00Z", - "Tasks": "string value", - "Layers": "string value", - "Miners": "string value" + "Name": { + "String": "string value", + "Valid": true + }, + "StartupTime": { + "Time": "0001-01-01T00:00:00Z", + "Valid": true + }, + "Tasks": { + "String": "string value", + "Valid": true + }, + "Layers": { + "String": "string value", + "Valid": true + }, + "Miners": { + "String": "string value", + "Valid": true + } } ``` diff --git a/harmony/harmonytask/harmonytask.go b/harmony/harmonytask/harmonytask.go index c03aa3e5b..951e0e700 100644 --- a/harmony/harmonytask/harmonytask.go +++ b/harmony/harmonytask/harmonytask.go @@ -288,15 +288,19 @@ func (e *TaskEngine) GracefullyTerminate() { func (e *TaskEngine) poller() { nextWait := POLL_NEXT_DURATION + timer := time.NewTimer(nextWait) + defer timer.Stop() + for { stats.Record(context.Background(), TaskMeasures.PollerIterations.M(1)) select { - case <-time.After(nextWait): // Find work periodically + case <-timer.C: // Find work periodically + nextWait = POLL_DURATION + timer.Reset(nextWait) case <-e.ctx.Done(): ///////////////////// Graceful exit return } - nextWait = POLL_DURATION // Check if the machine is schedulable schedulable, err := e.checkNodeFlags() @@ -310,6 +314,7 @@ func (e *TaskEngine) poller() { accepted := e.pollerTryAllWork(schedulable) if accepted { nextWait = POLL_NEXT_DURATION + timer.Reset(nextWait) } if !schedulable { diff --git a/lib/dealdata/dealdata.go b/lib/dealdata/dealdata.go index 6b3e8ecc1..0627eac95 100644 --- a/lib/dealdata/dealdata.go +++ b/lib/dealdata/dealdata.go @@ -2,6 +2,7 @@ package dealdata import ( "context" + "database/sql" "encoding/json" "io" "net/http" @@ -33,9 +34,9 @@ type dealMetadata struct { PieceCID string `db:"piece_cid"` PieceSize int64 `db:"piece_size"` - DataUrl *string `db:"data_url"` - DataHeaders []byte `db:"data_headers"` - DataRawSize *int64 `db:"data_raw_size"` + DataUrl sql.NullString `db:"data_url"` + DataHeaders []byte `db:"data_headers"` + DataRawSize sql.NullInt64 `db:"data_raw_size"` DataDelOnFinalize bool `db:"data_delete_on_finalize"` } @@ -173,8 +174,8 @@ func getDealMetadata(ctx context.Context, db *harmonydb.DB, sc *ffi.SealCalls, s // make pieceReader if !commDOnly { - if p.DataUrl != nil { - dataUrl := *p.DataUrl + if p.DataUrl.Valid { + dataUrl := p.DataUrl.String goUrl, err := url.Parse(dataUrl) if err != nil { @@ -215,10 +216,10 @@ func getDealMetadata(ctx context.Context, db *harmonydb.DB, sc *ffi.SealCalls, s closers = append(closers, pr) - reader, _ := padreader.New(pr, uint64(*p.DataRawSize)) + reader, _ := padreader.New(pr, uint64(p.DataRawSize.Int64)) pieceReaders = append(pieceReaders, reader) } else { - reader, _ := padreader.New(NewUrlReader(nil, dataUrl, hdrs, *p.DataRawSize), uint64(*p.DataRawSize)) + reader, _ := padreader.New(NewUrlReader(nil, dataUrl, hdrs, p.DataRawSize.Int64), uint64(p.DataRawSize.Int64)) pieceReaders = append(pieceReaders, reader) } diff --git a/lib/ffi/cunative/decode_snap.go b/lib/ffi/cunative/decode_snap.go index e7ad8dc0e..ab34e312d 100644 --- a/lib/ffi/cunative/decode_snap.go +++ b/lib/ffi/cunative/decode_snap.go @@ -241,6 +241,7 @@ func workerSnap(wg *sync.WaitGroup, jobs <-chan jobSnap, results chan<- resultSn pool.Put(j.rbuf) pool.Put(j.kbuf) + pool.Put(rhoInvsBytes) // Return rhoInvsBytes to pool results <- resultSnap{obuf, j.size, j.chunkID} } diff --git a/lib/paths/db_index.go b/lib/paths/db_index.go index 1a4f03701..b3d15f421 100644 --- a/lib/paths/db_index.go +++ b/lib/paths/db_index.go @@ -75,7 +75,8 @@ func (dbi *DBIndex) StorageList(ctx context.Context) (map[storiface.ID][]storifa } // skip sector info for storage paths with no sectors - if !entry.MinerId.Valid { + // All sector_location fields must be valid (they come from LEFT JOIN) + if !entry.MinerId.Valid || !entry.SectorNum.Valid || !entry.SectorFiletype.Valid { continue } @@ -591,8 +592,6 @@ func (dbi *DBIndex) StorageFindSector(ctx context.Context, sector abi.SectorID, func (dbi *DBIndex) findSectorUncached(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType, ssize abi.SectorSize, allowFetch bool) ([]storiface.SectorStorageInfo, error) { - var result []storiface.SectorStorageInfo - allowList := make(map[string]struct{}) storageWithSector := map[string]bool{} @@ -638,11 +637,15 @@ func (dbi *DBIndex) findSectorUncached(ctx context.Context, s abi.SectorID, ft s return nil, xerrors.Errorf("Finding sector storage from DB fails with err: %w", err) } + result := make([]storiface.SectorStorageInfo, 0, len(rows)) + for _, row := range rows { // Parse all urls - var urls, burls []string - for _, u := range splitString(row.Urls) { + splitUrls := splitString(row.Urls) + urls := make([]string, 0, len(splitUrls)) + burls := make([]string, 0, len(splitUrls)) + for _, u := range splitUrls { rl, err := url.Parse(u) if err != nil { return nil, xerrors.Errorf("failed to parse url: %w", err) @@ -762,8 +765,10 @@ func (dbi *DBIndex) findSectorUncached(ctx context.Context, s abi.SectorID, ft s } } - var urls, burls []string - for _, u := range splitString(row.Urls) { + splitUrls := splitString(row.Urls) + urls := make([]string, 0, len(splitUrls)) + burls := make([]string, 0, len(splitUrls)) + for _, u := range splitUrls { rl, err := url.Parse(u) if err != nil { return nil, xerrors.Errorf("failed to parse url: %w", err) @@ -893,7 +898,7 @@ func (dbi *DBIndex) StorageBestAlloc(ctx context.Context, allocate storiface.Sec return nil, xerrors.Errorf("Querying for best storage sectors fails with err %w: ", err) } - var result []storiface.StorageInfo + result := make([]storiface.StorageInfo, 0, len(rows)) for _, row := range rows { // Matching with 0 as a workaround to avoid having minerID @@ -1090,6 +1095,9 @@ func (dbi *DBIndex) StorageLock(ctx context.Context, sector abi.SectorID, read s lockUuid := uuid.New() // retry with exponential backoff and block until lock is acquired + timer := time.NewTimer(time.Duration(waitTime) * time.Second) + defer timer.Stop() + for { locked, err := dbi.lock(ctx, sector, read, write, lockUuid) // if err is not nil and is not because we cannot acquire lock, retry @@ -1105,10 +1113,11 @@ func (dbi *DBIndex) StorageLock(ctx context.Context, sector abi.SectorID, read s } select { - case <-time.After(time.Duration(waitTime) * time.Second): + case <-timer.C: if waitTime < maxWaitTime { waitTime *= 2 } + timer.Reset(time.Duration(waitTime) * time.Second) case <-ctx.Done(): return ctx.Err() } diff --git a/lib/paths/local.go b/lib/paths/local.go index 6c875c29a..11fda882a 100644 --- a/lib/paths/local.go +++ b/lib/paths/local.go @@ -118,9 +118,9 @@ type path struct { // which will make it take into account existing sectors when calculating // available space for new reservations type statExistingSectorForReservation struct { - id abi.SectorID - ft storiface.SectorFileType - overhead int64 + id abi.SectorID // 16 bytes - used with ft in sectorPath (line 137-138) + ft storiface.SectorFileType // Used with id (line 137-138) + overhead int64 // Used separately in calculations } func (p *path) stat(ls LocalStorage, newReserve ...statExistingSectorForReservation) (stat fsutil.FsStat, newResvOnDisk int64, err error) { @@ -549,14 +549,19 @@ func (st *Local) reportHealth(ctx context.Context) { // randomize interval by ~10% interval := (HeartbeatInterval*100_000 + time.Duration(rand.Int63n(10_000))) / 100_000 + timer := time.NewTimer(interval) + defer timer.Stop() + for { select { - case <-time.After(interval): + case <-timer.C: + st.reportStorage(ctx) + // Update interval for next iteration (randomize again) + interval = (HeartbeatInterval*100_000 + time.Duration(rand.Int63n(10_000))) / 100_000 + timer.Reset(interval) case <-ctx.Done(): return } - - st.reportStorage(ctx) } } diff --git a/lib/proofsvc/provictl.go b/lib/proofsvc/provictl.go index 3f471bcda..cdeae8939 100644 --- a/lib/proofsvc/provictl.go +++ b/lib/proofsvc/provictl.go @@ -76,10 +76,12 @@ func retryWithBackoff[T any](ctx context.Context, f func() (T, error)) (T, error lastErr = err log.Warnw("operation failed, retrying", "error", err, "backoff", backoff) + timer := time.NewTimer(backoff) select { case <-ctx.Done(): + timer.Stop() return zero, xerrors.Errorf("context canceled during backoff: %w (last error: %v)", ctx.Err(), lastErr) - case <-time.After(backoff): + case <-timer.C: } // Exponential backoff with a maximum diff --git a/market/indexstore/indexstore.go b/market/indexstore/indexstore.go index 24b6f4f2c..603c4017f 100644 --- a/market/indexstore/indexstore.go +++ b/market/indexstore/indexstore.go @@ -278,10 +278,12 @@ func (i *IndexStore) executeBatchWithRetry(ctx context.Context, batch *gocql.Bat } // Sleep for backoff duration before retrying + timer := time.NewTimer(backoff) select { case <-ctx.Done(): + timer.Stop() return ctx.Err() - case <-time.After(backoff): + case <-timer.C: } // Exponential backoff diff --git a/market/ipni/chunker/serve-chunker.go b/market/ipni/chunker/serve-chunker.go index c938ed9d8..f0cddfcef 100644 --- a/market/ipni/chunker/serve-chunker.go +++ b/market/ipni/chunker/serve-chunker.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "context" + "database/sql" "encoding/hex" "errors" "fmt" @@ -137,13 +138,13 @@ func (p *ServeChunker) getEntry(rctx context.Context, block cid.Cid, speculated PieceCIDv2 string `db:"piece_cid"` FromCar bool `db:"from_car"` - FirstCID *string `db:"first_cid"` - StartOffset *int64 `db:"start_offset"` - NumBlocks int64 `db:"num_blocks"` + FirstCID sql.NullString `db:"first_cid"` + StartOffset sql.NullInt64 `db:"start_offset"` + NumBlocks int64 `db:"num_blocks"` IsPDP bool `db:"is_pdp"` - PrevCID *string `db:"prev_cid"` + PrevCID sql.NullString `db:"prev_cid"` } var ipniChunks []ipniChunk @@ -232,8 +233,8 @@ func (p *ServeChunker) getEntry(rctx context.Context, block cid.Cid, speculated p.noSkipCache.Add(pieceCidv2, time.Now().Add(NoSkipCacheTTL)) var next ipld.Link - if chunk.PrevCID != nil { - prevChunk, err = cid.Parse(*chunk.PrevCID) + if chunk.PrevCID.Valid { + prevChunk, err = cid.Parse(chunk.PrevCID.String) if err != nil { return nil, xerrors.Errorf("parsing previous CID: %w", err) } @@ -245,15 +246,15 @@ func (p *ServeChunker) getEntry(rctx context.Context, block cid.Cid, speculated if chunk.NumBlocks != 1 { return nil, xerrors.Errorf("Expected 1 block for PDP piece announcement, got %d", chunk.NumBlocks) } - if chunk.PrevCID != nil { - return nil, xerrors.Errorf("Expected no previous chunk for PDP piece announcement, got %s", *chunk.PrevCID) + if chunk.PrevCID.Valid { + return nil, xerrors.Errorf("Expected no previous chunk for PDP piece announcement, got %s", chunk.PrevCID.String) } - if chunk.FirstCID == nil { + if !chunk.FirstCID.Valid { return nil, xerrors.Errorf("chunk does not have first CID") } - cb, err := hex.DecodeString(*chunk.FirstCID) + cb, err := hex.DecodeString(chunk.FirstCID.String) if err != nil { return nil, xerrors.Errorf("decoding first CID: %w", err) } @@ -289,11 +290,11 @@ func (p *ServeChunker) getEntry(rctx context.Context, block cid.Cid, speculated } if !chunk.FromCar { - if chunk.FirstCID == nil { + if !chunk.FirstCID.Valid { return nil, xerrors.Errorf("chunk does not have first CID") } - cb, err := hex.DecodeString(*chunk.FirstCID) + cb, err := hex.DecodeString(chunk.FirstCID.String) if err != nil { return nil, xerrors.Errorf("decoding first CID: %w", err) } @@ -303,7 +304,11 @@ func (p *ServeChunker) getEntry(rctx context.Context, block cid.Cid, speculated return p.reconstructChunkFromDB(ctx, block, pieceCidv2, firstHash, next, chunk.NumBlocks, speculated) } - return p.reconstructChunkFromCar(ctx, block, pieceCidv2, *chunk.StartOffset, next, chunk.NumBlocks, speculated) + if !chunk.StartOffset.Valid { + return nil, xerrors.Errorf("chunk does not have start offset") + } + + return p.reconstructChunkFromCar(ctx, block, pieceCidv2, chunk.StartOffset.Int64, next, chunk.NumBlocks, speculated) } // reconstructChunkFromCar reconstructs a chunk from a car file. diff --git a/market/mk12/mk12_utils.go b/market/mk12/mk12_utils.go index 84a7e8f9c..5e8711f03 100644 --- a/market/mk12/mk12_utils.go +++ b/market/mk12/mk12_utils.go @@ -3,6 +3,7 @@ package mk12 import ( "bytes" "context" + "database/sql" "encoding/json" "fmt" @@ -367,7 +368,7 @@ func cidOrNil(c cid.Cid) *cid.Cid { func getSealedDealStatus(ctx context.Context, db *harmonydb.DB, id string, onChain bool) (dealInfo, error) { var dealInfos []struct { Offline bool `db:"offline"` - Error *string `db:"error"` + Error sql.NullString `db:"error"` Proposal json.RawMessage `db:"proposal"` SignedProposalCID string `db:"signed_proposal_cid"` Label []byte `db:"label"` @@ -414,13 +415,13 @@ func getSealedDealStatus(ctx context.Context, db *harmonydb.DB, id string, onCha return dealInfo{}, xerrors.Errorf("failed to parse signed proposal CID: %w", err) } - if di.Error == nil { - di.Error = new(string) + if !di.Error.Valid { + di.Error = sql.NullString{} } ret := dealInfo{ Offline: di.Offline, - Error: *di.Error, + Error: di.Error.String, Proposal: prop, SignedProposalCID: spc, ChainDealID: abi.DealID(0), diff --git a/market/storageingest/deal_ingest_seal.go b/market/storageingest/deal_ingest_seal.go index d91f08ec5..67b5ef034 100644 --- a/market/storageingest/deal_ingest_seal.go +++ b/market/storageingest/deal_ingest_seal.go @@ -2,6 +2,7 @@ package storageingest import ( "context" + "database/sql" "encoding/json" "fmt" "math" @@ -443,7 +444,7 @@ type pieceDetails struct { StartEpoch abi.ChainEpoch `db:"deal_start_epoch"` EndEpoch abi.ChainEpoch `db:"deal_end_epoch"` Index int64 `db:"piece_index"` - CreatedAt *time.Time `db:"created_at"` + CreatedAt sql.NullTime `db:"created_at"` } func (p *PieceIngester) getOpenSectors(tx *harmonydb.Tx, mid int64) ([]*openSector, error) { @@ -484,8 +485,9 @@ func (p *PieceIngester) getOpenSectors(tx *harmonydb.Tx, mid int64) ([]*openSect } getOpenedAt := func(piece pieceDetails, cur *time.Time) *time.Time { - if piece.CreatedAt.Before(*cur) { - return piece.CreatedAt + if piece.CreatedAt.Valid && (cur == nil || piece.CreatedAt.Time.Before(*cur)) { + t := piece.CreatedAt.Time + return &t } return cur } @@ -501,7 +503,7 @@ func (p *PieceIngester) getOpenSectors(tx *harmonydb.Tx, mid int64) ([]*openSect currentSize: pi.Size, earliestStartEpoch: getStartEpoch(pi.StartEpoch, 0), index: pi.Index, - openedAt: pi.CreatedAt, + openedAt: &pi.CreatedAt.Time, latestEndEpoch: getEndEpoch(pi.EndEpoch, 0), pieces: []pieceInfo{ { diff --git a/market/storageingest/deal_ingest_snap.go b/market/storageingest/deal_ingest_snap.go index 3e20a4c68..059c9ddb8 100644 --- a/market/storageingest/deal_ingest_snap.go +++ b/market/storageingest/deal_ingest_snap.go @@ -600,8 +600,8 @@ func (p *PieceIngesterSnap) getOpenSectors(tx *harmonydb.Tx, mid int64) ([]*open } getOpenedAt := func(piece pieceDetails, cur *time.Time) *time.Time { - if piece.CreatedAt.Before(*cur) { - return piece.CreatedAt + if piece.CreatedAt.Time.Before(*cur) { + return &piece.CreatedAt.Time } return cur } @@ -617,7 +617,7 @@ func (p *PieceIngesterSnap) getOpenSectors(tx *harmonydb.Tx, mid int64) ([]*open currentSize: pi.Size, earliestStartEpoch: getStartEpoch(pi.StartEpoch, 0), index: pi.Index, - openedAt: pi.CreatedAt, + openedAt: &pi.CreatedAt.Time, latestEndEpoch: getEndEpoch(pi.EndEpoch, 0), pieces: []pieceInfo{ { diff --git a/tasks/gc/storage_endpoint_gc.go b/tasks/gc/storage_endpoint_gc.go index 600631f7a..e3964f02b 100644 --- a/tasks/gc/storage_endpoint_gc.go +++ b/tasks/gc/storage_endpoint_gc.go @@ -2,6 +2,7 @@ package gc import ( "context" + "database/sql" "strings" "sync" "time" @@ -56,7 +57,7 @@ func (s *StorageEndpointGC) Do(taskID harmonytask.TaskID, stillOwned func() bool var pathRefs []struct { StorageID storiface.ID `db:"storage_id"` Urls string `db:"urls"` - LastHeartbeat *time.Time `db:"last_heartbeat"` + LastHeartbeat sql.NullTime `db:"last_heartbeat"` } err = s.db.Select(ctx, &pathRefs, `SELECT storage_id, urls, last_heartbeat FROM storage_path`) diff --git a/tasks/gc/storage_gc_sweep.go b/tasks/gc/storage_gc_sweep.go index da9864370..1fb0a655f 100644 --- a/tasks/gc/storage_gc_sweep.go +++ b/tasks/gc/storage_gc_sweep.go @@ -2,6 +2,7 @@ package gc import ( "context" + "database/sql" "time" "github.com/samber/lo" @@ -47,8 +48,8 @@ func (s *StorageGCSweep) Do(taskID harmonytask.TaskID, stillOwned func() bool) ( FileType int64 `db:"sector_filetype"` StorageID string `db:"storage_id"` - CreatedAt time.Time `db:"created_at"` - ApprovedAt *time.Time `db:"approved_at"` + CreatedAt time.Time `db:"created_at"` + ApprovedAt sql.NullTime `db:"approved_at"` } err := s.db.Select(ctx, &marks, `SELECT sp_id, sector_num, sector_filetype, storage_id, created_at, approved_at FROM storage_removal_marks WHERE approved = true ORDER BY created_at LIMIT 1`) @@ -60,7 +61,7 @@ func (s *StorageGCSweep) Do(taskID harmonytask.TaskID, stillOwned func() bool) ( } mark := marks[0] - if mark.ApprovedAt == nil { + if !mark.ApprovedAt.Valid { return false, xerrors.Errorf("approved file approved_at was nil") } diff --git a/tasks/indexing/task_check_indexes.go b/tasks/indexing/task_check_indexes.go index 1d720f4a3..617802149 100644 --- a/tasks/indexing/task_check_indexes.go +++ b/tasks/indexing/task_check_indexes.go @@ -384,11 +384,11 @@ func (c *CheckIndexesTask) checkIPNI(ctx context.Context, taskID harmonytask.Tas SpID int64 `db:"sp_id"` PieceSize abi.PaddedPieceSize `db:"piece_size"` - UUID string `db:"uuid"` - Offline bool `db:"offline"` - URL *string `db:"url"` - Headers []byte `db:"url_headers"` - CreatedAt time.Time `db:"created_at"` + UUID string `db:"uuid"` + Offline bool `db:"offline"` + URL sql.NullString `db:"url"` + Headers []byte `db:"url_headers"` + CreatedAt time.Time `db:"created_at"` } err = c.db.Select(ctx, &toCheck, `SELECT DISTINCT piece_cid, sp_id, piece_size, uuid, offline, url, url_headers, created_at diff --git a/tasks/indexing/task_indexing.go b/tasks/indexing/task_indexing.go index 75906c11f..6baf5345d 100644 --- a/tasks/indexing/task_indexing.go +++ b/tasks/indexing/task_indexing.go @@ -71,23 +71,27 @@ func NewIndexingTask(db *harmonydb.DB, sc *ffi.SealCalls, indexStore *indexstore } type itask struct { - UUID string `db:"uuid"` - SpID int64 `db:"sp_id"` - Sector abi.SectorNumber `db:"sector"` - Proof abi.RegisteredSealProof `db:"reg_seal_proof"` - PieceCid string `db:"piece_cid"` - Size abi.PaddedPieceSize `db:"piece_size"` - Offset int64 `db:"sector_offset"` - RawSize int64 `db:"raw_size"` - Url sql.NullString `db:"url"` - ShouldIndex bool `db:"should_index"` - IndexingCreatedAt time.Time `db:"indexing_created_at"` - Announce bool `db:"announce"` - ChainDealId abi.DealID `db:"chain_deal_id"` - IsDDO bool `db:"is_ddo"` - Mk20 bool `db:"mk20"` - IsRM bool `db:"is_rm"` - PieceRef int64 + // Cache line 1 (bytes 0-64): Hot path - piece identification, checked early + UUID string `db:"uuid"` // 16 bytes (0-16) - checked early (line 169, 226, 582) + PieceCid string `db:"piece_cid"` // 16 bytes (16-32) - checked early (line 161, 226, 231, 236, 582) + SpID int64 `db:"sp_id"` // 8 bytes (32-40) - used with Sector (line 226, 250-256, 582) + Sector abi.SectorNumber `db:"sector"` // 8 bytes (40-48) - used with SpID (line 226, 250-256, 582) + Size abi.PaddedPieceSize `db:"piece_size"` // 8 bytes (48-56) - used with PieceCid (line 161, 236, 256, 582) + // Cache line 2 (bytes 64-128): Sector operations and deal processing + RawSize sql.NullInt64 `db:"raw_size"` // 16 bytes (56-72, overlaps) - used with Size (line 236, 582) + Proof abi.RegisteredSealProof `db:"reg_seal_proof"` // 8 bytes (72-80) - used with SpID/Sector (line 255, 582) + Offset int64 `db:"sector_offset"` // 8 bytes (80-88) - used with Size/RawSize (line 256, 582) + ChainDealId abi.DealID `db:"chain_deal_id"` // 8 bytes (88-96) - used in deal processing (line 582) + PieceRef int64 // 8 bytes (96-104) - used with Mk20 (line 217, 582) + Url sql.NullString `db:"url"` // 24 bytes (104-128) - used conditionally (line 199-217) + // Cache line 3 (bytes 128+): Less frequently accessed + IndexingCreatedAt time.Time `db:"indexing_created_at"` // 24 bytes (128-152) - used for ordering + // Bools: frequently accessed first, rare ones at end + Mk20 bool `db:"mk20"` // used early and frequently (line 169, 243, 286, 580, 596, 616) + ShouldIndex bool `db:"should_index"` // used early (line 221) + IsDDO bool `db:"is_ddo"` // used with Mk20 in deal processing (line 582) + Announce bool `db:"announce"` // used less frequently + IsRM bool `db:"is_rm"` // used less frequently } func (i *IndexingTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { @@ -231,7 +235,13 @@ func (i *IndexingTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (do return false, xerrors.Errorf("parsing piece CID: %w", err) } - pc2, err := commcid.PieceCidV2FromV1(pieceCid, uint64(task.RawSize)) + // Validate raw_size is present (required for PieceCID v2 calculation) + if !task.RawSize.Valid { + return false, xerrors.Errorf("raw_size is required but NULL for piece %s (uuid: %s)", task.PieceCid, task.UUID) + } + + pc2, err := commcid.PieceCidV2FromV1(pieceCid, uint64(task.RawSize.Int64)) + if err != nil { return false, xerrors.Errorf("getting piece commP: %w", err) } @@ -575,15 +585,18 @@ func IndexAggregate(pieceCid cid.Cid, // recordCompletion add the piece metadata and piece deal to the DB and // records the completion of an indexing task in the database func (i *IndexingTask) recordCompletion(ctx context.Context, task itask, taskID harmonytask.TaskID, indexed bool) error { + // Extract raw_size value (should be valid at this point since we validated earlier) + rawSize := task.RawSize.Int64 + if task.Mk20 { _, err := i.db.Exec(ctx, `SELECT process_piece_deal($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)`, - task.UUID, task.PieceCid, !task.IsDDO, task.SpID, task.Sector, task.Offset, task.Size, task.RawSize, indexed, task.PieceRef, false, task.ChainDealId) + task.UUID, task.PieceCid, !task.IsDDO, task.SpID, task.Sector, task.Offset, task.Size, rawSize, indexed, task.PieceRef, false, task.ChainDealId) if err != nil { return xerrors.Errorf("failed to update piece metadata and piece deal for deal %s: %w", task.UUID, err) } } else { _, err := i.db.Exec(ctx, `SELECT process_piece_deal($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)`, - task.UUID, task.PieceCid, !task.IsDDO, task.SpID, task.Sector, task.Offset, task.Size, task.RawSize, indexed, nil, false, task.ChainDealId) + task.UUID, task.PieceCid, !task.IsDDO, task.SpID, task.Sector, task.Offset, task.Size, rawSize, indexed, nil, false, task.ChainDealId) if err != nil { return xerrors.Errorf("failed to update piece metadata and piece deal for deal %s: %w", task.UUID, err) } @@ -647,14 +660,14 @@ func (i *IndexingTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.T Indexing bool `db:"indexing"` } - var tasks []*task + var tasks []task indIDs := make([]int64, len(ids)) for x, id := range ids { indIDs[x] = int64(id) } - var mk20tasks []*task + var mk20tasks []task if storiface.FTPiece != 32 { panic("storiface.FTPiece != 32") } @@ -664,13 +677,13 @@ func (i *IndexingTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.T return nil, xerrors.Errorf("getting mk20 urls: %w", err) } - for _, t := range mk20tasks { + for idx := range mk20tasks { - if !t.Indexing { + if !mk20tasks[idx].Indexing { continue } - goUrl, err := url.Parse(t.Url) + goUrl, err := url.Parse(mk20tasks[idx].Url) if err != nil { return nil, xerrors.Errorf("parsing data URL: %w", err) } @@ -699,7 +712,7 @@ func (i *IndexingTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.T return nil, xerrors.Errorf("failed to get storage location from DB: %w", err) } - t.StorageID = sLocation + mk20tasks[idx].StorageID = sLocation } } @@ -708,7 +721,7 @@ func (i *IndexingTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.T panic("storiface.FTUnsealed != 1") } - var mk12tasks []*task + var mk12tasks []task err = i.db.Select(ctx, &mk12tasks, `SELECT dp.indexing_task_id, dp.should_index AS indexing, dp.sp_id, dp.sector, l.storage_id FROM market_mk12_deal_pipeline dp @@ -730,12 +743,12 @@ func (i *IndexingTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.T localStorageMap[string(l.ID)] = true } - for _, t := range tasks { - if !t.Indexing { - return &t.TaskID, nil + for idx := range tasks { + if !tasks[idx].Indexing { + return &tasks[idx].TaskID, nil } - if found, ok := localStorageMap[t.StorageID]; ok && found { - return &t.TaskID, nil + if found, ok := localStorageMap[tasks[idx].StorageID]; ok && found { + return &tasks[idx].TaskID, nil } } diff --git a/tasks/indexing/task_pdp_indexing.go b/tasks/indexing/task_pdp_indexing.go index ee199504a..69505d4a8 100644 --- a/tasks/indexing/task_pdp_indexing.go +++ b/tasks/indexing/task_pdp_indexing.go @@ -259,7 +259,7 @@ func (P *PDPIndexingTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytas indIDs[x] = int64(id) } - var tasks []*task + var tasks []task if storiface.FTPiece != 32 { panic("storiface.FTPiece != 32") } @@ -269,9 +269,9 @@ func (P *PDPIndexingTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytas return nil, xerrors.Errorf("getting PDP indexing details: %w", err) } - for _, t := range tasks { + for idx := range tasks { - if !t.Indexing { + if !tasks[idx].Indexing { continue } @@ -284,12 +284,12 @@ func (P *PDPIndexingTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytas AND sl.miner_id = 0 AND sl.sector_filetype = 32 WHERE ppr.ref_id = $1 - `, t.PieceRef).Scan(&sLocation) + `, tasks[idx].PieceRef).Scan(&sLocation) if err != nil { return nil, xerrors.Errorf("getting storage_id: %w", err) } - t.StorageID = sLocation + tasks[idx].StorageID = sLocation } ls, err := P.sc.LocalStorage(ctx) @@ -302,12 +302,12 @@ func (P *PDPIndexingTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytas localStorageMap[string(l.ID)] = true } - for _, t := range tasks { - if !t.Indexing { - return &t.TaskID, nil + for idx := range tasks { + if !tasks[idx].Indexing { + return &tasks[idx].TaskID, nil } - if found, ok := localStorageMap[t.StorageID]; ok && found { - return &t.TaskID, nil + if found, ok := localStorageMap[tasks[idx].StorageID]; ok && found { + return &tasks[idx].TaskID, nil } } diff --git a/tasks/message/sender_eth.go b/tasks/message/sender_eth.go index 8a68562a9..2575c7ebd 100644 --- a/tasks/message/sender_eth.go +++ b/tasks/message/sender_eth.go @@ -2,6 +2,7 @@ package message import ( "context" + "database/sql" "fmt" "math/big" "time" @@ -42,14 +43,14 @@ func (s *SendTaskETH) Do(taskID harmonytask.TaskID, stillOwned func() bool) (don // Get transaction from the database var dbTx struct { - FromAddress string `db:"from_address"` - ToAddress string `db:"to_address"` - UnsignedTx []byte `db:"unsigned_tx"` - UnsignedHash string `db:"unsigned_hash"` - Nonce *uint64 `db:"nonce"` - SignedTx []byte `db:"signed_tx"` - SendSuccess *bool `db:"send_success"` - SendError *string `db:"send_error"` + FromAddress string `db:"from_address"` + ToAddress string `db:"to_address"` + UnsignedTx []byte `db:"unsigned_tx"` + UnsignedHash string `db:"unsigned_hash"` + Nonce sql.NullInt64 `db:"nonce"` + SignedTx []byte `db:"signed_tx"` + SendSuccess sql.NullBool `db:"send_success"` + SendError sql.NullString `db:"send_error"` } err = s.db.QueryRow(ctx, @@ -112,7 +113,7 @@ func (s *SendTaskETH) Do(taskID harmonytask.TaskID, stillOwned func() bool) (don var signedTx *types.Transaction - if dbTx.Nonce == nil { + if !dbTx.Nonce.Valid { // Get the latest nonce pendingNonce, err := s.client.PendingNonceAt(ctx, fromAddress) if err != nil { @@ -367,9 +368,9 @@ func (s *SenderETH) Send(ctx context.Context, fromAddress common.Address, tx *ty for { var dbTx struct { - SignedHash *string `db:"signed_hash"` - SendSuccess *bool `db:"send_success"` - SendError *string `db:"send_error"` + SignedHash sql.NullString `db:"signed_hash"` + SendSuccess sql.NullBool `db:"send_success"` + SendError sql.NullString `db:"send_error"` } err := s.db.QueryRow(ctx, @@ -379,7 +380,7 @@ func (s *SenderETH) Send(ctx context.Context, fromAddress common.Address, tx *ty return common.Hash{}, xerrors.Errorf("getting send status for task: %w", err) } - if dbTx.SendSuccess == nil { + if !dbTx.SendSuccess.Valid { time.Sleep(pollInterval) pollLoops++ pollInterval *= time.Duration(pollIntervalMul) @@ -389,14 +390,14 @@ func (s *SenderETH) Send(ctx context.Context, fromAddress common.Address, tx *ty continue } - if dbTx.SignedHash == nil || dbTx.SendError == nil { + if !dbTx.SignedHash.Valid || !dbTx.SendError.Valid { return common.Hash{}, xerrors.Errorf("unexpected null values in send status") } - if !*dbTx.SendSuccess { - sendErr = xerrors.Errorf("send error: %s", *dbTx.SendError) + if !dbTx.SendSuccess.Bool { + sendErr = xerrors.Errorf("send error: %s", dbTx.SendError.String) } else { - signedHash = common.HexToHash(*dbTx.SignedHash) + signedHash = common.HexToHash(dbTx.SignedHash.String) } break diff --git a/tasks/pdp/notify_task.go b/tasks/pdp/notify_task.go index 94f0be92a..f72317a87 100644 --- a/tasks/pdp/notify_task.go +++ b/tasks/pdp/notify_task.go @@ -3,6 +3,7 @@ package pdp import ( "bytes" "context" + "database/sql" "encoding/json" "fmt" "net/http" @@ -33,13 +34,13 @@ func (t *PDPNotifyTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (d // Fetch the pdp_piece_uploads entry associated with the taskID var upload struct { - ID string `db:"id" json:"id"` - Service string `db:"service" json:"service"` - PieceCID *string `db:"piece_cid" json:"piece_cid"` - NotifyURL string `db:"notify_url" json:"notify_url"` - PieceRef int64 `db:"piece_ref" json:"piece_ref"` - CheckHashCodec string `db:"check_hash_codec" json:"check_hash_codec"` - CheckHash []byte `db:"check_hash" json:"check_hash"` + ID string `db:"id" json:"id"` + Service string `db:"service" json:"service"` + PieceCID sql.NullString `db:"piece_cid" json:"piece_cid"` + NotifyURL string `db:"notify_url" json:"notify_url"` + PieceRef int64 `db:"piece_ref" json:"piece_ref"` + CheckHashCodec string `db:"check_hash_codec" json:"check_hash_codec"` + CheckHash []byte `db:"check_hash" json:"check_hash"` } err = t.db.QueryRow(ctx, ` SELECT id, service, piece_cid, notify_url, piece_ref, check_hash_codec, check_hash diff --git a/tasks/pdp/task_commp.go b/tasks/pdp/task_commp.go index aa2c244c3..b64e456e1 100644 --- a/tasks/pdp/task_commp.go +++ b/tasks/pdp/task_commp.go @@ -2,6 +2,7 @@ package pdp import ( "context" + "database/sql" "errors" "io" "net/url" @@ -156,7 +157,7 @@ func (c *PDPCommpTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.T var tasks []struct { TaskID harmonytask.TaskID `db:"commp_task_id"` StorageID string `db:"storage_id"` - Url *string `db:"url"` + Url sql.NullString `db:"url"` } indIDs := make([]int64, len(ids)) @@ -192,8 +193,8 @@ func (c *PDPCommpTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.T } for _, task := range tasks { - if task.Url != nil { - goUrl, err := url.Parse(*task.Url) + if task.Url.Valid { + goUrl, err := url.Parse(task.Url.String) if err != nil { return false, xerrors.Errorf("parsing data URL: %w", err) } diff --git a/tasks/proofshare/task_client_poll.go b/tasks/proofshare/task_client_poll.go index e719b2f34..bcd22d94d 100644 --- a/tasks/proofshare/task_client_poll.go +++ b/tasks/proofshare/task_client_poll.go @@ -2,6 +2,7 @@ package proofshare import ( "context" + "database/sql" "errors" "math/rand" "time" @@ -55,21 +56,22 @@ func (s *SectorInfo) SectorID() abi.SectorID { // ClientRequest holds the client request information type ClientRequest struct { - SpID int64 `db:"sp_id"` - SectorNumber int64 `db:"sector_num"` - - RequestCID *string `db:"request_cid"` - RequestUploaded bool `db:"request_uploaded"` - RequestPartitionCost int64 `db:"request_partition_cost"` - RequestType string `db:"request_type"` - - PaymentWallet *int64 `db:"payment_wallet"` - PaymentNonce *int64 `db:"payment_nonce"` - - RequestSent bool `db:"request_sent"` - ResponseData []byte `db:"response_data"` - - Done bool `db:"done"` + // Cache line 1 (0-64 bytes): Hot path - sector identification and early checks + SpID int64 `db:"sp_id"` // 8 bytes - used with SectorNumber (line 140, 265) + SectorNumber int64 `db:"sector_num"` // 8 bytes - used with SpID (line 140, 265) + RequestPartitionCost int64 `db:"request_partition_cost"` // 8 bytes - used together + // Cache line 2 (64+ bytes): sql.Null* types (16 bytes each) + RequestCID sql.NullString `db:"request_cid"` // 16 bytes - used with RequestUploaded + PaymentWallet sql.NullInt64 `db:"payment_wallet"` // 16 bytes - used with PaymentNonce + PaymentNonce sql.NullInt64 `db:"payment_nonce"` // 16 bytes - used with PaymentWallet + // Strings (16 bytes) + RequestType string `db:"request_type"` // 16 bytes - used less frequently + // Slices (24 bytes) + ResponseData []byte `db:"response_data"` // 24 bytes - used less frequently + // Cache line 3 (128+ bytes): Bools grouped together for early checks + RequestUploaded bool `db:"request_uploaded"` // Used in early check + RequestSent bool `db:"request_sent"` // Used in early check + Done bool `db:"done"` // Used in early check } type TaskClientPoll struct { @@ -154,11 +156,14 @@ func (t *TaskClientPoll) Do(taskID harmonytask.TaskID, stillOwned func() bool) ( const pollInterval = 10 * time.Second defer ownedCancel() + ticker := time.NewTicker(pollInterval) + defer ticker.Stop() + for { select { case <-ctx.Done(): return - case <-time.After(pollInterval): + case <-ticker.C: if !stillOwned() { // close the owned context return @@ -236,7 +241,10 @@ func NewTaskClientPoll(db *harmonydb.DB, api ClientServiceAPI) *TaskClientPoll { // pollForProof polls for the proof status func pollForProof(ctx context.Context, db *harmonydb.DB, taskID harmonytask.TaskID, clientRequest *ClientRequest) (bool, []byte, error) { // Parse the request CID - requestCid, err := cid.Parse(*clientRequest.RequestCID) + if !clientRequest.RequestCID.Valid { + return false, nil, xerrors.Errorf("request CID is null") + } + requestCid, err := cid.Parse(clientRequest.RequestCID.String) if err != nil { return false, nil, xerrors.Errorf("failed to parse request CID: %w", err) } diff --git a/tasks/proofshare/task_client_upload_porep.go b/tasks/proofshare/task_client_upload_porep.go index 086c06553..0e81665e8 100644 --- a/tasks/proofshare/task_client_upload_porep.go +++ b/tasks/proofshare/task_client_upload_porep.go @@ -3,6 +3,7 @@ package proofshare import ( "bytes" "context" + "database/sql" "encoding/json" "time" @@ -57,9 +58,9 @@ func (t *TaskClientUpload) adderPorep(add harmonytask.AddTaskFunc) { // claim [sectors] pipeline entries var sectors []struct { - SpID int64 `db:"sp_id"` - SectorNumber int64 `db:"sector_number"` - TaskIDPorep *int64 `db:"task_id_porep"` + SpID int64 `db:"sp_id"` + SectorNumber int64 `db:"sector_number"` + TaskIDPorep sql.NullInt64 `db:"task_id_porep"` } cutoffTime := time.Now().Add(-time.Duration(minPendingSeconds) * time.Second) @@ -91,11 +92,11 @@ func (t *TaskClientUpload) adderPorep(add harmonytask.AddTaskFunc) { return false, xerrors.Errorf("failed to update sector: %w", err) } - if sectors[0].TaskIDPorep != nil { - log.Infow("TaskClientUpload.adderPorep() deleting old task", "oldTaskID", *sectors[0].TaskIDPorep, "newTaskID", taskID) - _, err := tx.Exec(`DELETE FROM harmony_task WHERE id = $1`, *sectors[0].TaskIDPorep) + if sectors[0].TaskIDPorep.Valid { + log.Infow("TaskClientUpload.adderPorep() deleting old task", "oldTaskID", sectors[0].TaskIDPorep.Int64, "newTaskID", taskID) + _, err := tx.Exec(`DELETE FROM harmony_task WHERE id = $1`, sectors[0].TaskIDPorep.Int64) if err != nil { - log.Errorw("TaskClientUpload.adderPorep() failed to delete old task", "error", err, "oldTaskID", *sectors[0].TaskIDPorep) + log.Errorw("TaskClientUpload.adderPorep() failed to delete old task", "error", err, "oldTaskID", sectors[0].TaskIDPorep.Int64) return false, xerrors.Errorf("deleting old task: %w", err) } } diff --git a/tasks/proofshare/task_client_upload_snap.go b/tasks/proofshare/task_client_upload_snap.go index f9b43ccdb..860bdf7dc 100644 --- a/tasks/proofshare/task_client_upload_snap.go +++ b/tasks/proofshare/task_client_upload_snap.go @@ -2,6 +2,7 @@ package proofshare import ( "context" + "database/sql" "encoding/json" "time" @@ -73,9 +74,9 @@ func (t *TaskClientUpload) adderSnap(add harmonytask.AddTaskFunc) { // claim [sectors] pipeline entries var sectors []struct { - SpID int64 `db:"sp_id"` - SectorNumber int64 `db:"sector_number"` - TaskIDProve *int64 `db:"task_id_prove"` + SpID int64 `db:"sp_id"` + SectorNumber int64 `db:"sector_number"` + TaskIDProve sql.NullInt64 `db:"task_id_prove"` } cutoffTime := time.Now().Add(-time.Duration(minPendingSeconds) * time.Second) @@ -107,11 +108,11 @@ func (t *TaskClientUpload) adderSnap(add harmonytask.AddTaskFunc) { return false, xerrors.Errorf("failed to update sector: %w", err) } - if sectors[0].TaskIDProve != nil { - log.Infow("TaskClientUpload.adderSnap() deleting old task", "oldTaskID", *sectors[0].TaskIDProve, "newTaskID", taskID) - _, err := tx.Exec(`DELETE FROM harmony_task WHERE id = $1`, *sectors[0].TaskIDProve) + if sectors[0].TaskIDProve.Valid { + log.Infow("TaskClientUpload.adderSnap() deleting old task", "oldTaskID", sectors[0].TaskIDProve.Int64, "newTaskID", taskID) + _, err := tx.Exec(`DELETE FROM harmony_task WHERE id = $1`, sectors[0].TaskIDProve.Int64) if err != nil { - log.Errorw("TaskClientUpload.adderSnap() failed to delete old task", "error", err, "oldTaskID", *sectors[0].TaskIDProve) + log.Errorw("TaskClientUpload.adderSnap() failed to delete old task", "error", err, "oldTaskID", sectors[0].TaskIDProve.Int64) return false, xerrors.Errorf("deleting old task: %w", err) } } diff --git a/tasks/seal/poller.go b/tasks/seal/poller.go index c2e00ca63..3bbcce1d7 100644 --- a/tasks/seal/poller.go +++ b/tasks/seal/poller.go @@ -123,56 +123,45 @@ NOTE: TaskIDs are ONLY set while the tasks are executing or waiting to execute. */ type pollTask struct { - SpID int64 `db:"sp_id"` - SectorNumber int64 `db:"sector_number"` - RegisteredSealProof abi.RegisteredSealProof `db:"reg_seal_proof"` - - TicketEpoch *int64 `db:"ticket_epoch"` - - TaskSDR *int64 `db:"task_id_sdr"` - AfterSDR bool `db:"after_sdr"` - - TaskTreeD *int64 `db:"task_id_tree_d"` - AfterTreeD bool `db:"after_tree_d"` - - TaskTreeC *int64 `db:"task_id_tree_c"` - AfterTreeC bool `db:"after_tree_c"` - - TaskTreeR *int64 `db:"task_id_tree_r"` - AfterTreeR bool `db:"after_tree_r"` - - TaskSynth *int64 `db:"task_id_synth"` - AfterSynth bool `db:"after_synth"` - - PreCommitReadyAt *time.Time `db:"precommit_ready_at"` - - TaskPrecommitMsg *int64 `db:"task_id_precommit_msg"` - AfterPrecommitMsg bool `db:"after_precommit_msg"` - - AfterPrecommitMsgSuccess bool `db:"after_precommit_msg_success"` - SeedEpoch *int64 `db:"seed_epoch"` - - TaskPoRep *int64 `db:"task_id_porep"` - PoRepProof []byte `db:"porep_proof"` - AfterPoRep bool `db:"after_porep"` - - TaskFinalize *int64 `db:"task_id_finalize"` - AfterFinalize bool `db:"after_finalize"` - - TaskMoveStorage *int64 `db:"task_id_move_storage"` - AfterMoveStorage bool `db:"after_move_storage"` - - CommitReadyAt *time.Time `db:"commit_ready_at"` - - TaskCommitMsg *int64 `db:"task_id_commit_msg"` - AfterCommitMsg bool `db:"after_commit_msg"` - - AfterCommitMsgSuccess bool `db:"after_commit_msg_success"` - - Failed bool `db:"failed"` - FailedReason string `db:"failed_reason"` - - StartEpoch sql.NullInt64 `db:"start_epoch"` + // Cache line 1 (bytes 0-64): Hot path - identification and early checks + SpID int64 `db:"sp_id"` // 8 bytes (0-8) - used in all UPDATE statements + SectorNumber int64 `db:"sector_number"` // 8 bytes (8-16) - used in all UPDATE statements + RegisteredSealProof abi.RegisteredSealProof `db:"reg_seal_proof"` // 8 bytes (16-24) + TaskSDR sql.NullInt64 `db:"task_id_sdr"` // 16 bytes (25-41, some padding) + TaskTreeD sql.NullInt64 `db:"task_id_tree_d"` // 16 bytes (crosses into cache line 2) + Failed bool `db:"failed"` // 1 byte (24-25) - checked first in line 215 filter + AfterSDR bool `db:"after_sdr"` // 1 byte - checked with TaskSDR + AfterTreeD bool `db:"after_tree_d"` // 1 byte + // Cache line 2 (bytes 64-128): Tree stage task IDs (checked in sequence) + TaskTreeC sql.NullInt64 `db:"task_id_tree_c"` // 16 bytes + TaskTreeR sql.NullInt64 `db:"task_id_tree_r"` // 16 bytes + TaskSynth sql.NullInt64 `db:"task_id_synth"` // 16 bytes + TicketEpoch sql.NullInt64 `db:"ticket_epoch"` // 16 bytes + AfterTreeC bool `db:"after_tree_c"` // 1 byte + AfterTreeR bool `db:"after_tree_r"` // 1 byte + AfterSynth bool `db:"after_synth"` // 1 byte + // Cache line 3 (bytes 128-192): PreCommit and timing + TaskPrecommitMsg sql.NullInt64 `db:"task_id_precommit_msg"` // 16 bytes + AfterPrecommitMsg bool `db:"after_precommit_msg"` // 1 byte + AfterPrecommitMsgSuccess bool `db:"after_precommit_msg_success"` // 1 byte + SeedEpoch sql.NullInt64 `db:"seed_epoch"` // 16 bytes + PreCommitReadyAt sql.NullTime `db:"precommit_ready_at"` // 32 bytes (crosses into cache line 4) + // Cache line 4 (bytes 192-256): PoRep and Finalize stages + TaskPoRep sql.NullInt64 `db:"task_id_porep"` // 16 bytes + TaskFinalize sql.NullInt64 `db:"task_id_finalize"` // 16 bytes + StartEpoch sql.NullInt64 `db:"start_epoch"` // 16 bytes + AfterPoRep bool `db:"after_porep"` // 1 byte + AfterFinalize bool `db:"after_finalize"` // 1 byte + // Cache line 5 (bytes 256-320): Commit and storage stages + TaskMoveStorage sql.NullInt64 `db:"task_id_move_storage"` // 16 bytes + CommitReadyAt sql.NullTime `db:"commit_ready_at"` // 32 bytes + TaskCommitMsg sql.NullInt64 `db:"task_id_commit_msg"` // 16 bytes + AfterMoveStorage bool `db:"after_move_storage"` // 1 byte + AfterCommitMsg bool `db:"after_commit_msg"` // 1 byte + AfterCommitMsgSuccess bool `db:"after_commit_msg_success"` // 1 byte + // Larger fields at end + PoRepProof []byte `db:"porep_proof"` // 24 bytes - only used in specific stages + FailedReason string `db:"failed_reason"` // 16 bytes - only used when Failed=true } func (s *SealPoller) poll(ctx context.Context) error { @@ -252,7 +241,7 @@ func (s *SealPoller) poll(ctx context.Context) error { } func (s *SealPoller) pollStartSDR(ctx context.Context, task pollTask) { - if !task.AfterSDR && task.TaskSDR == nil && s.pollers[pollerSDR].IsSet() { + if !task.AfterSDR && !task.TaskSDR.Valid && s.pollers[pollerSDR].IsSet() { s.pollers[pollerSDR].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_sdr = $1 WHERE sp_id = $2 AND sector_number = $3 AND task_id_sdr IS NULL`, id, task.SpID, task.SectorNumber) if err != nil { @@ -272,7 +261,7 @@ func (t pollTask) afterSDR() bool { } func (s *SealPoller) pollStartSDRTreeD(ctx context.Context, task pollTask) { - if !task.AfterTreeD && task.TaskTreeD == nil && s.pollers[pollerTreeD].IsSet() && task.afterSDR() { + if !task.AfterTreeD && !task.TaskTreeD.Valid && s.pollers[pollerTreeD].IsSet() && task.afterSDR() { s.pollers[pollerTreeD].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_tree_d = $1 WHERE sp_id = $2 AND sector_number = $3 AND after_sdr = TRUE AND task_id_tree_d IS NULL`, id, task.SpID, task.SectorNumber) if err != nil { @@ -292,7 +281,7 @@ func (t pollTask) afterTreeD() bool { } func (s *SealPoller) pollStartSDRTreeRC(ctx context.Context, task pollTask) { - if !task.AfterTreeC && !task.AfterTreeR && task.TaskTreeC == nil && task.TaskTreeR == nil && s.pollers[pollerTreeRC].IsSet() && task.afterTreeD() { + if !task.AfterTreeC && !task.AfterTreeR && !task.TaskTreeC.Valid && !task.TaskTreeR.Valid && s.pollers[pollerTreeRC].IsSet() && task.afterTreeD() { s.pollers[pollerTreeRC].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_tree_c = $1, task_id_tree_r = $1 WHERE sp_id = $2 AND sector_number = $3 AND after_tree_d = TRUE AND task_id_tree_c IS NULL AND task_id_tree_r IS NULL`, id, task.SpID, task.SectorNumber) @@ -313,7 +302,7 @@ func (t pollTask) afterTreeRC() bool { } func (s *SealPoller) pollStartSynth(ctx context.Context, task pollTask) { - if !task.AfterSynth && task.TaskSynth == nil && s.pollers[pollerSyntheticProofs].IsSet() && task.AfterTreeR && task.afterTreeRC() { + if !task.AfterSynth && !task.TaskSynth.Valid && s.pollers[pollerSyntheticProofs].IsSet() && task.AfterTreeR && task.afterTreeRC() { s.pollers[pollerSyntheticProofs].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_synth = $1 WHERE sp_id = $2 AND sector_number = $3 AND task_id_synth IS NULL`, id, task.SpID, task.SectorNumber) if err != nil { @@ -341,9 +330,9 @@ func (t pollTask) afterPrecommitMsgSuccess() bool { } func (s *SealPoller) pollStartPoRep(ctx context.Context, task pollTask, ts *types.TipSet) { - if s.pollers[pollerPoRep].IsSet() && task.afterPrecommitMsgSuccess() && task.SeedEpoch != nil && - task.TaskPoRep == nil && !task.AfterPoRep && - ts.Height() >= abi.ChainEpoch(*task.SeedEpoch+seedEpochConfidence) { + if s.pollers[pollerPoRep].IsSet() && task.afterPrecommitMsgSuccess() && task.SeedEpoch.Valid && + !task.TaskPoRep.Valid && !task.AfterPoRep && + ts.Height() >= abi.ChainEpoch(task.SeedEpoch.Int64+seedEpochConfidence) { s.pollers[pollerPoRep].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_porep = $1 WHERE sp_id = $2 AND sector_number = $3 AND task_id_porep IS NULL`, id, task.SpID, task.SectorNumber) @@ -364,7 +353,7 @@ func (t pollTask) afterPoRep() bool { } func (s *SealPoller) pollStartFinalize(ctx context.Context, task pollTask, ts *types.TipSet) { - if s.pollers[pollerFinalize].IsSet() && task.afterPoRep() && !task.AfterFinalize && task.TaskFinalize == nil { + if s.pollers[pollerFinalize].IsSet() && task.afterPoRep() && !task.AfterFinalize && !task.TaskFinalize.Valid { s.pollers[pollerFinalize].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_finalize = $1 WHERE sp_id = $2 AND sector_number = $3 AND task_id_finalize IS NULL`, id, task.SpID, task.SectorNumber) if err != nil { @@ -384,7 +373,7 @@ func (t pollTask) afterFinalize() bool { } func (s *SealPoller) pollStartMoveStorage(ctx context.Context, task pollTask) { - if s.pollers[pollerMoveStorage].IsSet() && task.afterFinalize() && !task.AfterMoveStorage && task.TaskMoveStorage == nil { + if s.pollers[pollerMoveStorage].IsSet() && task.afterFinalize() && !task.AfterMoveStorage && !task.TaskMoveStorage.Valid { s.pollers[pollerMoveStorage].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_move_storage = $1 WHERE sp_id = $2 AND sector_number = $3 AND task_id_move_storage IS NULL`, id, task.SpID, task.SectorNumber) if err != nil { diff --git a/tasks/seal/task_submit_commit.go b/tasks/seal/task_submit_commit.go index 6bfe76569..6dafb048f 100644 --- a/tasks/seal/task_submit_commit.go +++ b/tasks/seal/task_submit_commit.go @@ -161,11 +161,13 @@ func (s *SubmitCommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) params := miner.ProveCommitSectors3Params{ RequireActivationSuccess: s.cfg.RequireActivationSuccess, RequireNotificationSuccess: s.cfg.RequireNotificationSuccess, + SectorActivations: make([]miner.SectorActivationManifest, 0, len(sectorParamsArr)), + SectorProofs: make([][]byte, 0, len(sectorParamsArr)), } collateral := big.Zero() - var infos []proof.AggregateSealVerifyInfo - var sectors []int64 + infos := make([]proof.AggregateSealVerifyInfo, 0, len(sectorParamsArr)) + sectors := make([]int64, 0, len(sectorParamsArr)) for _, sectorParams := range sectorParamsArr { sectorParams := sectorParams @@ -217,7 +219,7 @@ func (s *SubmitCommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) var verifiedSize abi.PaddedPieceSize - var pams []miner.PieceActivationManifest + pams := make([]miner.PieceActivationManifest, 0, len(pieces)) var sectorFailed bool diff --git a/tasks/seal/task_submit_precommit.go b/tasks/seal/task_submit_precommit.go index d2323ada8..1c4a861f7 100644 --- a/tasks/seal/task_submit_precommit.go +++ b/tasks/seal/task_submit_precommit.go @@ -3,6 +3,7 @@ package seal import ( "bytes" "context" + "database/sql" "fmt" "github.com/ipfs/go-cid" @@ -93,7 +94,7 @@ func (s *SubmitPrecommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bo SpID int64 `db:"sp_id"` SectorNumber int64 `db:"sector_number"` RegSealProof abi.RegisteredSealProof `db:"reg_seal_proof"` - UserSectorDurationEpochs *int64 `db:"user_sector_duration_epochs"` + UserSectorDurationEpochs sql.NullInt64 `db:"user_sector_duration_epochs"` TicketEpoch abi.ChainEpoch `db:"ticket_epoch"` SealedCID string `db:"tree_r_cid"` UnsealedCID string `db:"tree_d_cid"` @@ -139,7 +140,9 @@ func (s *SubmitPrecommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bo //never commit P2 message before, check ticket expiration ticketEarliest := head.Height() - policy.MaxPreCommitRandomnessLookback - params := miner.PreCommitSectorBatchParams2{} + params := miner.PreCommitSectorBatchParams2{ + Sectors: make([]miner.SectorPreCommitInfo, 0, len(sectorParamsArr)), + } collateral := big.Zero() // 2. Prepare preCommit info and PreCommitSectorBatchParams @@ -194,8 +197,8 @@ func (s *SubmitPrecommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bo } expiration := sectorParams.TicketEpoch + miner12.MaxSectorExpirationExtension - if sectorParams.UserSectorDurationEpochs != nil { - expiration = sectorParams.TicketEpoch + abi.ChainEpoch(*sectorParams.UserSectorDurationEpochs) + if sectorParams.UserSectorDurationEpochs.Valid { + expiration = sectorParams.TicketEpoch + abi.ChainEpoch(sectorParams.UserSectorDurationEpochs.Int64) } var pieces []struct { diff --git a/tasks/sealsupra/task_supraseal.go b/tasks/sealsupra/task_supraseal.go index dc946e157..751e3d21c 100644 --- a/tasks/sealsupra/task_supraseal.go +++ b/tasks/sealsupra/task_supraseal.go @@ -2,6 +2,7 @@ package sealsupra import ( "context" + "database/sql" "encoding/hex" "encoding/json" "fmt" @@ -535,9 +536,9 @@ func (s *SupraSeal) Adder(taskFunc harmonytask.AddTaskFunc) { } type sectorClaim struct { - SpID int64 `db:"sp_id"` - SectorNumber int64 `db:"sector_number"` - TaskIDSDR *int64 `db:"task_id_sdr"` + SpID int64 `db:"sp_id"` + SectorNumber int64 `db:"sector_number"` + TaskIDSDR sql.NullInt64 `db:"task_id_sdr"` } func (s *SupraSeal) schedule(taskFunc harmonytask.AddTaskFunc) error { @@ -585,9 +586,9 @@ func (s *SupraSeal) schedule(taskFunc harmonytask.AddTaskFunc) error { return false, xerrors.Errorf("updating task id: %w", err) } - if t.TaskIDSDR != nil { + if t.TaskIDSDR.Valid { // sdr task exists, remove it from the task engine - _, err := tx.Exec(`DELETE FROM harmony_task WHERE id = $1`, *t.TaskIDSDR) + _, err := tx.Exec(`DELETE FROM harmony_task WHERE id = $1`, t.TaskIDSDR.Int64) if err != nil { return false, xerrors.Errorf("deleting old task: %w", err) } @@ -673,7 +674,7 @@ func (s *SupraSeal) claimsFromCCScheduler(tx *harmonydb.Tx, toSeal int64) ([]sec outClaims = append(outClaims, sectorClaim{ SpID: schedule.SpID, SectorNumber: int64(sectorNum), - TaskIDSDR: nil, // New sector, no existing task + TaskIDSDR: sql.NullInt64{}, // New sector, no existing task }) userDuration := int64(schedule.DurationDays) * builtin.EpochsInDay diff --git a/tasks/snap/task_submit.go b/tasks/snap/task_submit.go index f7caa2f63..47027334a 100644 --- a/tasks/snap/task_submit.go +++ b/tasks/snap/task_submit.go @@ -3,6 +3,7 @@ package snap import ( "bytes" "context" + "database/sql" "encoding/json" "fmt" "math" @@ -180,6 +181,8 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done AggregateProofType: nil, RequireActivationSuccess: s.cfg.RequireActivationSuccess, RequireNotificationSuccess: s.cfg.RequireNotificationSuccess, + SectorUpdates: make([]miner13.SectorUpdateManifest, 0, len(tasks)), + SectorProofs: make([][]byte, 0, len(tasks)), } collateral := big.Zero() @@ -279,7 +282,7 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done } // Process pieces, prepare PAMs - var pams []miner.PieceActivationManifest + pams := make([]miner.PieceActivationManifest, 0, len(pieces)) var verifiedSize int64 pieceCheckFailed := false for _, piece := range pieces { @@ -527,11 +530,11 @@ func (s *SubmitTask) schedule(ctx context.Context, addTaskFunc harmonytask.AddTa // 1) Gather candidate tasks to schedule //---------------------------------- var rawRows []struct { - SpID int64 `db:"sp_id"` - SectorNumber int64 `db:"sector_number"` - UpgradeProof int64 `db:"upgrade_proof"` - UpdateReadyAt *time.Time `db:"update_ready_at"` - StartEpoch int64 `db:"smallest_direct_start_epoch"` + SpID int64 `db:"sp_id"` + SectorNumber int64 `db:"sector_number"` + UpgradeProof int64 `db:"upgrade_proof"` + UpdateReadyAt sql.NullTime `db:"update_ready_at"` + StartEpoch int64 `db:"smallest_direct_start_epoch"` } err := tx.Select(&rawRows, ` @@ -585,11 +588,13 @@ func (s *SubmitTask) schedule(ctx context.Context, addTaskFunc harmonytask.AddTa upMap = make(map[int64][]rowInfo) batchMap[row.SpID] = upMap } - upMap[row.UpgradeProof] = append(upMap[row.UpgradeProof], rowInfo{ - SectorNumber: row.SectorNumber, - UpdateReadyAt: row.UpdateReadyAt, - StartEpoch: row.StartEpoch, - }) + if row.UpdateReadyAt.Valid { + upMap[row.UpgradeProof] = append(upMap[row.UpgradeProof], rowInfo{ + SectorNumber: row.SectorNumber, + UpdateReadyAt: &row.UpdateReadyAt.Time, + StartEpoch: row.StartEpoch, + }) + } } //---------------------------------- diff --git a/tasks/storage-market/mk20.go b/tasks/storage-market/mk20.go index ef216e879..a1440f07e 100644 --- a/tasks/storage-market/mk20.go +++ b/tasks/storage-market/mk20.go @@ -2,6 +2,7 @@ package storage_market import ( "context" + "database/sql" "encoding/json" "errors" "fmt" @@ -36,41 +37,41 @@ import ( ) type MK20PipelinePiece struct { - ID string `db:"id"` - SPID int64 `db:"sp_id"` - Client string `db:"client"` - Contract string `db:"contract"` - PieceCIDV2 string `db:"piece_cid_v2"` - PieceCID string `db:"piece_cid"` - PieceSize int64 `db:"piece_size"` - RawSize int64 `db:"raw_size"` - Offline bool `db:"offline"` - URL *string `db:"url"` // Nullable fields use pointers - Indexing bool `db:"indexing"` - Announce bool `db:"announce"` - AllocationID *int64 `db:"allocation_id"` // Nullable fields use pointers - Duration *int64 `db:"duration"` // Nullable fields use pointers - PieceAggregation int `db:"piece_aggregation"` + ID string `db:"id"` + SPID int64 `db:"sp_id"` + Client string `db:"client"` + Contract string `db:"contract"` + PieceCIDV2 string `db:"piece_cid_v2"` + PieceCID string `db:"piece_cid"` + PieceSize int64 `db:"piece_size"` + RawSize int64 `db:"raw_size"` + Offline bool `db:"offline"` + URL sql.NullString `db:"url"` + Indexing bool `db:"indexing"` + Announce bool `db:"announce"` + AllocationID sql.NullInt64 `db:"allocation_id"` + Duration sql.NullInt64 `db:"duration"` + PieceAggregation int `db:"piece_aggregation"` Started bool `db:"started"` Downloaded bool `db:"downloaded"` - CommTaskID *int64 `db:"commp_task_id"` - AfterCommp bool `db:"after_commp"` + CommTaskID sql.NullInt64 `db:"commp_task_id"` + AfterCommp bool `db:"after_commp"` - DealAggregation int `db:"deal_aggregation"` - AggregationIndex int64 `db:"aggr_index"` - AggregationTaskID *int64 `db:"agg_task_id"` - Aggregated bool `db:"aggregated"` + DealAggregation int `db:"deal_aggregation"` + AggregationIndex int64 `db:"aggr_index"` + AggregationTaskID sql.NullInt64 `db:"agg_task_id"` + Aggregated bool `db:"aggregated"` - Sector *int64 `db:"sector"` // Nullable fields use pointers - RegSealProof *int `db:"reg_seal_proof"` // Nullable fields use pointers - SectorOffset *int64 `db:"sector_offset"` // Nullable fields use pointers + Sector sql.NullInt64 `db:"sector"` + RegSealProof sql.NullInt64 `db:"reg_seal_proof"` + SectorOffset sql.NullInt64 `db:"sector_offset"` - IndexingCreatedAt *time.Time `db:"indexing_created_at"` // Nullable fields use pointers - IndexingTaskID *int64 `db:"indexing_task_id"` - Indexed bool `db:"indexed"` + IndexingCreatedAt sql.NullTime `db:"indexing_created_at"` + IndexingTaskID sql.NullInt64 `db:"indexing_task_id"` + Indexed bool `db:"indexed"` } func (d *CurioStorageDealMarket) processMK20Deals(ctx context.Context) { @@ -837,7 +838,7 @@ func (d *CurioStorageDealMarket) findOfflineURLMk20Deal(ctx context.Context, pie // createCommPMk20Piece handles the creation of a CommP task for an MK20 pipeline piece, updating its status based on piece attributes. func (d *CurioStorageDealMarket) createCommPMk20Piece(ctx context.Context, piece MK20PipelinePiece) error { - if piece.Downloaded && !piece.AfterCommp && piece.CommTaskID == nil { + if piece.Downloaded && !piece.AfterCommp && !piece.CommTaskID.Valid { // Skip commP is configured to do so if d.cfg.Market.StorageMarketConfig.MK20.SkipCommP { _, err := d.db.Exec(ctx, `UPDATE market_mk20_pipeline SET after_commp = TRUE, commp_task_id = NULL @@ -889,7 +890,7 @@ func (d *CurioStorageDealMarket) createCommPMk20Piece(ctx context.Context, piece func (d *CurioStorageDealMarket) addDealOffset(ctx context.Context, piece MK20PipelinePiece) error { // Get the deal offset if sector has started sealing - if piece.Sector != nil && piece.RegSealProof != nil && piece.SectorOffset == nil { + if piece.Sector.Valid && piece.RegSealProof.Valid && !piece.SectorOffset.Valid { _, err := d.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { type pieces struct { Cid string `db:"piece_cid"` @@ -908,7 +909,7 @@ func (d *CurioStorageDealMarket) addDealOffset(ctx context.Context, piece MK20Pi FROM sectors_snap_initial_pieces WHERE sp_id = $1 AND sector_number = $2 - ORDER BY piece_index ASC;`, piece.SPID, piece.Sector) + ORDER BY piece_index ASC;`, piece.SPID, piece.Sector.Int64) if err != nil { return false, xerrors.Errorf("getting pieces for sector: %w", err) } @@ -924,7 +925,7 @@ func (d *CurioStorageDealMarket) addDealOffset(ctx context.Context, piece MK20Pi _, padLength := proofs.GetRequiredPadding(offset.Padded(), p.Size) offset += padLength.Unpadded() if p.Cid == piece.PieceCID && p.Size == abi.PaddedPieceSize(piece.PieceSize) { - n, err := tx.Exec(`UPDATE market_mk20_pipeline SET sector_offset = $1 WHERE id = $2 AND sector = $3 AND sector_offset IS NULL`, offset.Padded(), piece.ID, piece.Sector) + n, err := tx.Exec(`UPDATE market_mk20_pipeline SET sector_offset = $1 WHERE id = $2 AND sector = $3 AND sector_offset IS NULL`, offset.Padded(), piece.ID, piece.Sector.Int64) if err != nil { return false, xerrors.Errorf("updating deal offset: %w", err) } @@ -999,16 +1000,16 @@ func (d *CurioStorageDealMarket) processMK20DealIngestion(ctx context.Context) { } var deals []struct { - ID string `db:"id"` - SPID int64 `db:"sp_id"` - Client string `db:"client"` - PieceCID string `db:"piece_cid"` - PieceSize int64 `db:"piece_size"` - RawSize int64 `db:"raw_size"` - AllocationID *int64 `db:"allocation_id"` - Duration int64 `db:"duration"` - Url string `db:"url"` - Count int `db:"unassigned_count"` + ID string `db:"id"` + SPID int64 `db:"sp_id"` + Client string `db:"client"` + PieceCID string `db:"piece_cid"` + PieceSize int64 `db:"piece_size"` + RawSize int64 `db:"raw_size"` + AllocationID sql.NullInt64 `db:"allocation_id"` + Duration int64 `db:"duration"` + Url string `db:"url"` + Count int `db:"unassigned_count"` } err = d.db.Select(ctx, &deals, `SELECT @@ -1072,8 +1073,8 @@ func (d *CurioStorageDealMarket) processMK20DealIngestion(ctx context.Context) { start := head.Height() + 2*builtin.EpochsInDay end := start + abi.ChainEpoch(deal.Duration) var vak *miner.VerifiedAllocationKey - if deal.AllocationID != nil { - alloc, err := d.api.StateGetAllocation(ctx, client, verifreg.AllocationId(*deal.AllocationID), types.EmptyTSK) + if deal.AllocationID.Valid { + alloc, err := d.api.StateGetAllocation(ctx, client, verifreg.AllocationId(deal.AllocationID.Int64), types.EmptyTSK) if err != nil { log.Errorw("failed to get allocation", "deal", deal, "error", err) continue @@ -1089,7 +1090,7 @@ func (d *CurioStorageDealMarket) processMK20DealIngestion(ctx context.Context) { end = start + alloc.TermMin vak = &miner.VerifiedAllocationKey{ Client: abi.ActorID(clientId), - ID: verifreg13.AllocationId(*deal.AllocationID), + ID: verifreg13.AllocationId(deal.AllocationID.Int64), } } diff --git a/tasks/storage-market/storage_market.go b/tasks/storage-market/storage_market.go index 69929eeae..ca14de12f 100644 --- a/tasks/storage-market/storage_market.go +++ b/tasks/storage-market/storage_market.go @@ -88,28 +88,28 @@ type MK12Pipeline struct { PieceSize abi.PaddedPieceSize `db:"piece_size"` Offline bool `db:"offline"` // data is not downloaded before starting the deal RawSize sql.NullInt64 `db:"raw_size"` - URL *string `db:"url"` + URL sql.NullString `db:"url"` Headers json.RawMessage `db:"headers"` //DDO IsDDO bool `db:"is_ddo"` // commP task - CommTaskID *int64 `db:"commp_task_id"` - AfterCommp bool `db:"after_commp"` + CommTaskID sql.NullInt64 `db:"commp_task_id"` + AfterCommp bool `db:"after_commp"` // PSD task - PSDWaitTime *time.Time `db:"psd_wait_time"` // set in commp to now - PSDTaskID *int64 `db:"psd_task_id"` - AfterPSD bool `db:"after_psd"` + PSDWaitTime sql.NullTime `db:"psd_wait_time"` // set in commp to now + PSDTaskID sql.NullInt64 `db:"psd_task_id"` + AfterPSD bool `db:"after_psd"` // Find Deal task (just looks at the chain for the deal ID) - FindDealTaskID *int64 `db:"find_deal_task_id"` - AfterFindDeal bool `db:"after_find_deal"` + FindDealTaskID sql.NullInt64 `db:"find_deal_task_id"` + AfterFindDeal bool `db:"after_find_deal"` // Sector the deal was assigned into - Sector *int64 `db:"sector"` - Offset *int64 `db:"sector_offset"` + Sector sql.NullInt64 `db:"sector"` + Offset sql.NullInt64 `db:"sector_offset"` } func NewCurioStorageDealMarket(miners *config.Dynamic[[]address.Address], db *harmonydb.DB, cfg *config.CurioConfig, ethClient *ethclient.Client, si paths.SectorIndex, mapi storageMarketAPI, as *multictladdr.MultiAddressSelector, sc *ffi.SealCalls) *CurioStorageDealMarket { @@ -313,8 +313,8 @@ func (d *CurioStorageDealMarket) processMk12Deal(ctx context.Context, deal MK12P // Try to mark the deal as started if !deal.Started { // Check if download is finished and update the deal state in DB - if deal.URL != nil && *deal.URL != "" { - goUrl, err := url.Parse(*deal.URL) + if deal.URL.Valid && deal.URL.String != "" { + goUrl, err := url.Parse(deal.URL.String) if err != nil { return xerrors.Errorf("UUID: %s parsing data URL: %w", deal.UUID, err) } @@ -356,7 +356,7 @@ func (d *CurioStorageDealMarket) processMk12Deal(ctx context.Context, deal MK12P } // Create commP task - if deal.Started && !deal.AfterCommp && deal.CommTaskID == nil { + if deal.Started && !deal.AfterCommp && !deal.CommTaskID.Valid { // Skip commP is configured to do so if d.cfg.Market.StorageMarketConfig.MK12.SkipCommP { _, err := d.db.Exec(ctx, `UPDATE market_mk12_deal_pipeline SET after_commp = TRUE, psd_wait_time = NOW(), commp_task_id = NULL WHERE uuid = $1`, deal.UUID) @@ -395,7 +395,7 @@ func (d *CurioStorageDealMarket) processMk12Deal(ctx context.Context, deal MK12P } // Create Find Deal task - if deal.Started && deal.AfterCommp && deal.AfterPSD && !deal.AfterFindDeal && deal.FindDealTaskID == nil { + if deal.Started && deal.AfterCommp && deal.AfterPSD && !deal.AfterFindDeal && !deal.FindDealTaskID.Valid { var executed bool err := d.db.QueryRow(ctx, `SELECT EXISTS(SELECT TRUE FROM market_mk12_deals d INNER JOIN message_waits mw ON mw.signed_message_cid = d.publish_cid @@ -424,7 +424,7 @@ func (d *CurioStorageDealMarket) processMk12Deal(ctx context.Context, deal MK12P } // If on chain deal ID is present, we should add the deal to a sector - if deal.AfterFindDeal && deal.Sector == nil { + if deal.AfterFindDeal && !deal.Sector.Valid { err := d.ingestDeal(ctx, deal) if err != nil { return xerrors.Errorf("ingest deal: %w", err) @@ -432,7 +432,7 @@ func (d *CurioStorageDealMarket) processMk12Deal(ctx context.Context, deal MK12P } // Get the deal offset if sector has started sealing - if deal.AfterFindDeal && deal.Sector != nil && deal.Offset == nil { + if deal.AfterFindDeal && deal.Sector.Valid && !deal.Offset.Valid { type pieces struct { Cid string `db:"piece_cid"` Size abi.PaddedPieceSize `db:"piece_size"` @@ -450,13 +450,13 @@ func (d *CurioStorageDealMarket) processMk12Deal(ctx context.Context, deal MK12P FROM sectors_snap_initial_pieces WHERE sp_id = $1 AND sector_number = $2 - ORDER BY piece_index ASC;`, deal.SpID, deal.Sector) + ORDER BY piece_index ASC;`, deal.SpID, deal.Sector.Int64) if err != nil { return xerrors.Errorf("UUID: %s: getting pieces for sector: %w", deal.UUID, err) } if len(pieceList) == 0 { - return xerrors.Errorf("UUID: %s: no pieces found for the sector %d", deal.UUID, *deal.Sector) + return xerrors.Errorf("UUID: %s: no pieces found for the sector %d", deal.UUID, deal.Sector.Int64) } var offset abi.UnpaddedPieceSize @@ -465,7 +465,7 @@ func (d *CurioStorageDealMarket) processMk12Deal(ctx context.Context, deal MK12P _, padLength := proofs.GetRequiredPadding(offset.Padded(), p.Size) offset += padLength.Unpadded() if p.Cid == deal.PieceCid && p.Size == deal.PieceSize { - n, err := d.db.Exec(ctx, `UPDATE market_mk12_deal_pipeline SET sector_offset = $1 WHERE uuid = $2 AND sector = $3 AND sector_offset IS NULL`, offset.Padded(), deal.UUID, deal.Sector) + n, err := d.db.Exec(ctx, `UPDATE market_mk12_deal_pipeline SET sector_offset = $1 WHERE uuid = $2 AND sector = $3 AND sector_offset IS NULL`, offset.Padded(), deal.UUID, deal.Sector.Int64) if err != nil { return xerrors.Errorf("UUID: %s: updating deal pipeline with sector offset: %w", deal.UUID, err) } @@ -482,25 +482,25 @@ func (d *CurioStorageDealMarket) processMk12Deal(ctx context.Context, deal MK12P } type MarketMK12Deal struct { - UUID string `db:"uuid"` - CreatedAt time.Time `db:"created_at"` - SignedProposalCid string `db:"signed_proposal_cid"` - ProposalSignature []byte `db:"proposal_signature"` - Proposal []byte `db:"proposal"` - PieceCid string `db:"piece_cid"` - PieceSize int64 `db:"piece_size"` - Offline bool `db:"offline"` - Verified bool `db:"verified"` - SpID int64 `db:"sp_id"` - StartEpoch int64 `db:"start_epoch"` - EndEpoch int64 `db:"end_epoch"` - ClientPeerID string `db:"client_peer_id"` - ChainDealID int64 `db:"chain_deal_id"` - PublishCid string `db:"publish_cid"` - FastRetrieval bool `db:"fast_retrieval"` - AnnounceToIpni bool `db:"announce_to_ipni"` - Error *string `db:"error"` - Label []byte `db:"label"` + UUID string `db:"uuid"` + CreatedAt time.Time `db:"created_at"` + SignedProposalCid string `db:"signed_proposal_cid"` + ProposalSignature []byte `db:"proposal_signature"` + Proposal []byte `db:"proposal"` + PieceCid string `db:"piece_cid"` + PieceSize int64 `db:"piece_size"` + Offline bool `db:"offline"` + Verified bool `db:"verified"` + SpID int64 `db:"sp_id"` + StartEpoch int64 `db:"start_epoch"` + EndEpoch int64 `db:"end_epoch"` + ClientPeerID string `db:"client_peer_id"` + ChainDealID int64 `db:"chain_deal_id"` + PublishCid string `db:"publish_cid"` + FastRetrieval bool `db:"fast_retrieval"` + AnnounceToIpni bool `db:"announce_to_ipni"` + Error sql.NullString `db:"error"` + Label []byte `db:"label"` } func (d *CurioStorageDealMarket) findURLForOfflineDeals(ctx context.Context, deal string, pcid string) error { @@ -817,7 +817,7 @@ func (d *CurioStorageDealMarket) ingestDeal(ctx context.Context, deal MK12Pipeli return false, xerrors.Errorf("UUID: %s: %w", deal.UUID, err) } - dealUrl, err := url.Parse(*deal.URL) + dealUrl, err := url.Parse(deal.URL.String) if err != nil { return false, xerrors.Errorf("UUID: %s: %w", deal.UUID, err) } diff --git a/tasks/storage-market/task_commp.go b/tasks/storage-market/task_commp.go index c5b8607cd..7da854ac2 100644 --- a/tasks/storage-market/task_commp.go +++ b/tasks/storage-market/task_commp.go @@ -2,6 +2,7 @@ package storage_market import ( "context" + "database/sql" "encoding/json" "fmt" "io" @@ -53,7 +54,7 @@ func (c *CommpTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done Pcid string `db:"piece_cid"` Psize int64 `db:"piece_size"` RawSize int64 `db:"raw_size"` - URL *string `db:"url"` + URL sql.NullString `db:"url"` Headers json.RawMessage `db:"headers"` ID string `db:"id"` SpID int64 `db:"sp_id"` @@ -110,8 +111,8 @@ func (c *CommpTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done } } - if piece.URL != nil { - dataUrl := *piece.URL + if piece.URL.Valid { + dataUrl := piece.URL.String goUrl, err := url.Parse(dataUrl) if err != nil { @@ -276,7 +277,7 @@ func (c *CommpTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.Task var tasks []struct { TaskID harmonytask.TaskID `db:"commp_task_id"` StorageID string `db:"storage_id"` - Url *string `db:"url"` + Url sql.NullString `db:"url"` } indIDs := make([]int64, len(ids)) @@ -312,8 +313,8 @@ func (c *CommpTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.Task } for _, task := range tasks { - if task.Url != nil { - goUrl, err := url.Parse(*task.Url) + if task.Url.Valid { + goUrl, err := url.Parse(task.Url.String) if err != nil { return false, xerrors.Errorf("parsing data URL: %w", err) } diff --git a/web/api/webrpc/harmony_stats.go b/web/api/webrpc/harmony_stats.go index b92e10ff0..0c166bfc2 100644 --- a/web/api/webrpc/harmony_stats.go +++ b/web/api/webrpc/harmony_stats.go @@ -2,6 +2,7 @@ package webrpc import ( "context" + "database/sql" "time" "github.com/filecoin-project/go-address" @@ -75,9 +76,9 @@ type HarmonyTaskHistory struct { Result bool `db:"result"` Err string `db:"err"` - CompletedBy string `db:"completed_by_host_and_port"` - CompletedById *int64 `db:"completed_by_machine"` - CompletedByName *string `db:"completed_by_machine_name"` + CompletedBy string `db:"completed_by_host_and_port"` + CompletedById sql.NullInt64 `db:"completed_by_machine"` + CompletedByName sql.NullString `db:"completed_by_machine_name"` Events []*SectorEvent `db:"-"` } @@ -104,13 +105,13 @@ func (a *WebRPC) HarmonyTaskHistory(ctx context.Context, taskName string, fails // HarmonyTask represents the current state of a task. type HarmonyTask struct { - ID int64 `db:"id"` - Name string `db:"name"` - UpdateTime time.Time `db:"update_time"` - PostedTime time.Time `db:"posted_time"` - OwnerID *int64 `db:"owner_id"` - OwnerAddr *string `db:"owner_addr"` - OwnerName *string `db:"owner_name"` + ID int64 `db:"id"` + Name string `db:"name"` + UpdateTime time.Time `db:"update_time"` + PostedTime time.Time `db:"posted_time"` + OwnerID sql.NullInt64 `db:"owner_id"` + OwnerAddr sql.NullString `db:"owner_addr"` + OwnerName sql.NullString `db:"owner_name"` } // HarmonyTaskDetails returns the current state of a task by ID. diff --git a/web/api/webrpc/ipni.go b/web/api/webrpc/ipni.go index dd1e1b31d..3bf08321f 100644 --- a/web/api/webrpc/ipni.go +++ b/web/api/webrpc/ipni.go @@ -343,11 +343,11 @@ type EntryInfo struct { PieceCID string `db:"piece_cid"` FromCar bool `db:"from_car"` - FirstCID *string `db:"first_cid"` - StartOffset *int64 `db:"start_offset"` - NumBlocks int64 `db:"num_blocks"` + FirstCID sql.NullString `db:"first_cid"` + StartOffset sql.NullInt64 `db:"start_offset"` + NumBlocks int64 `db:"num_blocks"` - PrevCID *string `db:"prev_cid"` + PrevCID sql.NullString `db:"prev_cid"` Err *string Size int64 diff --git a/web/api/webrpc/market.go b/web/api/webrpc/market.go index 84dc38e36..27895ac5b 100644 --- a/web/api/webrpc/market.go +++ b/web/api/webrpc/market.go @@ -95,30 +95,36 @@ func (a *WebRPC) SetStorageAsk(ctx context.Context, ask *StorageAsk) error { } type MK12Pipeline struct { - UUID string `db:"uuid" json:"uuid"` - SpID int64 `db:"sp_id" json:"sp_id"` - Started bool `db:"started" json:"started"` - PieceCid string `db:"piece_cid" json:"piece_cid"` - PieceSize int64 `db:"piece_size" json:"piece_size"` - PieceCidV2 string `db:"-" json:"piece_cid_v2"` - RawSize sql.NullInt64 `db:"raw_size" json:"raw_size"` - Offline bool `db:"offline" json:"offline"` - URL *string `db:"url" json:"url"` - Headers []byte `db:"headers" json:"headers"` - CommTaskID *int64 `db:"commp_task_id" json:"commp_task_id"` - AfterCommp bool `db:"after_commp" json:"after_commp"` - PSDTaskID *int64 `db:"psd_task_id" json:"psd_task_id"` - AfterPSD bool `db:"after_psd" json:"after_psd"` - PSDWaitTime *time.Time `db:"psd_wait_time" json:"psd_wait_time"` - FindDealTaskID *int64 `db:"find_deal_task_id" json:"find_deal_task_id"` - AfterFindDeal bool `db:"after_find_deal" json:"after_find_deal"` - Sector *int64 `db:"sector" json:"sector"` - Offset *int64 `db:"sector_offset" json:"sector_offset"` - CreatedAt time.Time `db:"created_at" json:"created_at"` - Indexed bool `db:"indexed" json:"indexed"` - Announce bool `db:"announce" json:"announce"` - Complete bool `db:"complete" json:"complete"` - Miner string `json:"miner"` + // Cache line 1 (bytes 0-64): Hot path - piece identification and early checks + UUID string `db:"uuid" json:"uuid"` // 16 bytes (0-16) + SpID int64 `db:"sp_id" json:"sp_id"` // 8 bytes (16-24) + PieceCid string `db:"piece_cid" json:"piece_cid"` // 16 bytes (24-40) + PieceSize int64 `db:"piece_size" json:"piece_size"` // 8 bytes (40-48) + Offline bool `db:"offline" json:"offline"` // 1 byte (48-49) - checked early for download decisions + Started bool `db:"started" json:"started"` // 1 byte (49-50) - checked early + // Cache line 2 (bytes 64-128): Task IDs and stage tracking (sql.NullInt64 = 16 bytes) + CommTaskID sql.NullInt64 `db:"commp_task_id" json:"commp_task_id"` // 16 bytes + PSDTaskID sql.NullInt64 `db:"psd_task_id" json:"psd_task_id"` // 16 bytes + FindDealTaskID sql.NullInt64 `db:"find_deal_task_id" json:"find_deal_task_id"` // 16 bytes + AfterCommp bool `db:"after_commp" json:"after_commp"` // 1 byte + AfterPSD bool `db:"after_psd" json:"after_psd"` // 1 byte + AfterFindDeal bool `db:"after_find_deal" json:"after_find_deal"` // 1 byte + // Cache line 3 (bytes 128-192): Sector placement and sizing (sql.NullInt64 = 16 bytes) + RawSize sql.NullInt64 `db:"raw_size" json:"raw_size"` // 16 bytes + Sector sql.NullInt64 `db:"sector" json:"sector"` // 16 bytes + Offset sql.NullInt64 `db:"sector_offset" json:"sector_offset"` // 16 bytes + // Cache line 4 (bytes 192-256): Timing information + PSDWaitTime sql.NullTime `db:"psd_wait_time" json:"psd_wait_time"` // 32 bytes (sql.NullTime) + CreatedAt time.Time `db:"created_at" json:"created_at"` // 24 bytes + // Cache line 5+ (bytes 256+): Data URL and larger fields (sql.NullString = 24 bytes) + URL sql.NullString `db:"url" json:"url"` // 24 bytes - only for online deals + PieceCidV2 string `db:"-" json:"piece_cid_v2"` // 16 bytes - computed field + Miner string `json:"miner"` // 16 bytes - display field + Headers []byte `db:"headers" json:"headers"` // 24 bytes - only for online deals + // Status bools: rarely checked ones at end + Indexed bool `db:"indexed" json:"indexed"` // checked for indexing + Announce bool `db:"announce" json:"announce"` // checked for IPNI announce + Complete bool `db:"complete" json:"complete"` // checked for completion } func (a *WebRPC) GetMK12DealPipelines(ctx context.Context, limit int, offset int) ([]*MK12Pipeline, error) { @@ -538,28 +544,35 @@ func (a *WebRPC) MoveBalanceToEscrow(ctx context.Context, miner string, amount s } type PieceDeal struct { - ID string `db:"id" json:"id"` - BoostDeal bool `db:"boost_deal" json:"boost_deal"` - LegacyDeal bool `db:"legacy_deal" json:"legacy_deal"` - SpId int64 `db:"sp_id" json:"sp_id"` - ChainDealId int64 `db:"chain_deal_id" json:"chain_deal_id"` - Sector int64 `db:"sector_num" json:"sector"` - Offset sql.NullInt64 `db:"piece_offset" json:"offset"` - Length int64 `db:"piece_length" json:"length"` - RawSize int64 `db:"raw_size" json:"raw_size"` - Miner string `json:"miner"` - MK20 bool `db:"-" json:"mk20"` + // Cache line 1 (0-64 bytes): Hot path - identification fields used together + ID string `db:"id" json:"id"` // 16 bytes - used with SpId (line 621-623) + SpId int64 `db:"sp_id" json:"sp_id"` // 8 bytes - checked early (line 614), used with ID (line 617) + Miner string `json:"miner"` // 16 bytes - set based on SpId (line 615, 625) + // Cache line 2: Additional 8-byte types grouped together + ChainDealId int64 `db:"chain_deal_id" json:"chain_deal_id"` // 8 bytes + Sector int64 `db:"sector_num" json:"sector"` // 8 bytes + Length int64 `db:"piece_length" json:"length"` // 8 bytes + RawSize int64 `db:"raw_size" json:"raw_size"` // 8 bytes + // sql.NullInt64 (16 bytes) + Offset sql.NullInt64 `db:"piece_offset" json:"offset"` // 16 bytes + // Cache line 3 (64+ bytes): Bools grouped together at the end to minimize padding + MK20 bool `db:"-" json:"mk20"` // Used with ID check (line 621-623) - hot path + BoostDeal bool `db:"boost_deal" json:"boost_deal"` // Less frequently accessed + LegacyDeal bool `db:"legacy_deal" json:"legacy_deal"` // Less frequently accessed } type PieceInfo struct { - PieceCidv2 string `json:"piece_cid_v2"` - PieceCid string `json:"piece_cid"` - Size int64 `json:"size"` - CreatedAt time.Time `json:"created_at"` - Indexed bool `json:"indexed"` - IndexedAT time.Time `json:"indexed_at"` - IPNIAd []string `json:"ipni_ads"` - Deals []*PieceDeal `json:"deals"` + // Cache line 1 (0-64 bytes): Hot path - piece identification used together + PieceCidv2 string `json:"piece_cid_v2"` // 16 bytes - used together with PieceCid (line 584) + PieceCid string `json:"piece_cid"` // 16 bytes - used with PieceCidv2 (line 584) + Size int64 `json:"size"` // 8 bytes - used with PieceCid (line 587, 604) + IPNIAd []string `json:"ipni_ads"` // 24 bytes - used for results + // Cache line 2 (64+ bytes): Display + CreatedAt time.Time `json:"created_at"` // 24 bytes - used for display + IndexedAT time.Time `json:"indexed_at"` // 24 bytes - used for display + Indexed bool `json:"indexed"` // Used for display + // Cache line 3 + Deals []PieceDeal `json:"deals"` // 24 bytes - used for results } func (a *WebRPC) PieceInfo(ctx context.Context, pieceCid string) (*PieceInfo, error) { @@ -590,7 +603,7 @@ func (a *WebRPC) PieceInfo(ctx context.Context, pieceCid string) (*PieceInfo, er return nil, xerrors.Errorf("failed to get piece metadata: %w", err) } - pieceDeals := []*PieceDeal{} + pieceDeals := []PieceDeal{} err = a.deps.DB.Select(ctx, &pieceDeals, `SELECT id, diff --git a/web/api/webrpc/message.go b/web/api/webrpc/message.go index 0f1d45f3a..9c65a764c 100644 --- a/web/api/webrpc/message.go +++ b/web/api/webrpc/message.go @@ -2,8 +2,8 @@ package webrpc import ( "context" + "database/sql" "encoding/json" - "time" "github.com/filecoin-project/lotus/chain/types" ) @@ -15,21 +15,21 @@ type MessageDetail struct { SendTaskID int64 `db:"send_task_id" json:"send_task_id"` UnsignedData []byte `db:"unsigned_data" json:"unsigned_data"` UnsignedCID string `db:"unsigned_cid" json:"unsigned_cid"` - Nonce *int64 `db:"nonce" json:"nonce"` + Nonce sql.NullInt64 `db:"nonce" json:"nonce"` SignedData []byte `db:"signed_data" json:"signed_data"` SignedJSON json.RawMessage `db:"signed_json" json:"signed_json"` SignedCID string `db:"signed_cid" json:"signed_cid"` - SendTime *time.Time `db:"send_time" json:"send_time"` - SendSuccess *bool `db:"send_success" json:"send_success"` - SendError *string `db:"send_error" json:"send_error"` - WaiterMachineID *int64 `db:"waiter_machine_id" json:"waiter_machine_id"` - ExecutedTSKCID *string `db:"executed_tsk_cid" json:"executed_tsk_cid"` - ExecutedTSKEpoch *int64 `db:"executed_tsk_epoch" json:"executed_tsk_epoch"` - ExecutedMsgCID *string `db:"executed_msg_cid" json:"executed_msg_cid"` + SendTime sql.NullTime `db:"send_time" json:"send_time"` + SendSuccess sql.NullBool `db:"send_success" json:"send_success"` + SendError sql.NullString `db:"send_error" json:"send_error"` + WaiterMachineID sql.NullInt64 `db:"waiter_machine_id" json:"waiter_machine_id"` + ExecutedTSKCID sql.NullString `db:"executed_tsk_cid" json:"executed_tsk_cid"` + ExecutedTSKEpoch sql.NullInt64 `db:"executed_tsk_epoch" json:"executed_tsk_epoch"` + ExecutedMsgCID sql.NullString `db:"executed_msg_cid" json:"executed_msg_cid"` ExecutedMsgData json.RawMessage `db:"executed_msg_data" json:"executed_msg_data"` - ExecutedReceiptExitCode *int64 `db:"executed_rcpt_exitcode" json:"executed_rcpt_exitcode"` + ExecutedReceiptExitCode sql.NullInt64 `db:"executed_rcpt_exitcode" json:"executed_rcpt_exitcode"` ExecutedReceiptReturn []byte `db:"executed_rcpt_return" json:"executed_rcpt_return"` - ExecutedReceiptGasUsed *int64 `db:"executed_rcpt_gas_used" json:"executed_rcpt_gas_used"` + ExecutedReceiptGasUsed sql.NullInt64 `db:"executed_rcpt_gas_used" json:"executed_rcpt_gas_used"` ValueStr string `db:"-" json:"value_str"` FeeStr string `db:"-" json:"fee_str"` diff --git a/web/api/webrpc/pipeline_porep.go b/web/api/webrpc/pipeline_porep.go index 064bb0ff9..c443e0af9 100644 --- a/web/api/webrpc/pipeline_porep.go +++ b/web/api/webrpc/pipeline_porep.go @@ -2,6 +2,7 @@ package webrpc import ( "context" + "database/sql" "time" "github.com/snadrus/must" @@ -17,69 +18,63 @@ import ( ) type PipelineTask struct { - SpID int64 `db:"sp_id"` - SectorNumber int64 `db:"sector_number"` - - CreateTime time.Time `db:"create_time"` - - TaskSDR *int64 `db:"task_id_sdr"` - AfterSDR bool `db:"after_sdr"` - StartedSDR bool `db:"started_sdr"` - - TaskTreeD *int64 `db:"task_id_tree_d"` - AfterTreeD bool `db:"after_tree_d"` - StartedTreeD bool `db:"started_tree_d"` - TreeD *string `db:"tree_d_cid"` - - TaskTreeC *int64 `db:"task_id_tree_c"` - AfterTreeC bool `db:"after_tree_c"` - StartedTreeRC bool `db:"started_tree_rc"` - - TaskTreeR *int64 `db:"task_id_tree_r"` - AfterTreeR bool `db:"after_tree_r"` - TreeR *string `db:"tree_r_cid"` - - TaskSynthetic *int64 `db:"task_id_synth"` - AfterSynthetic bool `db:"after_synth"` - StartedSynthetic bool `db:"started_synthetic"` - - PreCommitReadyAt *time.Time `db:"precommit_ready_at"` - - TaskPrecommitMsg *int64 `db:"task_id_precommit_msg"` - AfterPrecommitMsg bool `db:"after_precommit_msg"` - StartedPrecommitMsg bool `db:"started_precommit_msg"` - - AfterPrecommitMsgSuccess bool `db:"after_precommit_msg_success"` - PreCommitMsgCid *string `db:"precommit_msg_cid"` - SeedEpoch *int64 `db:"seed_epoch"` - - TaskPoRep *int64 `db:"task_id_porep"` - PoRepProof []byte `db:"porep_proof"` - AfterPoRep bool `db:"after_porep"` - StartedPoRep bool `db:"started_porep"` - - TaskFinalize *int64 `db:"task_id_finalize"` - AfterFinalize bool `db:"after_finalize"` - StartedFinalize bool `db:"started_finalize"` - - TaskMoveStorage *int64 `db:"task_id_move_storage"` - AfterMoveStorage bool `db:"after_move_storage"` - StartedMoveStorage bool `db:"started_move_storage"` - - CommitReadyAt *time.Time `db:"commit_ready_at"` - - TaskCommitMsg *int64 `db:"task_id_commit_msg"` - AfterCommitMsg bool `db:"after_commit_msg"` - StartedCommitMsg bool `db:"started_commit_msg"` - - AfterCommitMsgSuccess bool `db:"after_commit_msg_success"` - CommitMsgCid *string `db:"commit_msg_cid"` - - Failed bool `db:"failed"` - FailedReason string `db:"failed_reason"` - - MissingTasks []int64 `db:"missing_tasks"` - AllTasks []int64 `db:"all_tasks"` + // Cache line 1 (bytes 0-64): Hot path - identification + SpID int64 `db:"sp_id"` // 8 bytes (0-8) + SectorNumber int64 `db:"sector_number"` // 8 bytes (8-16) + CreateTime time.Time `db:"create_time"` // 24 bytes (16-40) + Failed bool `db:"failed"` // 1 byte (40-41) - checked early + // Early stage task IDs (checked together) + TaskSDR sql.NullInt64 `db:"task_id_sdr"` // 16 bytes (41-57, with padding) + AfterSDR bool `db:"after_sdr"` // 1 byte + StartedSDR bool `db:"started_sdr"` // 1 byte + // Cache line 2 (bytes 64-128): Tree stages (accessed together) + TaskTreeD sql.NullInt64 `db:"task_id_tree_d"` // 16 bytes + TreeD sql.NullString `db:"tree_d_cid"` // 24 bytes + TaskTreeC sql.NullInt64 `db:"task_id_tree_c"` // 16 bytes + AfterTreeD bool `db:"after_tree_d"` // 1 byte + StartedTreeD bool `db:"started_tree_d"` // 1 byte + AfterTreeC bool `db:"after_tree_c"` // 1 byte + StartedTreeRC bool `db:"started_tree_rc"` // 1 byte + // Cache line 3 (bytes 128-192): TreeR and Synthetic stages + TaskTreeR sql.NullInt64 `db:"task_id_tree_r"` // 16 bytes + TreeR sql.NullString `db:"tree_r_cid"` // 24 bytes + TaskSynthetic sql.NullInt64 `db:"task_id_synth"` // 16 bytes + AfterTreeR bool `db:"after_tree_r"` // 1 byte + AfterSynthetic bool `db:"after_synth"` // 1 byte + StartedSynthetic bool `db:"started_synthetic"` // 1 byte + // Cache line 4 (bytes 192-256): PreCommit stage + PreCommitReadyAt sql.NullTime `db:"precommit_ready_at"` // 32 bytes + TaskPrecommitMsg sql.NullInt64 `db:"task_id_precommit_msg"` // 16 bytes + AfterPrecommitMsg bool `db:"after_precommit_msg"` // 1 byte + StartedPrecommitMsg bool `db:"started_precommit_msg"` // 1 byte + AfterPrecommitMsgSuccess bool `db:"after_precommit_msg_success"` // 1 byte + // Cache line 5 (bytes 256-320): PreCommit CID and SeedEpoch + PreCommitMsgCid sql.NullString `db:"precommit_msg_cid"` // 24 bytes + SeedEpoch sql.NullInt64 `db:"seed_epoch"` // 16 bytes + // PoRep stage (accessed together) + TaskPoRep sql.NullInt64 `db:"task_id_porep"` // 16 bytes + AfterPoRep bool `db:"after_porep"` // 1 byte + StartedPoRep bool `db:"started_porep"` // 1 byte + // Cache line 6 (bytes 320-384): Finalize and MoveStorage stages + TaskFinalize sql.NullInt64 `db:"task_id_finalize"` // 16 bytes + TaskMoveStorage sql.NullInt64 `db:"task_id_move_storage"` // 16 bytes + AfterFinalize bool `db:"after_finalize"` // 1 byte + StartedFinalize bool `db:"started_finalize"` // 1 byte + AfterMoveStorage bool `db:"after_move_storage"` // 1 byte + StartedMoveStorage bool `db:"started_move_storage"` // 1 byte + // Commit stage (accessed together) + CommitReadyAt sql.NullTime `db:"commit_ready_at"` // 32 bytes + // Cache line 7 (bytes 384-448): Commit message stage + TaskCommitMsg sql.NullInt64 `db:"task_id_commit_msg"` // 16 bytes + CommitMsgCid sql.NullString `db:"commit_msg_cid"` // 24 bytes + AfterCommitMsg bool `db:"after_commit_msg"` // 1 byte + StartedCommitMsg bool `db:"started_commit_msg"` // 1 byte + AfterCommitMsgSuccess bool `db:"after_commit_msg_success"` // 1 byte + // Larger fields at end (rarely accessed or only when needed) + FailedReason string `db:"failed_reason"` // 16 bytes - only used when Failed=true + PoRepProof []byte `db:"porep_proof"` // 24 bytes - only used in PoRep stage + MissingTasks []int64 `db:"missing_tasks"` // 24 bytes - computed field + AllTasks []int64 `db:"all_tasks"` // 24 bytes - computed field } type sectorListEntry struct { @@ -307,7 +302,7 @@ func (a *WebRPC) PipelinePorepSectors(ctx context.Context) ([]sectorListEntry, e minerBitfieldCache[addr] = mbf } - afterSeed := task.SeedEpoch != nil && *task.SeedEpoch <= int64(epoch) + afterSeed := task.SeedEpoch.Valid && task.SeedEpoch.Int64 <= int64(epoch) sectorList = append(sectorList, sectorListEntry{ PipelineTask: task, diff --git a/web/api/webrpc/proofshare.go b/web/api/webrpc/proofshare.go index 683cb82a3..5b6985082 100644 --- a/web/api/webrpc/proofshare.go +++ b/web/api/webrpc/proofshare.go @@ -28,22 +28,22 @@ import ( // ProofShareMeta holds the data from the proofshare_meta table. type ProofShareMeta struct { - Enabled bool `db:"enabled" json:"enabled"` - Wallet *string `db:"wallet" json:"wallet"` - RequestTaskID *int64 `db:"request_task_id" json:"request_task_id"` - Price string `db:"pprice" json:"price"` + Enabled bool `db:"enabled" json:"enabled"` + Wallet sql.NullString `db:"wallet" json:"wallet"` + RequestTaskID sql.NullInt64 `db:"request_task_id" json:"request_task_id"` + Price string `db:"pprice" json:"price"` } // ProofShareQueueItem represents each row in proofshare_queue. type ProofShareQueueItem struct { - ServiceID string `db:"service_id" json:"service_id"` - ObtainedAt time.Time `db:"obtained_at" json:"obtained_at"` - ComputeTaskID *int64 `db:"compute_task_id" json:"compute_task_id"` - ComputeDone bool `db:"compute_done" json:"compute_done"` - SubmitTaskID *int64 `db:"submit_task_id" json:"submit_task_id"` - SubmitDone bool `db:"submit_done" json:"submit_done"` - WasPoW bool `db:"was_pow" json:"was_pow"` - PaymentAmount string `json:"payment_amount"` + ServiceID string `db:"service_id" json:"service_id"` + ObtainedAt time.Time `db:"obtained_at" json:"obtained_at"` + ComputeTaskID sql.NullInt64 `db:"compute_task_id" json:"compute_task_id"` + ComputeDone bool `db:"compute_done" json:"compute_done"` + SubmitTaskID sql.NullInt64 `db:"submit_task_id" json:"submit_task_id"` + SubmitDone bool `db:"submit_done" json:"submit_done"` + WasPoW bool `db:"was_pow" json:"was_pow"` + PaymentAmount string `json:"payment_amount"` } // PSGetMeta returns the current meta row from proofshare_meta (always a single row). @@ -108,11 +108,11 @@ func (a *WebRPC) PSListAsks(ctx context.Context) ([]common.WorkAsk, error) { return nil, xerrors.Errorf("PSListAsks: failed to query proofshare_meta: %w", err) } - if meta.Wallet == nil { + if !meta.Wallet.Valid { return nil, nil } - work, err := proofsvc.PollWork(*meta.Wallet) + work, err := proofsvc.PollWork(meta.Wallet.String) if err != nil { return nil, xerrors.Errorf("failed to poll work: %w", err) } @@ -324,13 +324,13 @@ func (a *WebRPC) addMessageTrackingProvider(ctx context.Context, messageCid cid. // ProofShareClientSettings model // Matches proofshare_client_settings table columns type ProofShareClientSettings struct { - SpID int64 `db:"sp_id" json:"sp_id"` - Enabled bool `db:"enabled" json:"enabled"` - Wallet *string `db:"wallet" json:"wallet"` - MinimumPendingSecs int64 `db:"minimum_pending_seconds" json:"minimum_pending_seconds"` - DoPoRep bool `db:"do_porep" json:"do_porep"` - DoSnap bool `db:"do_snap" json:"do_snap"` - Price string `db:"pprice"` + SpID int64 `db:"sp_id" json:"sp_id"` + Enabled bool `db:"enabled" json:"enabled"` + Wallet sql.NullString `db:"wallet" json:"wallet"` + MinimumPendingSecs int64 `db:"minimum_pending_seconds" json:"minimum_pending_seconds"` + DoPoRep bool `db:"do_porep" json:"do_porep"` + DoSnap bool `db:"do_snap" json:"do_snap"` + Price string `db:"pprice"` Address string `db:"-" json:"address"` FilPerP string `db:"-" json:"price"` @@ -382,8 +382,8 @@ func (a *WebRPC) PSClientSet(ctx context.Context, s ProofShareClientSettings) er } var walletAddr *address.Address - if s.Wallet != nil { - addr, err := address.NewFromString(*s.Wallet) + if s.Wallet.Valid { + addr, err := address.NewFromString(s.Wallet.String) if err != nil { return xerrors.Errorf("PSClientSet: invalid address: %w", err) } @@ -421,18 +421,18 @@ func (a *WebRPC) PSClientSet(ctx context.Context, s ProofShareClientSettings) er // ProofShareClientRequest model type ProofShareClientRequest struct { - TaskID int64 `db:"task_id" json:"task_id"` - SpID int64 `db:"sp_id"` - SectorNum int64 `db:"sector_num" json:"sector_num"` - RequestCID *string `db:"request_cid" json:"request_cid,omitempty"` - RequestUploaded bool `db:"request_uploaded" json:"request_uploaded"` - PaymentWallet *int64 `db:"payment_wallet" json:"payment_wallet,omitempty"` - PaymentNonce *int64 `db:"payment_nonce" json:"payment_nonce,omitempty"` - RequestSent bool `db:"request_sent" json:"request_sent"` - ResponseData []byte `db:"response_data" json:"response_data,omitempty"` - Done bool `db:"done" json:"done"` - CreatedAt time.Time `db:"created_at" json:"created_at"` - DoneAt sql.NullTime `db:"done_at" json:"done_at,omitempty"` + TaskID int64 `db:"task_id" json:"task_id"` + SpID int64 `db:"sp_id"` + SectorNum int64 `db:"sector_num" json:"sector_num"` + RequestCID sql.NullString `db:"request_cid" json:"request_cid,omitempty"` + RequestUploaded bool `db:"request_uploaded" json:"request_uploaded"` + PaymentWallet sql.NullInt64 `db:"payment_wallet" json:"payment_wallet,omitempty"` + PaymentNonce sql.NullInt64 `db:"payment_nonce" json:"payment_nonce,omitempty"` + RequestSent bool `db:"request_sent" json:"request_sent"` + ResponseData []byte `db:"response_data" json:"response_data,omitempty"` + Done bool `db:"done" json:"done"` + CreatedAt time.Time `db:"created_at" json:"created_at"` + DoneAt sql.NullTime `db:"done_at" json:"done_at,omitempty"` PaymentAmount *string `db:"-" json:"payment_amount"` SpIDStr string `db:"-" json:"sp_id"` @@ -458,13 +458,13 @@ func (a *WebRPC) PSClientRequests(ctx context.Context, spId int64) ([]*ProofShar } for _, r := range rows { - if r.PaymentWallet == nil || r.PaymentNonce == nil { + if !r.PaymentWallet.Valid || !r.PaymentNonce.Valid { // No payment info, so no payment amount to calculate continue } - walletID := *r.PaymentWallet - currentNonce := *r.PaymentNonce + walletID := r.PaymentWallet.Int64 + currentNonce := r.PaymentNonce.Int64 var currentCumulativeAmountStr string err := a.deps.DB.QueryRow(ctx, ` diff --git a/web/api/webrpc/sector.go b/web/api/webrpc/sector.go index 73e9353a6..268eda956 100644 --- a/web/api/webrpc/sector.go +++ b/web/api/webrpc/sector.go @@ -2,6 +2,7 @@ package webrpc import ( "context" + "database/sql" "strconv" "strings" "time" @@ -62,35 +63,35 @@ type sectorSnapListEntry struct { } type SnapPipelineTask struct { - SpID int64 `db:"sp_id"` - SectorNumber int64 `db:"sector_number"` - StartTime time.Time `db:"start_time"` - - UpgradeProof int `db:"upgrade_proof"` - DataAssigned bool `db:"data_assigned"` - - UpdateUnsealedCID *string `db:"update_unsealed_cid"` - UpdateSealedCID *string `db:"update_sealed_cid"` - - TaskEncode *int64 `db:"task_id_encode"` - AfterEncode bool `db:"after_encode"` - TaskProve *int64 `db:"task_id_prove"` - AfterProve bool `db:"after_prove"` - TaskSubmit *int64 `db:"task_id_submit"` - AfterSubmit bool `db:"after_submit"` - AfterProveMsgSuccess bool `db:"after_prove_msg_success"` - ProveMsgTsk []byte `db:"prove_msg_tsk"` - UpdateMsgCid *string `db:"prove_msg_cid"` - - TaskMoveStorage *int64 `db:"task_id_move_storage"` - AfterMoveStorage bool `db:"after_move_storage"` - - Failed bool `db:"failed"` - FailedAt *time.Time `db:"failed_at"` - FailedReason string `db:"failed_reason"` - FailedReasonMsg string `db:"failed_reason_msg"` - - SubmitAfter *time.Time `db:"submit_after"` + // Cache line 1 (bytes 0-64): Hot path - identification and early checks + SpID int64 `db:"sp_id"` // 8 bytes (0-8) + SectorNumber int64 `db:"sector_number"` // 8 bytes (8-16) + StartTime time.Time `db:"start_time"` // 24 bytes (16-40) + UpgradeProof int `db:"upgrade_proof"` // 8 bytes (40-48) + Failed bool `db:"failed"` // 1 byte (48-49) - checked early + DataAssigned bool `db:"data_assigned"` // 1 byte (49-50) - checked with sector number + // Cache line 2 (bytes 64-128): Encode and Prove stages (accessed together) + TaskEncode sql.NullInt64 `db:"task_id_encode"` // 16 bytes + AfterEncode bool `db:"after_encode"` // 1 byte + UpdateUnsealedCID sql.NullString `db:"update_unsealed_cid"` // 24 bytes + TaskProve sql.NullInt64 `db:"task_id_prove"` // 16 bytes + AfterProve bool `db:"after_prove"` // 1 byte + // Cache line 3 (bytes 128-192): Submit and message stages + UpdateSealedCID sql.NullString `db:"update_sealed_cid"` // 24 bytes + TaskSubmit sql.NullInt64 `db:"task_id_submit"` // 16 bytes + AfterSubmit bool `db:"after_submit"` // 1 byte + AfterProveMsgSuccess bool `db:"after_prove_msg_success"` // 1 byte + UpdateMsgCid sql.NullString `db:"prove_msg_cid"` // 24 bytes (crosses into cache line 4) + // Cache line 4 (bytes 192-256): Storage and timing + TaskMoveStorage sql.NullInt64 `db:"task_id_move_storage"` // 16 bytes + AfterMoveStorage bool `db:"after_move_storage"` // 1 byte + SubmitAfter sql.NullTime `db:"submit_after"` // 32 bytes + // Failure info (only accessed when Failed=true) + FailedAt sql.NullTime `db:"failed_at"` // 32 bytes (crosses into cache line 5) + // Rarely accessed fields at end + FailedReason string `db:"failed_reason"` // 16 bytes - only when Failed=true + FailedReasonMsg string `db:"failed_reason_msg"` // 16 bytes - only when Failed=true + ProveMsgTsk []byte `db:"prove_msg_tsk"` // 24 bytes - only in specific stages } type SectorInfoTaskSummary struct { Name string @@ -100,53 +101,51 @@ type SectorInfoTaskSummary struct { } type TaskHistory struct { - PipelineTaskID int64 `db:"pipeline_task_id"` - Name *string `db:"name"` - CompletedBy *string `db:"completed_by_host_and_port"` - Result *bool `db:"result"` - Err *string `db:"err"` - WorkStart *time.Time `db:"work_start"` - WorkEnd *time.Time `db:"work_end"` - - // display - Took string `db:"-"` + // Cache line 1 (bytes 0-64): Identification and key timing + PipelineTaskID int64 `db:"pipeline_task_id"` // 8 bytes (0-8) + WorkStart sql.NullTime `db:"work_start"` // 32 bytes (8-40) + WorkEnd sql.NullTime `db:"work_end"` // 32 bytes (40-72, crosses to cache line 2) + // Cache line 2 (bytes 64-128): Task details + Name sql.NullString `db:"name"` // 24 bytes + CompletedBy sql.NullString `db:"completed_by_host_and_port"` // 24 bytes + Result sql.NullBool `db:"result"` // 2 bytes + // Cache line 3 (bytes 128+): Error info and display fields (only accessed when needed) + Err sql.NullString `db:"err"` // 24 bytes - only accessed when Result is false + Took string `db:"-"` // 16 bytes - display only, computed field } // Pieces type SectorPieceMeta struct { - PieceIndex int64 `db:"piece_index"` - PieceCid string `db:"piece_cid"` - PieceSize int64 `db:"piece_size"` - PieceCidV2 string `db:"-"` - - DealID *string `db:"deal_id"` - DataUrl *string `db:"data_url"` - DataRawSize *int64 `db:"data_raw_size"` - DeleteOnFinalize *bool `db:"data_delete_on_finalize"` - - F05PublishCid *string `db:"f05_publish_cid"` - F05DealID *int64 `db:"f05_deal_id"` - - DDOPam *string `db:"direct_piece_activation_manifest"` - - // display - StrPieceSize string `db:"-"` - StrDataRawSize string `db:"-"` - - // piece park - IsParkedPiece bool `db:"-"` - IsParkedPieceFound bool `db:"-"` - PieceParkID int64 `db:"-"` - PieceParkDataUrl string `db:"-"` - PieceParkCreatedAt time.Time `db:"-"` - PieceParkComplete bool `db:"-"` - PieceParkTaskID *int64 `db:"-"` - PieceParkCleanupTaskID *int64 `db:"-"` - - IsSnapPiece bool `db:"is_snap"` - - MK12Deal *bool `db:"boost_deal"` - LegacyDeal *bool `db:"legacy_deal"` + // Cache line 1 (bytes 0-64): Hot path - piece identification and size + PieceIndex int64 `db:"piece_index"` // 8 bytes (0-8) + PieceSize int64 `db:"piece_size"` // 8 bytes (8-16) + PieceCid string `db:"piece_cid"` // 16 bytes (16-32) + PieceCidV2 string `db:"-"` // 16 bytes (32-48) - computed field + DataRawSize sql.NullInt64 `db:"data_raw_size"` // 16 bytes (48-64) + // Cache line 2 (bytes 64-128): Deal identification + F05DealID sql.NullInt64 `db:"f05_deal_id"` // 16 bytes + DealID sql.NullString `db:"deal_id"` // 24 bytes + IsSnapPiece bool `db:"is_snap"` // 1 byte - frequently checked with PieceIndex + // Cache line 3 (bytes 128-192): Data access and F05 info + DataUrl sql.NullString `db:"data_url"` // 24 bytes + F05PublishCid sql.NullString `db:"f05_publish_cid"` // 24 bytes + // Cache line 4 (bytes 192-256): DDO and display fields + DDOPam sql.NullString `db:"direct_piece_activation_manifest"` // 24 bytes + StrPieceSize string `db:"-"` // 16 bytes - display only + StrDataRawSize string `db:"-"` // 16 bytes - display only + // Piece park fields (rarely accessed, only for parked pieces) + PieceParkDataUrl string `db:"-"` // 16 bytes + PieceParkCreatedAt time.Time `db:"-"` // 24 bytes + PieceParkID int64 `db:"-"` // 8 bytes + PieceParkTaskID *int64 `db:"-"` // 8 bytes - still pointer (not from DB) + PieceParkCleanupTaskID *int64 `db:"-"` // 8 bytes - still pointer (not from DB) + // Bools: frequently checked first, rare ones at end (sql.NullBool = 2 bytes each) + MK12Deal sql.NullBool `db:"boost_deal"` // 2 bytes - checked often + LegacyDeal sql.NullBool `db:"legacy_deal"` // 2 bytes - checked often + DeleteOnFinalize sql.NullBool `db:"data_delete_on_finalize"` // 2 bytes - checked during finalize + IsParkedPiece bool `db:"-"` // rare - only for UI display + IsParkedPieceFound bool `db:"-"` // rare - only for UI display + PieceParkComplete bool `db:"-"` // rare - only for parked pieces } type FileLocations struct { @@ -165,23 +164,22 @@ type LocationTable struct { } type SectorMeta struct { - OrigUnsealedCid string `db:"orig_unsealed_cid"` - OrigSealedCid string `db:"orig_sealed_cid"` - - UpdatedUnsealedCid string `db:"cur_unsealed_cid"` - UpdatedSealedCid string `db:"cur_sealed_cid"` - - PreCommitCid string `db:"msg_cid_precommit"` - CommitCid string `db:"msg_cid_commit"` - UpdateCid *string `db:"msg_cid_update"` - - IsCC *bool `db:"is_cc"` - ExpirationEpoch *int64 `db:"expiration_epoch"` - - Deadline *int64 `db:"deadline"` - Partition *int64 `db:"partition"` - - UnsealedState *bool `db:"target_unseal_state"` + // Cache line 1 (bytes 0-64): Original and updated CIDs (accessed together for sector comparison) + OrigUnsealedCid string `db:"orig_unsealed_cid"` // 16 bytes (0-16) + OrigSealedCid string `db:"orig_sealed_cid"` // 16 bytes (16-32) + UpdatedUnsealedCid string `db:"cur_unsealed_cid"` // 16 bytes (32-48) + UpdatedSealedCid string `db:"cur_sealed_cid"` // 16 bytes (48-64) + // Cache line 2 (bytes 64-128): Message CIDs (accessed for on-chain tracking) + PreCommitCid string `db:"msg_cid_precommit"` // 16 bytes (64-80) + CommitCid string `db:"msg_cid_commit"` // 16 bytes (80-96) + UpdateCid sql.NullString `db:"msg_cid_update"` // 24 bytes (96-120) - null for non-snap sectors + // Cache line 3 (bytes 128-192): On-chain metadata (sql.NullInt64 = 16 bytes each) + ExpirationEpoch sql.NullInt64 `db:"expiration_epoch"` // 16 bytes + Deadline sql.NullInt64 `db:"deadline"` // 16 bytes + Partition sql.NullInt64 `db:"partition"` // 16 bytes + // Bools (sql.NullBool = 2 bytes each) + IsCC sql.NullBool `db:"is_cc"` // 2 bytes + UnsealedState sql.NullBool `db:"target_unseal_state"` // 2 bytes } func (a *WebRPC) SectorInfo(ctx context.Context, sp string, intid int64) (*SectorInfo, error) { @@ -262,36 +260,36 @@ func (a *WebRPC) SectorInfo(ctx context.Context, sp string, intid int64) (*Secto var sle *sectorListEntry if len(tasks) > 0 { task := tasks[0] - if task.PreCommitMsgCid != nil { - si.PreCommitMsg = *task.PreCommitMsgCid + if task.PreCommitMsgCid.Valid { + si.PreCommitMsg = task.PreCommitMsgCid.String } else { si.PreCommitMsg = "" } - if task.CommitMsgCid != nil { - si.CommitMsg = *task.CommitMsgCid + if task.CommitMsgCid.Valid { + si.CommitMsg = task.CommitMsgCid.String } else { si.CommitMsg = "" } - if task.TreeD != nil { - si.UnsealedCid = *task.TreeD - si.UpdatedUnsealedCid = *task.TreeD + if task.TreeD.Valid { + si.UnsealedCid = task.TreeD.String + si.UpdatedUnsealedCid = task.TreeD.String } else { si.UnsealedCid = "" si.UpdatedUnsealedCid = "" } - if task.TreeR != nil { - si.SealedCid = *task.TreeR - si.UpdatedSealedCid = *task.TreeR + if task.TreeR.Valid { + si.SealedCid = task.TreeR.String + si.UpdatedSealedCid = task.TreeR.String } else { si.SealedCid = "" si.UpdatedSealedCid = "" } sle = §orListEntry{ PipelineTask: tasks[0], - AfterSeed: task.SeedEpoch != nil && *task.SeedEpoch <= int64(epoch), + AfterSeed: task.SeedEpoch.Valid && task.SeedEpoch.Int64 <= int64(epoch), ChainAlloc: must.One(mbf.alloc.IsSet(uint64(task.SectorNumber))), ChainSector: must.One(mbf.sectorSet.IsSet(uint64(task.SectorNumber))), @@ -305,18 +303,18 @@ func (a *WebRPC) SectorInfo(ctx context.Context, sp string, intid int64) (*Secto var sleSnap *sectorSnapListEntry if len(snapTasks) > 0 { task := snapTasks[0] - if task.UpdateUnsealedCID != nil { - si.UpdatedUnsealedCid = *task.UpdateUnsealedCID + if task.UpdateUnsealedCID.Valid { + si.UpdatedUnsealedCid = task.UpdateUnsealedCID.String } else { si.UpdatedUnsealedCid = "" } - if task.UpdateUnsealedCID != nil { - si.UpdatedSealedCid = *task.UpdateUnsealedCID + if task.UpdateSealedCID.Valid { + si.UpdatedSealedCid = task.UpdateSealedCID.String } else { si.UpdatedSealedCid = "" } - if task.UpdateMsgCid != nil { - si.UpdateMsg = *task.UpdateMsgCid + if task.UpdateMsgCid.Valid { + si.UpdateMsg = task.UpdateMsgCid.String } else { si.UpdateMsg = "" } @@ -429,27 +427,31 @@ func (a *WebRPC) SectorInfo(ctx context.Context, sp string, intid int64) (*Secto si.UpdatedSealedCid = sectormeta.UpdatedSealedCid si.PreCommitMsg = sectormeta.PreCommitCid si.CommitMsg = sectormeta.CommitCid - if sectormeta.UpdateCid != nil { - si.UpdateMsg = *sectormeta.UpdateCid + if sectormeta.UpdateCid.Valid { + si.UpdateMsg = sectormeta.UpdateCid.String } - if sectormeta.IsCC != nil { - si.IsSnap = !*sectormeta.IsCC + if sectormeta.IsCC.Valid { + si.IsSnap = !sectormeta.IsCC.Bool } else { si.IsSnap = false } - if sectormeta.ExpirationEpoch != nil { - si.ExpirationEpoch = sectormeta.ExpirationEpoch + if sectormeta.ExpirationEpoch.Valid { + e := sectormeta.ExpirationEpoch.Int64 + si.ExpirationEpoch = &e } - if sectormeta.Deadline != nil { - d := *sectormeta.Deadline + if sectormeta.Deadline.Valid { + d := sectormeta.Deadline.Int64 si.Deadline = &d } - if sectormeta.Partition != nil { - p := *sectormeta.Partition + if sectormeta.Partition.Valid { + p := sectormeta.Partition.Int64 si.Partition = &p } - si.UnsealedState = sectormeta.UnsealedState + if sectormeta.UnsealedState.Valid { + u := sectormeta.UnsealedState.Bool + si.UnsealedState = &u + } } var pieces []SectorPieceMeta @@ -514,15 +516,20 @@ func (a *WebRPC) SectorInfo(ctx context.Context, sp string, intid int64) (*Secto for i := range pieces { pieces[i].StrPieceSize = types.SizeStr(types.NewInt(uint64(pieces[i].PieceSize))) - pieces[i].StrDataRawSize = types.SizeStr(types.NewInt(uint64(derefOrZero(pieces[i].DataRawSize)))) + rawSize := int64(0) + if pieces[i].DataRawSize.Valid { + rawSize = pieces[i].DataRawSize.Int64 + } + pieces[i].StrDataRawSize = types.SizeStr(types.NewInt(uint64(rawSize))) pcid, err := cid.Parse(pieces[i].PieceCid) if err != nil { return nil, xerrors.Errorf("failed to parse piece cid: %w", err) } - if pieces[i].DataRawSize != nil { - pcid2, err := commcid.PieceCidV2FromV1(pcid, uint64(*pieces[i].DataRawSize)) + if pieces[i].DataRawSize.Valid { + pcid2, err := commcid.PieceCidV2FromV1(pcid, uint64(pieces[i].DataRawSize.Int64)) + if err != nil { return nil, xerrors.Errorf("failed to generate piece cid v2: %w", err) } @@ -530,7 +537,11 @@ func (a *WebRPC) SectorInfo(ctx context.Context, sp string, intid int64) (*Secto pieces[i].PieceCidV2 = pcid2.String() } - id, isPiecePark := strings.CutPrefix(derefOrZero(pieces[i].DataUrl), "pieceref:") + dataUrl := "" + if pieces[i].DataUrl.Valid { + dataUrl = pieces[i].DataUrl.String + } + id, isPiecePark := strings.CutPrefix(dataUrl, "pieceref:") if !isPiecePark { continue } @@ -547,10 +558,10 @@ func (a *WebRPC) SectorInfo(ctx context.Context, sp string, intid int64) (*Secto DataUrl string `db:"data_url"` // parked_pieces - CreatedAt time.Time `db:"created_at"` - Complete bool `db:"complete"` - ParkTaskID *int64 `db:"task_id"` - CleanupTaskID *int64 `db:"cleanup_task_id"` + CreatedAt time.Time `db:"created_at"` + Complete bool `db:"complete"` + ParkTaskID sql.NullInt64 `db:"task_id"` + CleanupTaskID sql.NullInt64 `db:"cleanup_task_id"` } err = a.deps.DB.Select(ctx, &parkedPiece, `SELECT ppr.piece_id, ppr.data_url, pp.created_at, pp.complete, pp.task_id, pp.cleanup_task_id FROM parked_piece_refs ppr @@ -572,41 +583,47 @@ func (a *WebRPC) SectorInfo(ctx context.Context, sp string, intid int64) (*Secto pieces[i].PieceParkDataUrl = parkedPiece[0].DataUrl pieces[i].PieceParkCreatedAt = parkedPiece[0].CreatedAt.Local() pieces[i].PieceParkComplete = parkedPiece[0].Complete - pieces[i].PieceParkTaskID = parkedPiece[0].ParkTaskID - pieces[i].PieceParkCleanupTaskID = parkedPiece[0].CleanupTaskID + if parkedPiece[0].ParkTaskID.Valid { + t := parkedPiece[0].ParkTaskID.Int64 + pieces[i].PieceParkTaskID = &t + } + if parkedPiece[0].CleanupTaskID.Valid { + c := parkedPiece[0].CleanupTaskID.Int64 + pieces[i].PieceParkCleanupTaskID = &c + } } // TaskIDs var htasks []SectorInfoTaskSummary taskIDs := map[int64]struct{}{} - appendNonNil := func(id *int64) { - if id != nil { - taskIDs[*id] = struct{}{} + appendNullInt64 := func(n sql.NullInt64) { + if n.Valid { + taskIDs[n.Int64] = struct{}{} } } // Append PoRep task IDs if len(tasks) > 0 { task := tasks[0] - appendNonNil(task.TaskSDR) - appendNonNil(task.TaskTreeD) - appendNonNil(task.TaskTreeC) - appendNonNil(task.TaskTreeR) - appendNonNil(task.TaskPrecommitMsg) - appendNonNil(task.TaskPoRep) - appendNonNil(task.TaskFinalize) - appendNonNil(task.TaskMoveStorage) - appendNonNil(task.TaskCommitMsg) + appendNullInt64(task.TaskSDR) + appendNullInt64(task.TaskTreeD) + appendNullInt64(task.TaskTreeC) + appendNullInt64(task.TaskTreeR) + appendNullInt64(task.TaskPrecommitMsg) + appendNullInt64(task.TaskPoRep) + appendNullInt64(task.TaskFinalize) + appendNullInt64(task.TaskMoveStorage) + appendNullInt64(task.TaskCommitMsg) } // Append SnapDeals task IDs if len(snapTasks) > 0 { task := snapTasks[0] - appendNonNil(task.TaskEncode) - appendNonNil(task.TaskProve) - appendNonNil(task.TaskSubmit) - appendNonNil(task.TaskMoveStorage) + appendNullInt64(task.TaskEncode) + appendNullInt64(task.TaskProve) + appendNullInt64(task.TaskSubmit) + appendNullInt64(task.TaskMoveStorage) } if len(taskIDs) > 0 { @@ -671,14 +688,14 @@ func (a *WebRPC) SectorInfo(ctx context.Context, sp string, intid int64) (*Secto } for i := range th { - if th[i].WorkStart != nil && th[i].WorkEnd != nil { - th[i].Took = th[i].WorkEnd.Sub(*th[i].WorkStart).Round(time.Second).String() + if th[i].WorkStart.Valid && th[i].WorkEnd.Valid { + th[i].Took = th[i].WorkEnd.Time.Sub(th[i].WorkStart.Time).Round(time.Second).String() } } var taskState []struct { - PipelineID int64 `db:"pipeline_id"` - HarmonyTaskID *int64 `db:"harmony_task_id"` + PipelineID int64 `db:"pipeline_id"` + HarmonyTaskID sql.NullInt64 `db:"harmony_task_id"` } err = a.deps.DB.Select(ctx, &taskState, `WITH task_ids AS ( SELECT unnest(get_sdr_pipeline_tasks($1, $2)) AS task_id @@ -694,7 +711,7 @@ func (a *WebRPC) SectorInfo(ctx context.Context, sp string, intid int64) (*Secto var hasAnyStuckTask bool for _, ts := range taskState { - if ts.HarmonyTaskID == nil { + if !ts.HarmonyTaskID.Valid { hasAnyStuckTask = true break } @@ -734,8 +751,8 @@ func (a *WebRPC) SectorInfo(ctx context.Context, sp string, intid int64) (*Secto } si.ActivationEpoch = onChainInfo.Activation - if si.ExpirationEpoch == nil || *si.ExpirationEpoch != int64(onChainInfo.Expiration) { - expr := int64(onChainInfo.Expiration) + expr := int64(onChainInfo.Expiration) + if si.ExpirationEpoch == nil || *si.ExpirationEpoch != expr { si.ExpirationEpoch = &expr } si.DealWeight = dealWeight @@ -925,10 +942,3 @@ func (a *WebRPC) SectorCCSchedulerDelete(ctx context.Context, sp string) error { } return nil } - -func derefOrZero[T any](a *T) T { - if a == nil { - return *new(T) - } - return *a -} diff --git a/web/api/webrpc/storage_stats.go b/web/api/webrpc/storage_stats.go index d1c1de08b..5dc85cd45 100644 --- a/web/api/webrpc/storage_stats.go +++ b/web/api/webrpc/storage_stats.go @@ -2,6 +2,7 @@ package webrpc import ( "context" + "database/sql" "net/url" "sort" "strings" @@ -249,9 +250,9 @@ type StorageGCMark struct { FileType int64 `db:"sector_filetype"` StorageID string `db:"storage_id"` - CreatedAt time.Time `db:"created_at"` - Approved bool `db:"approved"` - ApprovedAt *time.Time `db:"approved_at"` + CreatedAt time.Time `db:"created_at"` + Approved bool `db:"approved"` + ApprovedAt sql.NullTime `db:"approved_at"` CanSeal bool `db:"can_seal"` CanStore bool `db:"can_store"` diff --git a/web/api/webrpc/upgrade.go b/web/api/webrpc/upgrade.go index 37b11cdc1..f36f403c8 100644 --- a/web/api/webrpc/upgrade.go +++ b/web/api/webrpc/upgrade.go @@ -2,6 +2,7 @@ package webrpc import ( "context" + "database/sql" "time" "golang.org/x/xerrors" @@ -18,21 +19,21 @@ type UpgradeSector struct { SpID uint64 `db:"sp_id"` SectorNum uint64 `db:"sector_number"` - TaskIDEncode *uint64 `db:"task_id_encode"` - AfterEncode bool `db:"after_encode"` + TaskIDEncode sql.NullInt64 `db:"task_id_encode"` + AfterEncode bool `db:"after_encode"` - TaskIDProve *uint64 `db:"task_id_prove"` - AfterProve bool `db:"after_prove"` + TaskIDProve sql.NullInt64 `db:"task_id_prove"` + AfterProve bool `db:"after_prove"` - UpdateReadyAt *time.Time `db:"update_ready_at"` + UpdateReadyAt sql.NullTime `db:"update_ready_at"` - TaskIDSubmit *uint64 `db:"task_id_submit"` - AfterSubmit bool `db:"after_submit"` + TaskIDSubmit sql.NullInt64 `db:"task_id_submit"` + AfterSubmit bool `db:"after_submit"` AfterProveSuccess bool `db:"after_prove_msg_success"` - TaskIDMoveStorage *uint64 `db:"task_id_move_storage"` - AfterMoveStorage bool `db:"after_move_storage"` + TaskIDMoveStorage sql.NullInt64 `db:"task_id_move_storage"` + AfterMoveStorage bool `db:"after_move_storage"` Failed bool `db:"failed"` FailedReason string `db:"failed_reason"` diff --git a/web/api/webrpc/win_stats.go b/web/api/webrpc/win_stats.go index 23cfc42b8..0ab4de86c 100644 --- a/web/api/webrpc/win_stats.go +++ b/web/api/webrpc/win_stats.go @@ -2,6 +2,7 @@ package webrpc import ( "context" + "database/sql" "fmt" "time" @@ -9,15 +10,15 @@ import ( ) type WinStats struct { - Actor int64 `db:"sp_id"` - Epoch int64 `db:"epoch"` - Block string `db:"mined_cid"` - TaskID int64 `db:"task_id"` - SubmittedAt *time.Time `db:"submitted_at"` - Included *bool `db:"included"` + Actor int64 `db:"sp_id"` + Epoch int64 `db:"epoch"` + Block string `db:"mined_cid"` + TaskID int64 `db:"task_id"` + SubmittedAt sql.NullTime `db:"submitted_at"` + Included sql.NullBool `db:"included"` - BaseComputeTime *time.Time `db:"base_compute_time"` - MinedAt *time.Time `db:"mined_at"` + BaseComputeTime sql.NullTime `db:"base_compute_time"` + MinedAt sql.NullTime `db:"mined_at"` SubmittedAtStr string `db:"-"` TaskSuccess string `db:"-"` @@ -34,22 +35,22 @@ func (a *WebRPC) WinStats(ctx context.Context) ([]WinStats, error) { return nil, err } for i := range marks { - if marks[i].SubmittedAt == nil { + if !marks[i].SubmittedAt.Valid { marks[i].SubmittedAtStr = "Not Submitted" } else { - marks[i].SubmittedAtStr = marks[i].SubmittedAt.Format(time.RFC822) + marks[i].SubmittedAtStr = marks[i].SubmittedAt.Time.Format(time.RFC822) } - if marks[i].Included == nil { + if !marks[i].Included.Valid { marks[i].IncludedStr = "Not Checked" - } else if *marks[i].Included { + } else if marks[i].Included.Bool { marks[i].IncludedStr = "Included" } else { marks[i].IncludedStr = "Not Included" } - if marks[i].BaseComputeTime != nil && marks[i].MinedAt != nil { - marks[i].ComputeTime = marks[i].MinedAt.Sub(*marks[i].BaseComputeTime).Truncate(10 * time.Millisecond).String() + if marks[i].BaseComputeTime.Valid && marks[i].MinedAt.Valid { + marks[i].ComputeTime = marks[i].MinedAt.Time.Sub(marks[i].BaseComputeTime.Time).Truncate(10 * time.Millisecond).String() } var taskRes []struct {