Skip to content

Commit c36d3c5

Browse files
author
cyk
committed
feat(upload): enhance hash calculation and upload logic for various stream types
1 parent ee99853 commit c36d3c5

File tree

9 files changed

+332
-70
lines changed

9 files changed

+332
-70
lines changed

CLAUDE.md

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22

33
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
44

5+
## Core Development Principles
6+
7+
1. **最小代码改动原则** (Minimum code changes): Make the smallest change necessary to achieve the goal
8+
2. **不缓存整个文件原则** (No full file caching for seekable streams): For SeekableStream, use RangeRead instead of caching entire file
9+
3. **必要情况下可以多遍上传原则** (Multi-pass upload when necessary): If rapid upload fails, fall back to normal upload
10+
511
## Build and Development Commands
612

713
```bash
@@ -166,6 +172,49 @@ Response (JSON / Proxy / Redirect)
166172
- Downloads parts in parallel
167173
- Assembles final stream
168174

175+
**Stream Types and Reader Management**:
176+
177+
⚠️ **CRITICAL**: SeekableStream.Reader must NEVER be created early!
178+
179+
- **FileStream**: One-time sequential stream (e.g., HTTP body)
180+
- `Reader` is set at creation and consumed sequentially
181+
- Cannot be rewound or re-read
182+
183+
- **SeekableStream**: Reusable stream with RangeRead capability
184+
- Has `rangeReader` for creating new readers on-demand
185+
- `Reader` should ONLY be created when actually needed for sequential reading
186+
- **DO NOT create Reader early** - use lazy initialization via `generateReader()`
187+
188+
**Common Pitfall - Early Reader Creation**:
189+
```go
190+
// ❌ WRONG: Creating Reader early
191+
if _, ok := rr.(*model.FileRangeReader); ok {
192+
rc, _ := rr.RangeRead(ctx, http_range.Range{Length: -1})
193+
fs.Reader = rc // This will be consumed by intermediate operations!
194+
}
195+
196+
// ✅ CORRECT: Let generateReader() create it on-demand
197+
// Reader will be created only when Read() is called
198+
return &SeekableStream{FileStream: fs, rangeReader: rr}, nil
199+
```
200+
201+
**Why This Matters**:
202+
- Hash calculation uses `StreamHashFile()` which reads the file via RangeRead
203+
- If Reader is created early, it may be at EOF when HTTP upload actually needs it
204+
- Result: `http: ContentLength=X with Body length 0` error
205+
206+
**Hash Calculation for Uploads**:
207+
```go
208+
// For SeekableStream: Use RangeRead to avoid consuming Reader
209+
if _, ok := file.(*SeekableStream); ok {
210+
hash, err = stream.StreamHashFile(file, utils.MD5, 40, &up)
211+
// StreamHashFile uses RangeRead internally, Reader remains unused
212+
}
213+
214+
// For FileStream: Must cache first, then calculate hash
215+
_, hash, err = stream.CacheFullAndHash(file, &up, utils.MD5)
216+
```
217+
169218
**Link Refresh Pattern**:
170219
```go
171220
// In op.Link(), a refresher is automatically attached

drivers/115_open/driver.go

Lines changed: 87 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -226,28 +226,97 @@ func (d *Open115) Put(ctx context.Context, dstDir model.Obj, file model.FileStre
226226
if err != nil {
227227
return err
228228
}
229+
229230
sha1 := file.GetHash().GetHash(utils.SHA1)
230-
if len(sha1) != utils.SHA1.Width {
231-
// 流式计算SHA1
232-
sha1, err = stream.StreamHashFile(file, utils.SHA1, 100, &up)
231+
sha1128k := file.GetHash().GetHash(utils.SHA1_128K)
232+
233+
// 检查是否是可重复读取的流
234+
_, isSeekable := file.(*stream.SeekableStream)
235+
236+
// 如果有预计算的 hash,先尝试秒传
237+
if len(sha1) == utils.SHA1.Width && len(sha1128k) == utils.SHA1_128K.Width {
238+
resp, err := d.client.UploadInit(ctx, &sdk.UploadInitReq{
239+
FileName: file.GetName(),
240+
FileSize: file.GetSize(),
241+
Target: dstDir.GetID(),
242+
FileID: strings.ToUpper(sha1),
243+
PreID: strings.ToUpper(sha1128k),
244+
})
233245
if err != nil {
234246
return err
235247
}
248+
if resp.Status == 2 {
249+
up(100)
250+
return nil
251+
}
252+
// 秒传失败,继续后续流程
236253
}
237-
const PreHashSize int64 = 128 * utils.KB
238-
hashSize := PreHashSize
239-
if file.GetSize() < PreHashSize {
240-
hashSize = file.GetSize()
241-
}
242-
reader, err := file.RangeRead(http_range.Range{Start: 0, Length: hashSize})
243-
if err != nil {
244-
return err
245-
}
246-
sha1128k, err := utils.HashReader(utils.SHA1, reader)
247-
if err != nil {
248-
return err
254+
255+
if isSeekable {
256+
// 可重复读取的流,使用 RangeRead 计算 hash,不缓存
257+
if len(sha1) != utils.SHA1.Width {
258+
sha1, err = stream.StreamHashFile(file, utils.SHA1, 100, &up)
259+
if err != nil {
260+
return err
261+
}
262+
}
263+
// 计算 sha1_128k(如果没有预计算)
264+
if len(sha1128k) != utils.SHA1_128K.Width {
265+
const PreHashSize int64 = 128 * utils.KB
266+
hashSize := PreHashSize
267+
if file.GetSize() < PreHashSize {
268+
hashSize = file.GetSize()
269+
}
270+
reader, err := file.RangeRead(http_range.Range{Start: 0, Length: hashSize})
271+
if err != nil {
272+
return err
273+
}
274+
sha1128k, err = utils.HashReader(utils.SHA1, reader)
275+
if err != nil {
276+
return err
277+
}
278+
}
279+
} else {
280+
// 不可重复读取的流(如 HTTP body)
281+
// 如果有预计算的 hash,上面已经尝试过秒传了
282+
if len(sha1) == utils.SHA1.Width && len(sha1128k) == utils.SHA1_128K.Width {
283+
// 秒传失败,需要缓存文件进行实际上传
284+
_, err = file.CacheFullAndWriter(&up, nil)
285+
if err != nil {
286+
return err
287+
}
288+
} else {
289+
// 没有预计算的 hash,缓存整个文件并计算
290+
if len(sha1) != utils.SHA1.Width {
291+
_, sha1, err = stream.CacheFullAndHash(file, &up, utils.SHA1)
292+
if err != nil {
293+
return err
294+
}
295+
} else if file.GetFile() == nil {
296+
// 有 SHA1 但没有缓存,需要缓存以支持后续 RangeRead
297+
_, err = file.CacheFullAndWriter(&up, nil)
298+
if err != nil {
299+
return err
300+
}
301+
}
302+
// 计算 sha1_128k
303+
const PreHashSize int64 = 128 * utils.KB
304+
hashSize := PreHashSize
305+
if file.GetSize() < PreHashSize {
306+
hashSize = file.GetSize()
307+
}
308+
reader, err := file.RangeRead(http_range.Range{Start: 0, Length: hashSize})
309+
if err != nil {
310+
return err
311+
}
312+
sha1128k, err = utils.HashReader(utils.SHA1, reader)
313+
if err != nil {
314+
return err
315+
}
316+
}
249317
}
250-
// 1. Init
318+
319+
// 1. Init(SeekableStream 或已缓存的 FileStream)
251320
resp, err := d.client.UploadInit(ctx, &sdk.UploadInitReq{
252321
FileName: file.GetName(),
253322
FileSize: file.GetSize(),
@@ -273,11 +342,11 @@ func (d *Open115) Put(ctx context.Context, dstDir model.Obj, file model.FileStre
273342
if err != nil {
274343
return err
275344
}
276-
reader, err = file.RangeRead(http_range.Range{Start: start, Length: end - start + 1})
345+
signReader, err := file.RangeRead(http_range.Range{Start: start, Length: end - start + 1})
277346
if err != nil {
278347
return err
279348
}
280-
signVal, err := utils.HashReader(utils.SHA1, reader)
349+
signVal, err := utils.HashReader(utils.SHA1, signReader)
281350
if err != nil {
282351
return err
283352
}

drivers/123_open/driver.go

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -184,35 +184,46 @@ func (d *Open123) Put(ctx context.Context, dstDir model.Obj, file model.FileStre
184184

185185
// etag 文件md5
186186
etag := file.GetHash().GetHash(utils.MD5)
187+
188+
// 检查是否是可重复读取的流
189+
_, isSeekable := file.(*stream.SeekableStream)
190+
191+
// 如果有预计算的 hash,先尝试秒传
187192
if len(etag) >= utils.MD5.Width {
188-
// 有etag时,先尝试秒传
189193
createResp, err := d.create(parentFileId, file.GetName(), etag, file.GetSize(), 2, false)
190194
if err != nil {
191195
return nil, err
192196
}
193-
// 是否秒传
194-
if createResp.Data.Reuse {
195-
// 秒传成功才会返回正确的 FileID,否则为 0
196-
if createResp.Data.FileID != 0 {
197-
return File{
198-
FileName: file.GetName(),
199-
Size: file.GetSize(),
200-
FileId: createResp.Data.FileID,
201-
Type: 2,
202-
Etag: etag,
203-
}, nil
204-
}
197+
if createResp.Data.Reuse && createResp.Data.FileID != 0 {
198+
return File{
199+
FileName: file.GetName(),
200+
Size: file.GetSize(),
201+
FileId: createResp.Data.FileID,
202+
Type: 2,
203+
Etag: etag,
204+
}, nil
205205
}
206-
// 秒传失败,etag可能不可靠,继续流式计算真实MD5
206+
// 秒传失败,继续后续流程
207207
}
208208

209-
// 流式MD5计算
210-
etag, err = stream.StreamHashFile(file, utils.MD5, 40, &up)
211-
if err != nil {
212-
return nil, err
209+
if isSeekable {
210+
// 可重复读取的流,使用 RangeRead 计算 hash,不缓存
211+
if len(etag) < utils.MD5.Width {
212+
etag, err = stream.StreamHashFile(file, utils.MD5, 40, &up)
213+
if err != nil {
214+
return nil, err
215+
}
216+
}
217+
} else {
218+
// 不可重复读取的流(如 HTTP body)
219+
// 秒传失败或没有 hash,缓存整个文件并计算 MD5
220+
_, etag, err = stream.CacheFullAndHash(file, &up, utils.MD5)
221+
if err != nil {
222+
return nil, err
223+
}
213224
}
214225

215-
// 2. 创建上传任务
226+
// 2. 创建上传任务(或再次尝试秒传)
216227
createResp, err := d.create(parentFileId, file.GetName(), etag, file.GetSize(), 2, false)
217228
if err != nil {
218229
return nil, err

drivers/aliyundrive_open/upload.go

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -163,21 +163,29 @@ func (d *AliyundriveOpen) upload(ctx context.Context, dstDir model.Obj, stream m
163163
}
164164
count := int(math.Ceil(float64(stream.GetSize()) / float64(partSize)))
165165
createData["part_info_list"] = makePartInfos(count)
166+
167+
// 检查是否是可重复读取的流
168+
_, isSeekable := stream.(*streamPkg.SeekableStream)
169+
166170
// rapid upload
167171
rapidUpload := !stream.IsForceStreamUpload() && stream.GetSize() > 100*utils.KB && d.RapidUpload
168172
if rapidUpload {
169173
log.Debugf("[aliyundrive_open] start cal pre_hash")
170-
// read 1024 bytes to calculate pre hash
171-
reader, err := stream.RangeRead(http_range.Range{Start: 0, Length: 1024})
172-
if err != nil {
173-
return nil, err
174-
}
175-
hash, err := utils.HashReader(utils.SHA1, reader)
176-
if err != nil {
177-
return nil, err
174+
// 优先使用预计算的 pre_hash
175+
preHash := stream.GetHash().GetHash(utils.PRE_HASH)
176+
if len(preHash) != utils.PRE_HASH.Width {
177+
// 没有预计算的 pre_hash,使用 RangeRead 计算
178+
reader, err := stream.RangeRead(http_range.Range{Start: 0, Length: 1024})
179+
if err != nil {
180+
return nil, err
181+
}
182+
preHash, err = utils.HashReader(utils.SHA1, reader)
183+
if err != nil {
184+
return nil, err
185+
}
178186
}
179187
createData["size"] = stream.GetSize()
180-
createData["pre_hash"] = hash
188+
createData["pre_hash"] = preHash
181189
}
182190
var createResp CreateResp
183191
_, err, e := d.requestReturnErrResp(ctx, limiterOther, "/adrive/v1.0/openFile/create", http.MethodPost, func(req *resty.Request) {
@@ -191,9 +199,18 @@ func (d *AliyundriveOpen) upload(ctx context.Context, dstDir model.Obj, stream m
191199

192200
hash := stream.GetHash().GetHash(utils.SHA1)
193201
if len(hash) != utils.SHA1.Width {
194-
_, hash, err = streamPkg.CacheFullAndHash(stream, &up, utils.SHA1)
195-
if err != nil {
196-
return nil, err
202+
if isSeekable {
203+
// 可重复读取的流,使用 StreamHashFile(RangeRead),不缓存
204+
hash, err = streamPkg.StreamHashFile(stream, utils.SHA1, 50, &up)
205+
if err != nil {
206+
return nil, err
207+
}
208+
} else {
209+
// 不可重复读取的流,缓存并计算
210+
_, hash, err = streamPkg.CacheFullAndHash(stream, &up, utils.SHA1)
211+
if err != nil {
212+
return nil, err
213+
}
197214
}
198215
}
199216

0 commit comments

Comments
 (0)