@@ -3,6 +3,7 @@ package net
3
3
import (
4
4
"bytes"
5
5
"context"
6
+ "errors"
6
7
"fmt"
7
8
"io"
8
9
"net/http"
@@ -119,7 +120,7 @@ type ConcurrencyLimit struct {
119
120
Limit int // 需要大于0
120
121
}
121
122
122
- var ErrExceedMaxConcurrency = fmt . Errorf ("ExceedMaxConcurrency" )
123
+ var ErrExceedMaxConcurrency = errors . New ("ExceedMaxConcurrency" )
123
124
124
125
func (l * ConcurrencyLimit ) sub () error {
125
126
l ._m .Lock ()
@@ -279,10 +280,9 @@ func (d *downloader) interrupt() error {
279
280
err := fmt .Errorf ("interrupted" )
280
281
d .err = err
281
282
}
282
- if d .chunkChannel != nil {
283
+ close (d .chunkChannel )
284
+ if d .bufs != nil {
283
285
d .cancel (err )
284
- close (d .chunkChannel )
285
- d .chunkChannel = nil
286
286
for _ , buf := range d .bufs {
287
287
buf .Close ()
288
288
}
@@ -291,8 +291,6 @@ func (d *downloader) interrupt() error {
291
291
d .concurrency = - d .concurrency
292
292
}
293
293
log .Debugf ("maxConcurrency:%d" , d .cfg .Concurrency + d .concurrency )
294
- } else {
295
- log .Debug ("close of closed channel" )
296
294
}
297
295
return err
298
296
}
@@ -314,31 +312,35 @@ func (d *downloader) finishBuf(id int) (isLast bool, nextBuf *Buf) {
314
312
// downloadPart is an individual goroutine worker reading from the ch channel
315
313
// and performing Http request on the data with a given byte range.
316
314
func (d * downloader ) downloadPart () {
317
- // defer d.wg.Done ()
315
+ defer d .concurrencyFinish ()
318
316
for {
319
- c , ok := <- d .chunkChannel
320
- if ! ok {
321
- break
322
- }
323
- if d .getErr () != nil {
324
- // Drain the channel if there is an error, to prevent deadlocking
325
- // of download producer.
326
- break
327
- }
328
- if err := d .downloadChunk (& c ); err != nil {
329
- if err == errCancelConcurrency {
330
- break
317
+ select {
318
+ case <- d .ctx .Done ():
319
+ return
320
+ case c , ok := <- d .chunkChannel :
321
+ if ! ok {
322
+ return
331
323
}
332
- if err == context .Canceled {
333
- if e := context .Cause (d .ctx ); e != nil {
334
- err = e
324
+ if d .getErr () != nil {
325
+ // Drain the channel if there is an error, to prevent deadlocking
326
+ // of download producer.
327
+ return
328
+ }
329
+ if err := d .downloadChunk (& c ); err != nil {
330
+ if err == errCancelConcurrency {
331
+ return
332
+ }
333
+ if err == context .Canceled {
334
+ if e := context .Cause (d .ctx ); e != nil {
335
+ err = e
336
+ }
335
337
}
338
+ d .setErr (err )
339
+ d .cancel (err )
340
+ return
336
341
}
337
- d .setErr (err )
338
- d .cancel (err )
339
342
}
340
343
}
341
- d .concurrencyFinish ()
342
344
}
343
345
344
346
// downloadChunk downloads the chunk
@@ -390,8 +392,8 @@ func (d *downloader) downloadChunk(ch *chunk) error {
390
392
return err
391
393
}
392
394
393
- var errCancelConcurrency = fmt . Errorf ("cancel concurrency" )
394
- var errInfiniteRetry = fmt . Errorf ("infinite retry" )
395
+ var errCancelConcurrency = errors . New ("cancel concurrency" )
396
+ var errInfiniteRetry = errors . New ("infinite retry" )
395
397
396
398
func (d * downloader ) tryDownloadChunk (params * HttpRequestParams , ch * chunk ) (int64 , error ) {
397
399
resp , err := d .cfg .HttpClient (d .ctx , params )
0 commit comments