Skip to content

Commit 19a04e6

Browse files
author
cyk
committed
fix(google_drive): 更新Put方法以支持可重复读取流和不可重复读取流的MD5校验
1 parent 25ad4cc commit 19a04e6

File tree

2 files changed

+60
-12
lines changed

2 files changed

+60
-12
lines changed

.github/workflows/test_docker.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ on:
66
- main
77
pull_request:
88
branches:
9-
- fix # 👈 允许你的 fix 分支触发
9+
- copy
10+
1011

1112
concurrency:
1213
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}

drivers/google_drive/driver.go

Lines changed: 58 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010
"github.com/OpenListTeam/OpenList/v4/internal/driver"
1111
"github.com/OpenListTeam/OpenList/v4/internal/errs"
1212
"github.com/OpenListTeam/OpenList/v4/internal/model"
13+
"github.com/OpenListTeam/OpenList/v4/internal/stream"
14+
"github.com/OpenListTeam/OpenList/v4/pkg/http_range"
1315
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
1416
"github.com/go-resty/resty/v2"
1517
)
@@ -111,8 +113,44 @@ func (d *GoogleDrive) Remove(ctx context.Context, obj model.Obj) error {
111113
return err
112114
}
113115

114-
func (d *GoogleDrive) Put(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, up driver.UpdateProgress) error {
115-
obj := stream.GetExist()
116+
func (d *GoogleDrive) Put(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up driver.UpdateProgress) error {
117+
// 1. 准备MD5(用于完整性校验)
118+
md5Hash := file.GetHash().GetHash(utils.MD5)
119+
120+
// 检查是否是可重复读取的流
121+
_, isSeekable := file.(*stream.SeekableStream)
122+
123+
if isSeekable {
124+
// 可重复读取的流,使用 RangeRead 计算 hash,不缓存
125+
if len(md5Hash) != utils.MD5.Width {
126+
var err error
127+
md5Hash, err = stream.StreamHashFile(file, utils.MD5, 10, &up)
128+
if err != nil {
129+
return err
130+
}
131+
_ = md5Hash // MD5用于后续完整性校验(Google Drive会自动校验)
132+
}
133+
} else {
134+
// 不可重复读取的流(如 HTTP body)
135+
if len(md5Hash) != utils.MD5.Width {
136+
// 缓存整个文件并计算 MD5
137+
var err error
138+
_, md5Hash, err = stream.CacheFullAndHash(file, &up, utils.MD5)
139+
if err != nil {
140+
return err
141+
}
142+
_ = md5Hash // MD5用于后续完整性校验
143+
} else if file.GetFile() == nil {
144+
// 有 MD5 但没有缓存,需要缓存以支持后续 RangeRead
145+
_, err := file.CacheFullAndWriter(&up, nil)
146+
if err != nil {
147+
return err
148+
}
149+
}
150+
}
151+
152+
// 2. 初始化可恢复上传会话
153+
obj := file.GetExist()
116154
var (
117155
e Error
118156
url string
@@ -125,16 +163,16 @@ func (d *GoogleDrive) Put(ctx context.Context, dstDir model.Obj, stream model.Fi
125163
data = base.Json{}
126164
} else {
127165
data = base.Json{
128-
"name": stream.GetName(),
166+
"name": file.GetName(),
129167
"parents": []string{dstDir.GetID()},
130168
}
131169
url = "https://www.googleapis.com/upload/drive/v3/files?uploadType=resumable&supportsAllDrives=true"
132170
}
133171
req := base.NoRedirectClient.R().
134172
SetHeaders(map[string]string{
135173
"Authorization": "Bearer " + d.AccessToken,
136-
"X-Upload-Content-Type": stream.GetMimetype(),
137-
"X-Upload-Content-Length": strconv.FormatInt(stream.GetSize(), 10),
174+
"X-Upload-Content-Type": file.GetMimetype(),
175+
"X-Upload-Content-Length": strconv.FormatInt(file.GetSize(), 10),
138176
}).
139177
SetError(&e).SetBody(data).SetContext(ctx)
140178
if obj != nil {
@@ -151,20 +189,29 @@ func (d *GoogleDrive) Put(ctx context.Context, dstDir model.Obj, stream model.Fi
151189
if err != nil {
152190
return err
153191
}
154-
return d.Put(ctx, dstDir, stream, up)
192+
return d.Put(ctx, dstDir, file, up)
155193
}
156194
return fmt.Errorf("%s: %v", e.Error.Message, e.Error.Errors)
157195
}
196+
197+
// 3. 上传文件内容
158198
putUrl := res.Header().Get("location")
159-
if stream.GetSize() < d.ChunkSize*1024*1024 {
199+
if file.GetSize() < d.ChunkSize*1024*1024 {
200+
// 小文件上传:使用 RangeRead 读取整个文件(避免消费已计算hash的stream)
201+
reader, err := file.RangeRead(http_range.Range{Start: 0, Length: file.GetSize()})
202+
if err != nil {
203+
return err
204+
}
205+
160206
_, err = d.request(putUrl, http.MethodPut, func(req *resty.Request) {
161-
req.SetHeader("Content-Length", strconv.FormatInt(stream.GetSize(), 10)).
162-
SetBody(driver.NewLimitedUploadStream(ctx, stream))
207+
req.SetHeader("Content-Length", strconv.FormatInt(file.GetSize(), 10)).
208+
SetBody(driver.NewLimitedUploadStream(ctx, reader))
163209
}, nil)
210+
return err
164211
} else {
165-
err = d.chunkUpload(ctx, stream, putUrl, up)
212+
// 大文件分片上传
213+
return d.chunkUpload(ctx, file, putUrl, up)
166214
}
167-
return err
168215
}
169216

170217
func (d *GoogleDrive) GetDetails(ctx context.Context) (*model.StorageDetails, error) {

0 commit comments

Comments
 (0)