Skip to content

Commit 1811b4e

Browse files
author
Reed Allman
committed
make fn logger more reasonable
something still feels off with this, but i tinkered with it for a day-ish and didn't come up with anything a whole lot better. doing a lot of the maneuvering in the caller seemed better but it was just bloating up GetCall so went back to having it basically like it was, but returning the limited underlying buffer to read from so we can ship to the db. some small changes to the LogStore interface, swapped it to take an io.Reader instead of a string for more flexibility in the future while essentially maintaining the same level of performance that we have now. i'm guessing in the not so distant future we'll ship these to some s3 like service and it would be better to stream them in than carry around a giant string anyway. also, carrying around up to 1MB buffers in memory isn't great, we may want to switch to file backed logs for calls, too. using io.Reader for logs should make #279 more reasonable if/once we move to some s3-like thing, we can stream from the log storage service direct to clients. this fixes the span being out of whack and allows the 'right' context to be used to upload logs (next to inserting the call). deletes the dbWriter we had, and we just do this in call.End now (which makes sense to me at least). removes the dupe code for making an stderr for hot / cold and simplifies the way to get a func logger (no more 7 param methods yay). closes #298
1 parent 0280df0 commit 1811b4e

File tree

9 files changed

+126
-111
lines changed

9 files changed

+126
-111
lines changed

api/agent/agent.go

Lines changed: 13 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,6 @@ type slot interface {
428428
type coldSlot struct {
429429
cookie drivers.Cookie
430430
tok Token
431-
stderr io.Closer
432431
}
433432

434433
func (s *coldSlot) exec(ctx context.Context, call *call) error {
@@ -459,7 +458,6 @@ func (s *coldSlot) Close() error {
459458
s.cookie.Close(context.Background()) // ensure container removal, separate ctx
460459
}
461460
s.tok.Close()
462-
s.stderr.Close()
463461
return nil
464462
}
465463

@@ -477,20 +475,11 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error {
477475
span, ctx := opentracing.StartSpanFromContext(ctx, "agent_hot_exec")
478476
defer span.Finish()
479477

480-
stderr := NewFuncLogger(ctx, call.AppName, call.Path, call.Image, call.ID, call.ds)
481-
if call.w == nil {
482-
// send STDOUT to logs if no writer given (async...)
483-
// TODO fuck func logger, change it to not need a context and make calls
484-
// require providing their own stderr and writer instead of this crap. punting atm.
485-
call.w = stderr
486-
}
487-
488478
// link the container id and id in the logs [for us!]
489479
common.Logger(ctx).WithField("container_id", s.container.id).Info("starting call")
490480

491-
// swap in the new id and the new stderr logger
492-
s.container.swap(stderr)
493-
defer stderr.Close() // TODO shove in Close / elsewhere (to upload logs after exec exits)
481+
// swap in the new stderr logger
482+
s.container.swap(call.stderr)
494483

495484
errApp := make(chan error, 1)
496485
go func() {
@@ -541,15 +530,6 @@ func (a *agent) launch(ctx context.Context, slots chan<- slot, call *call, tok T
541530
}
542531

543532
func (a *agent) prepCold(ctx context.Context, slots chan<- slot, call *call, tok Token) error {
544-
// TODO dupe stderr code, reduce me
545-
stderr := NewFuncLogger(ctx, call.AppName, call.Path, call.Image, call.ID, call.ds)
546-
if call.w == nil {
547-
// send STDOUT to logs if no writer given (async...)
548-
// TODO fuck func logger, change it to not need a context and make calls
549-
// require providing their own stderr and writer instead of this crap. punting atm.
550-
call.w = stderr
551-
}
552-
553533
container := &container{
554534
id: id.New().String(), // XXX we could just let docker generate ids...
555535
image: call.Image,
@@ -558,7 +538,7 @@ func (a *agent) prepCold(ctx context.Context, slots chan<- slot, call *call, tok
558538
timeout: time.Duration(call.Timeout) * time.Second, // this is unnecessary, but in case removal fails...
559539
stdin: call.req.Body,
560540
stdout: call.w,
561-
stderr: stderr,
541+
stderr: call.stderr,
562542
}
563543

564544
// pull & create container before we return a slot, so as to be friendly
@@ -569,7 +549,7 @@ func (a *agent) prepCold(ctx context.Context, slots chan<- slot, call *call, tok
569549
return err
570550
}
571551

572-
slot := &coldSlot{cookie, tok, stderr}
552+
slot := &coldSlot{cookie, tok}
573553
select {
574554
case slots <- slot: // TODO need to make sure receiver will be ready (go routine race)
575555
default:
@@ -600,19 +580,22 @@ func (a *agent) runHot(slots chan<- slot, call *call, tok Token) error {
600580
ctx, shutdownContainer := context.WithCancel(context.Background())
601581
defer shutdownContainer() // close this if our waiter returns
602582

583+
cid := id.New().String()
584+
603585
// set up the stderr for the first one to capture any logs before the slot is
604-
// executed.
605-
// TODO need to figure out stderr logging for hot functions at a high level
606-
stderr := &ghostWriter{inner: newLineWriter(&logWriter{ctx: ctx, appName: call.AppName, path: call.Path, image: call.Image, reqID: call.ID})}
586+
// executed and between hot functions TODO this is still a little tobias funke
587+
stderr := newLineWriter(&logWriter{
588+
logrus.WithFields(logrus.Fields{"between_log": true, "app_name": call.AppName, "path": call.Path, "image": call.Image, "container_id": cid}),
589+
})
607590

608591
container := &container{
609-
id: id.New().String(), // XXX we could just let docker generate ids...
592+
id: cid, // XXX we could just let docker generate ids...
610593
image: call.Image,
611594
env: call.BaseEnv, // only base env
612595
memory: call.Memory,
613596
stdin: stdinRead,
614597
stdout: stdoutWrite,
615-
stderr: stderr,
598+
stderr: &ghostWriter{inner: stderr},
616599
}
617600

618601
logger := logrus.WithFields(logrus.Fields{"id": container.id, "app": call.AppName, "route": call.Path, "image": call.Image, "memory": call.Memory, "format": call.Format, "idle_timeout": call.IdleTimeout})
@@ -663,6 +646,7 @@ func (a *agent) runHot(slots chan<- slot, call *call, tok Token) error {
663646
// wait for this call to finish
664647
// NOTE do NOT select with shutdown / other channels. slot handles this.
665648
<-done
649+
container.swap(stderr) // log between tasks
666650
}
667651
}()
668652

api/agent/agent_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package agent
22

33
import (
44
"bytes"
5+
"fmt"
56
"io"
67
"net/http"
78
"net/http/httptest"
@@ -13,6 +14,7 @@ import (
1314
"github.com/fnproject/fn/api/datastore"
1415
"github.com/fnproject/fn/api/models"
1516
"github.com/fnproject/fn/api/mqs"
17+
"github.com/sirupsen/logrus"
1618
)
1719

1820
func TestCallConfigurationRequest(t *testing.T) {
@@ -273,3 +275,28 @@ func TestCallConfigurationModel(t *testing.T) {
273275
t.Fatal("expected payload to match, but it was a lie")
274276
}
275277
}
278+
279+
func TestLoggerIsStringerAndWorks(t *testing.T) {
280+
// TODO test limit writer, logrus writer, etc etc
281+
282+
loggyloo := logrus.WithFields(logrus.Fields{"yodawg": true})
283+
logger := setupLogger(loggyloo)
284+
285+
if _, ok := logger.(fmt.Stringer); !ok {
286+
// NOTE: if you are reading, maybe what you've done is ok, but be aware we were relying on this for optimization...
287+
t.Fatal("you turned the logger into something inefficient and possibly better all at the same time, how dare ye!")
288+
}
289+
290+
str := "0 line\n1 line\n2 line\n\n4 line"
291+
logger.Write([]byte(str))
292+
293+
strGot := logger.(fmt.Stringer).String()
294+
295+
if strGot != str {
296+
t.Fatal("logs did not match expectations, like being an adult", strGot, str)
297+
}
298+
299+
logger.Close() // idk maybe this would panic might as well call this
300+
301+
// TODO we could check for the toilet to flush here to logrus
302+
}

api/agent/call.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,6 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) {
246246
return nil, errors.New("no model or request provided for call")
247247
}
248248

249-
// TODO move func logger here
250249
// TODO add log store interface (yagni?)
251250
c.ds = a.ds
252251
c.mq = a.mq
@@ -255,6 +254,15 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) {
255254
logrus.Fields{"id": c.ID, "app": c.AppName, "route": c.Path})
256255
c.req = c.req.WithContext(ctx)
257256

257+
// setup stderr logger separate (don't inherit ctx vars)
258+
logger := logrus.WithFields(logrus.Fields{"user_log": true, "app_name": c.AppName, "path": c.Path, "image": c.Image, "call_id": c.ID})
259+
c.stderr = setupLogger(logger)
260+
if c.w == nil {
261+
// send STDOUT to logs if no writer given (async...)
262+
// TODO we could/should probably make this explicit to GetCall, ala 'WithLogger', but it's dupe code (who cares?)
263+
c.w = c.stderr
264+
}
265+
258266
return &c, nil
259267
}
260268

@@ -265,7 +273,7 @@ type call struct {
265273
mq models.MessageQueue
266274
w io.Writer
267275
req *http.Request
268-
stderr io.WriteCloser
276+
stderr io.ReadWriteCloser
269277
}
270278

271279
func (c *call) Model() *models.Call { return c.Call }
@@ -335,8 +343,15 @@ func (c *call) End(ctx context.Context, err error) {
335343
// call that ran successfully [by a user's perspective]
336344
// TODO: this should be update, really
337345
if err := c.ds.InsertCall(ctx, c.Call); err != nil {
338-
logrus.WithError(err).Error("error inserting call into datastore")
346+
common.Logger(ctx).WithError(err).Error("error inserting call into datastore")
339347
}
348+
349+
if err := c.ds.InsertLog(ctx, c.AppName, c.ID, c.stderr); err != nil {
350+
common.Logger(ctx).WithError(err).Error("error uploading log")
351+
}
352+
353+
// NOTE call this after InsertLog or the buffer will get reset
354+
c.stderr.Close()
340355
}
341356

342357
func fakeHandler(http.ResponseWriter, *http.Request, Params) {}

api/agent/func_logger.go

Lines changed: 33 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,11 @@ package agent
22

33
import (
44
"bytes"
5-
"context"
65
"errors"
76
"fmt"
87
"io"
98
"sync"
109

11-
"github.com/fnproject/fn/api/common"
12-
"github.com/fnproject/fn/api/models"
13-
"github.com/opentracing/opentracing-go"
1410
"github.com/sirupsen/logrus"
1511
)
1612

@@ -19,13 +15,10 @@ var (
1915
logPool = &sync.Pool{New: func() interface{} { return new(bytes.Buffer) }}
2016
)
2117

22-
// TODO we can have different types of these func loggers later
23-
// TODO move this to a different package
24-
25-
// DefaultFuncLogger returns a WriteCloser that writes STDERR output from a
26-
// container and outputs it in a parsed structured log format to attached
27-
// STDERR as well as writing the log to the db when Close is called.
28-
func NewFuncLogger(ctx context.Context, appName, path, image, reqID string, logDB models.LogStore) io.WriteCloser {
18+
// setupLogger returns an io.ReadWriteCloser which may write to multiple io.Writer's,
19+
// and may be read from the returned io.Reader (singular). After Close is called,
20+
// the Reader is not safe to read from, nor the Writer to write to.
21+
func setupLogger(logger logrus.FieldLogger) io.ReadWriteCloser {
2922
lbuf := bufPool.Get().(*bytes.Buffer)
3023
dbuf := logPool.Get().(*bytes.Buffer)
3124

@@ -39,41 +32,46 @@ func NewFuncLogger(ctx context.Context, appName, path, image, reqID string, logD
3932
}
4033

4134
// we don't need to limit the log writer, but we do need it to dispense lines
42-
linew := newLineWriterWithBuffer(lbuf, &logWriter{
43-
ctx: ctx,
44-
appName: appName,
45-
path: path,
46-
image: image,
47-
reqID: reqID,
48-
})
35+
linew := newLineWriterWithBuffer(lbuf, &logWriter{logger})
4936

5037
const MB = 1 * 1024 * 1024 // pick a number any number.. TODO configurable ?
5138

5239
// we don't need to log per line to db, but we do need to limit it
53-
limitw := newLimitWriter(MB, &dbWriter{
54-
Buffer: dbuf,
55-
db: logDB,
56-
ctx: ctx,
57-
reqID: reqID,
58-
appName: appName,
59-
})
40+
limitw := &nopCloser{newLimitWriter(MB, dbuf)}
6041

6142
// TODO / NOTE: we want linew to be first because limitw may error if limit
6243
// is reached but we still want to log. we should probably ignore hitting the
6344
// limit error since we really just want to not write too much to db and
6445
// that's handled as is. put buffers back last to avoid misuse, if there's
6546
// an error they won't get put back and that's really okay too.
66-
return multiWriteCloser{linew, limitw, &fCloser{close}}
47+
mw := multiWriteCloser{linew, limitw, &fCloser{close}}
48+
return &rwc{mw, dbuf}
49+
}
50+
51+
// implements io.ReadWriteCloser, keeps the buffer for all its handy methods
52+
type rwc struct {
53+
io.WriteCloser
54+
*bytes.Buffer
6755
}
6856

69-
// implements passthrough Write & arbitrary func close to have a seat at the cool kids lunch table
57+
// these are explicit to override the *bytes.Buffer's methods
58+
func (r *rwc) Write(b []byte) (int, error) { return r.WriteCloser.Write(b) }
59+
func (r *rwc) Close() error { return r.WriteCloser.Close() }
60+
61+
// implements passthrough Write & closure call in Close
7062
type fCloser struct {
7163
close func() error
7264
}
7365

7466
func (f *fCloser) Write(b []byte) (int, error) { return len(b), nil }
7567
func (f *fCloser) Close() error { return f.close() }
7668

69+
type nopCloser struct {
70+
io.Writer
71+
}
72+
73+
func (n *nopCloser) Close() error { return nil }
74+
7775
// multiWriteCloser returns the first write or close that returns a non-nil
7876
// err, if no non-nil err is returned, then the returned bytes written will be
7977
// from the last call to write.
@@ -102,17 +100,11 @@ func (m multiWriteCloser) Close() (err error) {
102100
// logWriter will log (to real stderr) every call to Write as a line. it should
103101
// be wrapped with a lineWriter so that the output makes sense.
104102
type logWriter struct {
105-
ctx context.Context
106-
appName string
107-
path string
108-
image string
109-
reqID string
103+
logrus.FieldLogger
110104
}
111105

112106
func (l *logWriter) Write(b []byte) (int, error) {
113-
log := common.Logger(l.ctx)
114-
log = log.WithFields(logrus.Fields{"user_log": true, "app_name": l.appName, "path": l.path, "image": l.image, "call_id": l.reqID})
115-
log.Debug(string(b))
107+
l.Debug(string(b))
116108
return len(b), nil
117109
}
118110

@@ -171,37 +163,14 @@ func (li *lineWriter) Close() error {
171163
return err
172164
}
173165

174-
// dbWriter accumulates all calls to Write into an in memory buffer
175-
// and writes them to the database when Close is called, returning
176-
// any error from Close. it should be wrapped in a limitWriter to
177-
// prevent blowing out the buffer and bloating the db.
178-
type dbWriter struct {
179-
*bytes.Buffer
180-
181-
db models.LogStore
182-
ctx context.Context
183-
reqID string
184-
appName string
185-
}
186-
187-
func (w *dbWriter) Close() error {
188-
span, ctx := opentracing.StartSpanFromContext(context.Background(), "agent_log_write")
189-
defer span.Finish()
190-
return w.db.InsertLog(ctx, w.appName, w.reqID, w.String())
191-
}
192-
193-
func (w *dbWriter) Write(b []byte) (int, error) {
194-
return w.Buffer.Write(b)
195-
}
196-
197-
// overrides Write, keeps Close
166+
// io.Writer that allows limiting bytes written to w
198167
type limitWriter struct {
199168
n, max int
200-
io.WriteCloser
169+
io.Writer
201170
}
202171

203-
func newLimitWriter(max int, w io.WriteCloser) io.WriteCloser {
204-
return &limitWriter{max: max, WriteCloser: w}
172+
func newLimitWriter(max int, w io.Writer) io.Writer {
173+
return &limitWriter{max: max, Writer: w}
205174
}
206175

207176
func (l *limitWriter) Write(b []byte) (int, error) {
@@ -212,11 +181,11 @@ func (l *limitWriter) Write(b []byte) (int, error) {
212181
// cut off to prevent gigantic line attack
213182
b = b[:l.max-l.n]
214183
}
215-
n, err := l.WriteCloser.Write(b)
184+
n, err := l.Writer.Write(b)
216185
l.n += n
217186
if l.n >= l.max {
218187
// write in truncation message to log once
219-
l.WriteCloser.Write([]byte(fmt.Sprintf("\n-----max log size %d bytes exceeded, truncating log-----\n")))
188+
l.Writer.Write([]byte(fmt.Sprintf("\n-----max log size %d bytes exceeded, truncating log-----\n")))
220189
}
221190
return n, err
222191
}

api/datastore/internal/datastoreutil/metrics.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package datastoreutil
22

33
import (
44
"context"
5+
"io"
56

67
"github.com/fnproject/fn/api/models"
78
"github.com/jmoiron/sqlx"
@@ -100,7 +101,7 @@ func (m *metricds) GetCalls(ctx context.Context, filter *models.CallFilter) ([]*
100101
return m.ds.GetCalls(ctx, filter)
101102
}
102103

103-
func (m *metricds) InsertLog(ctx context.Context, appName, callID, callLog string) error {
104+
func (m *metricds) InsertLog(ctx context.Context, appName, callID string, callLog io.Reader) error {
104105
span, ctx := opentracing.StartSpanFromContext(ctx, "ds_insert_log")
105106
defer span.Finish()
106107
return m.ds.InsertLog(ctx, appName, callID, callLog)

0 commit comments

Comments
 (0)