Skip to content

Commit 746c937

Browse files
cstocktonChris Stockton
andauthored
feat: background template reloading p1 - baseline decomposition (#2148)
**Summary** Currently every transactional email fetches its template inside transactions. This adds request latency, blocks database transactions, and causes user visible failures when the template service is slow or unavailable. This pr will address these issues in 3 phases: 1. **Baseline decomposition (this PR)** * Untangle the mailer into layered, single-purpose packages (`mailmeclient`, `noopclient`, `validateclient`, `taskclient`, `templatemailer`). * Simplify API construction with a single `newMailer()` factory inside the api package to composes the pipeline. * No behavioral changes, this is strictly restructuring to make follow ups easier. 2. **Template cache + interface refactor (upcoming)** * Fix the race conditions in the template caching system. * Add methods for the background template reloader to refresh the cache. * Redefine the low-level mail client interface to only accept `(to, subject, body, headers)` while `templatemailer` owns all templating logic. * Introduce some configuration parameters for tuning. 3. **Introduce background template reloading** * Add a worker at startup similar to the previous PR for background workers. * Periodically update the cache at a configurable interval. * Notify the template reloader to update when config reloads occur. * Remove blocking calls to the template cache unless a cache miss occurs. * Never drop a template from cache before having an updated entry. --------- Co-authored-by: Chris Stockton <[email protected]>
1 parent a89a0b0 commit 746c937

File tree

19 files changed

+1234
-690
lines changed

19 files changed

+1234
-690
lines changed

cmd/serve_cmd.go

Lines changed: 52 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ import (
1414
"github.com/sirupsen/logrus"
1515
"github.com/spf13/cobra"
1616
"github.com/supabase/auth/internal/api"
17+
"github.com/supabase/auth/internal/api/apiworker"
1718
"github.com/supabase/auth/internal/conf"
19+
"github.com/supabase/auth/internal/mailer/templatemailer"
1820
"github.com/supabase/auth/internal/reloader"
1921
"github.com/supabase/auth/internal/storage"
2022
"github.com/supabase/auth/internal/utilities"
@@ -48,22 +50,24 @@ func serve(ctx context.Context) {
4850
}
4951
defer db.Close()
5052

51-
addr := net.JoinHostPort(config.API.Host, config.API.Port)
52-
53-
opts := []api.Option{
54-
api.NewLimiterOptions(config),
55-
}
56-
5753
baseCtx, baseCancel := context.WithCancel(context.Background())
5854
defer baseCancel()
5955

6056
var wg sync.WaitGroup
6157
defer wg.Wait() // Do not return to caller until this goroutine is done.
6258

63-
a := api.NewAPIWithVersion(config, db, utilities.Version, opts...)
64-
ah := reloader.NewAtomicHandler(a)
65-
logrus.WithField("version", a.Version()).Infof("GoTrue API started on: %s", addr)
59+
mrCache := templatemailer.NewCache()
60+
limiterOpts := api.NewLimiterOptions(config)
61+
initialAPI := api.NewAPIWithVersion(
62+
config, db, utilities.Version,
63+
limiterOpts,
64+
api.WithMailer(templatemailer.FromConfig(config, mrCache)),
65+
)
6666

67+
addr := net.JoinHostPort(config.API.Host, config.API.Port)
68+
logrus.WithField("version", initialAPI.Version()).Infof("GoTrue API started on: %s", addr)
69+
70+
ah := reloader.NewAtomicHandler(initialAPI)
6771
httpSrv := &http.Server{
6872
Addr: addr,
6973
Handler: ah,
@@ -74,15 +78,53 @@ func serve(ctx context.Context) {
7478
}
7579
log := logrus.WithField("component", "api")
7680

81+
wrkLog := logrus.WithField("component", "apiworker")
82+
wrk := apiworker.New(config, mrCache, wrkLog)
83+
wg.Add(1)
84+
go func() {
85+
defer wg.Done()
86+
87+
var err error
88+
defer func() {
89+
logFn := wrkLog.Info
90+
if err != nil {
91+
logFn = wrkLog.WithError(err).Error
92+
}
93+
logFn("background apiworker is exiting")
94+
}()
95+
96+
// Work exits when ctx is done as in-flight requests do not depend
97+
// on it. If they do in the future this should be baseCtx instead.
98+
err = wrk.Work(ctx)
99+
}()
100+
77101
if watchDir != "" {
78102
wg.Add(1)
79103
go func() {
80104
defer wg.Done()
81105

82106
fn := func(latestCfg *conf.GlobalConfiguration) {
83107
log.Info("reloading api with new configuration")
108+
109+
// When config is updated we notify the apiworker.
110+
wrk.ReloadConfig(latestCfg)
111+
112+
// Create a new API version with the updated config.
84113
latestAPI := api.NewAPIWithVersion(
85-
latestCfg, db, utilities.Version, opts...)
114+
config, db, utilities.Version,
115+
116+
// Create a new mailer with existing template cache.
117+
api.WithMailer(
118+
templatemailer.FromConfig(config, mrCache),
119+
),
120+
121+
// Persist existing rate limiters.
122+
//
123+
// TODO(cstockton): we should consider updating these, if we
124+
// rely on hot config reloads 100% then rate limiter changes
125+
// won't be picked up.
126+
limiterOpts,
127+
)
86128
ah.Store(latestAPI)
87129
}
88130

internal/api/api.go

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/supabase/auth/internal/hooks/hookspgfunc"
1717
"github.com/supabase/auth/internal/hooks/v0hooks"
1818
"github.com/supabase/auth/internal/mailer"
19+
"github.com/supabase/auth/internal/mailer/templatemailer"
1920
"github.com/supabase/auth/internal/models"
2021
"github.com/supabase/auth/internal/observability"
2122
"github.com/supabase/auth/internal/storage"
@@ -38,11 +39,11 @@ type API struct {
3839
config *conf.GlobalConfiguration
3940
version string
4041

41-
hooksMgr *v0hooks.Manager
42-
hibpClient *hibp.PwnedClient
43-
oauthServer *oauthserver.Server
44-
mailerClientFunc func() mailer.MailClient
45-
tokenService *tokens.Service
42+
hooksMgr *v0hooks.Manager
43+
hibpClient *hibp.PwnedClient
44+
oauthServer *oauthserver.Server
45+
tokenService *tokens.Service
46+
mailer mailer.Mailer
4647

4748
// overrideTime can be used to override the clock used by handlers. Should only be used in tests!
4849
overrideTime func() time.Time
@@ -53,6 +54,7 @@ type API struct {
5354
func (a *API) GetConfig() *conf.GlobalConfiguration { return a.config }
5455
func (a *API) GetDB() *storage.Connection { return a.db }
5556
func (a *API) GetTokenService() *tokens.Service { return a.tokenService }
57+
func (a *API) Mailer() mailer.Mailer { return a.mailer }
5658

5759
func (a *API) Version() string {
5860
return a.version
@@ -99,11 +101,6 @@ func NewAPIWithVersion(globalConfig *conf.GlobalConfiguration, db *storage.Conne
99101
if api.limiterOpts == nil {
100102
api.limiterOpts = NewLimiterOptions(globalConfig)
101103
}
102-
if api.mailerClientFunc == nil {
103-
api.mailerClientFunc = func() mailer.MailClient {
104-
return mailer.NewMailClient(globalConfig)
105-
}
106-
}
107104
if api.hooksMgr == nil {
108105
httpDr := hookshttp.New()
109106
pgfuncDr := hookspgfunc.New(db)
@@ -114,6 +111,10 @@ func NewAPIWithVersion(globalConfig *conf.GlobalConfiguration, db *storage.Conne
114111
if api.tokenService == nil {
115112
api.tokenService = tokens.NewService(globalConfig, api.hooksMgr)
116113
}
114+
if api.mailer == nil {
115+
tc := templatemailer.NewCache()
116+
api.mailer = templatemailer.FromConfig(globalConfig, tc)
117+
}
117118

118119
// Connect token service to API's time function (supports test overrides)
119120
api.tokenService.SetTimeFunc(api.Now)
@@ -397,12 +398,6 @@ func (a *API) HealthCheck(w http.ResponseWriter, r *http.Request) error {
397398
})
398399
}
399400

400-
// Mailer returns NewMailer with the current tenant config
401-
func (a *API) Mailer() mailer.Mailer {
402-
config := a.config
403-
return mailer.NewMailerWithClient(config, a.mailerClientFunc())
404-
}
405-
406401
// ServeHTTP implements the http.Handler interface by passing the request along
407402
// to its underlying Handler.
408403
func (a *API) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package apiworker
2+
3+
import (
4+
"context"
5+
"errors"
6+
"sync"
7+
"time"
8+
9+
"github.com/sirupsen/logrus"
10+
"github.com/supabase/auth/internal/conf"
11+
"github.com/supabase/auth/internal/mailer/templatemailer"
12+
)
13+
14+
// Worker is a simple background worker for async tasks.
15+
type Worker struct {
16+
le *logrus.Entry
17+
tc *templatemailer.Cache
18+
19+
// Notifies worker the cfg has been updated.
20+
cfgCh chan struct{}
21+
22+
// workMu must be held for calls to Work
23+
workMu sync.Mutex
24+
25+
// mu must be held for field access below here
26+
mu sync.Mutex
27+
cfg *conf.GlobalConfiguration
28+
}
29+
30+
// New will return a new *Worker instance.
31+
func New(
32+
cfg *conf.GlobalConfiguration,
33+
tc *templatemailer.Cache,
34+
le *logrus.Entry,
35+
) *Worker {
36+
return &Worker{
37+
le: le,
38+
cfg: cfg,
39+
tc: tc,
40+
cfgCh: make(chan struct{}, 1),
41+
}
42+
}
43+
44+
func (o *Worker) putConfig(cfg *conf.GlobalConfiguration) {
45+
o.mu.Lock()
46+
defer o.mu.Unlock()
47+
o.cfg = cfg
48+
}
49+
50+
func (o *Worker) getConfig() *conf.GlobalConfiguration {
51+
o.mu.Lock()
52+
defer o.mu.Unlock()
53+
return o.cfg
54+
}
55+
56+
// ReloadConfig notifies the worker a new configuration is available.
57+
func (o *Worker) ReloadConfig(cfg *conf.GlobalConfiguration) {
58+
o.putConfig(cfg)
59+
60+
select {
61+
case o.cfgCh <- struct{}{}:
62+
default:
63+
}
64+
}
65+
66+
// Work will periodically reload the templates in the background as long as the
67+
// system remains active.
68+
func (o *Worker) Work(ctx context.Context) error {
69+
if ok := o.workMu.TryLock(); !ok {
70+
return errors.New("apiworker: concurrent calls to Work are invalid")
71+
}
72+
defer o.workMu.Unlock()
73+
74+
le := o.le.WithFields(logrus.Fields{
75+
"worker_type": "apiworker_template_cache",
76+
})
77+
le.Info("apiworker: template cache worker started")
78+
defer le.Info("apiworker: template cache worker exited")
79+
80+
// Reload templates right away on Work.
81+
o.maybeReloadTemplates(ctx, o.getConfig())
82+
83+
ival := func() time.Duration {
84+
return max(time.Second, o.getConfig().Mailer.TemplateRetryInterval/4)
85+
}
86+
87+
tr := time.NewTicker(ival())
88+
defer tr.Stop()
89+
90+
for {
91+
select {
92+
case <-ctx.Done():
93+
return ctx.Err()
94+
case <-o.cfgCh:
95+
tr.Reset(ival())
96+
case <-tr.C:
97+
}
98+
99+
// Either ticker fired or we got a config update.
100+
o.maybeReloadTemplates(ctx, o.getConfig())
101+
}
102+
}
103+
104+
func (o *Worker) maybeReloadTemplates(
105+
ctx context.Context,
106+
cfg *conf.GlobalConfiguration,
107+
) {
108+
if cfg.Mailer.TemplateReloadingEnabled {
109+
o.tc.Reload(ctx, cfg)
110+
}
111+
}

internal/api/mail.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/supabase/auth/internal/hooks/v0hooks"
1010
mail "github.com/supabase/auth/internal/mailer"
11+
"github.com/supabase/auth/internal/mailer/validateclient"
1112
"go.opentelemetry.io/otel/attribute"
1213
"go.opentelemetry.io/otel/metric"
1314

@@ -705,9 +706,9 @@ func (a *API) sendEmail(r *http.Request, tx *storage.Connection, u *models.User,
705706
}
706707

707708
switch {
708-
case errors.Is(err, mail.ErrInvalidEmailAddress),
709-
errors.Is(err, mail.ErrInvalidEmailFormat),
710-
errors.Is(err, mail.ErrInvalidEmailDNS):
709+
case errors.Is(err, validateclient.ErrInvalidEmailAddress),
710+
errors.Is(err, validateclient.ErrInvalidEmailFormat),
711+
errors.Is(err, validateclient.ErrInvalidEmailDNS):
711712
return apierrors.NewBadRequestError(
712713
apierrors.ErrorCodeEmailAddressInvalid,
713714
"Email address %q is invalid",

internal/api/options.go

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,21 @@ type Option interface {
1515
apply(*API)
1616
}
1717

18-
type MailerOptions struct {
19-
MailerClientFunc func() mailer.MailClient
18+
type optionFunc func(*API)
19+
20+
func (f optionFunc) apply(a *API) { f(a) }
21+
22+
func WithMailer(m mailer.Mailer) Option {
23+
return optionFunc(func(a *API) {
24+
a.mailer = m
25+
})
2026
}
2127

22-
func (mo *MailerOptions) apply(a *API) { a.mailerClientFunc = mo.MailerClientFunc }
28+
func WithTokenService(service *tokens.Service) Option {
29+
return optionFunc(func(a *API) {
30+
a.tokenService = service
31+
})
32+
}
2333

2434
type LimiterOptions struct {
2535
Email ratelimit.Limiter
@@ -44,19 +54,6 @@ type LimiterOptions struct {
4454

4555
func (lo *LimiterOptions) apply(a *API) { a.limiterOpts = lo }
4656

47-
// TokenServiceOption allows injecting a custom token service
48-
type TokenServiceOption struct {
49-
service *tokens.Service
50-
}
51-
52-
func WithTokenService(service *tokens.Service) *TokenServiceOption {
53-
return &TokenServiceOption{service: service}
54-
}
55-
56-
func (tso *TokenServiceOption) apply(a *API) {
57-
a.tokenService = tso.service
58-
}
59-
6057
func NewLimiterOptions(gc *conf.GlobalConfiguration) *LimiterOptions {
6158
o := &LimiterOptions{}
6259

internal/api/options_test.go

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,7 @@ import (
44
"testing"
55

66
"github.com/stretchr/testify/assert"
7-
"github.com/stretchr/testify/require"
87
"github.com/supabase/auth/internal/conf"
9-
"github.com/supabase/auth/internal/e2e"
10-
"github.com/supabase/auth/internal/mailer"
118
)
129

1310
func TestNewLimiterOptions(t *testing.T) {
@@ -31,17 +28,3 @@ func TestNewLimiterOptions(t *testing.T) {
3128
assert.NotNil(t, rl.SSO)
3229
assert.NotNil(t, rl.SAMLAssertion)
3330
}
34-
35-
func TestMailerOptions(t *testing.T) {
36-
globalCfg := e2e.Must(e2e.Config())
37-
conn := e2e.Must(e2e.Conn(globalCfg))
38-
39-
sentinelMailer := mailer.NewMailClient(globalCfg)
40-
mailerOpts := &MailerOptions{MailerClientFunc: func() mailer.MailClient {
41-
return sentinelMailer
42-
}}
43-
a := NewAPIWithVersion(globalCfg, conn, apiTestVersion, mailerOpts)
44-
45-
got := a.mailerClientFunc()
46-
require.Equal(t, sentinelMailer, got)
47-
}

0 commit comments

Comments
 (0)