diff --git a/cmd/pdptool/main.go b/cmd/pdptool/main.go index b8b5eaa92..aa1812c34 100644 --- a/cmd/pdptool/main.go +++ b/cmd/pdptool/main.go @@ -25,6 +25,7 @@ import ( "github.com/minio/sha256-simd" "github.com/schollz/progressbar/v3" "github.com/urfave/cli/v2" + "golang.org/x/sync/errgroup" "github.com/filecoin-project/go-commp-utils/nonffi" commcid "github.com/filecoin-project/go-fil-commcid" @@ -66,6 +67,8 @@ func main() { uploadFileCmd, // upload a file to a pdp service in many chunks downloadFileCmd, // download a file from curio + streamingPieceUploadCmd, // upload a piece to a pdp service in streaming mode + createDataSetCmd, // create a new data set on the PDP service getDataSetStatusCmd, // get the status of a data set creation on the PDP service getDataSetCmd, // retrieve the details of a data set from the PDP service @@ -1505,3 +1508,255 @@ var removePiecesCmd = &cli.Command{ return nil }, } + +var streamingPieceUploadCmd = &cli.Command{ + Name: "upload", + Usage: "Upload a piece to a PDP service", + ArgsUsage: "", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "service-url", + Usage: "URL of the PDP service", + Required: true, + }, + &cli.StringFlag{ + Name: "jwt-token", + Usage: "JWT token for authentication (optional if --service-name is provided)", + }, + &cli.StringFlag{ + Name: "service-name", + Usage: "Service Name to include in the JWT token (used if --jwt-token is not provided)", + }, + &cli.StringFlag{ + Name: "notify-url", + Usage: "Notification URL", + Required: false, + }, + &cli.StringFlag{ + Name: "hash-type", + Usage: "Hash type to use for verification (sha256 or commp)", + Value: "commp", + }, + &cli.BoolFlag{ + Name: "local-notif-wait", + Usage: "Wait for server notification by spawning a temporary local HTTP server", + }, + }, + Action: func(cctx *cli.Context) error { + inputFile := cctx.Args().Get(0) + if inputFile == "" { + return fmt.Errorf("input file is required") + } + + serviceURL := cctx.String("service-url") + jwtToken := cctx.String("jwt-token") + notifyURL := cctx.String("notify-url") + serviceName := cctx.String("service-name") + hashType := cctx.String("hash-type") + localNotifWait := cctx.Bool("local-notif-wait") + + if jwtToken == "" { + if serviceName == "" { + return fmt.Errorf("either --jwt-token or --service-name must be provided") + } + var err error + jwtToken, err = getJWTTokenForService(serviceName) + if err != nil { + return err + } + } + + if hashType != "sha256" && hashType != "commp" { + return fmt.Errorf("invalid hash type: %s", hashType) + } + + if localNotifWait && notifyURL != "" { + return fmt.Errorf("cannot specify both --notify-url and --local-notif-wait") + } + + var notifyReceived chan struct{} + var err error + + if localNotifWait { + notifyURL, notifyReceived, err = startLocalNotifyServer() + if err != nil { + return fmt.Errorf("failed to start local HTTP server: %v", err) + } + } + + // Open the input file + file, err := os.Open(inputFile) + if err != nil { + return fmt.Errorf("failed to open input file: %v", err) + } + defer func() { + _ = file.Close() + }() + + // Get the piece size + fi, err := file.Stat() + if err != nil { + return fmt.Errorf("failed to stat input file: %v", err) + } + raw_size := fi.Size() + + client := &http.Client{} + + req, err := http.NewRequest("GET", serviceURL+"/pdp/piece/uploads", nil) + if err != nil { + return fmt.Errorf("failed to create upload request: %v", err) + } + if jwtToken != "" { + req.Header.Set("Authorization", "Bearer "+jwtToken) + } + req.Header.Set("Content-Type", "application/json") + + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("failed to send request: %v", err) + } + defer func() { + _ = resp.Body.Close() + }() + + if resp.StatusCode != http.StatusCreated { + ret, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("failed to get upload URL, status code %d, failed to read body %s", resp.StatusCode, err.Error()) + } + return fmt.Errorf("failed to create upload, status code %d: %s", resp.StatusCode, string(ret)) + } + + location := resp.Header.Get("Location") + if location == "" { + return fmt.Errorf("failed to get upload URL, status code %d, no Location header", resp.StatusCode) + } + + cp := commp.Calc{} + + pipeReader, pipeWriter := io.Pipe() + + // Set up a MultiWriter to write to both cp and the pipe + multiWriter := io.MultiWriter(&cp, pipeWriter) + + // Create an error group to handle goroutines + var g errgroup.Group + + // Start goroutine to read the file and write to the MultiWriter + g.Go(func() error { + defer func() { + _ = pipeWriter.Close() // Ensure the pipeWriter is closed + }() + n, err := io.Copy(multiWriter, file) + if err != nil { + return fmt.Errorf("failed to copy data to multiwriter: %v", err) + } + if n != raw_size { + return fmt.Errorf("failed to copy all data to multiwriter, only copied %d/%d bytes", n, raw_size) + } + return nil + }) + + // Start a goroutine to handle the HTTP request + g.Go(func() error { + defer func() { + _ = pipeReader.Close() // Ensure the pipeReader is closed + }() + // Prepare the HTTP request for file upload + req, err := http.NewRequest("PUT", serviceURL+location, pipeReader) + if err != nil { + return fmt.Errorf("failed to create upload request: %v", err) + } + if jwtToken != "" { + req.Header.Set("Authorization", "Bearer "+jwtToken) + } + req.Header.Set("Content-Type", "application/octet-stream") + + // Execute the request + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("failed to send upload request: %v", err) + } + defer func() { + _ = resp.Body.Close() + }() + + if resp.StatusCode != http.StatusNoContent { + ret, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("failed to upload, status code %d, failed to read body %s", resp.StatusCode, err.Error()) + } + return fmt.Errorf("upload failed, status code %d: %s", resp.StatusCode, string(ret)) + } + return nil + }) + + // Wait for all goroutines to complete + if err := g.Wait(); err != nil { + return fmt.Errorf("upload process failed: %v", err) + } + + digest, _, err := cp.Digest() + if err != nil { + return fmt.Errorf("failed to calculate digest: %v", err) + } + + pcid2, err := commcid.DataCommitmentToPieceCidv2(digest, uint64(raw_size)) + if err != nil { + return fmt.Errorf("failed to compute piece CID: %v", err) + } + + // At this point, the commp calculation is complete + fmt.Printf("CommP: %s\n", pcid2.String()) + + type finalize struct { + PieceCID string `json:"pieceCid"` + Notify string `json:"notify,omitempty"` + } + + bd := finalize{ + PieceCID: pcid2.String(), + } + + if notifyURL != "" { + bd.Notify = notifyURL + } + + bodyBytes, err := json.Marshal(bd) + if err != nil { + return fmt.Errorf("failed to marshal finalize request body: %v", err) + } + + req, err = http.NewRequest("POST", serviceURL+location, bytes.NewBuffer(bodyBytes)) + if err != nil { + return fmt.Errorf("failed to create finalize request: %v", err) + } + if jwtToken != "" { + req.Header.Set("Authorization", "Bearer "+jwtToken) + } + req.Header.Set("Content-Type", "application/json") + + resp, err = client.Do(req) + if err != nil { + return fmt.Errorf("failed to send finalize request: %v", err) + } + defer func() { + _ = resp.Body.Close() + }() + + if resp.StatusCode != http.StatusOK { + ret, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("failed to finalize, status code %d, failed to read body %s", resp.StatusCode, err.Error()) + } + return fmt.Errorf("failed to finalize, status code %d: %s", resp.StatusCode, string(ret)) + } + + fmt.Println("Piece uploaded successfully.") + if localNotifWait { + fmt.Println("Waiting for server notification...") + <-notifyReceived + } + return nil + }, +} diff --git a/cuhttp/server.go b/cuhttp/server.go index ca092d3d9..7f0636489 100644 --- a/cuhttp/server.go +++ b/cuhttp/server.go @@ -295,7 +295,7 @@ func attachRouters(ctx context.Context, r *chi.Mux, d *deps.Deps, sd *ServiceDep libp2p.Router(r, rd) if sd.EthSender != nil { - pdsvc := pdp.NewPDPService(d.DB, d.LocalStore, must.One(d.EthClient.Get()), d.Chain, sd.EthSender) + pdsvc := pdp.NewPDPService(ctx, d.DB, d.LocalStore, must.One(d.EthClient.Get()), d.Chain, sd.EthSender) pdp.Routes(r, pdsvc) } diff --git a/harmony/harmonydb/sql/20250930-streaming-upload.sql b/harmony/harmonydb/sql/20250930-streaming-upload.sql new file mode 100644 index 000000000..875522c4f --- /dev/null +++ b/harmony/harmonydb/sql/20250930-streaming-upload.sql @@ -0,0 +1,14 @@ +CREATE TABLE pdp_piece_streaming_uploads ( + id UUID PRIMARY KEY NOT NULL, + service TEXT NOT NULL, -- pdp_services.id + + piece_cid TEXT, -- piece cid v1 + piece_size BIGINT, + raw_size BIGINT, + + piece_ref BIGINT, -- packed_piece_refs.ref_id + + created_at TIMESTAMPTZ NOT NULL DEFAULT TIMEZONE('UTC', NOW()), + complete bool, + completed_at TIMESTAMPTZ +); \ No newline at end of file diff --git a/lib/proof/merkle_sha254_memtree.go b/lib/proof/merkle_sha254_memtree.go index fec3daecd..1e127a65c 100644 --- a/lib/proof/merkle_sha254_memtree.go +++ b/lib/proof/merkle_sha254_memtree.go @@ -12,7 +12,7 @@ import ( "github.com/filecoin-project/lotus/storage/sealer/fr32" ) -const MaxMemtreeSize = 256 << 20 +const MaxMemtreeSize = 1 << 30 // BuildSha254Memtree builds a sha256 memtree from the input data // Returned slice should be released to the pool after use diff --git a/pdp/handlers.go b/pdp/handlers.go index 76918bef4..4fe17430d 100644 --- a/pdp/handlers.go +++ b/pdp/handlers.go @@ -12,6 +12,7 @@ import ( "path" "strconv" "strings" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -52,8 +53,8 @@ type PDPServiceNodeApi interface { } // NewPDPService creates a new instance of PDPService with the provided stores -func NewPDPService(db *harmonydb.DB, stor paths.StashStore, ec *ethclient.Client, fc PDPServiceNodeApi, sn *message.SenderETH) *PDPService { - return &PDPService{ +func NewPDPService(ctx context.Context, db *harmonydb.DB, stor paths.StashStore, ec *ethclient.Client, fc PDPServiceNodeApi, sn *message.SenderETH) *PDPService { + p := &PDPService{ Auth: &NullAuth{}, db: db, storage: stor, @@ -62,6 +63,9 @@ func NewPDPService(db *harmonydb.DB, stor paths.StashStore, ec *ethclient.Client ethClient: ec, filClient: fc, } + + go p.cleanup(ctx) + return p } // Routes registers the HTTP routes with the provided router @@ -113,6 +117,15 @@ func Routes(r *chi.Mux, p *PDPService) { // PUT /pdp/piece/upload/{uploadUUID} r.Put(path.Join(PDPRoutePath, "/piece/upload/{uploadUUID}"), p.handlePieceUpload) + + // POST /pdp/piece/uploads + r.Post(path.Join(PDPRoutePath, "/piece/uploads"), p.handleStreamingUploadURL) + + // PUT /pdp/piece/uploads/{uploadUUID} + r.Put(path.Join(PDPRoutePath, "/piece/uploads/{uploadUUID}"), p.handleStreamingUpload) + + // POST /pdp/piece/uploads/{uploadUUID} + r.Post(path.Join(PDPRoutePath, "/piece/uploads/{uploadUUID}"), p.handleFinalizeStreamingUpload) } // Handler functions @@ -908,6 +921,7 @@ func (p *PDPService) handleAddPieceToDataSet(w http.ResponseWriter, r *http.Requ } if height > 50 { http.Error(w, "Invalid height", http.StatusBadRequest) + return } // Get raw size by summing up the sizes of subPieces @@ -1590,3 +1604,36 @@ func asPieceCIDv2(cidStr string, size uint64) (cid.Cid, uint64, error) { return cid.Undef, 0, fmt.Errorf("unsupported piece CID type: %d", pieceCid.Prefix().MhType) } } + +func (p *PDPService) cleanup(ctx context.Context) { + rm := func(ctx context.Context, db *harmonydb.DB) { + + var RefIDs []int64 + + err := db.QueryRow(ctx, `SELECT COALESCE(array_agg(ref_id), '{}') AS ref_ids + FROM pdp_piece_streaming_uploads + WHERE complete = TRUE + AND completed_at <= TIMEZONE('UTC', NOW()) - INTERVAL '60 minutes';`).Scan(&RefIDs) + if err != nil { + log.Errorw("failed to get non-finalized uploads", "error", err) + } + + if len(RefIDs) > 0 { + _, err := db.Exec(ctx, `DELETE FROM parked_piece_refs WHERE ref_id = ANY($1);`, RefIDs) + if err != nil { + log.Errorw("failed to delete non-finalized uploads", "error", err) + } + } + } + + ticker := time.NewTicker(time.Minute * 5) + defer ticker.Stop() + for { + select { + case <-ticker.C: + rm(ctx, p.db) + case <-ctx.Done(): + return + } + } +} diff --git a/pdp/handlers_upload.go b/pdp/handlers_upload.go index f6e38d712..9e1f78015 100644 --- a/pdp/handlers_upload.go +++ b/pdp/handlers_upload.go @@ -11,6 +11,7 @@ import ( "net/http" "os" "path" + "time" "github.com/go-chi/chi/v5" "github.com/google/uuid" @@ -393,3 +394,336 @@ func (p *PDPService) handleFindPiece(w http.ResponseWriter, r *http.Request) { return } } + +func (p *PDPService) handleStreamingUploadURL(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + return + } + + // Verify that the request is authorized using ECDSA JWT + serviceID, err := p.AuthService(r) + if err != nil { + http.Error(w, "Unauthorized: "+err.Error(), http.StatusUnauthorized) + return + } + + uploadUUID := uuid.New() + uploadURL := path.Join(PDPRoutePath, "/piece/uploads", uploadUUID.String()) + + n, err := p.db.Exec(r.Context(), `INSERT INTO pdp_piece_streaming_uploads (id, service) VALUES ($1, $2)`, uploadUUID.String(), serviceID) + if err != nil { + log.Errorw("Failed to create upload request in database", "error", err) + http.Error(w, "Failed to create upload request", http.StatusInternalServerError) + return + } + if n != 1 { + log.Errorf("Failed to create upload request in database: expected 1 row but got %d", n) + http.Error(w, "Failed to create upload request", http.StatusInternalServerError) + return + } + + w.Header().Set("Location", uploadURL) + w.WriteHeader(http.StatusCreated) +} + +func (p *PDPService) handleStreamingUpload(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPut { + http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + return + } + + // Verify that the request is authorized using ECDSA JWT + serviceID, err := p.AuthService(r) + if err != nil { + http.Error(w, "Unauthorized: "+err.Error(), http.StatusUnauthorized) + return + } + + uploadUUIDStr := chi.URLParam(r, "uploadUUID") + uploadUUID, err := uuid.Parse(uploadUUIDStr) + if err != nil { + http.Error(w, "Invalid upload UUID", http.StatusBadRequest) + return + } + + ctx := r.Context() + + var exists bool + err = p.db.QueryRow(ctx, `SELECT EXISTS(SELECT 1 FROM pdp_piece_streaming_uploads WHERE id = $1 AND service = $2)`, uploadUUID.String(), serviceID).Scan(&exists) + if err != nil { + log.Errorw("Failed to query pdp_piece_streaming_uploads", "error", err) + http.Error(w, "Database error", http.StatusInternalServerError) + return + } + if !exists { + http.NotFound(w, r) + return + } + + reader := NewTimeoutLimitReader(r.Body, 5*time.Second) + cp := &commp.Calc{} + readSize := int64(0) + + // Function to write data into StashStore and calculate commP + writeFunc := func(f *os.File) error { + multiWriter := io.MultiWriter(cp, f) + + // Copy data from limitedReader to multiWriter + n, err := io.Copy(multiWriter, reader) + if err != nil { + return fmt.Errorf("failed to read and write piece data: %w", err) + } + + if n > UploadSizeLimit { + return fmt.Errorf("piece data exceeds the maximum allowed size") + } + + readSize = n + + return nil + } + + // Upload into StashStore + stashID, err := p.storage.StashCreate(ctx, UploadSizeLimit, writeFunc) + if err != nil { + if err.Error() == "piece data exceeds the maximum allowed size" { + http.Error(w, err.Error(), http.StatusRequestEntityTooLarge) + return + } else { + log.Errorw("Failed to store piece data in StashStore", "error", err) + http.Error(w, "Failed to store piece data", http.StatusInternalServerError) + return + } + } + + // Finalize the commP calculation + digest, paddedPieceSize, err := cp.Digest() + if err != nil { + log.Errorw("Failed to finalize commP calculation", "error", err) + // Remove the stash file as the data is invalid + _ = p.storage.StashRemove(ctx, stashID) + http.Error(w, "Failed to finalize commP calculation", http.StatusInternalServerError) + return + } + + pcid, err := commcid.DataCommitmentV1ToCID(digest) + if err != nil { + log.Errorw("Failed to calculate PieceCIDV2", "error", err) + _ = p.storage.StashRemove(ctx, stashID) + http.Error(w, "Failed to calculate PieceCIDV2", http.StatusInternalServerError) + return + } + + didCommit, err := p.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (bool, error) { + // 1. Create a long-term parked piece entry + var parkedPieceID int64 + err := tx.QueryRow(` + INSERT INTO parked_pieces (piece_cid, piece_padded_size, piece_raw_size, long_term) + VALUES ($1, $2, $3, TRUE) RETURNING id + `, pcid.String(), paddedPieceSize, readSize).Scan(&parkedPieceID) + if err != nil { + return false, fmt.Errorf("failed to create parked_pieces entry: %w", err) + } + + // 2. Create a piece ref with data_url being "stashstore://" + // Get StashURL + stashURL, err := p.storage.StashURL(stashID) + if err != nil { + return false, fmt.Errorf("failed to get stash URL: %w", err) + } + + // Change scheme to "custore" + stashURL.Scheme = dealdata.CustoreScheme + dataURL := stashURL.String() + + var pieceRefID int64 + err = tx.QueryRow(` + INSERT INTO parked_piece_refs (piece_id, data_url, long_term) + VALUES ($1, $2, TRUE) RETURNING ref_id + `, parkedPieceID, dataURL).Scan(&pieceRefID) + if err != nil { + return false, fmt.Errorf("failed to create parked_piece_refs entry: %w", err) + } + + // 3. Update the pdp_piece_streaming_uploads entry + _, err = tx.Exec(` + UPDATE pdp_piece_streaming_uploads SET piece_ref = $1, piece_cid = $2, piece_size = $3, raw_size = $4, complete = TRUE, completed_at = NOW() AT TIME ZONE 'UTC' WHERE id = $5 and service = $6 + `, pieceRefID, pcid.String(), paddedPieceSize, readSize, uploadUUID.String(), serviceID) + if err != nil { + return false, fmt.Errorf("failed to update pdp_piece_streaming_uploads: %w", err) + } + + return true, nil // Commit the transaction + }, harmonydb.OptionRetry()) + + if err != nil || !didCommit { + // Remove the stash file as the transaction failed + if err != nil { + log.Errorw("Failed to process piece upload", "error", err) + } else { + log.Errorw("Failed to process piece upload", "error", "failed to commit transaction") + } + _ = p.storage.StashRemove(ctx, stashID) + http.Error(w, "Failed to process piece upload", http.StatusInternalServerError) + return + } + + // Respond with 204 No Content + w.WriteHeader(http.StatusNoContent) + +} + +func (p *PDPService) handleFinalizeStreamingUpload(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + return + } + + // Verify that the request is authorized using ECDSA JWT + serviceID, err := p.AuthService(r) + if err != nil { + http.Error(w, "Unauthorized: "+err.Error(), http.StatusUnauthorized) + return + } + + uploadUUIDStr := chi.URLParam(r, "uploadUUID") + uploadUUID, err := uuid.Parse(uploadUUIDStr) + if err != nil { + http.Error(w, "Invalid upload UUID", http.StatusBadRequest) + return + } + + ctx := r.Context() + + var exists bool + err = p.db.QueryRow(ctx, `SELECT EXISTS(SELECT 1 FROM pdp_piece_streaming_uploads WHERE id = $1 AND service = $2)`, uploadUUID.String(), serviceID).Scan(&exists) + if err != nil { + log.Errorw("Failed to query pdp_piece_streaming_uploads", "error", err) + http.Error(w, "Database error", http.StatusInternalServerError) + return + } + + if !exists { + http.NotFound(w, r) + return + } + + var req struct { + PieceCID string `json:"pieceCid"` + Notify string `json:"notify,omitempty"` + } + if err = json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "Invalid request body: "+err.Error(), http.StatusBadRequest) + return + } + pcid, err := cid.Parse(req.PieceCID) + if err != nil { + http.Error(w, "Invalid request body: invalid pieceCid", http.StatusBadRequest) + return + } + + digest, err := commcid.CIDToDataCommitmentV1(pcid) + if err != nil { + http.Error(w, "Invalid request body: invalid pieceCid", http.StatusBadRequest) + return + } + + var dPcidStr string + var pSize, pref int64 + + err = p.db.QueryRow(ctx, `SELECT piece_cid, piece_size, piece_ref FROM pdp_piece_streaming_uploads WHERE id = $1 AND service = $2 AND complete = TRUE`, uploadUUID.String(), serviceID).Scan(&dPcidStr, &pSize) + if err != nil { + log.Errorw("Failed to query pdp_piece_streaming_uploads", "error", err) + http.Error(w, "Database error", http.StatusInternalServerError) + return + } + + dPcid, err := cid.Parse(dPcidStr) + if err != nil { + log.Errorw("Failed to parse pieceCid", "error", err) + http.Error(w, "Database error", http.StatusInternalServerError) + return + } + + if !pcid.Equals(dPcid) { + http.Error(w, "Invalid request body: pieceCid does not match the calculated pieceCid for the uploaded piece", http.StatusBadRequest) + return + } + + comm, err := p.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { + n, err := tx.Exec(` + INSERT INTO pdp_piece_uploads (id, service, piece_cid, notify_url, check_hash_codec, check_hash, check_size, piece_ref) + VALUES ($1, $2, $3, $4, $5, $6, $7) + `, uploadUUID.String(), serviceID, pcid.String(), req.Notify, multicodec.Sha2_256Trunc254Padded.String(), digest, pSize, pref) + if err != nil { + return false, fmt.Errorf("failed to store upload request in database: %w", err) + } + if n != 1 { + return false, fmt.Errorf("failed to store upload request in database: expected 1 row but got %d", n) + } + + _, err = tx.Exec(`DELETE FROM pdp_piece_streaming_uploads WHERE id = $1 AND service = $2 AND complete = TRUE`, uploadUUID.String(), serviceID) + if err != nil { + return false, fmt.Errorf("failed to delete pdp_piece_streaming_uploads entry: %w", err) + } + return true, nil + }, harmonydb.OptionRetry()) + if err != nil { + log.Errorw("Failed to process piece upload", "error", err) + http.Error(w, "Failed to process piece upload", http.StatusInternalServerError) + return + } + if !comm { + log.Errorw("Failed to process piece upload", "error", "failed to commit transaction") + http.Error(w, "Failed to process piece upload", http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) +} + +type TimeoutLimitReader struct { + r io.Reader + timeout time.Duration + totalBytes int64 +} + +func NewTimeoutLimitReader(r io.Reader, timeout time.Duration) *TimeoutLimitReader { + return &TimeoutLimitReader{ + r: r, + timeout: timeout, + totalBytes: 0, + } +} + +const UploadSizeLimit = int64(1065353216) // 1 GiB.Unpadded() + +func (t *TimeoutLimitReader) Read(p []byte) (int, error) { + deadline := time.Now().Add(t.timeout) + for { + // Attempt to read + n, err := t.r.Read(p) + if t.totalBytes+int64(n) > UploadSizeLimit { + return 0, fmt.Errorf("upload size limit exceeded: %d bytes", UploadSizeLimit) + } else { + t.totalBytes += int64(n) + } + + if err != nil { + return n, err + } + + if n > 0 { + // Otherwise return byte read and no error + return n, err + } + + // Timeout: If we hit the deadline without making progress, return a timeout error + if time.Now().After(deadline) { + return 0, fmt.Errorf("upload timeout: no progress (duration: %f Seconds)", t.timeout.Seconds()) + } + + // Avoid tight loop by adding a tiny sleep + time.Sleep(100 * time.Millisecond) // Small pause to avoid busy-waiting + } +} diff --git a/tasks/pdp/task_prove.go b/tasks/pdp/task_prove.go index 7347b3223..3c268e85e 100644 --- a/tasks/pdp/task_prove.go +++ b/tasks/pdp/task_prove.go @@ -758,7 +758,7 @@ func (p *ProveTask) TypeDetails() harmonytask.TaskTypeDetails { Cost: resources.Resources{ Cpu: 1, Gpu: 0, - Ram: 256 << 20, // 256 MB + Ram: 1 << 30, // 256 MB }, MaxFailures: 5, }