Skip to content

Commit 3adf09c

Browse files
authored
lb read cache everything under it (#1471)
* add everything else to read cache * finagle front end configuration * add metrics to read data access thing also fixes metrics, need to use the req context instead of the gin context in order to get spans. more thumbs down for ginny. this wraps the read data access on the whole, since the singleflight thing can show up as latency around accessing the datastore, but they are not necessarily the same thing (eg if 2 reqs come in, 1 waits for the other, but won't have a latency for actually hitting the datastore, it's just waiting for the cache to fill - instead of showing a void, show this to make it clear where the request is).
1 parent 73a7056 commit 3adf09c

File tree

4 files changed

+116
-32
lines changed

4 files changed

+116
-32
lines changed

api/agent/data_access.go

Lines changed: 96 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/fnproject/fn/api/models"
88
"github.com/golang/groupcache/singleflight"
99
"github.com/patrickmn/go-cache"
10+
"go.opencensus.io/trace"
1011
)
1112

1213
// XXX(reed): this is only used by the front end now, this should be in the server/ package
@@ -25,6 +26,39 @@ type DataAccess interface {
2526
ReadDataAccess
2627
}
2728

29+
// NewMetricReadDataAccess adds metrics to a ReadDataAccess
30+
func NewMetricReadDataAccess(rda ReadDataAccess) ReadDataAccess {
31+
return &metricda{rda}
32+
}
33+
34+
type metricda struct {
35+
rda ReadDataAccess
36+
}
37+
38+
func (m *metricda) GetTriggerBySource(ctx context.Context, appID string, triggerType, source string) (*models.Trigger, error) {
39+
ctx, span := trace.StartSpan(ctx, "rda_get_trigger_by_source")
40+
defer span.End()
41+
return m.rda.GetTriggerBySource(ctx, appID, triggerType, source)
42+
}
43+
44+
func (m *metricda) GetAppID(ctx context.Context, appName string) (string, error) {
45+
ctx, span := trace.StartSpan(ctx, "rda_get_app_id")
46+
defer span.End()
47+
return m.rda.GetAppID(ctx, appName)
48+
}
49+
50+
func (m *metricda) GetAppByID(ctx context.Context, appID string) (*models.App, error) {
51+
ctx, span := trace.StartSpan(ctx, "rda_get_app_by_id")
52+
defer span.End()
53+
return m.rda.GetAppByID(ctx, appID)
54+
}
55+
56+
func (m *metricda) GetFnByID(ctx context.Context, fnID string) (*models.Fn, error) {
57+
ctx, span := trace.StartSpan(ctx, "rda_get_fn_by_id")
58+
defer span.End()
59+
return m.rda.GetFnByID(ctx, fnID)
60+
}
61+
2862
// CachedDataAccess wraps a DataAccess and caches the results of GetApp.
2963
type cachedDataAccess struct {
3064
ReadDataAccess
@@ -42,12 +76,31 @@ func NewCachedDataAccess(da ReadDataAccess) ReadDataAccess {
4276
return cda
4377
}
4478

45-
func appIDCacheKey(appID string) string {
46-
return "a:" + appID
79+
func appIDCacheKey(appID string) string { return "a:" + appID }
80+
func appNameCacheKey(appName string) string { return "n:" + appName }
81+
func fnCacheKey(fnID string) string { return "f:" + fnID }
82+
func trigSourceCacheKey(app, typ, source string) string {
83+
return "t:" + app + string('\x00') + typ + string('\x00') + source
4784
}
4885

4986
func (da *cachedDataAccess) GetAppID(ctx context.Context, appName string) (string, error) {
50-
return da.ReadDataAccess.GetAppID(ctx, appName)
87+
key := appNameCacheKey(appName)
88+
app, ok := da.cache.Get(key)
89+
if ok {
90+
return app.(string), nil
91+
}
92+
93+
resp, err := da.singleflight.Do(key,
94+
func() (interface{}, error) {
95+
return da.ReadDataAccess.GetAppID(ctx, appName)
96+
})
97+
98+
if err != nil {
99+
return "", err
100+
}
101+
app = resp.(string)
102+
da.cache.Set(key, app, cache.DefaultExpiration)
103+
return app.(string), nil
51104
}
52105

53106
func (da *cachedDataAccess) GetAppByID(ctx context.Context, appID string) (*models.App, error) {
@@ -69,3 +122,43 @@ func (da *cachedDataAccess) GetAppByID(ctx context.Context, appID string) (*mode
69122
da.cache.Set(key, app, cache.DefaultExpiration)
70123
return app.(*models.App), nil
71124
}
125+
126+
func (da *cachedDataAccess) GetTriggerBySource(ctx context.Context, appID string, triggerType, source string) (*models.Trigger, error) {
127+
key := trigSourceCacheKey(appID, triggerType, source)
128+
trigger, ok := da.cache.Get(key)
129+
if ok {
130+
return trigger.(*models.Trigger), nil
131+
}
132+
133+
resp, err := da.singleflight.Do(key,
134+
func() (interface{}, error) {
135+
return da.ReadDataAccess.GetTriggerBySource(ctx, appID, triggerType, source)
136+
})
137+
138+
if err != nil {
139+
return nil, err
140+
}
141+
trigger = resp.(*models.Trigger)
142+
da.cache.Set(key, trigger, cache.DefaultExpiration)
143+
return trigger.(*models.Trigger), nil
144+
}
145+
146+
func (da *cachedDataAccess) GetFnByID(ctx context.Context, fnID string) (*models.Fn, error) {
147+
key := fnCacheKey(fnID)
148+
fn, ok := da.cache.Get(key)
149+
if ok {
150+
return fn.(*models.Fn), nil
151+
}
152+
153+
resp, err := da.singleflight.Do(key,
154+
func() (interface{}, error) {
155+
return da.ReadDataAccess.GetFnByID(ctx, fnID)
156+
})
157+
158+
if err != nil {
159+
return nil, err
160+
}
161+
fn = resp.(*models.Fn)
162+
da.cache.Set(key, fn, cache.DefaultExpiration)
163+
return fn.(*models.Fn), nil
164+
}

api/datastore/internal/datastoreutil/metrics.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,8 @@ package datastoreutil
33
import (
44
"context"
55

6-
"go.opencensus.io/trace"
7-
86
"github.com/fnproject/fn/api/models"
7+
"go.opencensus.io/trace"
98
)
109

1110
func MetricDS(ds models.Datastore) models.Datastore {

api/server/runner_fninvoke.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,13 @@ func (s *Server) handleFnInvokeCall(c *gin.Context) {
5757
// handleTriggerHTTPFunctionCall2 executes the function and returns an error
5858
// Requires the following in the context:
5959
func (s *Server) handleFnInvokeCall2(c *gin.Context) error {
60-
fn, err := s.lbReadAccess.GetFnByID(c, c.Param(api.FnID))
60+
ctx := c.Request.Context()
61+
fn, err := s.lbReadAccess.GetFnByID(ctx, c.Param(api.FnID))
6162
if err != nil {
6263
return err
6364
}
6465

65-
app, err := s.lbReadAccess.GetAppByID(c, fn.AppID)
66+
app, err := s.lbReadAccess.GetAppByID(ctx, fn.AppID)
6667
if err != nil {
6768
return err
6869
}
@@ -71,7 +72,7 @@ func (s *Server) handleFnInvokeCall2(c *gin.Context) error {
7172
if models.IsFuncError(err) || err == nil {
7273
// report all user-directed errors and function responses from here, after submit has run.
7374
// this is our never ending attempt to distinguish user and platform errors.
74-
ctx, err := tag.New(c.Request.Context(),
75+
ctx, err := tag.New(ctx,
7576
tag.Insert(whodunitKey, "user"),
7677
)
7778
if err != nil {

api/server/server.go

Lines changed: 15 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,6 @@ type Server struct {
201201
// TODO: extend this to cover gRPC options.
202202
svcConfigs map[string]*http.Server
203203

204-
// Agent enqueue and read stores
205204
lbReadAccess agent.ReadDataAccess
206205
noHTTTPTriggerEndpoint bool
207206
noFnInvokeEndpoint bool
@@ -335,8 +334,7 @@ func WithDBURL(dbURL string) Option {
335334
if err != nil {
336335
return err
337336
}
338-
s.datastore = ds
339-
s.lbReadAccess = agent.NewCachedDataAccess(s.datastore)
337+
return WithDatastore(ds)(ctx, s)
340338
}
341339
return nil
342340
}
@@ -361,7 +359,7 @@ func WithTLS(service string, tlsCfg *tls.Config) Option {
361359
// WithReadDataAccess overrides the LB read DataAccess for a server
362360
func WithReadDataAccess(ds agent.ReadDataAccess) Option {
363361
return func(ctx context.Context, s *Server) error {
364-
s.lbReadAccess = ds
362+
s.lbReadAccess = agent.NewMetricReadDataAccess(ds)
365363
return nil
366364
}
367365
}
@@ -370,8 +368,10 @@ func WithReadDataAccess(ds agent.ReadDataAccess) Option {
370368
func WithDatastore(ds models.Datastore) Option {
371369
return func(ctx context.Context, s *Server) error {
372370
s.datastore = ds
371+
s.datastore = datastore.Wrap(s.datastore)
372+
s.datastore = fnext.NewDatastore(s.datastore, s.appListeners, s.fnListeners, s.triggerListeners)
373373
if s.lbReadAccess == nil {
374-
s.lbReadAccess = agent.NewCachedDataAccess(ds)
374+
return WithReadDataAccess(agent.NewCachedDataAccess(s.datastore))(ctx, s)
375375
}
376376
return nil
377377
}
@@ -397,10 +397,6 @@ func (s *Server) defaultRunnerPool() (pool.RunnerPool, error) {
397397
func WithFullAgent() Option {
398398
return func(ctx context.Context, s *Server) error {
399399
s.nodeType = ServerTypeFull
400-
401-
if s.datastore == nil {
402-
return errors.New("full nodes must configure FN_DB_URL")
403-
}
404400
s.agent = agent.New()
405401
return nil
406402
}
@@ -414,9 +410,6 @@ func WithAgentFromEnv() Option {
414410
case ServerTypeAPI:
415411
return errors.New("should not initialize an agent for an Fn API node")
416412
case ServerTypePureRunner:
417-
if s.datastore != nil {
418-
return errors.New("pure runner nodes must not be configured with a datastore (FN_DB_URL)")
419-
}
420413
cancelCtx, cancel := context.WithCancel(ctx)
421414
prAgent, err := agent.DefaultPureRunner(cancel, s.svcConfigs[GRPCServer].Addr, s.svcConfigs[GRPCServer].TLSConfig)
422415
if err != nil {
@@ -430,9 +423,6 @@ func WithAgentFromEnv() Option {
430423
if runnerURL == "" {
431424
return errors.New("no FN_RUNNER_API_URL provided for an Fn NuLB node")
432425
}
433-
if s.datastore != nil {
434-
return errors.New("lb nodes must not be configured with a datastore (FN_DB_URL)")
435-
}
436426

437427
cl, err := hybrid.NewClient(runnerURL)
438428
if err != nil {
@@ -454,7 +444,10 @@ func WithAgentFromEnv() Option {
454444
placer = pool.NewNaivePlacer(&placerCfg)
455445
}
456446

457-
s.lbReadAccess = agent.NewCachedDataAccess(cl)
447+
err = WithReadDataAccess(agent.NewCachedDataAccess(cl))(ctx, s)
448+
if err != nil {
449+
return errors.New("LBAgent creation failed")
450+
}
458451
s.agent, err = agent.NewLBAgent(runnerPool, placer)
459452
if err != nil {
460453
return errors.New("LBAgent creation failed")
@@ -499,6 +492,7 @@ func WithAdminServer(port int) Option {
499492
}
500493
}
501494

495+
// WithHTTPConfig allows configuring specific http servers
502496
func WithHTTPConfig(service string, cfg *http.Server) Option {
503497
return func(ctx context.Context, s *Server) error {
504498
s.svcConfigs[service] = cfg
@@ -528,6 +522,11 @@ func New(ctx context.Context, opts ...Option) *Server {
528522
AdminServer: &http.Server{},
529523
GRPCServer: &http.Server{},
530524
},
525+
// MUST initialize these before opts
526+
appListeners: new(appListeners),
527+
fnListeners: new(fnListeners),
528+
triggerListeners: new(triggerListeners),
529+
531530
// Almost everything else is configured through opts (see NewFromEnv for ex.) or below
532531
}
533532

@@ -596,14 +595,6 @@ func New(ctx context.Context, opts ...Option) *Server {
596595
s.AdminRouter.Use(panicWrap)
597596
s.bindHandlers(ctx)
598597

599-
s.appListeners = new(appListeners)
600-
s.fnListeners = new(fnListeners)
601-
s.triggerListeners = new(triggerListeners)
602-
603-
// TODO it's not clear that this is always correct as the read store won't get wrapping
604-
s.datastore = datastore.Wrap(s.datastore)
605-
s.datastore = fnext.NewDatastore(s.datastore, s.appListeners, s.fnListeners, s.triggerListeners)
606-
607598
return s
608599
}
609600

0 commit comments

Comments
 (0)