Skip to content

Commit 1fb78ed

Browse files
authored
safe responsewriter usage in TryExec (#1490)
inside of TryExec we were writing directly to the response writer inside of a goroutine, but TryExec can timeout and then get called again to a different runner or even have the front end writing headers while TryExec is writing headers. one way to make this safe is to make a new response writer for TryExec to write the response into, and only after the goroutine handling the response has returned, from the TryExec goroutine we can copy the response back up as the caller will not call TryExec again until it has returned (this is seemingly part of the placer contract). unfortunately, we're already buffering the response writer in the front end, too - it's possible we can get rid of that but it may need further testing. this adds an optimization when copying the request body from the LB to a runner, since we're using request.GetBody() and returning a reader we are familiar with that happens to just wrap a buffer's bytes (which we just need multiple readers on, but the data doesn't change). anyway, this whole interaction is unfortunate but kind of necessary due to needing to maneuver into a protobuf, it seems like a worth it and somewhat ok abstraction wise optimization. additionally, this gets rid of passing the client response headers down into the agent for detached functions. we don't need these since detached functions are not responding with the functions response to the client, only a 202, this was leading to races around writing the headers in retries too, but this is just for posterity/correctness now. updated the makefile/system test script so that I could run these faster to repro, pretty handy, should add to other stuff too... closes #1484
1 parent 547d59e commit 1fb78ed

File tree

5 files changed

+93
-9
lines changed

5 files changed

+93
-9
lines changed

Makefile

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,9 @@ test: checkfmt pull-images test-basic test-middleware test-extensions test-syste
7070

7171
.PHONY: test-system
7272
test-system:
73-
./system_test.sh sqlite3
74-
./system_test.sh mysql
75-
./system_test.sh postgres
73+
./system_test.sh sqlite3 $(run)
74+
./system_test.sh mysql $(run)
75+
./system_test.sh postgres $(run)
7676

7777
.PHONY: img-busybox
7878
img-busybox:

api/agent/lb_agent.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,18 @@ type lbAgent struct {
2828
callOpts []CallOpt
2929
}
3030

31+
// DetachedResponseWriter implements http.ResponseWriter without allowing
32+
// writes to the body or writing the headers from a call to Write or
33+
// WriteHeader, it is only intended to allow writing the status code in and
34+
// being able to fetch it later from Status()
3135
type DetachedResponseWriter struct {
32-
Headers http.Header
36+
headers http.Header
3337
status int
3438
acked chan struct{}
3539
}
3640

3741
func (w *DetachedResponseWriter) Header() http.Header {
38-
return w.Headers
42+
return w.headers
3943
}
4044

4145
func (w *DetachedResponseWriter) Write(data []byte) (int, error) {
@@ -51,9 +55,9 @@ func (w *DetachedResponseWriter) Status() int {
5155
return w.status
5256
}
5357

54-
func NewDetachedResponseWriter(h http.Header, statusCode int) *DetachedResponseWriter {
58+
func NewDetachedResponseWriter(statusCode int) *DetachedResponseWriter {
5559
return &DetachedResponseWriter{
56-
Headers: h,
60+
headers: make(http.Header),
5761
status: statusCode,
5862
acked: make(chan struct{}, 1),
5963
}

api/agent/runner_client.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package agent
22

33
import (
4+
"bytes"
45
"context"
56
"crypto/tls"
67
"encoding/hex"
@@ -161,6 +162,76 @@ func (r *gRPCRunner) Status(ctx context.Context) (*pool.RunnerStatus, error) {
161162

162163
// implements Runner
163164
func (r *gRPCRunner) TryExec(ctx context.Context, call pool.RunnerCall) (bool, error) {
165+
166+
// we need to get a response writer that is safe for us to use. the only reason we need this,
167+
// ostensibly, is because io operations can't be timed out so we need to do receiveFromRunner
168+
// in a goroutine, and we may return from here from a timeout while recv is running, leading
169+
// to a race when a placer calls TryExec with the same ResponseWriter we have here. blech.
170+
respBuffer := bufPool.Get().(*bytes.Buffer)
171+
respBuffer.Reset()
172+
defer func() {
173+
if ctx.Err() == nil { // this is only safe if we don't time out (receiveFromRunner returned/not called)
174+
bufPool.Put(respBuffer)
175+
}
176+
}()
177+
178+
writer := syncResponseWriter{
179+
headers: make(http.Header),
180+
Buffer: respBuffer,
181+
}
182+
183+
safeCall := callWithResponseWriter(call, &writer)
184+
placed, err := r.tryExec(ctx, safeCall)
185+
if err != nil || !placed {
186+
return placed, err
187+
}
188+
189+
// now we can write to the actual response writer from this thread (so long
190+
// as the caller waits, we're playing jenga ofc)
191+
rw := call.ResponseWriter()
192+
copyHeaders(rw.Header(), writer.Header())
193+
rw.WriteHeader(writer.status)
194+
io.Copy(rw, writer) // TODO(reed): this is also a buffer->buffer operation :( but it means no errors
195+
return true, nil
196+
}
197+
198+
func callWithResponseWriter(call pool.RunnerCall, rw http.ResponseWriter) pool.RunnerCall {
199+
return &wrapCall{call, rw}
200+
}
201+
202+
// wrapCall implements pool.RunnerCall but bypasses the embedded RunnerCall's ResponseWriter() method
203+
// TODO this is the worst thing we've tried, except for all the other things we've tried.
204+
type wrapCall struct {
205+
pool.RunnerCall
206+
rw http.ResponseWriter
207+
}
208+
209+
func (c *wrapCall) ResponseWriter() http.ResponseWriter {
210+
return c.rw
211+
}
212+
213+
func copyHeaders(dst, src http.Header) {
214+
for k, vs := range src {
215+
for _, v := range vs {
216+
dst.Add(k, v)
217+
}
218+
}
219+
}
220+
221+
// TODO(reed): this is copied. and not only does it make sense in both places for different
222+
// reasons, it makes sense in another place too. need to reconsider ifaces
223+
type syncResponseWriter struct {
224+
headers http.Header
225+
status int
226+
*bytes.Buffer
227+
}
228+
229+
var _ http.ResponseWriter = new(syncResponseWriter) // nice compiler errors
230+
231+
func (s *syncResponseWriter) Header() http.Header { return s.headers }
232+
func (s *syncResponseWriter) WriteHeader(code int) { s.status = code }
233+
234+
func (r *gRPCRunner) tryExec(ctx context.Context, call pool.RunnerCall) (bool, error) {
164235
log := common.Logger(ctx).WithField("runner_addr", r.address)
165236

166237
log.Debug("Attempting to place call")
@@ -342,6 +413,7 @@ func recordFinishStats(ctx context.Context, msg *pb.CallFinished, c pool.RunnerC
342413
statsLBAgentRunnerSchedLatency(ctx, runnerSchedLatency)
343414
statsLBAgentRunnerExecLatency(ctx, runnerExecLatency)
344415

416+
// TODO: this is not safe to be called from within receiveFromRunner, it may get called each retry (data race, but also incorrect)
345417
c.AddUserExecutionTime(runnerExecLatency)
346418
}
347419
}

api/server/runner_fninvoke.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ func (s *Server) fnInvoke(resp http.ResponseWriter, req *http.Request, app *mode
9696

9797
isDetached := req.Header.Get("Fn-Invoke-Type") == models.TypeDetached
9898
if isDetached {
99-
writer = agent.NewDetachedResponseWriter(resp.Header(), 202)
99+
writer = agent.NewDetachedResponseWriter(202)
100100
} else {
101101
writer = &syncResponseWriter{
102102
headers: resp.Header(),

system_test.sh

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ source ./helpers.sh
66
remove_containers ${CONTEXT}
77

88
DB_NAME=$1
9+
shift # later usage
910
export FN_DB_URL=$(spawn_${DB_NAME} ${CONTEXT})
1011

1112
# avoid port conflicts with api_test.sh which are run in parallel
@@ -23,8 +24,15 @@ export FN_LOG_LEVEL=debug
2324
#
2425
export SYSTEM_TEST_PROMETHEUS_FILE=./prometheus.${DB_NAME}.txt
2526

27+
run="$@"
28+
29+
if [ ! -z "$run" ]
30+
then
31+
run="-run $run"
32+
fi
33+
2634
cd test/fn-system-tests
27-
go test -v ./...
35+
go test $run -v ./...
2836
cd ../../
2937

3038
remove_containers ${CONTEXT}

0 commit comments

Comments
 (0)