Skip to content

Commit 5b98d05

Browse files
authored
Merge pull request #439 from pkg/feature/ReadFromWithConcurrency
Export a ReadFromWithConcurrency function that permits ensuring concurrency usage.
2 parents de44fbb + 61f5f29 commit 5b98d05

File tree

1 file changed

+23
-10
lines changed

1 file changed

+23
-10
lines changed

client.go

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1539,8 +1539,13 @@ func (f *File) WriteAt(b []byte, off int64) (written int, err error) {
15391539
return len(b), nil
15401540
}
15411541

1542-
// readFromConcurrent implements ReaderFrom, but works concurrently rather than sequentially.
1543-
func (f *File) readFromConcurrent(r io.Reader, remain int64) (read int64, err error) {
1542+
// ReadFromWithConcurrency implements ReaderFrom,
1543+
// but uses the given concurrency to issue multiple requests at the same time.
1544+
//
1545+
// Giving a concurrency of less than one will default to the Client’s max concurrency.
1546+
//
1547+
// Otherwise, the given concurrency will be capped by the Client's max concurrency.
1548+
func (f *File) ReadFromWithConcurrency(r io.Reader, concurrency int) (read int64, err error) {
15441549
// Split the write into multiple maxPacket sized concurrent writes.
15451550
// This allows writes with a suitably large reader
15461551
// to transfer data at a much faster rate due to overlapping round trip times.
@@ -1560,12 +1565,9 @@ func (f *File) readFromConcurrent(r io.Reader, remain int64) (read int64, err er
15601565
}
15611566
errCh := make(chan rwErr)
15621567

1563-
concurrency64 := remain/int64(f.c.maxPacket) + 1 // a bad guess, but better than no guess
1564-
if concurrency64 > int64(f.c.maxConcurrentRequests) || concurrency64 < 1 {
1565-
concurrency64 = int64(f.c.maxConcurrentRequests)
1568+
if concurrency > f.c.maxConcurrentRequests || concurrency < 1 {
1569+
concurrency = f.c.maxConcurrentRequests
15661570
}
1567-
// Now that concurrency64 is saturated to an int value, we know this assignment cannot possibly overflow.
1568-
concurrency := int(concurrency64)
15691571

15701572
pool := newBufPool(concurrency, f.c.maxPacket)
15711573

@@ -1694,12 +1696,23 @@ func (f *File) ReadFrom(r io.Reader) (int64, error) {
16941696
}
16951697

16961698
if remain < 0 {
1697-
remain = math.MaxInt64
1699+
// We can strongly assert that we want default max concurrency here.
1700+
return f.ReadFromWithConcurrency(r, f.c.maxConcurrentRequests)
16981701
}
16991702

17001703
if remain > int64(f.c.maxPacket) {
1701-
// Only use concurrency, if it would be at least two read/writes.
1702-
return f.readFromConcurrent(r, remain)
1704+
// Otherwise, only use concurrency, if it would be at least two packets.
1705+
1706+
// This is the best reasonable guess we can make.
1707+
concurrency64 := remain/int64(f.c.maxPacket) + 1
1708+
1709+
// We need to cap this value to an `int` size value to avoid overflow on 32-bit machines.
1710+
// So, we may as well pre-cap it to `f.c.maxConcurrentRequests`.
1711+
if concurrency64 > int64(f.c.maxConcurrentRequests) {
1712+
concurrency64 = int64(f.c.maxConcurrentRequests)
1713+
}
1714+
1715+
return f.ReadFromWithConcurrency(r, int(concurrency64))
17031716
}
17041717
}
17051718

0 commit comments

Comments
 (0)