Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
https://example.com {
reverse_proxy https://localhost:54321 {
stream_buffer_size 8KB
}
}

----------
{
"apps": {
"http": {
"servers": {
"srv0": {
"listen": [
":443"
],
"routes": [
{
"match": [
{
"host": [
"example.com"
]
}
],
"handle": [
{
"handler": "subroute",
"routes": [
{
"handle": [
{
"handler": "reverse_proxy",
"stream_buffer_size": 8000,
"transport": {
"protocol": "http",
"tls": {}
},
"upstreams": [
{
"dial": "localhost:54321"
}
]
}
]
}
]
}
],
"terminal": true
}
]
}
}
}
}
}
5 changes: 4 additions & 1 deletion modules/caddyhttp/reverseproxy/caddyfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func parseCaddyfile(h httpcaddyfile.Helper) (caddyhttp.MiddlewareHandler, error)
// flush_interval <duration>
// request_buffers <size>
// response_buffers <size>
// stream_buffer_size <size>
// stream_timeout <duration>
// stream_close_delay <duration>
// verbose_logs
Expand Down Expand Up @@ -646,7 +647,7 @@ func (h *Handler) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
h.FlushInterval = caddy.Duration(dur)
}

case "request_buffers", "response_buffers":
case "request_buffers", "response_buffers", "stream_buffer_size":
subdir := d.Val()
if !d.NextArg() {
return d.ArgErr()
Expand All @@ -670,6 +671,8 @@ func (h *Handler) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
h.RequestBuffers = size
case "response_buffers":
h.ResponseBuffers = size
case "stream_buffer_size":
h.StreamBufferSize = int(size)
}

case "stream_timeout":
Expand Down
6 changes: 6 additions & 0 deletions modules/caddyhttp/reverseproxy/reverseproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ type Handler struct {
// forcibly closed at the end of the timeout. Default: no timeout.
StreamTimeout caddy.Duration `json:"stream_timeout,omitempty"`

// The size of the buffer used for each direction of streaming
// requests such as WebSockets. If zero, the default size is 32 KiB.
// This only affects upgraded bidirectional streams, not normal
// request or response buffering.
StreamBufferSize int `json:"stream_buffer_size,omitempty"`

// If nonzero, streaming requests such as WebSockets will not be
// closed when the proxy config is unloaded, and instead the stream
// will remain open until the delay is complete. In other words,
Expand Down
20 changes: 17 additions & 3 deletions modules/caddyhttp/reverseproxy/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,12 @@ func (h *Handler) handleUpgradeResponse(logger *zap.Logger, wg *sync.WaitGroup,
defer deleteFrontConn()
defer deleteBackConn()

spc := switchProtocolCopier{user: conn, backend: backConn, wg: wg}
spc := switchProtocolCopier{
user: conn,
backend: backConn,
wg: wg,
bufferSize: h.StreamBufferSize,
}

// setup the timeout if requested
var timeoutc <-chan time.Time
Expand Down Expand Up @@ -636,20 +641,29 @@ func (m *maxLatencyWriter) stop() {
type switchProtocolCopier struct {
user, backend io.ReadWriteCloser
wg *sync.WaitGroup
bufferSize int
}

func (c switchProtocolCopier) copyFromBackend(errc chan<- error) {
_, err := io.Copy(c.user, c.backend)
_, err := io.CopyBuffer(c.user, c.backend, c.buffer())
errc <- err
c.wg.Done()
}

func (c switchProtocolCopier) copyToBackend(errc chan<- error) {
_, err := io.Copy(c.backend, c.user)
_, err := io.CopyBuffer(c.backend, c.user, c.buffer())
errc <- err
c.wg.Done()
}

func (c switchProtocolCopier) buffer() []byte {
size := c.bufferSize
if size <= 0 {
size = defaultBufferSize
}
return make([]byte, size)
}

var streamingBufPool = sync.Pool{
New: func() any {
// The Pool's New function should generally only return pointer
Expand Down
46 changes: 46 additions & 0 deletions modules/caddyhttp/reverseproxy/streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package reverseproxy

import (
"bytes"
"io"
"net/http/httptest"
"strings"
"sync"
"testing"

"github.com/caddyserver/caddy/v2"
Expand Down Expand Up @@ -34,3 +36,47 @@ func TestHandlerCopyResponse(t *testing.T) {
}
}
}

func TestSwitchProtocolCopierBufferSize(t *testing.T) {
var wg sync.WaitGroup
var errc = make(chan error, 1)
var dst bytes.Buffer

copier := switchProtocolCopier{
user: nopReadWriteCloser{Reader: strings.NewReader("hello")},
backend: nopReadWriteCloser{Writer: &dst},
wg: &wg,
bufferSize: 7,
}

buf := copier.buffer()
if got := len(buf); got != 7 {
t.Fatalf("buffer len = %d, want 7", got)
}

wg.Add(1)
go copier.copyToBackend(errc)
wg.Wait()

if err := <-errc; err != nil {
t.Fatalf("copyToBackend() error = %v", err)
}
if got := dst.String(); got != "hello" {
t.Fatalf("copied data = %q, want %q", got, "hello")
}
}

func TestSwitchProtocolCopierDefaultBufferSize(t *testing.T) {
copier := switchProtocolCopier{}
buf := copier.buffer()
if got := len(buf); got != defaultBufferSize {
t.Fatalf("buffer len = %d, want %d", got, defaultBufferSize)
}
}

type nopReadWriteCloser struct {
io.Reader
io.Writer
}

func (nopReadWriteCloser) Close() error { return nil }
Loading