Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions common/atomicmorph.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type AtomicMorpherInt32 func(startVal int32) (val int32, morphResult interface{}
// AtomicMorph atomically morphs target in to new value (and result) as indicated by the AtomicMorpher callback function.
func AtomicMorphInt32(target *int32, morpher AtomicMorpherInt32) interface{} {
if target == nil || morpher == nil {
panic("target and morpher mut not be nil")
panic("target and morpher must not be nil")
}
for {
currentVal := atomic.LoadInt32(target)
Expand All @@ -29,7 +29,7 @@ type AtomicMorpherUint32 func(startVal uint32) (val uint32, morphResult interfac
// AtomicMorph atomically morphs target in to new value (and result) as indicated bythe AtomicMorpher callback function.
func AtomicMorphUint32(target *uint32, morpher AtomicMorpherUint32) interface{} {
if target == nil || morpher == nil {
panic("target and morpher mut not be nil")
panic("target and morpher must not be nil")
}
for {
currentVal := atomic.LoadUint32(target)
Expand All @@ -48,7 +48,7 @@ type AtomicMorpherInt64 func(startVal int64) (val int64, morphResult interface{}
// AtomicMorph atomically morphs target in to new value (and result) as indicated bythe AtomicMorpher callback function.
func AtomicMorphInt64(target *int64, morpher AtomicMorpherInt64) interface{} {
if target == nil || morpher == nil {
panic("target and morpher mut not be nil")
panic("target and morpher must not be nil")
}
for {
currentVal := atomic.LoadInt64(target)
Expand All @@ -67,7 +67,7 @@ type AtomicMorpherUint64 func(startVal uint64) (val uint64, morphResult interfac
// AtomicMorph atomically morphs target in to new value (and result) as indicated bythe AtomicMorpher callback function.
func AtomicMorphUint64(target *uint64, morpher AtomicMorpherUint64) interface{} {
if target == nil || morpher == nil {
panic("target and morpher mut not be nil")
panic("target and morpher must not be nil")
}
for {
currentVal := atomic.LoadUint64(target)
Expand All @@ -77,3 +77,25 @@ func AtomicMorphUint64(target *uint64, morpher AtomicMorpherUint64) interface{}
}
}
}

type AtomicValue[T any] interface {
Store(T)
Load() T
CompareAndSwap(old, new T) bool
}

type AtomicMorpher[T any, O any] func(startValue T) (val T, morphResult O)

func AtomicMorph[T any, O any](target AtomicValue[T], morpher AtomicMorpher[T, O]) O {
if target == nil || morpher == nil {
panic("target and morpher must not be nil")
}

for {
currentVal := target.Load()
desiredVal, morphResult := morpher(currentVal)
if target.CompareAndSwap(currentVal, desiredVal) {
return morphResult
}
}
}
10 changes: 5 additions & 5 deletions jobsAdmin/JobsAdmin.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"sync/atomic"
"time"

"github.com/Azure/azure-storage-azcopy/v10/pacer"
"github.com/Azure/azure-storage-azcopy/v10/ste"

"github.com/Azure/azure-storage-azcopy/v10/common"
Expand Down Expand Up @@ -115,14 +116,13 @@ func initJobsAdmin(appCtx context.Context, concurrency ste.ConcurrencySettings,

// use the "networking mega" (based on powers of 10, not powers of 2, since that's what mega means in networking context)
targetRateInBytesPerSec := int64(targetRateInMegaBitsPerSec * 1000 * 1000 / 8)
unusedExpectedCoarseRequestByteCount := int64(0)
pacer := ste.NewTokenBucketPacer(targetRateInBytesPerSec, unusedExpectedCoarseRequestByteCount)
requestPacer := pacer.New(pacer.NewBandwidthRecorder(targetRateInBytesPerSec, pacer.DefaultBandwidthRecorderWindowSeconds, appCtx), appCtx)
// Note: as at July 2019, we don't currently have a shutdown method/event on JobsAdmin where this pacer
// could be shut down. But, it's global anyway, so we just leave it running until application exit.
ja := &jobsAdmin{
concurrency: concurrency,
jobIDToJobMgr: newJobIDToJobMgr(),
pacer: pacer,
pacer: requestPacer,
slicePool: common.NewMultiSizeSlicePool(common.MaxBlockBlobBlockSize),
cacheLimiter: common.NewCacheLimiter(maxRamBytesToUse),
fileCountLimiter: common.NewCacheLimiter(int64(concurrency.MaxOpenDownloadFiles)),
Expand Down Expand Up @@ -231,7 +231,7 @@ type jobsAdmin struct {
jobIDToJobMgr jobIDToJobMgr // Thread-safe map from each JobID to its JobInfo
// Other global state can be stored in more fields here...
appCtx context.Context
pacer ste.PacerAdmin
pacer pacer.Interface
slicePool common.ByteSlicePooler
cacheLimiter common.CacheLimiter
fileCountLimiter common.CacheLimiter
Expand Down Expand Up @@ -313,7 +313,7 @@ func (ja *jobsAdmin) UpdateTargetBandwidth(newTarget int64) {
if newTarget < 0 {
return
}
ja.pacer.UpdateTargetBytesPerSecond(newTarget)
ja.pacer.RequestHardLimit(newTarget)
}

/*
Expand Down
75 changes: 75 additions & 0 deletions pacer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# pacer

The `pacer` package aims to solve two problems...

1) How do we prevent timing out requests when we have high routines and low bandwidth
2) What if the bandwidth is intentionally, artificially low?
3) What if it's a mix? The user has requested a target bandwidth, but they cannot hit it.

The answer is simple at face value, but pretty complex in reality.

## Bandwidth detection

This is relevant for #1, and #3. Until we _know_ what our bandwidth looks like, we should fire as many requests as our routines will allow, and try to pick up the bandwidth by recording the throughput and averaging it over the last block of time (i.e. average 1s chunks over 30s), then scale down in [Bandwidth Allocation](#bandwidth-allocation).

To do this, our pacer must implement a throughput recorder which will report the average (or target throughput). This is done by `interface.go`'s `ThroughputRecorder` interface:

```go
type ThroughputRecorder interface {
// RecordBytes records the number of bytes that have made it through the pipe.
// We don't care if the request ends in success or failure,
// because we're trying to observe the available bandwidth.
RecordBytes(count uint64)

// SetObservationPeriod sets the observation period that RecordBytes averages over.
SetObservationPeriod(seconds uint)
// ObservationPeriod gets the observation period that RecordBytes averages over.
ObservationPeriod() (seconds uint)

// Bandwidth returns the observed (or requested) bandwidth.
Bandwidth() (bytesPerSecond uint64)
// HardLimit indicates that the user has requested a hard limit.
HardLimit() (requested bool, bytesPerSecond uint64)

// RequestHardLimit indicates that the user is requesting a hard limit.
RequestHardLimit(bytesPerSecond uint64)
// ClearHardLimit indicates that the user has cancelled the hard limit request.
ClearHardLimit()
}
```

## Bandwidth allocation

Consider the following few scenarios under the above situations.

1) We are processing abnormally large requests, i.e. chunks of several gigabytes
2) We are processing abnormally small requests, i.e. tiny chunks of a handful of kilobytes.
3) We are processing average sized requests, i.e. a handful of megabytes

In case 1, we can probably just allocate everyone a standard division of the hard cap, or as much as they like if we're in detection-only mode. For flighting requests, we should check if allocating a new request would drop everyone below Azure's minimum throughput (with some healthy overage), henceforth `azureMinimumSpeed` based upon the hard cap, or the observed cap (if it is lower).

In case 2, these requests may complete in one iteration, and won't consume the entire `azureMinimumSpeed` on their own. We should watch out for this, because if we don't, we may wind up with much lower real throughput than we intend.

In case 3, it's something in between 1 and 2. It could bounce between. Ergo, we have to handle them like case 2.

To achieve this, keeping our observed throughput and hard-cap in mind, we should, for requests larger than `azureMinimumSpeed`, allocate them as consuming `azureMinimumSpeed`, and for requests smaller, allocate them as consuming their real size.

If the new request's allocated throughput would go over either our hard cap or our observed throughput, we typically shouldn't allocate it.

### On the subject of S2S requests...

These are a headache. We technically _have_ to support them, because previous pacer implementations did, but with a huge bug due to the caveat of trying to pace them. It's, fundamentally, not our bandwidth. We can't control the speed at which an S2S request moves, just the fact that it, in it's entirety, moves at all.

This means the most granularity we get, is the full chunk. We have to allocate them as the full chunk, and subtract the desired throughput from the chunk size until it is satisfied. This in effect, averages out to the desired speed, but not really.

Unfortunately, these have to be handled fundamentally differently than upload or download requests.

## Pauses in transfers and other delays that may present sub-optimal bandwidth observations...

1) Many small files are known to cause issues with bandwidth, as they absorb mostly IOPS, _not_ actual bandwidth.
2) Certain methods of enumeration can be increasingly slow, causing delays between blocks of 10k files. This leads to pauses in any real throughput, and consequently, sends observed bandwidth to 0.
3) What if the user's internet disconnects temporarily or something?
4) What if we get a really large string of retries? That'll send bandwidth to near 0.

Unfortunately, situations like this are where our strategy starts to fall apart a little bit more. We _could_ keep track of average request timing, or we could just fall back to 1MB/s throughput as a theoretical "minimum" to prevent getting totally locked out.

189 changes: 189 additions & 0 deletions pacer/bandwidth_recorder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package pacer

import (
"context"
"math"
"sync"
"sync/atomic"
)

var _ BandwidthRecorder = &bandwidthRecorder{}

type bandwidthRecorder struct {
lock sync.RWMutex

control chan bandwidthRecorderMessage

hardLimit atomic.Int64
totalTraffic atomic.Int64
buckets *bucketRotator
}

func NewBandwidthRecorder(hardLimit int64, observationSeconds uint64, ctx context.Context) BandwidthRecorder {
out := &bandwidthRecorder{
lock: sync.RWMutex{},
control: make(chan bandwidthRecorderMessage),
hardLimit: atomic.Int64{},
totalTraffic: atomic.Int64{},
buckets: newBucketRotator(observationSeconds),
}

out.hardLimit.Store(hardLimit)

go out.worker(ctx)

return out
}

func (b *bandwidthRecorder) GetTotalTraffic() int64 {
return b.totalTraffic.Load()
}

type BandwidthRecorderConfig struct {
ObservationPeriodSeconds uint
HardLimit int
}

func (b *bandwidthRecorder) StartObservation() {
b.control <- bandwidthRecorderMessage{messageType: pacerMessageStart}
}

func (b *bandwidthRecorder) PauseObservation() {
b.control <- bandwidthRecorderMessage{messageType: pacerMessagePause}
}

func (b *bandwidthRecorder) RecordBytes(count int) {
b.totalTraffic.Add(int64(count))
b.buckets.AddToCurrentValue(uint64(max(count, 0)))
}

func (b *bandwidthRecorder) SetObservationPeriod(seconds uint) {
b.buckets.SetSize(seconds)
}

func (b *bandwidthRecorder) ObservationPeriod() (seconds uint) {
return b.buckets.Size()
}

func (b *bandwidthRecorder) Bandwidth() (bytesPerSecond int64, fullAverage bool) {
return int64(min(b.buckets.GetAverage(), math.MaxInt64)), b.buckets.AverageReady()
}

func (b *bandwidthRecorder) HardLimit() (requested bool, bytesPerSecond int64) {
return b.hardLimit.Load() != 0, b.hardLimit.Load()
}

func (b *bandwidthRecorder) RequestHardLimit(bytesPerSecond int64) {
b.hardLimit.Store(bytesPerSecond)
}

func (b *bandwidthRecorder) ClearHardLimit() {
b.RequestHardLimit(0)
}

type bucketRotator struct {
bucketLock *sync.RWMutex
availableValues uint
buckets []atomic.Uint64
currentBucket uint
}

func newBucketRotator(size uint64) *bucketRotator {
size++

return &bucketRotator{
bucketLock: &sync.RWMutex{},
buckets: make([]atomic.Uint64, size),
currentBucket: 0,
}
}

func (b *bucketRotator) Rotate() {
b.bucketLock.Lock()
defer b.bucketLock.Unlock()

b.availableValues = min(b.Size(), b.availableValues+1)
b.currentBucket = (b.currentBucket + 1) % uint(len(b.buckets))
b.buckets[b.currentBucket].Store(0)
}

func (b *bucketRotator) AddToCurrentValue(val uint64) {
b.bucketLock.RLock()
defer b.bucketLock.RUnlock()

b.buckets[b.currentBucket].Add(val)
}

func (b *bucketRotator) AverageReady() bool {
return b.availableValues == b.Size() && // we must have a full set of buckets,
b.Size() > 0 // and a size > 0
}

func (b *bucketRotator) GetAverage() uint64 {
b.bucketLock.RLock()
defer b.bucketLock.RUnlock()

// if we have nothing to average, we have 0. oops!
if b.availableValues == 0 {
return 0
}

var sum uint64
for idx := range int(b.availableValues) {
// idx is not actually a raw index, but needs to be turned into one. It is, instead, the distance, in the reverse direction of rotations, from the current index.
idx++ // first, since the index we're reading is always going to be one back (the current value isn't realized), we need to step back one more
idx = int(b.currentBucket) - idx // step backwards
if idx < 0 {
idx += len(b.buckets) // wrap around
}

// then, add the value of that bucket.
sum += b.buckets[idx].Load()
}

var divisor = uint64(b.availableValues)

return sum / divisor
}

func (b *bucketRotator) SetSize(size uint) {
b.bucketLock.Lock()
defer b.bucketLock.Unlock()

size += 1 // 1 for current element, there should never be 0 elements

if size > uint(len(b.buckets)) {
// insert between us and the tail
var out []atomic.Uint64
diff := size - uint(len(b.buckets))

out = b.buckets[:b.currentBucket+1]
out = append(out, make([]atomic.Uint64, diff)...)
out = append(out, b.buckets[b.currentBucket+1:]...)

b.buckets = out
} else if size < uint(len(b.buckets)) {
// We want to trim items from the list. Immediately after the write head is "stale", so we want to cut the least recent items.
staleEnd := b.buckets[b.currentBucket+1:]
freshEnd := b.buckets[:b.currentBucket]
removing := uint(len(b.buckets)) - size

var out []atomic.Uint64
if removing < uint(len(staleEnd)) {
out = append(freshEnd, b.buckets[b.currentBucket])

Check failure on line 173 in pacer/bandwidth_recorder.go

View workflow job for this annotation

GitHub Actions / lint (1.24.13, ubuntu-latest)

copylocks: call of append copies lock value: sync/atomic.Uint64 contains sync/atomic.noCopy (govet)
out = append(out, staleEnd[removing:]...)
} else {
removing -= uint(len(staleEnd))

out = append(freshEnd[removing:], b.buckets[b.currentBucket])

Check failure on line 178 in pacer/bandwidth_recorder.go

View workflow job for this annotation

GitHub Actions / lint (1.24.13, ubuntu-latest)

copylocks: call of append copies lock value: sync/atomic.Uint64 contains sync/atomic.noCopy (govet)
b.currentBucket -= removing
}

b.availableValues = min(b.availableValues, uint(len(out)-1))
b.buckets = out
}
}

func (b *bucketRotator) Size() uint {
return uint(len(b.buckets)) - 1
}
Loading
Loading