-
-
Notifications
You must be signed in to change notification settings - Fork 237
pool wire write buffer #2799
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
pool wire write buffer #2799
Changes from all commits
Commits
Show all changes
24 commits
Select commit
Hold shift + click to select a range
bafac99
rough cut buf
max-hoffman 6f2abf1
edits
max-hoffman ba35075
edits
max-hoffman 2236f90
Merge branch 'main' into max/session-write-buffer
max-hoffman f5fd9b4
correctness for byte copying
max-hoffman 68d431c
Merge branch 'main' into max/session-write-buffer
max-hoffman 5fd4a67
fix bugs
max-hoffman 6e3c3fe
simplify
max-hoffman 281efd8
correct simplification
max-hoffman ee7dc63
page sized spool buffer
max-hoffman fcae966
fix build
max-hoffman ba830b2
comments
max-hoffman e5e4a5a
bump timeout
max-hoffman b362f57
bump timeout
max-hoffman b2e0e80
fix race
max-hoffman d17b1c0
try separate sleep error
max-hoffman 49a0a99
vitess bump
max-hoffman 5982731
see if sleep error masks a different error
max-hoffman 112556b
add sleeps back
max-hoffman fc476f6
more error check where it won't hide other errors
max-hoffman 3ce2865
remove handler test race
max-hoffman 3a7e950
revert back to racey with sleeps
max-hoffman 210c5c7
zach comments
max-hoffman 4e37c3b
[ga-format-pr] Run ./format_repo.sh to fix formatting
max-hoffman File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,75 @@ | ||
| // Copyright 2024 Dolthub, Inc. | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| package sql | ||
|
|
||
| import ( | ||
| "sync" | ||
| ) | ||
|
|
||
| const defaultByteBuffCap = 4096 | ||
|
|
||
| var ByteBufPool = sync.Pool{ | ||
| New: func() any { | ||
| return NewByteBuffer(defaultByteBuffCap) | ||
| }, | ||
| } | ||
|
|
||
| type ByteBuffer struct { | ||
| i int | ||
| buf []byte | ||
| } | ||
|
|
||
| func NewByteBuffer(initCap int) *ByteBuffer { | ||
| buf := make([]byte, initCap) | ||
| return &ByteBuffer{buf: buf} | ||
| } | ||
|
|
||
| // Grow records the latest used byte position. Callers | ||
| // are responsible for accurately reporting which bytes | ||
| // they expect to be protected. | ||
| func (b *ByteBuffer) Grow(n int) { | ||
| newI := b.i | ||
| if b.i+n <= len(b.buf) { | ||
| // Increment |b.i| if no alloc | ||
| newI += n | ||
| } | ||
| if b.i+n >= len(b.buf) { | ||
| // No more space, double. | ||
| // An external allocation doubled the cap using the size of | ||
| // the override object, which if used could lead to overall | ||
| // shrinking behavior. | ||
| b.Double() | ||
| } | ||
| b.i = newI | ||
| } | ||
|
|
||
| // Double expands the backing array by 2x. We do this | ||
| // here because the runtime only doubles based on slice | ||
| // length. | ||
| func (b *ByteBuffer) Double() { | ||
| buf := make([]byte, len(b.buf)*2) | ||
| copy(buf, b.buf) | ||
| b.buf = buf | ||
| } | ||
|
|
||
| // Get returns a zero length slice beginning at a safe | ||
| // write position. | ||
| func (b *ByteBuffer) Get() []byte { | ||
| return b.buf[b.i:b.i] | ||
| } | ||
|
|
||
| func (b *ByteBuffer) Reset() { | ||
| b.i = 0 | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,72 @@ | ||
| // Copyright 2024 Dolthub, Inc. | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| package sql | ||
|
|
||
| import ( | ||
| "testing" | ||
|
|
||
| "github.com/stretchr/testify/require" | ||
| ) | ||
|
|
||
| func TestGrowByteBuffer(t *testing.T) { | ||
| b := NewByteBuffer(10) | ||
|
|
||
| // grow less than boundary | ||
| src1 := []byte{1, 1, 1} | ||
| obj1 := append(b.Get(), src1...) | ||
| b.Grow(len(src1)) | ||
|
|
||
| require.Equal(t, 10, len(b.buf)) | ||
| require.Equal(t, 3, b.i) | ||
| require.Equal(t, 10, cap(obj1)) | ||
|
|
||
| // grow to boundary | ||
| src2 := []byte{0, 0, 0, 0, 0, 0, 0} | ||
| obj2 := append(b.Get(), src2...) | ||
| b.Grow(len(src2)) | ||
|
|
||
| require.Equal(t, 20, len(b.buf)) | ||
| require.Equal(t, 10, b.i) | ||
| require.Equal(t, 7, cap(obj2)) | ||
|
|
||
| src3 := []byte{2, 2, 2, 2, 2} | ||
| obj3 := append(b.Get(), src3...) | ||
| b.Grow(len(src3)) | ||
|
|
||
| require.Equal(t, 20, len(b.buf)) | ||
| require.Equal(t, 15, b.i) | ||
| require.Equal(t, 10, cap(obj3)) | ||
|
|
||
| // grow exceeds boundary | ||
|
|
||
| src4 := []byte{3, 3, 3, 3, 3, 3, 3, 3} | ||
| obj4 := append(b.Get(), src4...) | ||
| b.Grow(len(src4)) | ||
|
|
||
| require.Equal(t, 40, len(b.buf)) | ||
| require.Equal(t, 15, b.i) | ||
| require.Equal(t, 16, cap(obj4)) | ||
|
|
||
| // objects are all valid after doubling | ||
| require.Equal(t, src1, obj1) | ||
| require.Equal(t, src2, obj2) | ||
| require.Equal(t, src3, obj3) | ||
| require.Equal(t, src4, obj4) | ||
|
|
||
| // reset | ||
| b.Reset() | ||
| require.Equal(t, 40, len(b.buf)) | ||
| require.Equal(t, 0, b.i) | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This idea has merit and is surely better than allocating a buffer for each request but the way you're managing the memory is suboptimal. Also good to use the same backing array for multiple values in a request.
In the main use of this object in the handler, you're getting a zero-length slice (which has some larger backing array) and then calling
appendon it byte by byte. This will grow the backing array in some cases, but it's not being done under your deliberate control. Rather, you're then callingDoubleif the backing array is low on space after the appends have already happened.Basically: in these methods, you are referring to the
lenof the byte slice, when your concern is usually thecap. It's fine to letappendhappen byte by byte as long as they aren't doubling the backing array too often, that's the expensive bit.I think this would probably work slightly better if you just scrapped the explicit capacity management altogether and just let the Go runtime manage it automatically for you. Either that, or always manage it explicitly yourself, i.e. before you serialize a value with all those
appendcalls. But it's not clear to me that manual management is actually any better if you use the same strategy as the go runtime does (double once we're full).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with all of this, but there are two caveats that limit our ability to let the runtime handle this for us. (1) The runtime chooses doubling based on the cap of the slice, not the full backing array. So for the current setup, the doubled array is usually actually smaller than the original backing array. (2) Doubled arrays are not reference swapped, we need a handle to the new buffer to know when to grow.
I'm not aware of how to override the default runtime growth behavior to ignore the slice cap and instead double based on the backing array cap. So
BytesBufferstill does the doubling, and aGrow(n int)interface to track when this should happen. We pay for 2 mallocs on doubling, because the first one is never big enough. Not callingGrowafter allocing, or growing by too small of length compared to the allocations used will stomp previously written memory.