Skip to content

Commit ada991b

Browse files
committed
move log config up to getcall
we have WithLogger already, we just need to use it. this moves the config of the logger itself up to be a call option which makes it less brittle in GetCall to hit the path where it might get accidentally turned on. removes the old buffer and io.Reader on stderr so that we could upload logs, we only need the logrus logger for now and it was a bit of a tangled mess. TODO need to add config bit to set the default level to 'info' so that we get logs out of these guys in the agent config (pure runner is left alone to off), with ability to change to debug or turn off altogether. NOTE: this fixes #1328 by putting a write guard after close and not using the line writer for different writers
1 parent 3adf09c commit ada991b

File tree

4 files changed

+72
-140
lines changed

4 files changed

+72
-140
lines changed

api/agent/agent.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -641,8 +641,8 @@ func (s *hotSlot) dispatch(ctx context.Context, call *call) error {
641641
defer span.End()
642642

643643
// TODO it's possible we can get rid of this (after getting rid of logs API) - may need for call id/debug mode still
644-
// TODO there's a timeout race for swapping this back if the container doesn't get killed for timing out, and don't you forget it
645644
swapBack := s.container.swap(call.stderr, &call.Stats)
645+
defer call.stderr.Close()
646646
defer swapBack()
647647

648648
req := createUDSRequest(ctx, call)
@@ -1187,9 +1187,11 @@ func newHotContainer(ctx context.Context, evictor Evictor, call *call, cfg *Conf
11871187
if _, ok := stderr.(common.NoopReadWriteCloser); !ok {
11881188
gw := common.NewGhostWriter()
11891189
buf1 := bufPool.Get().(*bytes.Buffer)
1190-
sec := &nopCloser{&logWriter{
1190+
// TODO(reed): this logger may have garbage in it between calls, easy fix
1191+
// is to make a new one each swap, it's cheap enough to be doable.
1192+
sec := &nopCloser{newLogWriter(
11911193
logrus.WithFields(logrus.Fields{"tag": "stderr", "app_id": call.AppID, "fn_id": call.FnID, "image": call.Image, "container_id": id}),
1192-
}}
1194+
)}
11931195
gw.Swap(newLineWriterWithBuffer(buf1, sec))
11941196
stderr = gw
11951197
bufs = append(bufs, buf1)
@@ -1320,6 +1322,7 @@ func (c *container) GetCallId() string {
13201322
func (c *container) WriteStat(ctx context.Context, stat drivers.Stat) {
13211323
for key, value := range stat.Metrics {
13221324
if m, ok := dockerMeasures[key]; ok {
1325+
common.Logger(ctx).WithField(key, value).Info("container stats")
13231326
stats.Record(ctx, m.M(int64(value)))
13241327
}
13251328
}

api/agent/call.go

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"io"
8+
"io/ioutil"
89
"net/http"
910
"path/filepath"
1011
"strings"
@@ -212,6 +213,14 @@ func WithDockerAuth(auth docker.Auther) CallOpt {
212213
}
213214
}
214215

216+
func (a *agent) WithStderrLogger() CallOpt {
217+
return func(c *call) error {
218+
// XXX(reed): allow configuring the level / turning off
219+
c.stderr = setupLogger(c.Call)
220+
return nil
221+
}
222+
}
223+
215224
// GetCall builds a Call that can be used to submit jobs to the agent.
216225
func (a *agent) GetCall(opts ...CallOpt) (Call, error) {
217226
var c call
@@ -253,20 +262,17 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) {
253262
}
254263
c.Call.Config["FN_LISTENER"] = "unix:" + filepath.Join(iofsDockerMountDest, udsFilename)
255264
c.Call.Config["FN_FORMAT"] = "http-stream" // TODO: remove this after fdk's forget what it means
256-
// TODO we could set type here too, for now, or anything else not based in fn/app/trigger config
257265

258266
setupCtx(&c)
259267

260268
c.ct = a
261269
if c.stderr == nil {
262-
// TODO(reed): is line writer is vulnerable to attack?
263-
// XXX(reed): forcing this as default is not great / configuring it isn't great either. reconsider.
264-
c.stderr = setupLogger(c.req.Context(), a.cfg.MaxLogSize, !a.cfg.DisableDebugUserLogs, c.Call)
270+
// this disables logs in driver (at container level)
271+
c.stderr = common.NoopReadWriteCloser{}
265272
}
266273
if c.respWriter == nil {
267-
// send function output to logs if no writer given (TODO no longer need w/o async?)
268-
// TODO we could/should probably make this explicit to GetCall, ala 'WithLogger', but it's dupe code (who cares?)
269-
c.respWriter = c.stderr
274+
// TODO: we could make this an error, up to us
275+
c.respWriter = ioutil.Discard
270276
}
271277

272278
return &c, nil
@@ -283,7 +289,7 @@ type call struct {
283289

284290
respWriter io.Writer
285291
req *http.Request
286-
stderr io.ReadWriteCloser
292+
stderr io.WriteCloser
287293
ct callTrigger
288294
slots *slotQueue
289295
requestState RequestState
@@ -373,9 +379,6 @@ func (c *call) End(ctx context.Context, errIn error) error {
373379
// ensure stats histogram is reasonably bounded
374380
c.Call.Stats = drivers.Decimate(240, c.Call.Stats)
375381

376-
// NOTE call this after InsertLog or the buffer will get reset
377-
c.stderr.Close()
378-
379382
if err := c.ct.fireAfterCall(ctx, c.Model()); err != nil {
380383
return err
381384
}

api/agent/func_logger.go

Lines changed: 41 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -2,129 +2,90 @@ package agent
22

33
import (
44
"bytes"
5-
"context"
6-
"fmt"
75
"io"
86
"sync"
7+
"sync/atomic"
98

10-
"github.com/fnproject/fn/api/common"
119
"github.com/fnproject/fn/api/models"
1210
"github.com/sirupsen/logrus"
1311
)
1412

1513
var (
1614
bufPool = &sync.Pool{New: func() interface{} { return new(bytes.Buffer) }}
17-
logPool = &sync.Pool{New: func() interface{} { return new(bytes.Buffer) }}
1815
)
1916

20-
// setupLogger returns a ReadWriteCloser that may have:
21-
// * [always] writes bytes to a size limited buffer, that can be read from using io.Reader
2217
// * [always] writes bytes per line to stderr as DEBUG
23-
//
24-
// To prevent write failures from failing the call or any other writes,
25-
// multiWriteCloser ignores errors. Close will flush the line writers
26-
// appropriately. The returned io.ReadWriteCloser is not safe for use after
27-
// calling Close.
28-
func setupLogger(ctx context.Context, maxSize uint64, debug bool, c *models.Call) io.ReadWriteCloser {
18+
func setupLogger(c *models.Call) io.WriteCloser {
2919
lbuf := bufPool.Get().(*bytes.Buffer)
30-
dbuf := logPool.Get().(*bytes.Buffer)
3120

32-
close := func() error {
21+
close := func() {
3322
// TODO we may want to toss out buffers that grow to grotesque size but meh they will prob get GC'd
3423
lbuf.Reset()
35-
dbuf.Reset()
3624
bufPool.Put(lbuf)
37-
logPool.Put(dbuf)
38-
return nil
3925
}
4026

41-
// we don't need to log per line to db, but we do need to limit it
42-
limitw := &nopCloser{newLimitWriter(int(maxSize), dbuf)}
43-
44-
// order matters, in that closer should be last and limit should be next to last
45-
mw := make(multiWriteCloser, 0, 3)
46-
47-
if debug {
48-
// accumulate all line writers, wrap in same line writer (to re-use buffer)
49-
stderrLogger := common.Logger(ctx).WithFields(logrus.Fields{"user_log": true, "app_id": c.AppID, "fn_id": c.FnID, "image": c.Image, "call_id": c.ID})
50-
loggo := &nopCloser{&logWriter{stderrLogger}}
51-
52-
// we don't need to limit the log writer(s), but we do need it to dispense lines
53-
linew := newLineWriterWithBuffer(lbuf, loggo)
54-
mw = append(mw, linew)
27+
stderrLogger := logrus.WithFields(logrus.Fields{"user_log": true, "app_id": c.AppID, "fn_id": c.FnID, "image": c.Image, "call_id": c.ID})
28+
loggo := newLogWriter(stderrLogger)
29+
linew := newLineWriterWithBuffer(lbuf, loggo)
30+
linew = &fCloser{
31+
close: func() error {
32+
err := linew.Close()
33+
close()
34+
return err
35+
},
5536
}
56-
57-
mw = append(mw, limitw, &fCloser{close})
58-
return &rwc{mw, dbuf}
37+
return linew
5938
}
6039

61-
// implements io.ReadWriteCloser, fmt.Stringer and Bytes()
62-
// TODO WriteString and ReadFrom would be handy to implement,
63-
// ReadFrom is a little involved.
64-
type rwc struct {
65-
io.WriteCloser
66-
67-
// buffer is not embedded since it would bypass calls to WriteCloser.Write
68-
// in cases such as WriteString and ReadFrom
69-
b *bytes.Buffer
70-
}
71-
72-
func (r *rwc) Read(b []byte) (int, error) { return r.b.Read(b) }
73-
func (r *rwc) String() string { return r.b.String() }
74-
func (r *rwc) Bytes() []byte { return r.b.Bytes() }
75-
76-
// implements passthrough Write & closure call in Close
40+
// implements passthrough WriteCloser with overwritable Close
7741
type fCloser struct {
42+
io.Writer
7843
close func() error
7944
}
8045

81-
func (f *fCloser) Write(b []byte) (int, error) { return len(b), nil }
82-
func (f *fCloser) Close() error { return f.close() }
46+
func (f *fCloser) Close() error { return f.close() }
8347

8448
type nopCloser struct {
8549
io.Writer
8650
}
8751

8852
func (n *nopCloser) Close() error { return nil }
8953

90-
// multiWriteCloser ignores all errors from inner writers. you say, oh, this is a bad idea?
91-
// yes, well, we were going to silence them all individually anyway, so let's not be shy about it.
92-
// the main thing we need to ensure is that every close is called, even if another errors.
93-
// XXX(reed): maybe we should log it (for syslog, it may help debug, maybe we just log that one)
94-
type multiWriteCloser []io.WriteCloser
95-
96-
func (m multiWriteCloser) Write(b []byte) (n int, err error) {
97-
for _, mw := range m {
98-
mw.Write(b)
99-
}
100-
return len(b), nil
101-
}
102-
103-
func (m multiWriteCloser) Close() (err error) {
104-
for _, mw := range m {
105-
mw.Close()
106-
}
107-
return nil
108-
}
109-
11054
// logWriter will log (to real stderr) every call to Write as a line. it should
11155
// be wrapped with a lineWriter so that the output makes sense.
11256
type logWriter struct {
57+
// level string // XXX(reed):
11358
logrus.FieldLogger
59+
closed uint32
60+
}
61+
62+
func newLogWriter(logger logrus.FieldLogger) io.WriteCloser {
63+
return &logWriter{FieldLogger: logger}
11464
}
11565

11666
func (l *logWriter) Write(b []byte) (int, error) {
67+
if atomic.LoadUint32(&l.closed) == 1 {
68+
// we don't want to return 0/error or the container will get shut down
69+
return len(b), nil
70+
}
11771
l.Debug(string(b))
11872
return len(b), nil
11973
}
12074

75+
func (l *logWriter) Close() error {
76+
atomic.StoreUint32(&l.closed, 1)
77+
return nil
78+
}
79+
12180
// lineWriter buffers all calls to Write and will call Write
12281
// on the underlying writer once per new line. Close must
12382
// be called to ensure that the buffer is flushed, and a newline
12483
// will be appended in Close if none is present.
84+
// TODO(reed): is line writer is vulnerable to attack?
12585
type lineWriter struct {
126-
b *bytes.Buffer
127-
w io.WriteCloser
86+
b *bytes.Buffer
87+
w io.WriteCloser
88+
closed uint32
12889
}
12990

13091
func newLineWriter(w io.WriteCloser) io.WriteCloser {
@@ -136,6 +97,10 @@ func newLineWriterWithBuffer(b *bytes.Buffer, w io.WriteCloser) io.WriteCloser {
13697
}
13798

13899
func (li *lineWriter) Write(ogb []byte) (int, error) {
100+
if atomic.LoadUint32(&li.closed) == 1 {
101+
// we don't want to return 0/error or the container will shut down
102+
return len(ogb), nil
103+
}
139104
li.b.Write(ogb) // bytes.Buffer is guaranteed, read it!
140105

141106
for {
@@ -159,6 +124,8 @@ func (li *lineWriter) Write(ogb []byte) (int, error) {
159124
}
160125

161126
func (li *lineWriter) Close() error {
127+
atomic.StoreUint32(&li.closed, 1)
128+
162129
defer li.w.Close() // MUST close this (after writing last line)
163130

164131
// flush the remaining bytes in the buffer to underlying writer, adding a
@@ -174,41 +141,3 @@ func (li *lineWriter) Close() error {
174141
_, err := li.w.Write(b)
175142
return err
176143
}
177-
178-
// io.Writer that allows limiting bytes written to w
179-
// TODO change to use clamp writer, this is dupe code
180-
type limitDiscardWriter struct {
181-
n, max int
182-
io.Writer
183-
}
184-
185-
func newLimitWriter(max int, w io.Writer) io.Writer {
186-
return &limitDiscardWriter{max: max, Writer: w}
187-
}
188-
189-
func (l *limitDiscardWriter) Write(b []byte) (int, error) {
190-
inpLen := len(b)
191-
if l.n >= l.max {
192-
return inpLen, nil
193-
}
194-
195-
if l.n+inpLen >= l.max {
196-
// cut off to prevent gigantic line attack
197-
b = b[:l.max-l.n]
198-
}
199-
200-
n, err := l.Writer.Write(b)
201-
l.n += n
202-
203-
if l.n >= l.max {
204-
// write in truncation message to log once
205-
l.Writer.Write([]byte(fmt.Sprintf("\n-----max log size %d bytes exceeded, truncating log-----\n", l.max)))
206-
} else if n != len(b) {
207-
// Is this truly a partial write? We'll be honest if that's the case.
208-
return n, err
209-
}
210-
211-
// yes, we lie... this is to prevent callers to blow up, we always pretend
212-
// that we were able to write the entire buffer.
213-
return inpLen, err
214-
}

api/server/runner_fninvoke.go

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -92,19 +92,28 @@ func (s *Server) fnInvoke(resp http.ResponseWriter, req *http.Request, app *mode
9292
// buffer the response before writing it out to client to prevent partials from trying to stream
9393
buf := bufPool.Get().(*bytes.Buffer)
9494
buf.Reset()
95-
var writer ResponseBuffer
9695

96+
var opts []agent.CallOpt
97+
opts = append(opts, agent.FromHTTPFnRequest(app, fn, req))
98+
99+
var writer ResponseBuffer
97100
isDetached := req.Header.Get("Fn-Invoke-Type") == models.TypeDetached
98101
if isDetached {
99102
writer = agent.NewDetachedResponseWriter(resp.Header(), 202)
103+
opts = append(opts, agent.InvokeDetached())
100104
} else {
101105
writer = &syncResponseWriter{
102106
headers: resp.Header(),
103107
status: 200,
104108
Buffer: buf,
105109
}
106110
}
107-
opts := getCallOptions(req, app, fn, trig, writer)
111+
112+
opts = append(opts, agent.WithWriter(rw))
113+
opts = append(opts, agent.WithStderrLogger())
114+
if trig != nil {
115+
opts = append(opts, agent.WithTrigger(trig))
116+
}
108117

109118
call, err := s.agent.GetCall(opts...)
110119
if err != nil {
@@ -137,16 +146,4 @@ func (s *Server) fnInvoke(resp http.ResponseWriter, req *http.Request, app *mode
137146
}
138147

139148
func getCallOptions(req *http.Request, app *models.App, fn *models.Fn, trig *models.Trigger, rw http.ResponseWriter) []agent.CallOpt {
140-
var opts []agent.CallOpt
141-
opts = append(opts, agent.WithWriter(rw)) // XXX (reed): order matters [for now]
142-
opts = append(opts, agent.FromHTTPFnRequest(app, fn, req))
143-
144-
if req.Header.Get("Fn-Invoke-Type") == models.TypeDetached {
145-
opts = append(opts, agent.InvokeDetached())
146-
}
147-
148-
if trig != nil {
149-
opts = append(opts, agent.WithTrigger(trig))
150-
}
151-
return opts
152149
}

0 commit comments

Comments
 (0)