diff --git a/common/atomicmorph.go b/common/atomicmorph.go index 426d5e5f00..e3e10dc7bb 100644 --- a/common/atomicmorph.go +++ b/common/atomicmorph.go @@ -10,7 +10,7 @@ type AtomicMorpherInt32 func(startVal int32) (val int32, morphResult interface{} // AtomicMorph atomically morphs target in to new value (and result) as indicated by the AtomicMorpher callback function. func AtomicMorphInt32(target *int32, morpher AtomicMorpherInt32) interface{} { if target == nil || morpher == nil { - panic("target and morpher mut not be nil") + panic("target and morpher must not be nil") } for { currentVal := atomic.LoadInt32(target) @@ -29,7 +29,7 @@ type AtomicMorpherUint32 func(startVal uint32) (val uint32, morphResult interfac // AtomicMorph atomically morphs target in to new value (and result) as indicated bythe AtomicMorpher callback function. func AtomicMorphUint32(target *uint32, morpher AtomicMorpherUint32) interface{} { if target == nil || morpher == nil { - panic("target and morpher mut not be nil") + panic("target and morpher must not be nil") } for { currentVal := atomic.LoadUint32(target) @@ -48,7 +48,7 @@ type AtomicMorpherInt64 func(startVal int64) (val int64, morphResult interface{} // AtomicMorph atomically morphs target in to new value (and result) as indicated bythe AtomicMorpher callback function. func AtomicMorphInt64(target *int64, morpher AtomicMorpherInt64) interface{} { if target == nil || morpher == nil { - panic("target and morpher mut not be nil") + panic("target and morpher must not be nil") } for { currentVal := atomic.LoadInt64(target) @@ -67,7 +67,7 @@ type AtomicMorpherUint64 func(startVal uint64) (val uint64, morphResult interfac // AtomicMorph atomically morphs target in to new value (and result) as indicated bythe AtomicMorpher callback function. func AtomicMorphUint64(target *uint64, morpher AtomicMorpherUint64) interface{} { if target == nil || morpher == nil { - panic("target and morpher mut not be nil") + panic("target and morpher must not be nil") } for { currentVal := atomic.LoadUint64(target) @@ -77,3 +77,25 @@ func AtomicMorphUint64(target *uint64, morpher AtomicMorpherUint64) interface{} } } } + +type AtomicValue[T any] interface { + Store(T) + Load() T + CompareAndSwap(old, new T) bool +} + +type AtomicMorpher[T any, O any] func(startValue T) (val T, morphResult O) + +func AtomicMorph[T any, O any](target AtomicValue[T], morpher AtomicMorpher[T, O]) O { + if target == nil || morpher == nil { + panic("target and morpher must not be nil") + } + + for { + currentVal := target.Load() + desiredVal, morphResult := morpher(currentVal) + if target.CompareAndSwap(currentVal, desiredVal) { + return morphResult + } + } +} diff --git a/jobsAdmin/JobsAdmin.go b/jobsAdmin/JobsAdmin.go index 11b5cea2c8..471bcdbc26 100755 --- a/jobsAdmin/JobsAdmin.go +++ b/jobsAdmin/JobsAdmin.go @@ -34,6 +34,7 @@ import ( "sync/atomic" "time" + "github.com/Azure/azure-storage-azcopy/v10/pacer" "github.com/Azure/azure-storage-azcopy/v10/ste" "github.com/Azure/azure-storage-azcopy/v10/common" @@ -115,14 +116,13 @@ func initJobsAdmin(appCtx context.Context, concurrency ste.ConcurrencySettings, // use the "networking mega" (based on powers of 10, not powers of 2, since that's what mega means in networking context) targetRateInBytesPerSec := int64(targetRateInMegaBitsPerSec * 1000 * 1000 / 8) - unusedExpectedCoarseRequestByteCount := int64(0) - pacer := ste.NewTokenBucketPacer(targetRateInBytesPerSec, unusedExpectedCoarseRequestByteCount) + requestPacer := pacer.New(pacer.NewBandwidthRecorder(targetRateInBytesPerSec, pacer.DefaultBandwidthRecorderWindowSeconds, appCtx), appCtx) // Note: as at July 2019, we don't currently have a shutdown method/event on JobsAdmin where this pacer // could be shut down. But, it's global anyway, so we just leave it running until application exit. ja := &jobsAdmin{ concurrency: concurrency, jobIDToJobMgr: newJobIDToJobMgr(), - pacer: pacer, + pacer: requestPacer, slicePool: common.NewMultiSizeSlicePool(common.MaxBlockBlobBlockSize), cacheLimiter: common.NewCacheLimiter(maxRamBytesToUse), fileCountLimiter: common.NewCacheLimiter(int64(concurrency.MaxOpenDownloadFiles)), @@ -231,7 +231,7 @@ type jobsAdmin struct { jobIDToJobMgr jobIDToJobMgr // Thread-safe map from each JobID to its JobInfo // Other global state can be stored in more fields here... appCtx context.Context - pacer ste.PacerAdmin + pacer pacer.Interface slicePool common.ByteSlicePooler cacheLimiter common.CacheLimiter fileCountLimiter common.CacheLimiter @@ -313,7 +313,7 @@ func (ja *jobsAdmin) UpdateTargetBandwidth(newTarget int64) { if newTarget < 0 { return } - ja.pacer.UpdateTargetBytesPerSecond(newTarget) + ja.pacer.RequestHardLimit(newTarget) } /* diff --git a/pacer/README.md b/pacer/README.md new file mode 100644 index 0000000000..9f89046b5e --- /dev/null +++ b/pacer/README.md @@ -0,0 +1,75 @@ +# pacer + +The `pacer` package aims to solve two problems... + +1) How do we prevent timing out requests when we have high routines and low bandwidth +2) What if the bandwidth is intentionally, artificially low? +3) What if it's a mix? The user has requested a target bandwidth, but they cannot hit it. + +The answer is simple at face value, but pretty complex in reality. + +## Bandwidth detection + +This is relevant for #1, and #3. Until we _know_ what our bandwidth looks like, we should fire as many requests as our routines will allow, and try to pick up the bandwidth by recording the throughput and averaging it over the last block of time (i.e. average 1s chunks over 30s), then scale down in [Bandwidth Allocation](#bandwidth-allocation). + +To do this, our pacer must implement a throughput recorder which will report the average (or target throughput). This is done by `interface.go`'s `ThroughputRecorder` interface: + +```go +type ThroughputRecorder interface { + // RecordBytes records the number of bytes that have made it through the pipe. + // We don't care if the request ends in success or failure, + // because we're trying to observe the available bandwidth. + RecordBytes(count uint64) + + // SetObservationPeriod sets the observation period that RecordBytes averages over. + SetObservationPeriod(seconds uint) + // ObservationPeriod gets the observation period that RecordBytes averages over. + ObservationPeriod() (seconds uint) + + // Bandwidth returns the observed (or requested) bandwidth. + Bandwidth() (bytesPerSecond uint64) + // HardLimit indicates that the user has requested a hard limit. + HardLimit() (requested bool, bytesPerSecond uint64) + + // RequestHardLimit indicates that the user is requesting a hard limit. + RequestHardLimit(bytesPerSecond uint64) + // ClearHardLimit indicates that the user has cancelled the hard limit request. + ClearHardLimit() +} +``` + +## Bandwidth allocation + +Consider the following few scenarios under the above situations. + +1) We are processing abnormally large requests, i.e. chunks of several gigabytes +2) We are processing abnormally small requests, i.e. tiny chunks of a handful of kilobytes. +3) We are processing average sized requests, i.e. a handful of megabytes + +In case 1, we can probably just allocate everyone a standard division of the hard cap, or as much as they like if we're in detection-only mode. For flighting requests, we should check if allocating a new request would drop everyone below Azure's minimum throughput (with some healthy overage), henceforth `azureMinimumSpeed` based upon the hard cap, or the observed cap (if it is lower). + +In case 2, these requests may complete in one iteration, and won't consume the entire `azureMinimumSpeed` on their own. We should watch out for this, because if we don't, we may wind up with much lower real throughput than we intend. + +In case 3, it's something in between 1 and 2. It could bounce between. Ergo, we have to handle them like case 2. + +To achieve this, keeping our observed throughput and hard-cap in mind, we should, for requests larger than `azureMinimumSpeed`, allocate them as consuming `azureMinimumSpeed`, and for requests smaller, allocate them as consuming their real size. + +If the new request's allocated throughput would go over either our hard cap or our observed throughput, we typically shouldn't allocate it. + +### On the subject of S2S requests... + +These are a headache. We technically _have_ to support them, because previous pacer implementations did, but with a huge bug due to the caveat of trying to pace them. It's, fundamentally, not our bandwidth. We can't control the speed at which an S2S request moves, just the fact that it, in it's entirety, moves at all. + +This means the most granularity we get, is the full chunk. We have to allocate them as the full chunk, and subtract the desired throughput from the chunk size until it is satisfied. This in effect, averages out to the desired speed, but not really. + +Unfortunately, these have to be handled fundamentally differently than upload or download requests. + +## Pauses in transfers and other delays that may present sub-optimal bandwidth observations... + +1) Many small files are known to cause issues with bandwidth, as they absorb mostly IOPS, _not_ actual bandwidth. +2) Certain methods of enumeration can be increasingly slow, causing delays between blocks of 10k files. This leads to pauses in any real throughput, and consequently, sends observed bandwidth to 0. +3) What if the user's internet disconnects temporarily or something? +4) What if we get a really large string of retries? That'll send bandwidth to near 0. + +Unfortunately, situations like this are where our strategy starts to fall apart a little bit more. We _could_ keep track of average request timing, or we could just fall back to 1MB/s throughput as a theoretical "minimum" to prevent getting totally locked out. + diff --git a/pacer/bandwidth_recorder.go b/pacer/bandwidth_recorder.go new file mode 100644 index 0000000000..9efb5b15a2 --- /dev/null +++ b/pacer/bandwidth_recorder.go @@ -0,0 +1,189 @@ +package pacer + +import ( + "context" + "math" + "sync" + "sync/atomic" +) + +var _ BandwidthRecorder = &bandwidthRecorder{} + +type bandwidthRecorder struct { + lock sync.RWMutex + + control chan bandwidthRecorderMessage + + hardLimit atomic.Int64 + totalTraffic atomic.Int64 + buckets *bucketRotator +} + +func NewBandwidthRecorder(hardLimit int64, observationSeconds uint64, ctx context.Context) BandwidthRecorder { + out := &bandwidthRecorder{ + lock: sync.RWMutex{}, + control: make(chan bandwidthRecorderMessage), + hardLimit: atomic.Int64{}, + totalTraffic: atomic.Int64{}, + buckets: newBucketRotator(observationSeconds), + } + + out.hardLimit.Store(hardLimit) + + go out.worker(ctx) + + return out +} + +func (b *bandwidthRecorder) GetTotalTraffic() int64 { + return b.totalTraffic.Load() +} + +type BandwidthRecorderConfig struct { + ObservationPeriodSeconds uint + HardLimit int +} + +func (b *bandwidthRecorder) StartObservation() { + b.control <- bandwidthRecorderMessage{messageType: pacerMessageStart} +} + +func (b *bandwidthRecorder) PauseObservation() { + b.control <- bandwidthRecorderMessage{messageType: pacerMessagePause} +} + +func (b *bandwidthRecorder) RecordBytes(count int) { + b.totalTraffic.Add(int64(count)) + b.buckets.AddToCurrentValue(uint64(max(count, 0))) +} + +func (b *bandwidthRecorder) SetObservationPeriod(seconds uint) { + b.buckets.SetSize(seconds) +} + +func (b *bandwidthRecorder) ObservationPeriod() (seconds uint) { + return b.buckets.Size() +} + +func (b *bandwidthRecorder) Bandwidth() (bytesPerSecond int64, fullAverage bool) { + return int64(min(b.buckets.GetAverage(), math.MaxInt64)), b.buckets.AverageReady() +} + +func (b *bandwidthRecorder) HardLimit() (requested bool, bytesPerSecond int64) { + return b.hardLimit.Load() != 0, b.hardLimit.Load() +} + +func (b *bandwidthRecorder) RequestHardLimit(bytesPerSecond int64) { + b.hardLimit.Store(bytesPerSecond) +} + +func (b *bandwidthRecorder) ClearHardLimit() { + b.RequestHardLimit(0) +} + +type bucketRotator struct { + bucketLock *sync.RWMutex + availableValues uint + buckets []atomic.Uint64 + currentBucket uint +} + +func newBucketRotator(size uint64) *bucketRotator { + size++ + + return &bucketRotator{ + bucketLock: &sync.RWMutex{}, + buckets: make([]atomic.Uint64, size), + currentBucket: 0, + } +} + +func (b *bucketRotator) Rotate() { + b.bucketLock.Lock() + defer b.bucketLock.Unlock() + + b.availableValues = min(b.Size(), b.availableValues+1) + b.currentBucket = (b.currentBucket + 1) % uint(len(b.buckets)) + b.buckets[b.currentBucket].Store(0) +} + +func (b *bucketRotator) AddToCurrentValue(val uint64) { + b.bucketLock.RLock() + defer b.bucketLock.RUnlock() + + b.buckets[b.currentBucket].Add(val) +} + +func (b *bucketRotator) AverageReady() bool { + return b.availableValues == b.Size() && // we must have a full set of buckets, + b.Size() > 0 // and a size > 0 +} + +func (b *bucketRotator) GetAverage() uint64 { + b.bucketLock.RLock() + defer b.bucketLock.RUnlock() + + // if we have nothing to average, we have 0. oops! + if b.availableValues == 0 { + return 0 + } + + var sum uint64 + for idx := range int(b.availableValues) { + // idx is not actually a raw index, but needs to be turned into one. It is, instead, the distance, in the reverse direction of rotations, from the current index. + idx++ // first, since the index we're reading is always going to be one back (the current value isn't realized), we need to step back one more + idx = int(b.currentBucket) - idx // step backwards + if idx < 0 { + idx += len(b.buckets) // wrap around + } + + // then, add the value of that bucket. + sum += b.buckets[idx].Load() + } + + var divisor = uint64(b.availableValues) + + return sum / divisor +} + +func (b *bucketRotator) SetSize(size uint) { + b.bucketLock.Lock() + defer b.bucketLock.Unlock() + + size += 1 // 1 for current element, there should never be 0 elements + + if size > uint(len(b.buckets)) { + // insert between us and the tail + var out []atomic.Uint64 + diff := size - uint(len(b.buckets)) + + out = b.buckets[:b.currentBucket+1] + out = append(out, make([]atomic.Uint64, diff)...) + out = append(out, b.buckets[b.currentBucket+1:]...) + + b.buckets = out + } else if size < uint(len(b.buckets)) { + // We want to trim items from the list. Immediately after the write head is "stale", so we want to cut the least recent items. + staleEnd := b.buckets[b.currentBucket+1:] + freshEnd := b.buckets[:b.currentBucket] + removing := uint(len(b.buckets)) - size + + var out []atomic.Uint64 + if removing < uint(len(staleEnd)) { + out = append(freshEnd, b.buckets[b.currentBucket]) + out = append(out, staleEnd[removing:]...) + } else { + removing -= uint(len(staleEnd)) + + out = append(freshEnd[removing:], b.buckets[b.currentBucket]) + b.currentBucket -= removing + } + + b.availableValues = min(b.availableValues, uint(len(out)-1)) + b.buckets = out + } +} + +func (b *bucketRotator) Size() uint { + return uint(len(b.buckets)) - 1 +} diff --git a/pacer/bandwidth_recorder_ctl.go b/pacer/bandwidth_recorder_ctl.go new file mode 100644 index 0000000000..55ab0cceea --- /dev/null +++ b/pacer/bandwidth_recorder_ctl.go @@ -0,0 +1,60 @@ +package pacer + +import ( + "context" + "time" +) + +const ( + pacerMessageStart = 1 + pacerMessagePause = 2 +) + +type bandwidthRecorderMessage struct { + messageType uint + params []any +} + +type bandwidthRecorderWorkerState struct { + ticker *time.Ticker + live bool +} + +func (b *bandwidthRecorder) worker(ctx context.Context) { + state := &bandwidthRecorderWorkerState{ + ticker: time.NewTicker(time.Second), + live: false, + } + + //state.ticker.Stop() + + for { + select { + case <-state.ticker.C: + b.buckets.Rotate() + case msg := <-b.control: + b.handleMessage(&msg, state) + case <-ctx.Done(): + return + } + } +} + +func (b *bandwidthRecorder) handleMessage(msg *bandwidthRecorderMessage, state *bandwidthRecorderWorkerState) { + switch msg.messageType { + case pacerMessageStart: + if state.live { + return // no-op + } + + state.ticker.Reset(time.Second) + state.live = true + case pacerMessagePause: + if !state.live { + return // no-op + } + + state.ticker.Stop() + state.live = false + } +} diff --git a/pacer/bucket_rotator_test.go b/pacer/bucket_rotator_test.go new file mode 100644 index 0000000000..4d2a0d1e7b --- /dev/null +++ b/pacer/bucket_rotator_test.go @@ -0,0 +1,104 @@ +package pacer + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +// TestBucketRotator tests the facilities of the bucket rotator to +// 1) accurately maintain whether or not the average is ready through the start, inflation, and deflation steps +// 2) accept "overflowing" values and maintain a correct average +// 3) correctly handle a state where averaging isn't possible +func TestBucketRotator(t *testing.T) { + const ( + ROTATOR_SIZE = 30 + ) + + a := assert.New(t) + + rot := newBucketRotator(ROTATOR_SIZE) + + // test averaging + var sum uint64 + for idx := range uint64(ROTATOR_SIZE) { + a.Equal(false, rot.AverageReady(), fmt.Sprintf("average ready on idx %d", idx)) // average should not be ready yet. + if idx > 0 { + a.Equal(int(sum/idx), int(rot.GetAverage()), "early average should match logic") // idx, not idx+1, because we haven't added the current value yet. + } else { + a.Equal(0, int(rot.GetAverage()), "0th element avg should be 0") + } + + sum += idx + 1 + rot.AddToCurrentValue(idx + 1) + rot.Rotate() + } + + a.Equal(ROTATOR_SIZE, int(rot.Size())) + a.Equal(ROTATOR_SIZE, int(rot.availableValues)) + a.Equal(true, rot.AverageReady(), "Average is expected to be ready after ROTATOR_SIZE rotations") + a.Equal(int(sum/uint64(ROTATOR_SIZE)), int(rot.GetAverage())) + + // test deflation + rot.SetSize(ROTATOR_SIZE / 2) + sum = 0 + for idx := range uint64(ROTATOR_SIZE / 2) { + sum += idx + (ROTATOR_SIZE / 2) + 1 // re-adjust our results, since it's going to be the more recent half + } + + a.Equal(ROTATOR_SIZE/2, int(rot.Size())) + a.Equal(ROTATOR_SIZE/2, int(rot.availableValues)) + a.Equal(true, rot.AverageReady()) + a.Equal(int(sum/(ROTATOR_SIZE/2)), int(rot.GetAverage())) + + // test inflation + rot.SetSize(ROTATOR_SIZE) + + // we shouldn't be ready to provide an average, but, if we pull one anyway the result should be the same. + a.Equal(ROTATOR_SIZE, int(rot.Size())) + a.Equal(false, rot.AverageReady()) + a.Equal(ROTATOR_SIZE/2, int(rot.availableValues)) + a.Equal(int(sum/(ROTATOR_SIZE/2)), int(rot.GetAverage())) + + // if we count back up, then we should get our original average. + for idx := range uint64(ROTATOR_SIZE / 2) { + a.Equal(int(sum/((ROTATOR_SIZE/2)+idx)), int(rot.GetAverage()), "early average should match logic") // idx, not idx+1, because we haven't added the current value yet. + + sum += idx + 1 + rot.AddToCurrentValue(idx + 1) + rot.Rotate() + } + + // we should be ready to average again. + a.Equal(ROTATOR_SIZE, int(rot.Size())) + a.Equal(true, rot.AverageReady()) + a.Equal(ROTATOR_SIZE, int(rot.availableValues)) + a.Equal(int(sum/uint64(ROTATOR_SIZE)), int(rot.GetAverage())) + + // if we push over by 1, it should delete the first value ever placed in the table. + rot.AddToCurrentValue(ROTATOR_SIZE + 1) + sum -= (ROTATOR_SIZE / 2) + 1 // remove the proceeding value, which should be the first value after the halfway mark. + sum += ROTATOR_SIZE + 1 // add the new value, which is 1 over our cap. + rot.Rotate() + + // validate our new average matches our expectations. + a.Equal(ROTATOR_SIZE, int(rot.Size())) + a.Equal(true, rot.AverageReady()) + a.Equal(ROTATOR_SIZE, int(rot.availableValues)) + a.Equal(int(sum/uint64(ROTATOR_SIZE)), int(rot.GetAverage())) + + // deflate to 0. + rot.SetSize(0) + + a.Equal(0, int(rot.Size())) + a.Equal(0, int(rot.availableValues)) + a.Equal(false, rot.AverageReady()) + a.Equal(0, int(rot.GetAverage())) + + // if we try to push a value and rotate the only value will be 0. + rot.AddToCurrentValue(5) + a.Equal(5, int(rot.buckets[0])) + rot.Rotate() + a.Equal(0, int(rot.buckets[0])) +} diff --git a/pacer/interface.go b/pacer/interface.go new file mode 100644 index 0000000000..fc9f82c724 --- /dev/null +++ b/pacer/interface.go @@ -0,0 +1,95 @@ +package pacer + +import ( + "context" + "io" + + "github.com/Azure/azure-storage-azcopy/v10/common" + "github.com/google/uuid" +) + +const ( + DefaultBandwidthRecorderWindowSeconds = 10 +) + +type BandwidthRecorder interface { + // RecordBytes records the number of bytes that have made it through the pipe. + // We don't care if the request ends in success or failure, + // because we're trying to observe the available bandwidth. + RecordBytes(count int) + + // StartObservation must be called after the bandwidth recorder is created. If already started, no-ops. + StartObservation() + // PauseObservation should be called whenever transfers aren't actively in progress (i.e. when waiting for another job plan part), followed by StartObservation. If already paused, no-ops. + PauseObservation() + + // SetObservationPeriod sets the observation period that RecordBytes averages over. + SetObservationPeriod(seconds uint) + // ObservationPeriod gets the observation period that RecordBytes averages over. + ObservationPeriod() (seconds uint) + + // Bandwidth returns the observed (or requested) bandwidth. Requests should be gated based upon this, if it is below a requested hard limit. + Bandwidth() (bytesPerSecond int64, fullAverage bool) + // HardLimit indicates that the user has requested a hard limit. Bandwidth should be allocated evenly amongst this if requested. + HardLimit() (requested bool, bytesPerSecond int64) + + GetTotalTraffic() int64 + + // RequestHardLimit indicates that the user is requesting a hard limit. + RequestHardLimit(bytesPerSecond int64) + // ClearHardLimit indicates that the user has cancelled the hard limit request. + ClearHardLimit() +} + +// Interface implements the meat and potatoes of the pacer package, recording bandwidth and managing request initiation. +// Think of it like air traffic control. You don't want too many planes in the air, otherwise you're going to have a crash. +// One BandwidthRecorder is attached, intended for measuring the primary transfer direction. +type Interface interface { + BandwidthRecorder + + // InitiateRequest asks to initiate a request with a specific size. The request should _not_ be on the wire yet. + // Do not call directly, favor InjectPacer. + initiateRequest(bodySizeBytes int64, ctx context.Context) <-chan Request + + // InitiateUnpaceable asks to initiate a request which *cannot* be paced and has to act by sheer average (i.e. like S2S). + // This comes with a very large caveat! Bandwidth is _not_ recorded, and the hard limit is observed "raw". + // This is incompatible with the pacing of InitiateRequest. + // This shouldn't wind up being combined, since we don't do S2S transfers at the same time as up/downloads, + // But word to the wise: HERE BE DRAGONS. + InitiateUnpaceable(bodySizeBytes int64, ctx context.Context) <-chan error + + // InjectPacer updates the context with the key required to inject the pacer. + // NewPacerInjectPolicy should be added to the pipeline, following the retry policy, as close to the request execution as possible. + // Without it, InjectPacer will be ineffectual. + InjectPacer(bodySizeBytes int64, fromTo common.FromTo, ctx context.Context) (context.Context, error) + + discardRequest(request Request) + reinitiateRequest(req Request) <-chan any +} + +// Request is a lifecycle manager for a single request. It is _not_ idempotent, multiple threads should _not_ be acting upon it at the same time. +// One request, on one thread, that is already on the wire. +type Request interface { + ID() uuid.UUID + + // RemainingAllocations is how much left this request has to get allocated. + RemainingAllocations() int + // RemainingReads is how much left this request has to read. Should always be larger than RemaningAllocations(). + RemainingReads() int + + // WrapRequestBody wraps a request body. This, or WrapResponseBody should only be called once. + WrapRequestBody(reader io.ReadSeekCloser) io.ReadSeekCloser + // WrapResponseBody wraps a response body. This, or WrapRequestBody should only be called once. + WrapResponseBody(reader io.ReadCloser) io.ReadCloser + + // issueBytes hands a request bytes to use. They don't _have_ to be used, and there's intentionally no mechanism to return them, because it complicates the logic. + issueBytes(size int) (remaining int64) + // informSeek alters the amount read and allocated (if less has been allocated) and reinitiates the request. + informSeek(newLoc int64) + // requestUse should be called with the size of the buffer. It may be larger than Remaining. An amount of bytes [0, Remaining] will be returned. 0 is only returned if Remaining is 0. + requestUse(size int) (allocated int, err error) + // confirmUse should be called after reading (or writing) as much as possible from the allocated value of RequestUse. If recordBandwidth is true, it is written to the BandwidthRecorder. + confirmUse(size int, recordBandwidth bool) + // Discard indicates that the Request will probably never be used again, and that we should discard the bandwidth allocation. + Discard() +} diff --git a/pacer/pacer_ctl.go b/pacer/pacer_ctl.go new file mode 100644 index 0000000000..07253cbfed --- /dev/null +++ b/pacer/pacer_ctl.go @@ -0,0 +1,6 @@ +package pacer + +type requestQueueEntry struct { + req Request + readyCh chan any +} diff --git a/pacer/pacer_impl.go b/pacer/pacer_impl.go new file mode 100644 index 0000000000..7b3ee8726f --- /dev/null +++ b/pacer/pacer_impl.go @@ -0,0 +1,145 @@ +package pacer + +import ( + "context" + "errors" + "time" + + "github.com/Azure/azure-storage-azcopy/v10/common" + "github.com/google/uuid" +) + +const ( + /* + https://learn.microsoft.com/en-us/rest/api/storageservices/setting-timeouts-for-blob-service-operations + Azure Blob storage defines downloads as 2 minutes per megabyte, and 10 minutes per megabyte for upload. + + https://learn.microsoft.com/en-us/rest/api/storageservices/setting-timeouts-for-file-service-operations + Azure Files defines it's minimum throughput as 2 minutes per megabytes. + + As such, we'll safely define our understanding at double that-- one megabyte per minute. + */ + azureMinimumBandwidth = common.MegaByte / 60 + + bandwidthSaturationTarget float64 = 0.35 + pretendMinimumRequestCount = 10 + + allocatorTickrate = time.Second +) + +var ( + // pretendBps exists to fill in the lulls (i.e. what if the FE is just enumerating and we have nothing to do?) + pretendBps = float64(azureMinimumBandwidth*pretendMinimumRequestCount) * (float64(1) / bandwidthSaturationTarget) +) + +type impl struct { + BandwidthRecorder + + appCtx context.Context + ticker *time.Ticker + + queue chan requestQueueEntry + reliveQueue chan requestQueueEntry + discardQueue chan Request + live map[uuid.UUID]Request +} + +func New(recorder BandwidthRecorder, appCtx context.Context) Interface { + out := &impl{ + BandwidthRecorder: recorder, + appCtx: appCtx, + ticker: time.NewTicker(allocatorTickrate), + queue: make(chan requestQueueEntry, 300), + reliveQueue: make(chan requestQueueEntry, 300), + discardQueue: make(chan Request, 300), + live: make(map[uuid.UUID]Request), + } + + go out.worker() + + return out +} + +func (i *impl) InjectPacer(bodySizeBytes int64, fromTo common.FromTo, ctx context.Context) (context.Context, error) { + if !fromTo.IsUpload() && !fromTo.IsDownload() { + return ctx, errors.New("call InjectPacer only on upload and download; For S2S, call InitiateUnpaceable") + } + + return context.WithValue(ctx, pacerInjectKey, pacerInjectValue{ + pacer: i, + wrapMode: common.Iff(fromTo.IsUpload(), pacerInjectWrapModeRequest, pacerInjectWrapModeResponse), // request on upload, response on download + expectedBodySize: bodySizeBytes, + }), nil +} + +func (i *impl) initiateRequest(bodySizeBytes int64, ctx context.Context) <-chan Request { + out := make(chan Request, 1) + go func() { + startCh := make(chan any, 1) + req := newRequest(i, bodySizeBytes, ctx) + + i.queue <- requestQueueEntry{ + req: req, + readyCh: startCh, + } + + <-startCh + out <- req + }() + + return out +} + +func (i *impl) InitiateUnpaceable(bodySizeBytes int64, ctx context.Context) <-chan error { + out := make(chan error, 1) + + go func() { + req := <-i.initiateRequest(bodySizeBytes, ctx) + + for req.RemainingReads() > 0 { + allocated, err := req.requestUse(req.RemainingReads()) + + if err != nil { + req.Discard() + out <- err + } + + req.confirmUse(allocated, false) + } + + req.Discard() + + out <- nil + }() + + return out +} + +func (i *impl) reinitiateRequest(req Request) <-chan any { + out := make(chan any, 1) + + i.reliveQueue <- requestQueueEntry{ + req: req, + readyCh: out, + } + + return out +} + +func (i *impl) discardRequest(request Request) { + i.discardQueue <- request +} + +func (i *impl) worker() { + for { + select { + case req := <-i.discardQueue: + // sometimes we discard mid-tick, that's OK, just ignore it. + if _, ok := i.live[req.ID()]; ok { + delete(i.live, req.ID()) + } + case <-i.ticker.C: + i.tick() + } + } +} diff --git a/pacer/pacer_policy.go b/pacer/pacer_policy.go new file mode 100644 index 0000000000..f15fc6a1ac --- /dev/null +++ b/pacer/pacer_policy.go @@ -0,0 +1,90 @@ +package pacer + +import ( + "fmt" + "net/http" + "sync" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" + "github.com/Azure/azure-storage-azcopy/v10/common" +) + +type ( + // pacerInjectKeyType is used to define pacerInjectKey to have a unique value for the context key + pacerInjectKeyType struct{} + // pacerInjectValue is the expected type of value behind pacerInjectKey. + pacerInjectValue struct { + pacer Interface + wrapMode pacerInjectWrapMode + expectedBodySize int64 + } + // pacerInjectWrapMode defines how the pacer should be injected ( + pacerInjectWrapMode uint +) + +const ( + // pacerInjectWrapModeNil is an invalid state, and will ignore the action if set, or pacer is nil. + pacerInjectWrapModeNil pacerInjectWrapMode = iota + // pacerInjectWrapModeRequest wraps the request body, and gates the request. + pacerInjectWrapModeRequest + // pacerInjectWrapModeResponse wraps the response body, and gates the request. + pacerInjectWrapModeResponse +) + +var ( + // pacerInjectKey see pacerInjectKeyType description + pacerInjectKey = &pacerInjectKeyType{} + // pacerInjectWarnOnce warns on LCM that something isn't right... + pacerInjectWarnOnce = &sync.Once{} +) + +type pacerInjectPolicy struct { +} + +// NewPacerInjectPolicy creates a new policy (which should be added after retry policy). This policy, on it's own, does not wrap a request, but relies upon the pacer being injected. +func NewPacerInjectPolicy() policy.Policy { + return &pacerInjectPolicy{} +} + +func (p *pacerInjectPolicy) warn(warningText string) { + pacerInjectWarnOnce.Do(func() { + common.GetLifecycleMgr().Warn(warningText) + common.AzcopyCurrentJobLogger.Log(common.LogWarning, warningText) + }) +} + +func (p *pacerInjectPolicy) Do(req *policy.Request) (*http.Response, error) { + if injectData, ok := req.Raw().Context().Value(pacerInjectKey).(pacerInjectValue); ok { + + if injectData.pacer == nil { + p.warn("Sanity Check: Pacer inject key found, but pacer was nil. File a bug on AzCopy's github page if you see this.") + return req.Next() + } + + if injectData.wrapMode != pacerInjectWrapModeResponse && injectData.wrapMode != pacerInjectWrapModeRequest { + p.warn("Sanity Check: Pacer inject key found, but wrap mode was undefined. File a bug on AzCopy's github page if you see this.") + return req.Next() + } + + pacerRequest := <-injectData.pacer.initiateRequest(injectData.expectedBodySize, req.Raw().Context()) + + // for uploads, we wrap the request body. + if injectData.wrapMode == pacerInjectWrapModeRequest { + err := req.SetBody(pacerRequest.WrapRequestBody(req.Body()), req.Raw().Header.Get("Content-Type")) + if err != nil { + return nil, fmt.Errorf("error while wrapping body: %w", err) + } + } + + resp, err := req.Next() + + // for downloads, wrap the response body. Do not do this if the request failed. + if err == nil && injectData.wrapMode == pacerInjectWrapModeResponse { + resp.Body = pacerRequest.WrapResponseBody(resp.Body) + } + + return resp, err + } + + return req.Next() +} diff --git a/pacer/pacer_ticker.go b/pacer/pacer_ticker.go new file mode 100644 index 0000000000..dcbfe76490 --- /dev/null +++ b/pacer/pacer_ticker.go @@ -0,0 +1,101 @@ +package pacer + +import ( + "fmt" + "math" + + "github.com/Azure/azure-storage-azcopy/v10/common" +) + +func (i *impl) tick() { + /* + 1) Check hard limit and bandwidth + 2) Allocate new requests + 3) Feed all existing requests + */ + + limitRequested, hardLimit := i.HardLimit() + observedBps, _ := i.Bandwidth() + observedBps = max(observedBps, int64(pretendBps)) + + currentlyAllocated := len(i.live) * azureMinimumBandwidth + //targetBandwidth := int(float64(observedBps) * bandwidthSaturationTarget) + //if limitRequested && observedBps > hardLimit { + // targetBandwidth = int(float64(hardLimit) * bandwidthSaturationTarget) + //} + + for len(i.live) < 2 { + var newRequest requestQueueEntry + queueEmpty := false + select { + case newRequest = <-i.reliveQueue: // reanimated requests receive higher priority, as they may still be on the wire. this helps avoid starvation. + case newRequest = <-i.queue: + default: + queueEmpty = true + } + + if !queueEmpty { + go func() { + newRequest.readyCh <- nil + }() + + // there's no point in accepting a request needing nothing, this is just going to waste our time. + if newRequest.req.RemainingAllocations() == 0 { + continue + } + + // keep track of our allocations + currentlyAllocated += azureMinimumBandwidth + + // record it as live + i.live[newRequest.req.ID()] = newRequest.req + + common.GetLifecycleMgr().Info(fmt.Sprintf("accepting request %s with %d bytes needed", newRequest.req.ID(), newRequest.req.RemainingAllocations())) + } else { + break + } + } + + if !limitRequested || len(i.live) == 0 { + // if there's no limit requested, everyone gets everything they want-- the requests will check and see that a hard limit isn't enforced. + // if there's nobody to give bandwidth to, this escapes a div by 0. + return + } + + requestsPopped := 0 + // If there's a limit requested, we need to distribute it. In phase 1 of distribution, we'll see if any requests are lower than our "average". + // We don't bother looping this, because our average is only likely to be lower if we did any distributions during this phase. + averageAllocationSize := hardLimit / int64(len(i.live)) + for k, v := range i.live { + if remNeeded := v.RemainingAllocations(); int64(remNeeded) <= averageAllocationSize { + common.GetLifecycleMgr().Info(fmt.Sprintf("%s receiving %d bytes (early clear)", v.ID(), remNeeded)) + + v.issueBytes(remNeeded) + hardLimit -= int64(remNeeded) + delete(i.live, k) + v.Discard() + requestsPopped++ + } + } + + if len(i.live) == 0 { + return + } + + // recalculate our average, then redistribute it to all live requests + averageAllocationSize = hardLimit / int64(len(i.live)) + for k, v := range i.live { + remainingBytes := v.issueBytes(int(min(averageAllocationSize, math.MaxInt))) + common.GetLifecycleMgr().Info(fmt.Sprintf("%s receiving %d bytes (%d remain, %d bytes to read left)", v.ID(), averageAllocationSize, remainingBytes, v.RemainingReads())) + + if remainingBytes == 0 { + delete(i.live, k) + v.Discard() + requestsPopped++ + } + } + + limitRequested, hardLimit = i.HardLimit() + observedBps, _ = i.Bandwidth() + common.GetLifecycleMgr().Info(fmt.Sprintf("average alloc: %d (%d requests live, %d before averaging, %d popped this cycle, %d bytes observed throughput, %d hard cap requested)", averageAllocationSize, len(i.live), hardLimit, requestsPopped, observedBps, hardLimit)) +} diff --git a/pacer/request_impl.go b/pacer/request_impl.go new file mode 100644 index 0000000000..0016535602 --- /dev/null +++ b/pacer/request_impl.go @@ -0,0 +1,231 @@ +package pacer + +import ( + "context" + "errors" + "fmt" + "io" + "math" + "sync" + "sync/atomic" + "time" + + "github.com/Azure/azure-storage-azcopy/v10/common" + "github.com/google/uuid" +) + +var pacerIncorrectBehaviorWarnOnce = &sync.Once{} + +type request struct { + parent Interface + + ctx context.Context + id uuid.UUID + + allocationsFinished *atomic.Bool + + /* + I (adreed, 2/18/26) have to do a little justifying to myself as to why this + (adding to requestedBudget when we need more budget) is unnecessary to be worrying about. + + The largest block in a block blob or append blob is 4 gigabytes a block (common.MaxBlockBlobBlockSize) + Dividing math.MaxInt64 by that yields the reality that it fits into an int64, + not just a handful of times, but, 11 whole digits times (21990232255)! + + Append blobs having a maximum block size of 40x less (common.MaxAppendBlobBlockSize), + Page blobs, and Azure Files being able to put only 4MB per chunk. (exactly 1000x less, if you ask our code!) + ADLS Gen 2 doesn't list a max size in its REST API docs, but we do fall back to similar logic for block blobs, so we can assume that's the max. + + Either way, if a retry adds a requirement for new budget, it very certainly won't do it (10^11)*2.2 times under current retry policies... + If it does, call me, so I can laugh at my own hubris, and panic over how to solve that problem. + */ + + requestedBudget *atomic.Int64 + allocatedBudget *atomic.Int64 + usedBudget *atomic.Int64 + readHead *atomic.Int64 + + bodySize int64 +} + +func newRequest(parent Interface, bodySize int64, ctx context.Context) Request { + out := &request{ + parent: parent, + + ctx: ctx, + id: uuid.New(), + + allocationsFinished: &atomic.Bool{}, + + requestedBudget: &atomic.Int64{}, + allocatedBudget: &atomic.Int64{}, + usedBudget: &atomic.Int64{}, + readHead: &atomic.Int64{}, // we can trust our read head is at 0, since the SDK expects it too! + + bodySize: bodySize, + } + + out.allocationsFinished.Store(false) + out.requestedBudget.Store(bodySize) + out.allocatedBudget.Store(0) + out.usedBudget.Store(0) + out.readHead.Store(0) + + return out +} + +func (r *request) ID() uuid.UUID { + return r.id +} + +func (r *request) RemainingAllocations() int { + return int(min(r.bodySize-r.allocatedBudget.Load(), math.MaxInt)) +} + +func (r *request) RemainingReads() int { + return int(min(r.bodySize-r.readHead.Load(), math.MaxInt)) +} + +func (r *request) WrapRequestBody(reader io.ReadSeekCloser) io.ReadSeekCloser { + return &wrappedRSC{ + seeker: reader, + wrappedRC: wrappedRC{ + parentReq: r, + childReader: reader, + }, + } +} + +func (r *request) WrapResponseBody(reader io.ReadCloser) io.ReadCloser { + return &wrappedRC{ + parentReq: r, + childReader: reader, + } +} + +func (r *request) issueBytes(size int) (remaining int64) { + // allocate as many bytes as both size, and our current request will allow. + cRequest := r.requestedBudget.Load() + cAlloc := r.allocatedBudget.Load() + + maxAlloc := cRequest - cAlloc + sizei64 := min(int64(size), maxAlloc) + + return cRequest - r.allocatedBudget.Add(sizei64) +} + +func (r *request) informSeek(newLoc int64) { + // We'll never, ever remove request, only add here. Why? + // 1) Subtracting request bytes could cause issueBytes to over-issue if it occurs at the right time. + // 2) Subtracting request bytes could incorrectly cause our allocator to de-allocate us, which would be extremely bad mid-request. + // So, we should calculate if we'll need new bytes with our new location + + cRequest := r.requestedBudget.Load() + cRead := r.usedBudget.Load() // we ask about what we've read, not what we've been allocated, as current allocations are "free" under this model. + toAllocate := cRequest - cRead + newRequirement := r.bodySize - newLoc + + // if we are about to be allocated less than we'll now need, we add what's missing. + if toAllocate < newRequirement { + fmt.Println("what", toAllocate, newRequirement, r.requestedBudget.Load(), r.usedBudget.Load()) + // per my rant in the request struct, this'll pretty much always be a safe operation. + r.requestedBudget.Add(newRequirement - toAllocate) + } + + // make sure we also update our read head for accurate read reporting (even though that's only really used for S2S as of 2/18/26!) + r.readHead.Store(newLoc) +} + +func (r *request) requestUse(size int) (allocated int, err error) { + // a request to use is not a confirmed use-- readers can always read less than you ask them to, so we request, then confirm. + // simultaneously, sometimes size might be larger than we expect! there could be less of the reader remaining than we intend. + + // if the allocator doesn't have a hard limit, we go as fast as we want. the allocator is trying to gauge how many requests is safe to send. + if hardLimitRequested, _ := r.parent.HardLimit(); !hardLimitRequested { + // if we're doing this, we're going to allocate for ourselves, that way if throughput limiting is suddenly enabled (i.e. via stgexp), the request gets paced normally again. + postAllocation := r.allocatedBudget.Add(int64(size)) + if r.requestedBudget.Load() <= postAllocation { // expand our budget if it's needed (probably not), and de-allocate ourselves, since we're likely finished. + r.requestedBudget.Store(postAllocation) + r.Discard() + } + + return size, nil + } + + // first, let's return what's available, if anything. + available := r.allocatedBudget.Load() - r.usedBudget.Load() + + if available > 0 { + allocated = min(int(min(available, math.MaxInt)), size) + return + } + + // prepare a function to reanimate ourselves, if needed. + reanimate := func() { + if r.allocationsFinished.Load() { + common.GetLifecycleMgr().Info(fmt.Sprintf("reanimating %s with %d bytes needed", r.id.String(), size)) + + // ensure we are going to get our portion, in case something weird happened. + futureReadBudget := r.requestedBudget.Load() - r.usedBudget.Load() + if futureReadBudget < int64(size) { + r.requestedBudget.Add(int64(size) - futureReadBudget) + } + + // fire off the reanimate request and flip the allocation bit + <-r.parent.reinitiateRequest(r) + r.allocationsFinished.Store(false) + } + } + + // then, we wait for a new allocation. this won't be perfectly on pace with the traverser, but 1 second isn't a ton of time to lose for one request. + t := time.NewTicker(allocatorTickrate) + defer t.Stop() + for available == 0 { + select { + case <-t.C: + // no-op, check if we have an allocation (or if we need to reanimate ourselves) + available = r.allocatedBudget.Load() - r.usedBudget.Load() + + // try reanimating if we still need bytes but have nothing available + if available == 0 { + reanimate() + } + case <-r.ctx.Done(): + return 0, errors.New("context canceled while checking for allocation") + } + } + + allocated = min(int(min(available, math.MaxInt)), size) + + return +} + +func (r *request) confirmUse(size int, recordBandwidth bool) { + // while it's incorrect behavior to confirm more than we've ever been allocated, + // we'll let it slide and drop a warning in the logs that something *probably* isn't as we expect, + // because working is better than not working, even if working incorrectly. + + afterUse := r.usedBudget.Add(int64(size)) + r.readHead.Add(int64(size)) + if r.allocatedBudget.Load() < afterUse { // handle the incorrect scenario + pacerIncorrectBehaviorWarnOnce.Do(func() { + common.AzcopyCurrentJobLogger.Log(common.LogWarning, "This won't cause issues with your job, but the request pacer has observed incorrect behavior, confirming more bytes than allocated. Please file a bug on the AzCopy github repo if you see this.") + }) + + r.allocatedBudget.Store(afterUse) + if r.requestedBudget.Load() < afterUse { + r.requestedBudget.Store(afterUse) + } + } + + if recordBandwidth { // observe the bytes. + r.parent.RecordBytes(size) + } +} + +func (r *request) Discard() { + // mark ourselves discarded, and use all allocations. + r.allocationsFinished.Store(true) + + r.parent.discardRequest(r) +} diff --git a/pacer/wrapped_reader.go b/pacer/wrapped_reader.go new file mode 100644 index 0000000000..40afa772b0 --- /dev/null +++ b/pacer/wrapped_reader.go @@ -0,0 +1,53 @@ +package pacer + +import ( + "fmt" + "io" +) + +type wrappedRSC struct { + seeker io.Seeker + + wrappedRC +} + +func (w *wrappedRSC) Seek(offset int64, whence int) (newLoc int64, err error) { + newLoc, err = w.seeker.Seek(offset, whence) + + if err != nil { + return + } + + w.parentReq.informSeek(newLoc) + return +} + +type wrappedRC struct { + parentReq Request + childReader io.ReadCloser +} + +func (w *wrappedRC) Read(p []byte) (n int, err error) { + if w.parentReq.RemainingReads() <= 0 { + return 0, nil + } + + var allocated int + allocated, err = w.parentReq.requestUse(len(p)) + + if err != nil { + return 0, fmt.Errorf("failed to get allocation to read: %w", err) + } + + p = p[:allocated] + + n, err = w.childReader.Read(p) + + w.parentReq.confirmUse(n, true) + return n, err +} + +func (w *wrappedRC) Close() error { + w.parentReq.Discard() + return w.childReader.Close() +} diff --git a/ste/downloader-azureFiles.go b/ste/downloader-azureFiles.go index 29ae989b24..3fd0f3b0f0 100644 --- a/ste/downloader-azureFiles.go +++ b/ste/downloader-azureFiles.go @@ -25,6 +25,7 @@ import ( "time" "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/file" + "github.com/Azure/azure-storage-azcopy/v10/pacer" "github.com/Azure/azure-storage-azcopy/v10/common" ) @@ -112,7 +113,7 @@ func (bd *azureFilesDownloader) Epilogue() { } // GenerateDownloadFunc returns a chunk-func for file downloads -func (bd *azureFilesDownloader) GenerateDownloadFunc(jptm IJobPartTransferMgr, destWriter common.ChunkedFileWriter, id common.ChunkID, length int64, pacer pacer) chunkFunc { +func (bd *azureFilesDownloader) GenerateDownloadFunc(jptm IJobPartTransferMgr, destWriter common.ChunkedFileWriter, id common.ChunkID, length int64, pacer pacer.Interface) chunkFunc { return createDownloadChunkFunc(jptm, id, func() { // step 1: Downloading the file from range startIndex till (startIndex + adjustedChunkSize) @@ -120,8 +121,15 @@ func (bd *azureFilesDownloader) GenerateDownloadFunc(jptm IJobPartTransferMgr, d // wait until we get the headers back... but we have not yet read its whole body. // The Download method encapsulates any retries that may be necessary to get to the point of receiving response headers. jptm.LogChunkStatus(id, common.EWaitReason.HeaderResponse()) - // TODO : Why no enriched context here? enrichedContext := withRetryNotification(jptm.Context(), bd.filePacer) - get, err := bd.source.DownloadStream(jptm.Context(), &file.DownloadStreamOptions{Range: file.HTTPRange{Offset: id.OffsetInFile(), Count: length}}) + + // inject our pacer so our policy picks it up + pacerCtx, err := pacer.InjectPacer(length, jptm.FromTo(), jptm.Context()) + if err != nil { + jptm.FailActiveDownload("Injecting pacer into context", err) + return + } + + get, err := bd.source.DownloadStream(pacerCtx, &file.DownloadStreamOptions{Range: file.HTTPRange{Offset: id.OffsetInFile(), Count: length}}) if err != nil { jptm.FailActiveDownload("Downloading response body", err) // cancel entire transfer because this chunk has failed return @@ -142,7 +150,8 @@ func (bd *azureFilesDownloader) GenerateDownloadFunc(jptm IJobPartTransferMgr, d OnFailedRead: common.NewFileReadLogFunc(jptm, jptm.Info().Source), }) defer retryReader.Close() - err = destWriter.EnqueueChunk(jptm.Context(), id, length, newPacedResponseBody(jptm.Context(), retryReader, pacer), true) + + err = destWriter.EnqueueChunk(jptm.Context(), id, length, retryReader, true) if err != nil { jptm.FailActiveDownload("Enqueuing chunk", err) return diff --git a/ste/downloader-blob.go b/ste/downloader-blob.go index 093505a181..cc49bf7554 100644 --- a/ste/downloader-blob.go +++ b/ste/downloader-blob.go @@ -27,6 +27,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/pageblob" + "github.com/Azure/azure-storage-azcopy/v10/pacer" "github.com/Azure/azure-storage-azcopy/v10/common" ) @@ -130,7 +131,7 @@ func (bd *blobDownloader) Epilogue() { } // Returns a chunk-func for blob downloads -func (bd *blobDownloader) GenerateDownloadFunc(jptm IJobPartTransferMgr, destWriter common.ChunkedFileWriter, id common.ChunkID, length int64, pacer pacer) chunkFunc { +func (bd *blobDownloader) GenerateDownloadFunc(jptm IJobPartTransferMgr, destWriter common.ChunkedFileWriter, id common.ChunkID, length int64, pacer pacer.Interface) chunkFunc { return createDownloadChunkFunc(jptm, id, func() { // If the range does not contain any data, write out empty data to disk without performing download @@ -174,7 +175,15 @@ func (bd *blobDownloader) GenerateDownloadFunc(jptm IJobPartTransferMgr, destWri // wait until we get the headers back... but we have not yet read its whole body. // The Download method encapsulates any retries that may be necessary to get to the point of receiving response headers. jptm.LogChunkStatus(id, common.EWaitReason.HeaderResponse()) - enrichedContext := withRetryNotification(jptm.Context(), bd.filePacer) + + // inject our pacer so our policy picks it up + pacerCtx, err := pacer.InjectPacer(length, jptm.FromTo(), jptm.Context()) + if err != nil { + jptm.FailActiveDownload("Injecting pacer into context", err) + return + } + + enrichedContext := withRetryNotification(pacerCtx, bd.filePacer) get, err := bd.source.DownloadStream(enrichedContext, &blob.DownloadStreamOptions{ Range: blob.HTTPRange{Offset: id.OffsetInFile(), Count: length}, AccessConditions: accessConditions, @@ -188,13 +197,21 @@ func (bd *blobDownloader) GenerateDownloadFunc(jptm IJobPartTransferMgr, destWri // Enqueue the response body to be written out to disk // The retryReader encapsulates any retries that may be necessary while downloading the body + blobReadLogFunc := common.NewBlobReadLogFunc(jptm, jptm.Info().Source) jptm.LogChunkStatus(id, common.EWaitReason.Body()) retryReader := get.NewRetryReader(enrichedContext, &blob.RetryReaderOptions{ - MaxRetries: int32(destWriter.MaxRetryPerDownloadBody()), - OnFailedRead: common.NewBlobReadLogFunc(jptm, jptm.Info().Source), + MaxRetries: int32(destWriter.MaxRetryPerDownloadBody()), + OnFailedRead: func(failureCount int32, lastError error, rnge blob.HTTPRange, willRetry bool) { + blobReadLogFunc(failureCount, lastError, rnge, willRetry) + + if willRetry { + bd.filePacer.RetryCallback() + } + }, }) + defer retryReader.Close() - err = destWriter.EnqueueChunk(jptm.Context(), id, length, newPacedResponseBody(jptm.Context(), retryReader, pacer), true) + err = destWriter.EnqueueChunk(pacerCtx, id, length, retryReader, true) if err != nil { jptm.FailActiveDownload("Enqueuing chunk", err) return diff --git a/ste/downloader-blobFS.go b/ste/downloader-blobFS.go index bb3e7227c3..cb351e9444 100644 --- a/ste/downloader-blobFS.go +++ b/ste/downloader-blobFS.go @@ -22,16 +22,18 @@ package ste import ( "errors" - "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/file" "os" "time" + + "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/file" "github.com/Azure/azure-storage-azcopy/v10/common" + "github.com/Azure/azure-storage-azcopy/v10/pacer" ) type blobFSDownloader struct { - jptm IJobPartTransferMgr - txInfo *TransferInfo - srcFileClient *file.Client + jptm IJobPartTransferMgr + txInfo *TransferInfo + srcFileClient *file.Client } func newBlobFSDownloader(jptm IJobPartTransferMgr) (downloader, error) { @@ -75,7 +77,7 @@ func (bd *blobFSDownloader) Epilogue() { // Returns a chunk-func for ADLS gen2 downloads -func (bd *blobFSDownloader) GenerateDownloadFunc(jptm IJobPartTransferMgr, destWriter common.ChunkedFileWriter, id common.ChunkID, length int64, pacer pacer) chunkFunc { +func (bd *blobFSDownloader) GenerateDownloadFunc(jptm IJobPartTransferMgr, destWriter common.ChunkedFileWriter, id common.ChunkID, length int64, pacer pacer.Interface) chunkFunc { return createDownloadChunkFunc(jptm, id, func() { srcFileClient := bd.srcFileClient @@ -84,7 +86,15 @@ func (bd *blobFSDownloader) GenerateDownloadFunc(jptm IJobPartTransferMgr, destW // wait until we get the headers back... but we have not yet read its whole body. // The Download method encapsulates any retries that may be necessary to get to the point of receiving response headers. jptm.LogChunkStatus(id, common.EWaitReason.HeaderResponse()) - get, err := srcFileClient.DownloadStream(jptm.Context(), &file.DownloadStreamOptions{Range: &file.HTTPRange{Offset: id.OffsetInFile(), Count: length}}) + + // inject our pacer so our policy picks it up + pacerCtx, err := pacer.InjectPacer(length, jptm.FromTo(), jptm.Context()) + if err != nil { + jptm.FailActiveDownload("Injecting pacer into context", err) + return + } + + get, err := srcFileClient.DownloadStream(pacerCtx, &file.DownloadStreamOptions{Range: &file.HTTPRange{Offset: id.OffsetInFile(), Count: length}}) if err != nil { jptm.FailActiveDownload("Downloading response body", err) // cancel entire transfer because this chunk has failed return @@ -100,12 +110,13 @@ func (bd *blobFSDownloader) GenerateDownloadFunc(jptm IJobPartTransferMgr, destW // step 2: Enqueue the response body to be written out to disk // The retryReader encapsulates any retries that may be necessary while downloading the body jptm.LogChunkStatus(id, common.EWaitReason.Body()) - retryReader := get.NewRetryReader(jptm.Context(), &file.RetryReaderOptions{ - MaxRetries: MaxRetryPerDownloadBody, + retryReader := get.NewRetryReader(pacerCtx, &file.RetryReaderOptions{ + MaxRetries: MaxRetryPerDownloadBody, OnFailedRead: common.NewDatalakeReadLogFunc(jptm, srcFileClient.DFSURL()), }) defer retryReader.Close() - err = destWriter.EnqueueChunk(jptm.Context(), id, length, newPacedResponseBody(jptm.Context(), retryReader, pacer), true) + + err = destWriter.EnqueueChunk(pacerCtx, id, length, retryReader, true) if err != nil { jptm.FailActiveDownload("Enqueuing chunk", err) return @@ -125,4 +136,4 @@ func (bd *blobFSDownloader) CreateSymlink(jptm IJobPartTransferMgr) error { err = os.Symlink(symlinkInfo, jptm.Info().Destination) return err -} \ No newline at end of file +} diff --git a/ste/downloader.go b/ste/downloader.go index 737b52045a..c2a5224b3c 100644 --- a/ste/downloader.go +++ b/ste/downloader.go @@ -24,6 +24,7 @@ import ( "io" "github.com/Azure/azure-storage-azcopy/v10/common" + "github.com/Azure/azure-storage-azcopy/v10/pacer" ) // Abstraction of the methods needed to download files/blobs from a remote location @@ -34,7 +35,7 @@ type downloader interface { // GenerateDownloadFunc returns a func() that will download the specified portion of the remote file into dstFile // Instead of taking destination file as a parameter, it takes a helper that will write to the file. That keeps details of // file IO out out the download func, and lets that func concentrate only on the details of the remote endpoint - GenerateDownloadFunc(jptm IJobPartTransferMgr, writer common.ChunkedFileWriter, id common.ChunkID, length int64, pacer pacer) chunkFunc + GenerateDownloadFunc(jptm IJobPartTransferMgr, writer common.ChunkedFileWriter, id common.ChunkID, length int64, pacer pacer.Interface) chunkFunc // Epilogue does cleanup. MAY be the only method that gets called (in error cases). So must not fail simply because // Prologue has not yet been called diff --git a/ste/mgr-JobMgr.go b/ste/mgr-JobMgr.go index 69a773722c..197e53e4fe 100755 --- a/ste/mgr-JobMgr.go +++ b/ste/mgr-JobMgr.go @@ -31,6 +31,7 @@ import ( "time" "github.com/Azure/azure-storage-azcopy/v10/common" + "github.com/Azure/azure-storage-azcopy/v10/pacer" ) var _ IJobMgr = &jobMgr{} @@ -96,7 +97,7 @@ type IJobMgr interface { func NewJobMgr(concurrency ConcurrencySettings, jobID common.JobID, appCtx context.Context, cpuMon common.CPUMonitor, level common.LogLevel, commandString string, tuner ConcurrencyTuner, - pacer PacerAdmin, slicePool common.ByteSlicePooler, cacheLimiter common.CacheLimiter, fileCountLimiter common.CacheLimiter, + pacer pacer.Interface, slicePool common.ByteSlicePooler, cacheLimiter common.CacheLimiter, fileCountLimiter common.CacheLimiter, jobLogger common.ILoggerResetable, daemonMode bool, jobErrorHandler common.JobErrorHandler) IJobMgr { const channelSize = 100000 // PartsChannelSize defines the number of JobParts which can be placed into the @@ -310,7 +311,7 @@ type jobMgr struct { poolSizingChannels poolSizingChannels concurrencyTuner ConcurrencyTuner cpuMon common.CPUMonitor - pacer PacerAdmin + pacer pacer.Interface slicePool common.ByteSlicePooler cacheLimiter common.CacheLimiter fileCountLimiter common.CacheLimiter diff --git a/ste/mgr-JobPartMgr.go b/ste/mgr-JobPartMgr.go index e0de469717..ca6a34f5dd 100644 --- a/ste/mgr-JobPartMgr.go +++ b/ste/mgr-JobPartMgr.go @@ -16,6 +16,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" azruntime "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" + "github.com/Azure/azure-storage-azcopy/v10/pacer" "github.com/Azure/azure-storage-azcopy/v10/common" ) @@ -103,8 +104,14 @@ func NewClientOptions(retry policy.RetryOptions, telemetry policy.TelemetryOptio // Pipeline will look like // [includeResponsePolicy, newAPIVersionPolicy (ignored), NewTelemetryPolicy, perCall, NewRetryPolicy, perRetry, NewLogPolicy, httpHeaderPolicy, bodyDownloadPolicy] perCallPolicies := []policy.Policy{azruntime.NewRequestIDPolicy(), NewRequestPriorityPolicy(), NewVersionPolicy(), newFileUploadRangeFromURLFixPolicy()} - // TODO : Default logging policy is not equivalent to old one. tracing HTTP request - perRetryPolicies := []policy.Policy{newRetryNotificationPolicy(), newLogPolicy(log), newStatsPolicy()} + perRetryPolicies := []policy.Policy{ + newRetryNotificationPolicy(), + // pacer inject policy is added here, as log policy and stats policy have actions that care about the lifetime of the request. + pacer.NewPacerInjectPolicy(), + // TODO : Default logging policy is not equivalent to old one. tracing HTTP request + newLogPolicy(log), + newStatsPolicy(), + } if dstCred != nil { perCallPolicies = append(perRetryPolicies, NewDestReauthPolicy(dstCred)) } @@ -187,7 +194,7 @@ type jobPartMgr struct { priority common.JobPriority - pacer pacer // Pacer is used to cap throughput + pacer pacer.Interface // Pacer is used to cap throughput slicePool common.ByteSlicePooler diff --git a/ste/pacedReadSeeker.go b/ste/pacedReadSeeker.go deleted file mode 100644 index 37d13ae65d..0000000000 --- a/ste/pacedReadSeeker.go +++ /dev/null @@ -1,85 +0,0 @@ -// Copyright © 2017 Microsoft -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package ste - -import ( - "context" - "io" -) - -// pacedReadSeeker implements read/seek/close with pacing. (Formerly in file pacer-lite) -type pacedReadSeeker struct { - - // Although storing ctx in a struct is generally considered an anti-patten, this particular - // struct happens to be fairly-short lived (from its users point of view). Basically just for - // as long as a takes to read or send a request body. That's not terribly long, but it is - // long enough that we might need to cancel during it, hence the ctx. - ctx context.Context - - body io.Reader // Seeking is required to support retries - p pacer -} - -func newPacedRequestBody(ctx context.Context, requestBody io.ReadSeeker, p pacer) io.ReadSeekCloser { - if p == nil { - panic("p must not be nil") - } - return &pacedReadSeeker{ctx: ctx, body: requestBody, p: p} -} - -func newPacedResponseBody(ctx context.Context, responseBody io.ReadCloser, p pacer) io.ReadCloser { - if p == nil { - panic("p must not be nil") - } - return &pacedReadSeeker{ctx: ctx, body: responseBody, p: p} -} - -func (prs *pacedReadSeeker) Read(p []byte) (int, error) { - requestedCount := len(p) - - // blocks until we are allowed to process the bytes - err := prs.p.RequestTrafficAllocation(prs.ctx, int64(requestedCount)) - if err != nil { - return 0, err - } - - // process them - n, err := prs.body.Read(p) - - // "return" any unused tokens to the pacer (e.g. if we hit eof before the end of our buffer p) - excess := requestedCount - n - prs.p.UndoRequest(int64(excess)) - - return n, err -} - -// Seeking is required to support retries -func (prs *pacedReadSeeker) Seek(offset int64, whence int) (offsetFromStart int64, err error) { - return prs.body.(io.ReadSeeker).Seek(offset, whence) -} - -// pacedReadSeeker supports Close but the underlying stream may not; if it does, Close will close it. -func (prs *pacedReadSeeker) Close() error { - if c, ok := prs.body.(io.Closer); ok { - return c.Close() - } - return nil -} diff --git a/ste/pacer-autoPacer.go b/ste/pacer-autoPacer.go index d0c990963d..595a6f3842 100644 --- a/ste/pacer-autoPacer.go +++ b/ste/pacer-autoPacer.go @@ -21,6 +21,7 @@ package ste import ( + "context" "fmt" "strings" "sync" @@ -30,8 +31,25 @@ import ( "github.com/Azure/azure-storage-azcopy/v10/common" ) +// autopacer is a request pacer that automatically tunes based upon the frequency of retries. +// autopacer paces at the chunk-level, NOT at the read level, and NOT at the +// If you don't know exactly what you want and why it's this, take a look at pacer.Interface instead. +// autopacer remains in a state in which the pacer has been merged into the interface, as a way to ease transition to +// pacer.Interface by avoiding fixing what isn't broken. type autopacer interface { - pacer + // RequestTrafficAllocation blocks until the caller is allowed to process byteCount bytes. + RequestTrafficAllocation(ctx context.Context, byteCount int64) error + + UpdateTargetBytesPerSecond(newTarget int64) + + // UndoRequest reverses a previous request to process n bytes. Is used when + // the caller did not need all of the allocation they previously requested + // e.g. when they asked for enough for a big buffer, but never filled it, they would + // call this method to return the unused portion. + UndoRequest(byteCount int64) + + Close() error + retryNotificationReceiver } diff --git a/ste/pacer-tokenBucketPacer.go b/ste/pacer-tokenBucketPacer.go index 63b7c50507..3a72bf85de 100644 --- a/ste/pacer-tokenBucketPacer.go +++ b/ste/pacer-tokenBucketPacer.go @@ -29,29 +29,12 @@ import ( "github.com/Azure/azure-storage-azcopy/v10/common" ) -// pacer is used by callers whose activity must be controlled to a certain pace -type pacer interface { - - // RequestTrafficAllocation blocks until the caller is allowed to process byteCount bytes. - RequestTrafficAllocation(ctx context.Context, byteCount int64) error - - UpdateTargetBytesPerSecond(newTarget int64) - - // UndoRequest reverses a previous request to process n bytes. Is used when - // the caller did not need all of the allocation they previously requested - // e.g. when they asked for enough for a big buffer, but never filled it, they would - // call this method to return the unused portion. - UndoRequest(byteCount int64) - - Close() error -} - -type PacerAdmin interface { - pacer - - // GetTotalTraffic returns the cumulative count of all traffic that has been processed - GetTotalTraffic() int64 -} +//type PacerAdmin interface { +// pacer +// +// // GetTotalTraffic returns the cumulative count of all traffic that has been processed +// GetTotalTraffic() int64 +//} const ( // How long to sleep in the loop that puts tokens into the bucket diff --git a/ste/s2sCopier-URLToBlob.go b/ste/s2sCopier-URLToBlob.go index 1faeaa80b7..b06049d56b 100644 --- a/ste/s2sCopier-URLToBlob.go +++ b/ste/s2sCopier-URLToBlob.go @@ -22,7 +22,10 @@ package ste import ( "fmt" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" + "github.com/Azure/azure-storage-azcopy/v10/pacer" + "net/url" "strings" "sync" @@ -33,7 +36,7 @@ import ( var LogBlobConversionOnce = &sync.Once{} // Creates the right kind of URL to blob copier, based on the blob type of the source -func newURLToBlobCopier(jptm IJobPartTransferMgr, destination string, pacer pacer, sip ISourceInfoProvider) (sender, error) { +func newURLToBlobCopier(jptm IJobPartTransferMgr, destination string, pacer pacer.Interface, sip ISourceInfoProvider) (sender, error) { srcInfoProvider := sip.(IRemoteSourceInfoProvider) // "downcast" to the type we know it really has // If our destination is a dfs endpoint, make an attempt to cast it to the blob endpoint diff --git a/ste/sender-appendBlob.go b/ste/sender-appendBlob.go index 768063668f..3681622967 100644 --- a/ste/sender-appendBlob.go +++ b/ste/sender-appendBlob.go @@ -32,6 +32,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/appendblob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" + "github.com/Azure/azure-storage-azcopy/v10/pacer" "golang.org/x/sync/semaphore" @@ -43,7 +44,7 @@ type appendBlobSenderBase struct { destAppendBlobClient *appendblob.Client chunkSize int64 numChunks uint32 - pacer pacer + pacer pacer.Interface // Headers and other info that we will apply to the destination // object. For S2S, these come from the source service. // When sending local data, they are computed based on @@ -59,7 +60,7 @@ type appendBlobSenderBase struct { type appendBlockFunc = func() -func newAppendBlobSenderBase(jptm IJobPartTransferMgr, destination string, pacer pacer, srcInfoProvider ISourceInfoProvider) (*appendBlobSenderBase, error) { +func newAppendBlobSenderBase(jptm IJobPartTransferMgr, destination string, pacer pacer.Interface, srcInfoProvider ISourceInfoProvider) (*appendBlobSenderBase, error) { transferInfo := jptm.Info() // compute chunk count diff --git a/ste/sender-appendBlobFromLocal.go b/ste/sender-appendBlobFromLocal.go index 6e0cddd333..4fb4ebf188 100644 --- a/ste/sender-appendBlobFromLocal.go +++ b/ste/sender-appendBlobFromLocal.go @@ -23,6 +23,7 @@ package ste import ( "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/appendblob" "github.com/Azure/azure-storage-azcopy/v10/common" + "github.com/Azure/azure-storage-azcopy/v10/pacer" ) type appendBlobUploader struct { @@ -50,7 +51,7 @@ func (u *appendBlobUploader) Prologue(ps common.PrologueState) (destinationModif return u.appendBlobSenderBase.Prologue(ps) } -func newAppendBlobUploader(jptm IJobPartTransferMgr, destination string, pacer pacer, sip ISourceInfoProvider) (sender, error) { +func newAppendBlobUploader(jptm IJobPartTransferMgr, destination string, pacer pacer.Interface, sip ISourceInfoProvider) (sender, error) { senderBase, err := newAppendBlobSenderBase(jptm, destination, pacer, sip) if err != nil { return nil, err @@ -66,11 +67,17 @@ func (u *appendBlobUploader) Md5Channel() chan<- []byte { func (u *appendBlobUploader) GenerateUploadFunc(id common.ChunkID, blockIndex int32, reader common.SingleChunkReader, chunkIsWholeFile bool) chunkFunc { appendBlockFromLocal := func() { u.jptm.LogChunkStatus(id, common.EWaitReason.Body()) - body := newPacedRequestBody(u.jptm.Context(), reader, u.pacer) offset := id.OffsetInFile() var timeoutFromCtx bool ctx := withTimeoutNotification(u.jptm.Context(), &timeoutFromCtx) - _, err := u.destAppendBlobClient.AppendBlock(ctx, body, + // inject our pacer so our policy picks it up + pacerCtx, err := u.pacer.InjectPacer(reader.Length(), u.jptm.FromTo(), ctx) + if err != nil { + u.jptm.FailActiveDownload("Injecting pacer into context", err) + return + } + + _, err = u.destAppendBlobClient.AppendBlock(pacerCtx, reader, &appendblob.AppendBlockOptions{ AppendPositionAccessConditions: &appendblob.AppendPositionAccessConditions{AppendPosition: &offset}, CPKInfo: u.jptm.CpkInfo(), diff --git a/ste/sender-appendBlobFromURL.go b/ste/sender-appendBlobFromURL.go index 3534d1cd31..b5d094339a 100644 --- a/ste/sender-appendBlobFromURL.go +++ b/ste/sender-appendBlobFromURL.go @@ -25,6 +25,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/file" "github.com/Azure/azure-storage-azcopy/v10/common" + "github.com/Azure/azure-storage-azcopy/v10/pacer" ) type urlToAppendBlobCopier struct { @@ -34,7 +35,7 @@ type urlToAppendBlobCopier struct { addFileRequestIntent bool } -func newURLToAppendBlobCopier(jptm IJobPartTransferMgr, destination string, pacer pacer, srcInfoProvider IRemoteSourceInfoProvider) (s2sCopier, error) { +func newURLToAppendBlobCopier(jptm IJobPartTransferMgr, destination string, pacer pacer.Interface, srcInfoProvider IRemoteSourceInfoProvider) (s2sCopier, error) { senderBase, err := newAppendBlobSenderBase(jptm, destination, pacer, srcInfoProvider) if err != nil { return nil, err @@ -61,10 +62,8 @@ func newURLToAppendBlobCopier(jptm IJobPartTransferMgr, destination string, pace func (c *urlToAppendBlobCopier) GenerateCopyFunc(id common.ChunkID, blockIndex int32, adjustedChunkSize int64, chunkIsWholeFile bool) chunkFunc { appendBlockFromURL := func() { c.jptm.LogChunkStatus(id, common.EWaitReason.S2SCopyOnWire()) - - if err := c.pacer.RequestTrafficAllocation(c.jptm.Context(), adjustedChunkSize); err != nil { - c.jptm.FailActiveUpload("Pacing block", err) - } + + <-c.pacer.InitiateUnpaceable(adjustedChunkSize, c.jptm.Context()) offset := id.OffsetInFile() token, err := c.jptm.GetS2SSourceTokenCredential(c.jptm.Context()) if err != nil { diff --git a/ste/sender-azureFile.go b/ste/sender-azureFile.go index 672d6f8b49..7e0a3c6d31 100644 --- a/ste/sender-azureFile.go +++ b/ste/sender-azureFile.go @@ -35,6 +35,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/fileerror" filesas "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/sas" "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/share" + "github.com/Azure/azure-storage-azcopy/v10/pacer" "github.com/Azure/azure-storage-azcopy/v10/common" ) @@ -57,7 +58,7 @@ type azureFileSenderBase struct { shareClient *share.Client chunkSize int64 numChunks uint32 - pacer pacer + pacer pacer.Interface ctx context.Context sip ISourceInfoProvider // Headers and other info that we will apply to the destination @@ -79,7 +80,7 @@ type NFSProperties struct { FileMode *string } -func newAzureFileSenderBase(jptm IJobPartTransferMgr, destination string, pacer pacer, sip ISourceInfoProvider) (*azureFileSenderBase, error) { +func newAzureFileSenderBase(jptm IJobPartTransferMgr, destination string, pacer pacer.Interface, sip ISourceInfoProvider) (*azureFileSenderBase, error) { info := jptm.Info() // compute chunk size (irrelevant but harmless for folders) diff --git a/ste/sender-azureFileFromLocal.go b/ste/sender-azureFileFromLocal.go index 7e071b379a..28cb2f16e0 100644 --- a/ste/sender-azureFileFromLocal.go +++ b/ste/sender-azureFileFromLocal.go @@ -28,6 +28,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/fileerror" "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/share" "github.com/Azure/azure-storage-azcopy/v10/common" + "github.com/Azure/azure-storage-azcopy/v10/pacer" ) type azureFileUploader struct { @@ -35,7 +36,7 @@ type azureFileUploader struct { md5Channel chan []byte } -func newAzureFilesUploader(jptm IJobPartTransferMgr, destination string, pacer pacer, sip ISourceInfoProvider) (sender, error) { +func newAzureFilesUploader(jptm IJobPartTransferMgr, destination string, pacer pacer.Interface, sip ISourceInfoProvider) (sender, error) { senderBase, err := newAzureFileSenderBase(jptm, destination, pacer, sip) if err != nil { return nil, err @@ -70,8 +71,14 @@ func (u *azureFileUploader) GenerateUploadFunc(id common.ChunkID, blockIndex int // upload the byte range represented by this chunk jptm.LogChunkStatus(id, common.EWaitReason.Body()) - body := newPacedRequestBody(u.ctx, reader, u.pacer) - _, err := u.getFileClient().UploadRange(u.ctx, id.OffsetInFile(), body, nil) + // inject our pacer so our policy picks it up + pacerCtx, err := u.pacer.InjectPacer(reader.Length(), u.jptm.FromTo(), u.ctx) + if err != nil { + u.jptm.FailActiveDownload("Injecting pacer into context", err) + return + } + + _, err = u.getFileClient().UploadRange(pacerCtx, id.OffsetInFile(), reader, nil) if err != nil { jptm.FailActiveUpload("Uploading range", err) return @@ -161,7 +168,7 @@ func DoWithCreateSymlinkOnAzureFilesNFS( action func() error, client *file.Client, shareClient *share.Client, - pacer pacer, + pacer pacer.Interface, jptm IJobPartTransferMgr, ) error { // try the action diff --git a/ste/sender-azureFileFromURL.go b/ste/sender-azureFileFromURL.go index 602bd8f6b9..217da5073a 100644 --- a/ste/sender-azureFileFromURL.go +++ b/ste/sender-azureFileFromURL.go @@ -22,9 +22,12 @@ package ste import ( "context" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/file" "github.com/Azure/azure-storage-azcopy/v10/common" + "github.com/Azure/azure-storage-azcopy/v10/pacer" + "net/http" ) @@ -33,7 +36,7 @@ type urlToAzureFileCopier struct { srcURL string } -func newURLToAzureFileCopier(jptm IJobPartTransferMgr, destination string, pacer pacer, sip ISourceInfoProvider) (sender, error) { +func newURLToAzureFileCopier(jptm IJobPartTransferMgr, destination string, pacer pacer.Interface, sip ISourceInfoProvider) (sender, error) { srcInfoProvider := sip.(IRemoteSourceInfoProvider) // "downcast" to the type we know it really has senderBase, err := newAzureFileSenderBase(jptm, destination, pacer, sip) @@ -62,9 +65,7 @@ func (u *urlToAzureFileCopier) GenerateCopyFunc(id common.ChunkID, blockIndex in // upload the range (including application of global pacing. We don't have a separate wait reason for global pacing // so just do it inside the S2SCopyOnWire state) u.jptm.LogChunkStatus(id, common.EWaitReason.S2SCopyOnWire()) - if err := u.pacer.RequestTrafficAllocation(u.jptm.Context(), adjustedChunkSize); err != nil { - u.jptm.FailActiveUpload("Pacing block (global level)", err) - } + <-u.pacer.InitiateUnpaceable(adjustedChunkSize, u.jptm.Context()) // destination auth is OAuth, so we need to use the special policy to add the x-ms-file-request-intent header since the SDK has not yet implemented it. token, err := u.jptm.GetS2SSourceTokenCredential(u.jptm.Context()) if err != nil { diff --git a/ste/sender-blobFS.go b/ste/sender-blobFS.go index 6e850393a0..630ea9b43c 100644 --- a/ste/sender-blobFS.go +++ b/ste/sender-blobFS.go @@ -27,6 +27,7 @@ import ( "time" datalakesas "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/sas" + "github.com/Azure/azure-storage-azcopy/v10/pacer" "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming" @@ -54,13 +55,13 @@ type blobFSSenderBase struct { parentDirClient *directory.Client chunkSize int64 numChunks uint32 - pacer pacer + pacer pacer.Interface creationTimeHeaders *file.HTTPHeaders flushThreshold int64 metadataToSet common.Metadata } -func newBlobFSSenderBase(jptm IJobPartTransferMgr, destination string, pacer pacer, sip ISourceInfoProvider) (*blobFSSenderBase, error) { +func newBlobFSSenderBase(jptm IJobPartTransferMgr, destination string, pacer pacer.Interface, sip ISourceInfoProvider) (*blobFSSenderBase, error) { info := jptm.Info() // compute chunk size and number of chunks diff --git a/ste/sender-blobFSFromLocal.go b/ste/sender-blobFSFromLocal.go index 3c6e936db0..5c42f01dfc 100644 --- a/ste/sender-blobFSFromLocal.go +++ b/ste/sender-blobFSFromLocal.go @@ -24,6 +24,8 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/file" "github.com/Azure/azure-storage-azcopy/v10/common" + "github.com/Azure/azure-storage-azcopy/v10/pacer" + "math" ) @@ -32,7 +34,7 @@ type blobFSUploader struct { md5Channel chan []byte } -func newBlobFSUploader(jptm IJobPartTransferMgr, destination string, pacer pacer, sip ISourceInfoProvider) (sender, error) { +func newBlobFSUploader(jptm IJobPartTransferMgr, destination string, pacer pacer.Interface, sip ISourceInfoProvider) (sender, error) { senderBase, err := newBlobFSSenderBase(jptm, destination, pacer, sip) if err != nil { return nil, err @@ -58,8 +60,15 @@ func (u *blobFSUploader) GenerateUploadFunc(id common.ChunkID, blockIndex int32, // upload the byte range represented by this chunk jptm.LogChunkStatus(id, common.EWaitReason.Body()) - body := newPacedRequestBody(jptm.Context(), reader, u.pacer) - _, err := u.getFileClient().AppendData(jptm.Context(), id.OffsetInFile(), body, nil) // note: AppendData is really UpdatePath with "append" action + + // inject our pacer so our policy picks it up + pacerCtx, err := u.pacer.InjectPacer(reader.Length(), u.jptm.FromTo(), u.jptm.Context()) + if err != nil { + u.jptm.FailActiveDownload("Injecting pacer into context", err) + return + } + + _, err = u.getFileClient().AppendData(pacerCtx, id.OffsetInFile(), reader, nil) // note: AppendData is really UpdatePath with "append" action if err != nil { jptm.FailActiveUpload("Uploading range", err) return diff --git a/ste/sender-blockBlob.go b/ste/sender-blockBlob.go index c0f7a5a48c..3eea55c62c 100644 --- a/ste/sender-blockBlob.go +++ b/ste/sender-blockBlob.go @@ -25,7 +25,10 @@ import ( "encoding/base64" "errors" "fmt" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" + "github.com/Azure/azure-storage-azcopy/v10/pacer" + "strconv" "strings" "sync" @@ -48,7 +51,7 @@ type blockBlobSenderBase struct { destBlockBlobClient *blockblob.Client chunkSize int64 numChunks uint32 - pacer pacer + pacer pacer.Interface blockIDs []string destBlobTier *blob.AccessTier @@ -152,7 +155,7 @@ func getBlockNamePrefix(jobID common.JobID, partNum uint32, transferIndex uint32 return fmt.Sprintf("%s%s%05d%05d", placeHolderPrefix, jobIdStr, partNum, transferIndex) } -func newBlockBlobSenderBase(jptm IJobPartTransferMgr, pacer pacer, srcInfoProvider ISourceInfoProvider, inferredAccessTierType *blob.AccessTier) (*blockBlobSenderBase, error) { +func newBlockBlobSenderBase(jptm IJobPartTransferMgr, pacer pacer.Interface, srcInfoProvider ISourceInfoProvider, inferredAccessTierType *blob.AccessTier) (*blockBlobSenderBase, error) { // compute chunk count chunkSize, numChunks, err := getVerifiedChunkParams(jptm.Info(), jptm.CacheLimiter().Limit(), jptm.CacheLimiter().StrictLimit()) if err != nil { diff --git a/ste/sender-blockBlobFromLocal.go b/ste/sender-blockBlobFromLocal.go index dec0f6bdd6..c4c8f1e7ae 100644 --- a/ste/sender-blockBlobFromLocal.go +++ b/ste/sender-blockBlobFromLocal.go @@ -23,8 +23,11 @@ package ste import ( "bytes" "fmt" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob" + "github.com/Azure/azure-storage-azcopy/v10/pacer" + "sync/atomic" "github.com/Azure/azure-storage-azcopy/v10/common" @@ -36,7 +39,7 @@ type blockBlobUploader struct { md5Channel chan []byte } -func newBlockBlobUploader(jptm IJobPartTransferMgr, pacer pacer, sip ISourceInfoProvider) (sender, error) { +func newBlockBlobUploader(jptm IJobPartTransferMgr, pacer pacer.Interface, sip ISourceInfoProvider) (sender, error) { senderBase, err := newBlockBlobSenderBase(jptm, pacer, sip, nil) if err != nil { return nil, err @@ -100,8 +103,15 @@ func (u *blockBlobUploader) generatePutBlock(id common.ChunkID, blockIndex int32 // step 3: put block to remote u.jptm.LogChunkStatus(id, common.EWaitReason.Body()) - body := newPacedRequestBody(u.jptm.Context(), reader, u.pacer) - _, err := u.destBlockBlobClient.StageBlock(u.jptm.Context(), encodedBlockID, body, + + // inject our pacer so our policy picks it up + pacerCtx, err := u.pacer.InjectPacer(reader.Length(), u.jptm.FromTo(), u.jptm.Context()) + if err != nil { + u.jptm.FailActiveDownload("Injecting pacer into context", err) + return + } + + _, err = u.destBlockBlobClient.StageBlock(pacerCtx, encodedBlockID, reader, &blockblob.StageBlockOptions{ CPKInfo: u.jptm.CpkInfo(), CPKScopeInfo: u.jptm.CpkScopeInfo(), @@ -163,9 +173,15 @@ func (u *blockBlobUploader) generatePutWholeBlob(id common.ChunkID, reader commo u.headersToApply.BlobContentMD5 = md5Hash } + // inject our pacer so our policy picks it up + pacerCtx, err := u.pacer.InjectPacer(reader.Length(), u.jptm.FromTo(), jptm.Context()) + if err != nil { + u.jptm.FailActiveDownload("Injecting pacer into context", err) + return + } + // Upload the file - body := newPacedRequestBody(jptm.Context(), reader, u.pacer) - _, err = u.destBlockBlobClient.Upload(jptm.Context(), body, + _, err = u.destBlockBlobClient.Upload(pacerCtx, reader, &blockblob.UploadOptions{ HTTPHeaders: &u.headersToApply, Metadata: u.metadataToApply, diff --git a/ste/sender-blockBlobFromURL.go b/ste/sender-blockBlobFromURL.go index f22014771d..d018a52225 100644 --- a/ste/sender-blockBlobFromURL.go +++ b/ste/sender-blockBlobFromURL.go @@ -22,9 +22,12 @@ package ste import ( "fmt" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/file" + "github.com/Azure/azure-storage-azcopy/v10/pacer" + "sync/atomic" "github.com/Azure/azure-storage-azcopy/v10/common" @@ -38,7 +41,7 @@ type urlToBlockBlobCopier struct { addFileRequestIntent bool // Necessary for FileBlob Oauth copies } -func newURLToBlockBlobCopier(jptm IJobPartTransferMgr, pacer pacer, srcInfoProvider IRemoteSourceInfoProvider) (s2sCopier, error) { +func newURLToBlockBlobCopier(jptm IJobPartTransferMgr, pacer pacer.Interface, srcInfoProvider IRemoteSourceInfoProvider) (s2sCopier, error) { // Get blob tier, by default set none. var destBlobTier *blob.AccessTier // If the source is block blob, preserve source's blob tier. @@ -113,9 +116,8 @@ func (c *urlToBlockBlobCopier) generatePutBlockFromURL(id common.ChunkID, blockI // step 3: put block to remote c.jptm.LogChunkStatus(id, common.EWaitReason.S2SCopyOnWire()) - if err := c.pacer.RequestTrafficAllocation(c.jptm.Context(), adjustedChunkSize); err != nil { - c.jptm.FailActiveUpload("Pacing block", err) - } + <-c.pacer.InitiateUnpaceable(adjustedChunkSize, c.jptm.Context()) // await our bytes being allocated + token, err := c.jptm.GetS2SSourceTokenCredential(c.jptm.Context()) if err != nil { c.jptm.FailActiveS2SCopy("Getting source token credential", err) @@ -167,9 +169,8 @@ func (c *urlToBlockBlobCopier) generateStartPutBlobFromURL(id common.ChunkID, bl destBlobTier = nil } - if err := c.pacer.RequestTrafficAllocation(c.jptm.Context(), adjustedChunkSize); err != nil { - c.jptm.FailActiveUpload("Pacing block", err) - } + <-c.pacer.InitiateUnpaceable(adjustedChunkSize, c.jptm.Context()) // await our bytes being allocated + token, err := c.jptm.GetS2SSourceTokenCredential(c.jptm.Context()) if err != nil { c.jptm.FailActiveS2SCopy("Getting source token credential", err) diff --git a/ste/sender-pageBlob.go b/ste/sender-pageBlob.go index ed6f756c42..d435111681 100644 --- a/ste/sender-pageBlob.go +++ b/ste/sender-pageBlob.go @@ -32,6 +32,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/pageblob" + "github.com/Azure/azure-storage-azcopy/v10/pacer" "github.com/Azure/azure-storage-azcopy/v10/common" ) @@ -42,7 +43,7 @@ type pageBlobSenderBase struct { srcSize int64 chunkSize int64 numChunks uint32 - pacer pacer + pacer pacer.Interface // Headers and other info that we will apply to the destination // object. For S2S, these come from the source service. @@ -79,7 +80,7 @@ var ( md5NotSupportedInManagedDiskError = errors.New("the Content-MD5 hash is not supported for managed disk uploads") ) -func newPageBlobSenderBase(jptm IJobPartTransferMgr, destination string, pacer pacer, srcInfoProvider ISourceInfoProvider, inferredAccessTierType *blob.AccessTier) (*pageBlobSenderBase, error) { +func newPageBlobSenderBase(jptm IJobPartTransferMgr, destination string, pacer pacer.Interface, srcInfoProvider ISourceInfoProvider, inferredAccessTierType *blob.AccessTier) (*pageBlobSenderBase, error) { transferInfo := jptm.Info() // compute chunk count diff --git a/ste/sender-pageBlobFromLocal.go b/ste/sender-pageBlobFromLocal.go index fb08f0339a..da142d333f 100644 --- a/ste/sender-pageBlobFromLocal.go +++ b/ste/sender-pageBlobFromLocal.go @@ -22,9 +22,11 @@ package ste import ( "fmt" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/pageblob" + "github.com/Azure/azure-storage-azcopy/v10/pacer" "github.com/Azure/azure-storage-azcopy/v10/common" ) @@ -36,7 +38,7 @@ type pageBlobUploader struct { sip ISourceInfoProvider } -func newPageBlobUploader(jptm IJobPartTransferMgr, destination string, pacer pacer, sip ISourceInfoProvider) (sender, error) { +func newPageBlobUploader(jptm IJobPartTransferMgr, destination string, pacer pacer.Interface, sip ISourceInfoProvider) (sender, error) { senderBase, err := newPageBlobSenderBase(jptm, destination, pacer, sip, nil) if err != nil { return nil, err @@ -111,9 +113,16 @@ func (u *pageBlobUploader) GenerateUploadFunc(id common.ChunkID, blockIndex int3 // send it jptm.LogChunkStatus(id, common.EWaitReason.Body()) - body := newPacedRequestBody(jptm.Context(), reader, u.pacer) enrichedContext := withRetryNotification(jptm.Context(), u.filePacer) - _, err := u.destPageBlobClient.UploadPages(enrichedContext, body, blob.HTTPRange{Offset: id.OffsetInFile(), Count: reader.Length()}, + + // inject our pacer so our policy picks it up + pacerCtx, err := u.pacer.InjectPacer(reader.Length(), u.jptm.FromTo(), enrichedContext) + if err != nil { + u.jptm.FailActiveDownload("Injecting pacer into context", err) + return + } + + _, err = u.destPageBlobClient.UploadPages(pacerCtx, reader, blob.HTTPRange{Offset: id.OffsetInFile(), Count: reader.Length()}, &pageblob.UploadPagesOptions{ CPKInfo: u.jptm.CpkInfo(), CPKScopeInfo: u.jptm.CpkScopeInfo(), diff --git a/ste/sender-pageBlobFromURL.go b/ste/sender-pageBlobFromURL.go index 4f9ae0a84b..6e84998f6f 100644 --- a/ste/sender-pageBlobFromURL.go +++ b/ste/sender-pageBlobFromURL.go @@ -25,6 +25,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/pageblob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/file" + "github.com/Azure/azure-storage-azcopy/v10/pacer" "github.com/Azure/azure-storage-azcopy/v10/common" ) @@ -37,7 +38,7 @@ type urlToPageBlobCopier struct { addFileRequestIntent bool } -func newURLToPageBlobCopier(jptm IJobPartTransferMgr, destination string, pacer pacer, srcInfoProvider IRemoteSourceInfoProvider) (s2sCopier, error) { +func newURLToPageBlobCopier(jptm IJobPartTransferMgr, destination string, pacer pacer.Interface, srcInfoProvider IRemoteSourceInfoProvider) (s2sCopier, error) { srcURL, err := srcInfoProvider.PreSignedSourceURL() if err != nil { return nil, err @@ -146,9 +147,7 @@ func (c *urlToPageBlobCopier) GenerateCopyFunc(id common.ChunkID, blockIndex int // upload the page (including application of global pacing. We don't have a separate wait reason for global pacing // so just do it inside the S2SCopyOnWire state) c.jptm.LogChunkStatus(id, common.EWaitReason.S2SCopyOnWire()) - if err := c.pacer.RequestTrafficAllocation(c.jptm.Context(), adjustedChunkSize); err != nil { - c.jptm.FailActiveUpload("Pacing block (global level)", err) - } + <-c.pacer.InitiateUnpaceable(adjustedChunkSize, c.jptm.Context()) token, err := c.jptm.GetS2SSourceTokenCredential(c.jptm.Context()) if err != nil { c.jptm.FailActiveS2SCopy("Getting source token credential", err) diff --git a/ste/sender.go b/ste/sender.go index e6563e7902..6e4e4ca08b 100644 --- a/ste/sender.go +++ b/ste/sender.go @@ -25,6 +25,7 @@ import ( "time" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" + "github.com/Azure/azure-storage-azcopy/v10/pacer" "github.com/Azure/azure-storage-azcopy/v10/common" ) @@ -109,7 +110,7 @@ type symlinkSender interface { RemoteFileExists() (bool, time.Time, error) } -type senderFactory func(jptm IJobPartTransferMgr, destination string, pacer pacer, sip ISourceInfoProvider) (sender, error) +type senderFactory func(jptm IJobPartTransferMgr, destination string, pacer pacer.Interface, sip ISourceInfoProvider) (sender, error) ///////////////////////////////////////////////////////////////////////////////////////////////// // For copying folder properties, many of the ISender of the methods needed to copy one file from URL to a remote location @@ -213,7 +214,7 @@ func createChunkFunc(setDoneStatusOnExit bool, jptm IJobPartTransferMgr, id comm } // newBlobUploader detects blob type and creates a uploader manually -func newBlobUploader(jptm IJobPartTransferMgr, destination string, pacer pacer, sip ISourceInfoProvider) (sender, error) { +func newBlobUploader(jptm IJobPartTransferMgr, destination string, pacer pacer.Interface, sip ISourceInfoProvider) (sender, error) { override := jptm.BlobTypeOverride() intendedType := override.ToBlobType() diff --git a/ste/xfer-anyToRemote-file.go b/ste/xfer-anyToRemote-file.go index f701aa407c..dea2c756ff 100644 --- a/ste/xfer-anyToRemote-file.go +++ b/ste/xfer-anyToRemote-file.go @@ -34,6 +34,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" + "github.com/Azure/azure-storage-azcopy/v10/pacer" "github.com/Azure/azure-storage-azcopy/v10/common" ) @@ -164,7 +165,7 @@ func ValidateTier(jptm IJobPartTransferMgr, blobTier *blob.AccessTier, client IB // xfer.go requires just a single xfer function for the whole job. // This routine serves that role for uploads and S2S copies, and redirects for each transfer to a file or folder implementation -func anyToRemote(jptm IJobPartTransferMgr, pacer pacer, senderFactory senderFactory, sipf sourceInfoProviderFactory) { +func anyToRemote(jptm IJobPartTransferMgr, pacer pacer.Interface, senderFactory senderFactory, sipf sourceInfoProviderFactory) { info := jptm.Info() fromTo := jptm.FromTo() @@ -210,7 +211,7 @@ func anyToRemote(jptm IJobPartTransferMgr, pacer pacer, senderFactory senderFact } // anyToRemote_file handles all kinds of sender operations for files - both uploads from local files, and S2S copies -func anyToRemote_file(jptm IJobPartTransferMgr, info *TransferInfo, pacer pacer, senderFactory senderFactory, sipf sourceInfoProviderFactory) { +func anyToRemote_file(jptm IJobPartTransferMgr, info *TransferInfo, pacer pacer.Interface, senderFactory senderFactory, sipf sourceInfoProviderFactory) { pseudoId := common.NewPseudoChunkIDForWholeFile(info.Source) jptm.LogChunkStatus(pseudoId, common.EWaitReason.XferStart()) diff --git a/ste/xfer-anyToRemote-fileProperties.go b/ste/xfer-anyToRemote-fileProperties.go index e12f0d8a73..9490101f94 100644 --- a/ste/xfer-anyToRemote-fileProperties.go +++ b/ste/xfer-anyToRemote-fileProperties.go @@ -22,10 +22,11 @@ package ste import ( "github.com/Azure/azure-storage-azcopy/v10/common" + "github.com/Azure/azure-storage-azcopy/v10/pacer" ) // anyToRemote_folder handles all kinds of sender operations for FOLDERS - both uploads from local files, and S2S copies -func anyToRemote_fileProperties(jptm IJobPartTransferMgr, info *TransferInfo, pacer pacer, senderFactory senderFactory, sipf sourceInfoProviderFactory) { +func anyToRemote_fileProperties(jptm IJobPartTransferMgr, info *TransferInfo, pacer pacer.Interface, senderFactory senderFactory, sipf sourceInfoProviderFactory) { // schedule the work as a chunk, so it will run on the main goroutine pool, instead of the // smaller "transfer initiation pool", where this code runs. id := common.NewChunkID(jptm.Info().Source, 0, 0) diff --git a/ste/xfer-anyToRemote-folder.go b/ste/xfer-anyToRemote-folder.go index afa20dcdc4..ab6dcb69dc 100644 --- a/ste/xfer-anyToRemote-folder.go +++ b/ste/xfer-anyToRemote-folder.go @@ -22,10 +22,11 @@ package ste import ( "github.com/Azure/azure-storage-azcopy/v10/common" + "github.com/Azure/azure-storage-azcopy/v10/pacer" ) // anyToRemote_folder handles all kinds of sender operations for FOLDERS - both uploads from local files, and S2S copies -func anyToRemote_folder(jptm IJobPartTransferMgr, info *TransferInfo, pacer pacer, senderFactory senderFactory, sipf sourceInfoProviderFactory) { +func anyToRemote_folder(jptm IJobPartTransferMgr, info *TransferInfo, pacer pacer.Interface, senderFactory senderFactory, sipf sourceInfoProviderFactory) { // step 1. perform initial checks if jptm.WasCanceled() { diff --git a/ste/xfer-anyToRemote-symlink.go b/ste/xfer-anyToRemote-symlink.go index fa254ad327..859dbb217b 100644 --- a/ste/xfer-anyToRemote-symlink.go +++ b/ste/xfer-anyToRemote-symlink.go @@ -23,9 +23,10 @@ import ( "net/url" "github.com/Azure/azure-storage-azcopy/v10/common" + "github.com/Azure/azure-storage-azcopy/v10/pacer" ) -func anyToRemote_symlink(jptm IJobPartTransferMgr, info *TransferInfo, pacer pacer, senderFactory senderFactory, sipf sourceInfoProviderFactory) { +func anyToRemote_symlink(jptm IJobPartTransferMgr, info *TransferInfo, pacer pacer.Interface, senderFactory senderFactory, sipf sourceInfoProviderFactory) { // Check if cancelled if jptm.WasCanceled() { /* This is earliest we detect that jptm has been cancelled before we reach destination */ diff --git a/ste/xfer-deleteBlob.go b/ste/xfer-deleteBlob.go index a5ca6a1b1d..8312dc64b5 100644 --- a/ste/xfer-deleteBlob.go +++ b/ste/xfer-deleteBlob.go @@ -10,13 +10,14 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" + "github.com/Azure/azure-storage-azcopy/v10/pacer" "github.com/Azure/azure-storage-azcopy/v10/common" ) var explainedSkippedRemoveOnce sync.Once -func DeleteBlob(jptm IJobPartTransferMgr, pacer pacer) { +func DeleteBlob(jptm IJobPartTransferMgr, pacer pacer.Interface) { // If the transfer was cancelled, then reporting transfer as done and increasing the bytestransferred by the size of the source. if jptm.WasCanceled() { diff --git a/ste/xfer-deleteBlobFS.go b/ste/xfer-deleteBlobFS.go index 3d746c0eac..f86b740888 100644 --- a/ste/xfer-deleteBlobFS.go +++ b/ste/xfer-deleteBlobFS.go @@ -6,6 +6,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake" + "github.com/Azure/azure-storage-azcopy/v10/pacer" "net/http" "strings" @@ -18,7 +19,7 @@ var logBlobFSDeleteWarnOnce = &sync.Once{} const blobFSDeleteWarning = "Displayed file count will be either 1 or based upon list-of-files entries, and thus inaccurate, as deletes are performed recursively service-side." -func DeleteHNSResource(jptm IJobPartTransferMgr, pacer pacer) { +func DeleteHNSResource(jptm IJobPartTransferMgr, pacer pacer.Interface) { // If the transfer was cancelled, then report the transfer as done. if jptm.WasCanceled() { jptm.ReportTransferDone() diff --git a/ste/xfer-deleteFile.go b/ste/xfer-deleteFile.go index 9a4184992b..bbce9702f7 100644 --- a/ste/xfer-deleteFile.go +++ b/ste/xfer-deleteFile.go @@ -11,11 +11,12 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/file" "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/fileerror" + "github.com/Azure/azure-storage-azcopy/v10/pacer" "github.com/Azure/azure-storage-azcopy/v10/common" ) -func DeleteFile(jptm IJobPartTransferMgr, _ pacer) { +func DeleteFile(jptm IJobPartTransferMgr, _ pacer.Interface) { // If the transfer was cancelled, then reporting transfer as done and increasing the bytestransferred by the size of the source. if jptm.WasCanceled() { diff --git a/ste/xfer-remoteToLocal-file.go b/ste/xfer-remoteToLocal-file.go index e9c9141503..29c75a75d9 100644 --- a/ste/xfer-remoteToLocal-file.go +++ b/ste/xfer-remoteToLocal-file.go @@ -31,13 +31,14 @@ import ( "strings" "github.com/Azure/azure-storage-azcopy/v10/common" + "github.com/Azure/azure-storage-azcopy/v10/pacer" ) const azcopyTempDownloadPrefix string = ".azDownload-%s-" // xfer.go requires just a single xfer function for the whole job. // This routine serves that role for downloads and redirects for each transfer to a file or folder implementation -func remoteToLocal(jptm IJobPartTransferMgr, pacer pacer, df downloaderFactory) { +func remoteToLocal(jptm IJobPartTransferMgr, pacer pacer.Interface, df downloaderFactory) { info := jptm.Info() if info.IsFolderPropertiesTransfer() { remoteToLocal_folder(jptm, pacer, df) @@ -49,7 +50,7 @@ func remoteToLocal(jptm IJobPartTransferMgr, pacer pacer, df downloaderFactory) } // general-purpose "any remote persistence location" to local, for files -func remoteToLocal_file(jptm IJobPartTransferMgr, pacer pacer, df downloaderFactory) { +func remoteToLocal_file(jptm IJobPartTransferMgr, pacer pacer.Interface, df downloaderFactory) { info := jptm.Info() diff --git a/ste/xfer-remoteToLocal-folder.go b/ste/xfer-remoteToLocal-folder.go index 3e80f59cec..5a56e14663 100644 --- a/ste/xfer-remoteToLocal-folder.go +++ b/ste/xfer-remoteToLocal-folder.go @@ -22,10 +22,11 @@ package ste import ( "github.com/Azure/azure-storage-azcopy/v10/common" + "github.com/Azure/azure-storage-azcopy/v10/pacer" ) // general-purpose "any remote persistence location" to local, for folders -func remoteToLocal_folder(jptm IJobPartTransferMgr, pacer pacer, df downloaderFactory) { +func remoteToLocal_folder(jptm IJobPartTransferMgr, pacer pacer.Interface, df downloaderFactory) { info := jptm.Info() diff --git a/ste/xfer-remoteToLocal-symlink.go b/ste/xfer-remoteToLocal-symlink.go index fa61b14129..4ad60a1bd1 100644 --- a/ste/xfer-remoteToLocal-symlink.go +++ b/ste/xfer-remoteToLocal-symlink.go @@ -2,10 +2,12 @@ package ste import ( "github.com/Azure/azure-storage-azcopy/v10/common" + "github.com/Azure/azure-storage-azcopy/v10/pacer" + "os" ) -func remoteToLocal_symlink(jptm IJobPartTransferMgr, pacer pacer, df downloaderFactory) { +func remoteToLocal_symlink(jptm IJobPartTransferMgr, pacer pacer.Interface, df downloaderFactory) { info := jptm.Info() // Perform initial checks diff --git a/ste/xfer-setProperties.go b/ste/xfer-setProperties.go index 9889b92fae..54635b03c9 100644 --- a/ste/xfer-setProperties.go +++ b/ste/xfer-setProperties.go @@ -11,9 +11,10 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/file" "github.com/Azure/azure-storage-azcopy/v10/common" + "github.com/Azure/azure-storage-azcopy/v10/pacer" ) -func SetProperties(jptm IJobPartTransferMgr, _ pacer) { +func SetProperties(jptm IJobPartTransferMgr, _ pacer.Interface) { // If the transfer was cancelled, then reporting transfer as done and increasing the bytes transferred by the size of the source. if jptm.WasCanceled() { jptm.ReportTransferDone() diff --git a/ste/xfer.go b/ste/xfer.go index a3f86d47c2..998abbe369 100644 --- a/ste/xfer.go +++ b/ste/xfer.go @@ -27,6 +27,7 @@ import ( "time" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" + "github.com/Azure/azure-storage-azcopy/v10/pacer" "github.com/Azure/azure-storage-azcopy/v10/common" ) @@ -57,22 +58,22 @@ var cpkAccessFailureLogGLCM sync.Once ////////////////////////////////////////////////////////////////////////////////////////////////////////// // These types are define the STE Coordinator -type newJobXfer func(jptm IJobPartTransferMgr, pacer pacer) +type newJobXfer func(jptm IJobPartTransferMgr, pacer pacer.Interface) // same as newJobXfer, but with an extra parameter -type newJobXferWithDownloaderFactory = func(jptm IJobPartTransferMgr, pacer pacer, df downloaderFactory) -type newJobXferWithSenderFactory = func(jptm IJobPartTransferMgr, pacer pacer, sf senderFactory, sipf sourceInfoProviderFactory) +type newJobXferWithDownloaderFactory = func(jptm IJobPartTransferMgr, pacer pacer.Interface, df downloaderFactory) +type newJobXferWithSenderFactory = func(jptm IJobPartTransferMgr, pacer pacer.Interface, sf senderFactory, sipf sourceInfoProviderFactory) // Takes a multi-purpose download function, and makes it ready to user with a specific type of downloader func parameterizeDownload(targetFunction newJobXferWithDownloaderFactory, df downloaderFactory) newJobXfer { - return func(jptm IJobPartTransferMgr, pacer pacer) { + return func(jptm IJobPartTransferMgr, pacer pacer.Interface) { targetFunction(jptm, pacer, df) } } // Takes a multi-purpose send function, and makes it ready to use with a specific type of sender func parameterizeSend(targetFunction newJobXferWithSenderFactory, sf senderFactory, sipf sourceInfoProviderFactory) newJobXfer { - return func(jptm IJobPartTransferMgr, pacer pacer) { + return func(jptm IJobPartTransferMgr, pacer pacer.Interface) { targetFunction(jptm, pacer, sf, sipf) } }