Skip to content

Commit aa8e75c

Browse files
committed
Explicitly flush headers when proxying
1 parent 5716127 commit aa8e75c

File tree

2 files changed

+68
-1
lines changed

2 files changed

+68
-1
lines changed

staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,34 @@ func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request
230230
proxy := httputil.NewSingleHostReverseProxy(&url.URL{Scheme: h.Location.Scheme, Host: h.Location.Host})
231231
proxy.Transport = h.Transport
232232
proxy.FlushInterval = h.FlushInterval
233-
proxy.ServeHTTP(w, newReq)
233+
proxy.ServeHTTP(maybeWrapFlushHeadersWriter(w), newReq)
234+
}
235+
236+
// maybeWrapFlushHeadersWriter wraps the given writer to force flushing headers prior to writing the response body.
237+
// if the given writer does not support http.Flusher, http.Hijacker, and http.CloseNotifier, the original writer is returned.
238+
// TODO(liggitt): drop this once https://github.com/golang/go/issues/31125 is fixed
239+
func maybeWrapFlushHeadersWriter(w http.ResponseWriter) http.ResponseWriter {
240+
flusher, isFlusher := w.(http.Flusher)
241+
hijacker, isHijacker := w.(http.Hijacker)
242+
closeNotifier, isCloseNotifier := w.(http.CloseNotifier)
243+
// flusher, hijacker, and closeNotifier are all used by the ReverseProxy implementation.
244+
// if the given writer can't support all three, return the original writer.
245+
if !isFlusher || !isHijacker || !isCloseNotifier {
246+
return w
247+
}
248+
return &flushHeadersWriter{w, flusher, hijacker, closeNotifier}
249+
}
250+
251+
type flushHeadersWriter struct {
252+
http.ResponseWriter
253+
http.Flusher
254+
http.Hijacker
255+
http.CloseNotifier
256+
}
257+
258+
func (w *flushHeadersWriter) WriteHeader(code int) {
259+
w.ResponseWriter.WriteHeader(code)
260+
w.Flusher.Flush()
234261
}
235262

236263
// tryUpgrade returns true if the request was handled.

staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -865,6 +865,46 @@ func TestProxyRequestContentLengthAndTransferEncoding(t *testing.T) {
865865
}
866866
}
867867

868+
func TestFlushIntervalHeaders(t *testing.T) {
869+
const expected = "hi"
870+
stopCh := make(chan struct{})
871+
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
872+
w.Header().Add("MyHeader", expected)
873+
w.WriteHeader(200)
874+
w.(http.Flusher).Flush()
875+
<-stopCh
876+
}))
877+
defer backend.Close()
878+
defer close(stopCh)
879+
880+
backendURL, err := url.Parse(backend.URL)
881+
if err != nil {
882+
t.Fatal(err)
883+
}
884+
885+
proxyHandler := NewUpgradeAwareHandler(backendURL, nil, false, false, nil)
886+
887+
frontend := httptest.NewServer(proxyHandler)
888+
defer frontend.Close()
889+
890+
req, _ := http.NewRequest("GET", frontend.URL, nil)
891+
req.Close = true
892+
893+
ctx, cancel := context.WithTimeout(req.Context(), 10*time.Second)
894+
defer cancel()
895+
req = req.WithContext(ctx)
896+
897+
res, err := frontend.Client().Do(req)
898+
if err != nil {
899+
t.Fatalf("Get: %v", err)
900+
}
901+
defer res.Body.Close()
902+
903+
if res.Header.Get("MyHeader") != expected {
904+
t.Errorf("got header %q; expected %q", res.Header.Get("MyHeader"), expected)
905+
}
906+
}
907+
868908
// exampleCert was generated from crypto/tls/generate_cert.go with the following command:
869909
// go run generate_cert.go --rsa-bits 512 --host example.com --ca --start-date "Jan 1 00:00:00 1970" --duration=1000000h
870910
var exampleCert = []byte(`-----BEGIN CERTIFICATE-----

0 commit comments

Comments
 (0)