Skip to content

Conversation

adreed-msft
Copy link
Member

@adreed-msft adreed-msft commented Jul 16, 2025

This one is gonna be a bit lengthy, and I'm still sorta in the depths of it.

At this point, most of the meat and potatoes are here, but this still isn't quite ready to go. It won't build yet. I'm reworking the page blob auto pacer around the new model. At that point, things need to be plugged in.

It's important to note, before review, what's majorly different:

  1. The pacer is implemented as a pipeline policy now.
  2. The pacer now operates at a fine-grained level, allowing requests based upon the ability to meet minimum throughput requirements, with a "red line" before it starts sharing remaining bandwidth evenly. THIS WILL MOST LIKELY REQUIRE TUNING.
  3. This doesn't entirely apply for S2S, since we can't possibly pace that. Thus, it just allocates the blocks gradually similarly to the old way.
  4. To apply the pipeline policy to page blobs, we need an injector to smuggle the policy in for us. That is one of the new policies.

I need to write tests for no bandwidth limit as well, because I don't trust that quite yet.

a.logger.Log(common.LogInfo, fmt.Sprintf("%s: Target Mbps %d", a.logPrefix, (a.targetBytesPerSecond()*8)/(1000*1000)))
}
//
//var (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just delete this file?

package common

// NonBlockingSafeSend implements a channel send that is guaranteed to not panic, and may be either instantaneous or waiting.
// Instantaneous returns false if a panic occurred, or if
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment is incomplete.

newAlloc uint64
} // when a request seeks, it should fire itself down this channel to manage its allocation

shutdownCh chan bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see this being initialized in the constructor.

@@ -1,85 +1,85 @@
// Copyright © 2017 Microsoft <[email protected]>
// // Copyright © 2017 Microsoft <[email protected]>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that the entire file is commented out. If it's no longer needed, should we consider removing it?

@@ -0,0 +1,22 @@
package ste
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a new file, we should add the Microsoft licensing header.

@@ -0,0 +1,31 @@
package ste
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above comment

@@ -0,0 +1,373 @@
package ste
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

isLive: &atomic.Pointer[bool]{},

liveBodies: make(map[uuid.UUID]*policyPacerBody),
requestInitChannel: make(chan *policyPacerBody, 300),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can declare this as constant defaultBufferSize=300

common.AtomicSubtract(r.allocatedBytesPerSecond, PacerBodyMinimumBytesPerSecond)
req.bodyStatus.Store(to.Ptr(BodyStatusDeallocated))
default:
escape = true // break doesn't work in here
Copy link
Member

@dphulkar-msft dphulkar-msft Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can do something like this.

for {
	select {
	case req := <-r.pacerExitChannel:
		delete(r.liveBodies, req.id)
		common.AtomicSubtract(r.allocatedBytesPerSecond, PacerBodyMinimumBytesPerSecond)
		req.bodyStatus.Store(to.Ptr(BodyStatusDeallocated))
	default:
		return
	}
}

// Inform the body that it is allocated
newResponse.bodyStatus.Store(to.Ptr(BodyStatusAllocated))
default:
escape = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

// Inform the body that it is allocated
newRequest.bodyStatus.Store(to.Ptr(BodyStatusCompleted))
default: // there's nothing new, so let's use what we have.
escape = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

// Inform the body that it is allocated
newRequest.bodyStatus.Store(to.Ptr(BodyStatusAllocated))
default: // there's nothing new, so let's use what we have.
escape = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dittto


if resp != nil {
n.parent.ProcessBytes(uint64(max(resp.ContentLength, 0)))
} else if err == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Can both resp and err be nil? Normally, if err is nil, resp is expected to be non-nil.
However, it's good defensive coding to handle the rare case where a misbehaving policy or transport layer might return both as nil.
In such a case, we safely return the previously allocated bytes.


maxPacerGbps = 100
maxPacerBytesPerSecond = maxPacerGbps * 1000 * 1000 * 1000 / 8
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've noticed that there are many constants defined across different files in the ste folder. Would it make sense to create a dedicated constants.go file under ste to centralize and manage all constants for better maintainability?

pacer := NewRequestPolicyPacer(targetBytesPerSecond)

return &RequestPolicyAutoPacer{
RequestPolicyPacer: pacer,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit pick: RequestPolicyPacer: NewRequestPolicyPacer(targetBytesPerSecond),

corePolicy := r.RequestPolicyPacer.GetPolicy()

return &autoPacerPolicy{
p: corePolicy,
Copy link
Member

@dphulkar-msft dphulkar-msft Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider using a more meaningful variable name instead of p? Something like innerPolicy, wrappedPolicy, or corePolicy

@@ -0,0 +1,302 @@
package ste
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

msft licensing header required

BodyStatusNew BodyStatus = 0
BodyStatusAllocated BodyStatus = 1
BodyStatusCompleted BodyStatus = 2
BodyStatusDeallocated BodyStatus = -1
Copy link
Member

@dphulkar-msft dphulkar-msft Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be moved to a separate constants file

if pol, ok := newPolicy.(policy.Policy); ok {
return pol.Do(req)
} else if pol != nil {
return nil, fmt.Errorf("supplied policy for key %v was not a policy.Policy but a %v", p.policyKey, newPolicy)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are checking the type of the policy, should we use %T here to clearly indicate the actual type received?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants