diff --git a/go.mod b/go.mod index 82a778d02..5e350acf6 100644 --- a/go.mod +++ b/go.mod @@ -190,7 +190,7 @@ require ( github.com/blevesearch/zapx/v15 v15.4.2 // indirect github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc // indirect github.com/bytedance/sonic v1.13.3 // indirect - github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 github.com/coreos/go-semver v0.3.1 // indirect github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect diff --git a/go.sum b/go.sum index 029e5d514..97ac10348 100644 --- a/go.sum +++ b/go.sum @@ -398,8 +398,6 @@ github.com/gorilla/sessions v1.2.1 h1:DHd3rPN5lE3Ts3D8rKkQ8x/0kqfeNmBAaiSi+o7Fsg github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/halalcloud/golang-sdk-lite v0.0.0-20251006164234-3c629727c499 h1:4ovnBdiGDFi8putQGxhipuuhXItAgh4/YnzufPYkZkQ= -github.com/halalcloud/golang-sdk-lite v0.0.0-20251006164234-3c629727c499/go.mod h1:8x1h4rm3s8xMcTyJrq848sQ6BJnKzl57mDY4CNshdPM= github.com/halalcloud/golang-sdk-lite v0.0.0-20251105081800-78cbb6786c38 h1:lsK2GVgI2Ox0NkRpQnN09GBOH7jtsjFK5tcIgxXlLr0= github.com/halalcloud/golang-sdk-lite v0.0.0-20251105081800-78cbb6786c38/go.mod h1:8x1h4rm3s8xMcTyJrq848sQ6BJnKzl57mDY4CNshdPM= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= diff --git a/internal/bootstrap/config.go b/internal/bootstrap/config.go index 116b4cd35..62cb54cf6 100644 --- a/internal/bootstrap/config.go +++ b/internal/bootstrap/config.go @@ -5,6 +5,7 @@ import ( "os" "path/filepath" "strings" + "time" "github.com/OpenListTeam/OpenList/v4/cmd/flags" "github.com/OpenListTeam/OpenList/v4/drivers/base" @@ -184,6 +185,31 @@ func CleanTempDir() { } } +func CleanStaleChunks() { + chunkDir := filepath.Join(conf.Conf.TempDir, "chunks") + if !utils.Exists(chunkDir) { + return + } + files, err := os.ReadDir(chunkDir) + if err != nil { + log.Errorln("failed list chunks: ", err) + return + } + now := time.Now() + for _, file := range files { + info, err := file.Info() + if err != nil { + continue + } + // Clean up chunks older than 24 hours + if now.Sub(info.ModTime()) > 24*time.Hour { + if err := os.RemoveAll(filepath.Join(chunkDir, file.Name())); err != nil { + log.Errorln("failed delete stale chunk: ", err) + } + } + } +} + // validateProxyConfig validates proxy configuration and displays status at startup func validateProxyConfig() { if conf.Conf.ProxyAddress != "" { diff --git a/internal/bootstrap/data/setting.go b/internal/bootstrap/data/setting.go index 7bff851de..b6fdc4309 100644 --- a/internal/bootstrap/data/setting.go +++ b/internal/bootstrap/data/setting.go @@ -242,6 +242,15 @@ func InitialSettings() []model.SettingItem { {Key: conf.StreamMaxClientUploadSpeed, Value: "-1", Type: conf.TypeNumber, Group: model.TRAFFIC, Flag: model.PRIVATE}, {Key: conf.StreamMaxServerDownloadSpeed, Value: "-1", Type: conf.TypeNumber, Group: model.TRAFFIC, Flag: model.PRIVATE}, {Key: conf.StreamMaxServerUploadSpeed, Value: "-1", Type: conf.TypeNumber, Group: model.TRAFFIC, Flag: model.PRIVATE}, + + // HTTP Server configuration (for large file transfers) + {Key: conf.HTTPServerReadTimeout, Value: "0", Type: conf.TypeNumber, Group: model.TRAFFIC, Flag: model.PRIVATE, Help: "HTTP read request timeout (seconds), 0 means no limit. Recommended to set to 0 to support large file uploads."}, + {Key: conf.HTTPServerWriteTimeout, Value: "0", Type: conf.TypeNumber, Group: model.TRAFFIC, Flag: model.PRIVATE, Help: "HTTP write response timeout (seconds), 0 means no limit. Recommended to set to 0 to support large file downloads."}, + {Key: conf.HTTPServerIdleTimeout, Value: "120", Type: conf.TypeNumber, Group: model.TRAFFIC, Flag: model.PRIVATE, Help: "HTTP idle connection timeout (seconds). Recommended to set to 120 seconds to allow for brief network fluctuations."}, + {Key: conf.HTTPServerReadHeaderTimeout, Value: "30", Type: conf.TypeNumber, Group: model.TRAFFIC, Flag: model.PRIVATE, Help: "HTTP read header timeout (seconds) to prevent slow attacks. Recommended to set to 30 seconds."}, + {Key: conf.HTTPServerMaxHeaderBytes, Value: "1048576", Type: conf.TypeNumber, Group: model.TRAFFIC, Flag: model.PRIVATE, Help: "Maximum bytes for HTTP header, defaults to 1MB (1048576)."}, + // Chunked upload configuration (to bypass Cloudflare CDN limits) + {Key: conf.ChunkedUploadChunkSize, Value: "95", Type: conf.TypeNumber, Group: model.TRAFFIC, Flag: model.PUBLIC, Help: "Chunked upload threshold (MB). Files exceeding this size will be uploaded in chunks. Recommended to set to 95 to bypass Cloudflare's 100MB limit."}, } additionalSettingItems := tool.Tools.Items() // 固定顺序 diff --git a/internal/bootstrap/task.go b/internal/bootstrap/task.go index 47e0b59eb..c09fa56fc 100644 --- a/internal/bootstrap/task.go +++ b/internal/bootstrap/task.go @@ -1,12 +1,15 @@ package bootstrap import ( + "time" + "github.com/OpenListTeam/OpenList/v4/internal/conf" "github.com/OpenListTeam/OpenList/v4/internal/db" "github.com/OpenListTeam/OpenList/v4/internal/fs" "github.com/OpenListTeam/OpenList/v4/internal/offline_download/tool" "github.com/OpenListTeam/OpenList/v4/internal/op" "github.com/OpenListTeam/OpenList/v4/internal/setting" + "github.com/OpenListTeam/OpenList/v4/pkg/cron" "github.com/OpenListTeam/tache" ) @@ -49,4 +52,11 @@ func InitTaskManager() { op.RegisterSettingChangingCallback(func() { fs.ArchiveContentUploadTaskManager.SetWorkersNumActive(taskFilterNegative(setting.GetInt(conf.TaskDecompressUploadThreadsNum, conf.Conf.Tasks.DecompressUpload.Workers))) }) + + // Start background task to clean stale chunks every 10 minutes + go func() { + c := cron.NewCron(10 * time.Minute) + c.Do(CleanStaleChunks) + }() } + diff --git a/internal/conf/const.go b/internal/conf/const.go index b99d8849c..ae901ae45 100644 --- a/internal/conf/const.go +++ b/internal/conf/const.go @@ -77,9 +77,6 @@ const ( // 115 Pan115TempDir = "115_temp_dir" - // 123 - Pan123TempDir = "123_temp_dir" - // 115_open Pan115OpenTempDir = "115_open_temp_dir" @@ -139,6 +136,7 @@ const ( // 123 open offline download Pan123OpenOfflineDownloadCallbackUrl = "123_open_callback_url" Pan123OpenTempDir = "123_open_temp_dir" + Pan123TempDir = "123_temp_dir" // ftp FTPPublicHost = "ftp_public_host" @@ -161,6 +159,16 @@ const ( StreamMaxClientUploadSpeed = "max_client_upload_speed" StreamMaxServerDownloadSpeed = "max_server_download_speed" StreamMaxServerUploadSpeed = "max_server_upload_speed" + + // HTTP Server Timeouts (传输超时配置) + HTTPServerReadTimeout = "http_server_read_timeout" // 读取请求超时(秒),0表示无限制 + HTTPServerWriteTimeout = "http_server_write_timeout" // 写入响应超时(秒),0表示无限制 + HTTPServerIdleTimeout = "http_server_idle_timeout" // 空闲连接超时(秒) + HTTPServerReadHeaderTimeout = "http_server_read_header_timeout" // 读取Header超时(秒) + HTTPServerMaxHeaderBytes = "http_server_max_header_bytes" // Header最大字节数 + + // Chunked Upload (分片上传配置) + ChunkedUploadChunkSize = "chunked_upload_chunk_size" // 分片大小(MB),超过此大小的文件将自动分片上传 ) const ( diff --git a/pkg/utils/hash.go b/pkg/utils/hash.go index 596e61e54..d64545a2b 100644 --- a/pkg/utils/hash.go +++ b/pkg/utils/hash.go @@ -9,10 +9,13 @@ import ( "encoding/json" "errors" "hash" + "hash/crc32" + "hash/crc64" "io" "iter" "github.com/OpenListTeam/OpenList/v4/internal/errs" + "github.com/cespare/xxhash/v2" log "github.com/sirupsen/logrus" ) @@ -90,6 +93,15 @@ var ( // SHA256 indicates SHA-256 support SHA256 = RegisterHash("sha256", "SHA-256", 64, sha256.New) + + // CRC32 indicates CRC-32 support (IEEE polynomial) + CRC32 = RegisterHash("crc32", "CRC-32", 8, func() hash.Hash { return crc32.NewIEEE() }) + + // CRC64 indicates CRC-64 support (ECMA polynomial) + CRC64 = RegisterHash("crc64", "CRC-64", 16, func() hash.Hash { return crc64.New(crc64.MakeTable(crc64.ECMA)) }) + + // XXH64 indicates xxHash64 support + XXH64 = RegisterHash("xxh64", "XXH64", 16, func() hash.Hash { return xxhash.New() }) ) // HashData get hash of one hashType @@ -242,3 +254,4 @@ func (hi HashInfo) All() iter.Seq2[*HashType, string] { } } } + diff --git a/server/handles/fsup.go b/server/handles/fsup.go index 0f46398cd..86c5e0f2b 100644 --- a/server/handles/fsup.go +++ b/server/handles/fsup.go @@ -1,8 +1,11 @@ package handles import ( + "context" + "fmt" "io" "net/url" + "os" stdpath "path" "strconv" "time" @@ -112,7 +115,7 @@ func FsStream(c *gin.Context) { if asTask { t, err = fs.PutAsTask(c.Request.Context(), dir, s) } else { - err = fs.PutDirectly(c.Request.Context(), dir, s) + err = fs.PutDirectly(c.Request.Context(), dir, s, true) } if err != nil { common.ErrorResp(c, err, 500) @@ -212,7 +215,7 @@ func FsForm(c *gin.Context) { }{f} t, err = fs.PutAsTask(c.Request.Context(), dir, s) } else { - err = fs.PutDirectly(c.Request.Context(), dir, s) + err = fs.PutDirectly(c.Request.Context(), dir, s, true) } if err != nil { common.ErrorResp(c, err, 500) @@ -226,3 +229,316 @@ func FsForm(c *gin.Context) { "task": getTaskInfo(t), }) } + +// FsChunkUpload handles uploading a single chunk of a large file +func FsChunkUpload(c *gin.Context) { + uploadId := c.Query("upload_id") + indexStr := c.Query("index") + if uploadId == "" || indexStr == "" { + common.ErrorStrResp(c, "upload_id and index are required", 400) + return + } + + if _, err := strconv.Atoi(indexStr); err != nil { + common.ErrorResp(c, err, 400) + return + } + + // Get the chunk file from form + file, err := c.FormFile("file") + if err != nil { + common.ErrorResp(c, err, 400) + return + } + + // Create chunk directory + chunkDir := stdpath.Join(conf.Conf.TempDir, "chunks", uploadId) + if err := os.MkdirAll(chunkDir, 0755); err != nil { + common.ErrorResp(c, err, 500) + return + } + + // Save chunk to file + chunkPath := stdpath.Join(chunkDir, indexStr) + // Get CRC32 from header + expectedCRC32 := c.GetHeader("X-Chunk-CRC32") + + // Save the uploaded file temporarily + if err := c.SaveUploadedFile(file, chunkPath); err != nil { + common.ErrorResp(c, err, 500) + return + } + + // Always calculate CRC32 of the saved chunk for verification and response + f, err := os.Open(chunkPath) + if err != nil { + common.ErrorResp(c, err, 500) + return + } + defer f.Close() + + actualCRC32, err := utils.HashReader(utils.CRC32, f) + if err != nil { + os.Remove(chunkPath) // Clean up + common.ErrorResp(c, err, 500) + return + } + + // Verify CRC32 if provided + if expectedCRC32 != "" { + if actualCRC32 != expectedCRC32 { + os.Remove(chunkPath) // Clean up + common.ErrorStrResp(c, fmt.Sprintf("chunk CRC32 mismatch: client=%s, server=%s", expectedCRC32, actualCRC32), 400) + return + } + } + + common.SuccessResp(c, gin.H{ + "crc32": actualCRC32, + }) +} + +// FsChunkMerge merges all chunks into a single file and uploads it +func FsChunkMerge(c *gin.Context) { + var req struct { + UploadId string `json:"upload_id"` + Path string `json:"path"` + TotalChunks int `json:"total_chunks"` + AsTask bool `json:"as_task"` + Overwrite bool `json:"overwrite"` + LastModified int64 `json:"last_modified"` + Hash string `json:"hash"` + } + + if err := c.ShouldBindJSON(&req); err != nil { + common.ErrorResp(c, err, 400) + return + } + + user := c.Request.Context().Value(conf.UserKey).(*model.User) + path, err := user.JoinPath(req.Path) + if err != nil { + common.ErrorResp(c, err, 403) + return + } + + // Check if file exists when not overwriting + if !req.Overwrite { + if res, _ := fs.Get(c.Request.Context(), path, &fs.GetArgs{NoLog: true}); res != nil { + common.ErrorStrResp(c, "file exists", 403) + return + } + } + + chunkDir := stdpath.Join(conf.Conf.TempDir, "chunks", req.UploadId) + + // Check if all chunks exist (quick check, no heavy I/O) + for i := 0; i < req.TotalChunks; i++ { + chunkPath := stdpath.Join(chunkDir, strconv.Itoa(i)) + if _, err := os.Stat(chunkPath); os.IsNotExist(err) { + common.ErrorStrResp(c, "chunk "+strconv.Itoa(i)+" not found", 400) + return + } + } + + dir, name := stdpath.Split(path) + + // Check if system file should be ignored + if shouldIgnoreSystemFile(name) { + os.RemoveAll(chunkDir) + common.ErrorStrResp(c, errs.IgnoredSystemFile.Error(), 403) + return + } + + lastModified := time.Now() + if req.LastModified > 0 { + lastModified = time.UnixMilli(req.LastModified) + } + + // For as_task=true (large files), immediately return and process in background + if req.AsTask { + // Generate a simple task ID for tracking + taskId := fmt.Sprintf("merge-%s", req.UploadId) + + // Start background goroutine for merge + go func() { + utils.Log.Infof("[ChunkMerge] Starting background merge for %s", path) + + // Create merged file + mergedPath := stdpath.Join(chunkDir, "merged") + mergedFile, err := os.Create(mergedPath) + if err != nil { + utils.Log.Errorf("[ChunkMerge] Failed to create merged file: %v", err) + return + } + + // Merge all chunks while computing hash + var totalSize int64 + hasher := utils.NewMultiHasher([]*utils.HashType{utils.XXH64, utils.CRC64}) + multiWriter := io.MultiWriter(mergedFile, hasher) + for i := 0; i < req.TotalChunks; i++ { + chunkPath := stdpath.Join(chunkDir, strconv.Itoa(i)) + chunk, err := os.Open(chunkPath) + if err != nil { + mergedFile.Close() + utils.Log.Errorf("[ChunkMerge] Failed to open chunk %d: %v", i, err) + return + } + n, err := io.Copy(multiWriter, chunk) + chunk.Close() + if err != nil { + mergedFile.Close() + utils.Log.Errorf("[ChunkMerge] Failed to copy chunk %d: %v", i, err) + return + } + totalSize += n + } + mergedFile.Close() + + hashInfo := hasher.GetHashInfo() + hashMap := hashInfo.Export() + + // Verify client provided hash (xxHash64) + if req.Hash != "" { + for ht, hashValue := range hashMap { + if ht.Name == "xxh64" && hashValue != req.Hash { + os.RemoveAll(chunkDir) + utils.Log.Errorf("[ChunkMerge] Hash mismatch: Client=%s, Server=%s", req.Hash, hashValue) + return + } + } + } + + utils.Log.Infof("[ChunkMerge] Merge complete. Size: %d bytes. Uploading to storage...", totalSize) + + // Open merged file for upload + mergedReader, err := os.Open(mergedPath) + if err != nil { + utils.Log.Errorf("[ChunkMerge] Failed to open merged file: %v", err) + return + } + + s := &stream.FileStream{ + Obj: &model.Object{ + Name: name, + Size: totalSize, + Modified: lastModified, + }, + Reader: mergedReader, + Mimetype: utils.GetMimeType(name), + WebPutAsTask: true, + } + s.Closers.Add(utils.CloseFunc(func() error { + mergedReader.Close() + os.RemoveAll(chunkDir) + return nil + })) + + // Use background context since original request context is gone + ctx := context.Background() + _, err = fs.PutAsTask(ctx, dir, s) + if err != nil { + utils.Log.Errorf("[ChunkMerge] Failed to put as task: %v", err) + return + } + utils.Log.Infof("[ChunkMerge] Successfully queued upload task for %s", path) + }() + + // Immediately return success with task info + common.SuccessResp(c, gin.H{ + "task": gin.H{ + "id": taskId, + "status": "processing", + "message": "Merge started in background. Check Tasks page for progress.", + }, + }) + return + } + + // For as_task=false (small files or direct upload), use synchronous logic + mergedPath := stdpath.Join(chunkDir, "merged") + mergedFile, err := os.Create(mergedPath) + if err != nil { + common.ErrorResp(c, err, 500) + return + } + + // Merge all chunks while computing hash + var totalSize int64 + hasher := utils.NewMultiHasher([]*utils.HashType{utils.XXH64, utils.CRC64}) + multiWriter := io.MultiWriter(mergedFile, hasher) + for i := 0; i < req.TotalChunks; i++ { + chunkPath := stdpath.Join(chunkDir, strconv.Itoa(i)) + chunk, err := os.Open(chunkPath) + if err != nil { + mergedFile.Close() + common.ErrorResp(c, err, 500) + return + } + n, err := io.Copy(multiWriter, chunk) + chunk.Close() + if err != nil { + mergedFile.Close() + common.ErrorResp(c, err, 500) + return + } + totalSize += n + } + mergedFile.Close() + hashInfo := hasher.GetHashInfo() + hashMap := hashInfo.Export() + // Prepare hash map for response + hashResponse := make(map[string]string) + for ht, hashValue := range hashMap { + hashResponse[ht.Name] = hashValue + } + + // Verify client provided hash (xxHash64) + if req.Hash != "" { + if serverHash, ok := hashResponse["xxh64"]; ok { + if serverHash != req.Hash { + // Hash mismatch! + os.Remove(mergedPath) + common.ErrorStrResp(c, fmt.Sprintf("Hash mismatch: Client=%s, Server=%s", req.Hash, serverHash), 400) + return + } + } + } + + // Open merged file for upload + mergedReader, err := os.Open(mergedPath) + if err != nil { + common.ErrorResp(c, err, 500) + return + } + + s := &stream.FileStream{ + Obj: &model.Object{ + Name: name, + Size: totalSize, + Modified: lastModified, + }, + Reader: mergedReader, + Mimetype: utils.GetMimeType(name), + WebPutAsTask: false, + } + s.Closers.Add(utils.CloseFunc(func() error { + mergedReader.Close() + os.RemoveAll(chunkDir) + return nil + })) + + err = fs.PutDirectly(c.Request.Context(), dir, s, true) + mergedReader.Close() + os.RemoveAll(chunkDir) + + if err != nil { + common.ErrorResp(c, err, 500) + return + } + + common.SuccessResp(c, gin.H{ + "hash": hashResponse, + }) +} + diff --git a/server/router.go b/server/router.go index 57d1166ae..0ebe6d234 100644 --- a/server/router.go +++ b/server/router.go @@ -160,8 +160,8 @@ func admin(g *gin.RouterGroup) { setting.POST("/set_transmission", handles.SetTransmission) setting.POST("/set_115", handles.Set115) setting.POST("/set_115_open", handles.Set115Open) - setting.POST("/set_123_pan", handles.Set123Pan) setting.POST("/set_123_open", handles.Set123Open) + setting.POST("/set_123_pan", handles.Set123Pan) setting.POST("/set_pikpak", handles.SetPikPak) setting.POST("/set_thunder", handles.SetThunder) setting.POST("/set_thunderx", handles.SetThunderX) @@ -211,6 +211,8 @@ func _fs(g *gin.RouterGroup) { uploadLimiter := middlewares.UploadRateLimiter(stream.ClientUploadLimit) g.PUT("/put", middlewares.FsUp, uploadLimiter, handles.FsStream) g.PUT("/form", middlewares.FsUp, uploadLimiter, handles.FsForm) + g.PUT("/put/chunk", handles.FsChunkUpload) + g.POST("/put/chunk/merge", handles.FsChunkMerge) g.POST("/link", middlewares.AuthAdmin, handles.Link) // g.POST("/add_aria2", handles.AddOfflineDownload) // g.POST("/add_qbit", handles.AddQbittorrent) @@ -248,3 +250,4 @@ func InitS3(e *gin.Engine) { Cors(e) S3Server(e.Group("/")) } +