Skip to content

Commit 1dfec2c

Browse files
committed
Improve config reload consistency, add tests
1 parent 813964d commit 1dfec2c

File tree

15 files changed

+510
-82
lines changed

15 files changed

+510
-82
lines changed

framework/dns/debugflags.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,12 @@ func init() {
3232
Usage: "replace the DNS resolver address",
3333
Value: "system-default",
3434
Destination: &overrideServ,
35+
Action: func(context *cli.Context, s string) error {
36+
if s != "" && s != "system-default" {
37+
override(s)
38+
}
39+
overrideServ = s
40+
return nil
41+
},
3542
})
3643
}

framework/dns/resolver.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,5 @@ func LookupAddr(ctx context.Context, r Resolver, ip net.IP) (string, error) {
5353
}
5454

5555
func DefaultResolver() Resolver {
56-
if overrideServ != "" && overrideServ != "system-default" {
57-
override(overrideServ)
58-
}
59-
6056
return net.DefaultResolver
6157
}

framework/module/lifetime.go

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,18 +37,27 @@ type ReloadModule interface {
3737
Reload() error
3838
}
3939

40+
// EarlyStopModule is a LifetimeModule that needs to do some bookkeeping
41+
// before new server instance starts during reload.
42+
type EarlyStopModule interface {
43+
LifetimeModule
44+
EarlyStop() error
45+
}
46+
4047
type LifetimeTracker struct {
4148
logger *log.Logger
4249
instances []*struct {
43-
mod LifetimeModule
44-
started bool
50+
mod LifetimeModule
51+
started bool
52+
earlyStopped bool
4553
}
4654
}
4755

4856
func (lt *LifetimeTracker) Add(mod LifetimeModule) {
4957
lt.instances = append(lt.instances, &struct {
50-
mod LifetimeModule
51-
started bool
58+
mod LifetimeModule
59+
started bool
60+
earlyStopped bool
5261
}{mod: mod, started: false})
5362
}
5463

@@ -59,6 +68,9 @@ func (lt *LifetimeTracker) StartAll() error {
5968
continue
6069
}
6170

71+
lt.logger.DebugMsg("starting module",
72+
"mod_name", entry.mod.Name(), "inst_name", entry.mod.InstanceName())
73+
6274
if err := entry.mod.Start(); err != nil {
6375
if err := lt.StopAll(); err != nil {
6476
lt.logger.Error("StopAll failed after Start fail", err)
@@ -96,6 +108,32 @@ func (lt *LifetimeTracker) ReloadAll() error {
96108
return nil
97109
}
98110

111+
func (lt *LifetimeTracker) EarlyStopAll() error {
112+
for i := len(lt.instances) - 1; i >= 0; i-- {
113+
entry := lt.instances[i]
114+
115+
if !entry.started {
116+
continue
117+
}
118+
119+
rsm, ok := entry.mod.(EarlyStopModule)
120+
if !ok {
121+
continue
122+
}
123+
124+
if err := rsm.EarlyStop(); err != nil {
125+
lt.logger.Error("module early stop failed", err,
126+
"mod_name", entry.mod.Name(), "inst_name", entry.mod.InstanceName())
127+
continue
128+
}
129+
lt.logger.DebugMsg("module early stopped",
130+
"mod_name", entry.mod.Name(), "inst_name", entry.mod.InstanceName())
131+
132+
entry.earlyStopped = true
133+
}
134+
return nil
135+
}
136+
99137
// StopAll calls Stop for all registered LifetimeModule instances.
100138
func (lt *LifetimeTracker) StopAll() error {
101139
for i := len(lt.instances) - 1; i >= 0; i-- {

internal/sqlite/modernc_sqlite3.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
//go:build !nosqlite3 && !cgo
1+
//go:build (!nosqlite3 && !cgo) || modernc
22

33
/*
44
Maddy Mail Server - Composable all-in-one email server.

internal/sqlite/sqlite3.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
//go:build !nosqlite3 && cgo
1+
//go:build !nosqlite3 && cgo && !modernc
22

33
/*
44
Maddy Mail Server - Composable all-in-one email server.

internal/target/queue/queue.go

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ type Queue struct {
122122
location string
123123
hostname string
124124
autogenMsgDomain string
125-
wheel *TimeWheel
125+
wheel *TimeWheel[queueSlot]
126126

127127
dsnPipeline module.DeliveryTarget
128128

@@ -211,6 +211,9 @@ func (q *Queue) Configure(inlineArgs []string, cfg *config.Map) error {
211211
cfg.Bool("debug", true, false, &q.Log.Debug)
212212
cfg.Int("max_tries", false, false, 20, &q.maxTries)
213213
cfg.Int("max_parallelism", false, false, 16, &q.maxParallelism)
214+
cfg.Duration("post_init_delay", false, false, q.postInitDelay, &q.postInitDelay)
215+
cfg.Duration("initial_retry_time", false, false, q.initialRetryTime, &q.initialRetryTime)
216+
cfg.Float("retry_time_scale", false, false, q.retryTimeScale, &q.retryTimeScale)
214217
cfg.String("location", false, false, q.location, &q.location)
215218
cfg.Custom("target", false, true, nil, modconfig.DeliveryDirective, &q.Target)
216219
cfg.String("hostname", true, true, "", &q.hostname)
@@ -249,7 +252,7 @@ func (q *Queue) Start() error {
249252
}
250253

251254
func (q *Queue) start(maxParallelism int) error {
252-
q.wheel = NewTimeWheel(q.dispatch)
255+
q.wheel = NewTimeWheel[queueSlot](q.dispatch)
253256
q.deliverySemaphore = make(chan struct{}, maxParallelism)
254257

255258
if err := q.readDiskQueue(); err != nil {
@@ -261,13 +264,18 @@ func (q *Queue) start(maxParallelism int) error {
261264
return nil
262265
}
263266

264-
func (q *Queue) Stop() error {
267+
func (q *Queue) EarlyStop() error {
268+
// We must ensure queue state is consistent on disk before we proceed
269+
// with configuration reload.
265270
q.wheel.Close()
266271
q.deliveryWg.Wait()
267-
268272
return nil
269273
}
270274

275+
func (q *Queue) Stop() error {
276+
return q.EarlyStop()
277+
}
278+
271279
// discardBroken changes the name of metadata file to have .meta_broken
272280
// extension.
273281
//
@@ -283,8 +291,8 @@ func (q *Queue) discardBroken(id string) {
283291
}
284292
}
285293

286-
func (q *Queue) dispatch(value TimeSlot) {
287-
slot := value.Value.(queueSlot)
294+
func (q *Queue) dispatch(ctx context.Context, value TimeSlot[queueSlot]) {
295+
slot := value.Value
288296

289297
q.Log.Debugln("starting delivery for", slot.ID)
290298

@@ -329,7 +337,7 @@ func (q *Queue) dispatch(value TimeSlot) {
329337
body = slot.Body
330338
}
331339

332-
q.tryDelivery(meta, hdr, body)
340+
q.tryDelivery(ctx, meta, hdr, body)
333341
}()
334342
}
335343

@@ -373,10 +381,10 @@ func toSMTPErr(err error) *smtp.SMTPError {
373381
return res
374382
}
375383

376-
func (q *Queue) tryDelivery(meta *QueueMetadata, header textproto.Header, body buffer.Buffer) {
384+
func (q *Queue) tryDelivery(ctx context.Context, meta *QueueMetadata, header textproto.Header, body buffer.Buffer) {
377385
dl := target.DeliveryLogger(q.Log, meta.MsgMeta)
378386

379-
partialErr := q.deliver(meta, header, body)
387+
partialErr := q.deliver(ctx, meta, header, body)
380388
dl.Debugf("errors: %v", partialErr.Errs)
381389

382390
// While iterating the list of recipients we also pick the smallest tries count
@@ -460,7 +468,7 @@ func (q *Queue) tryDelivery(meta *QueueMetadata, header textproto.Header, body b
460468
})
461469
}
462470

463-
func (q *Queue) deliver(meta *QueueMetadata, header textproto.Header, body buffer.Buffer) partialError {
471+
func (q *Queue) deliver(ctx context.Context, meta *QueueMetadata, header textproto.Header, body buffer.Buffer) partialError {
464472
dl := target.DeliveryLogger(q.Log, meta.MsgMeta)
465473
perr := partialError{
466474
Errs: map[string]error{},
@@ -471,7 +479,7 @@ func (q *Queue) deliver(meta *QueueMetadata, header textproto.Header, body buffe
471479
msgMeta.ID = msgMeta.ID + "-" + strconv.FormatInt(time.Now().Unix(), 16)
472480
dl.Debugf("using message ID = %s", msgMeta.ID)
473481

474-
msgCtx, msgTask := trace.NewTask(context.Background(), "Queue delivery")
482+
msgCtx, msgTask := trace.NewTask(ctx, "Queue delivery")
475483
defer msgTask.End()
476484

477485
mailCtx, mailTask := trace.NewTask(msgCtx, "MAIL FROM")
@@ -486,6 +494,15 @@ func (q *Queue) deliver(meta *QueueMetadata, header textproto.Header, body buffe
486494
}
487495
dl.Debugf("target.StartDelivery OK")
488496

497+
// Check in case delivery implementation is actually
498+
// context-unaware.
499+
if err := mailCtx.Err(); err != nil {
500+
for _, rcpt := range meta.To {
501+
perr.Errs[rcpt] = err
502+
}
503+
return perr
504+
}
505+
489506
var acceptedRcpts []string
490507
for _, rcpt := range meta.To {
491508
rcptCtx, rcptTask := trace.NewTask(msgCtx, "RCPT TO")
@@ -497,6 +514,15 @@ func (q *Queue) deliver(meta *QueueMetadata, header textproto.Header, body buffe
497514
acceptedRcpts = append(acceptedRcpts, rcpt)
498515
}
499516
rcptTask.End()
517+
518+
// Check in case delivery implementation is actually
519+
// context-unaware.
520+
if err := mailCtx.Err(); err != nil {
521+
for _, rcpt := range meta.To {
522+
perr.Errs[rcpt] = err
523+
}
524+
return perr
525+
}
500526
}
501527

502528
if len(acceptedRcpts) == 0 {
@@ -513,6 +539,10 @@ func (q *Queue) deliver(meta *QueueMetadata, header textproto.Header, body buffe
513539
}
514540
}
515541

542+
// At this point, it is too late to abort delivery. We should complete
543+
// it or fail it consistently.
544+
msgCtx = context.WithoutCancel(msgCtx)
545+
516546
bodyCtx, bodyTask := trace.NewTask(msgCtx, "DATA")
517547
defer bodyTask.End()
518548

internal/target/queue/timewheel.go

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,64 +20,69 @@ package queue
2020

2121
import (
2222
"container/list"
23+
"context"
2324
"sync"
2425
"sync/atomic"
2526
"time"
2627
)
2728

28-
type TimeSlot struct {
29+
type TimeSlot[Value any] struct {
2930
Time time.Time
30-
Value interface{}
31+
Value Value
3132
}
3233

33-
type TimeWheel struct {
34+
type TimeWheel[Value any] struct {
3435
stopped uint32
3536

3637
slots *list.List
3738
slotsLock sync.Mutex
3839

3940
updateNotify chan time.Time
4041
stopNotify chan struct{}
42+
tickerCtx context.Context
43+
tickerCancel context.CancelFunc
4144

42-
dispatch func(TimeSlot)
45+
dispatch func(context.Context, TimeSlot[Value])
4346
}
4447

45-
func NewTimeWheel(dispatch func(TimeSlot)) *TimeWheel {
46-
tw := &TimeWheel{
48+
func NewTimeWheel[Value any](dispatch func(context.Context, TimeSlot[Value])) *TimeWheel[Value] {
49+
ctx, cancel := context.WithCancel(context.Background())
50+
51+
tw := &TimeWheel[Value]{
4752
slots: list.New(),
4853
stopNotify: make(chan struct{}),
54+
tickerCtx: ctx,
55+
tickerCancel: cancel,
4956
updateNotify: make(chan time.Time),
5057
dispatch: dispatch,
5158
}
52-
go tw.tick()
59+
go tw.tick(context.Background())
5360
return tw
5461
}
5562

56-
func (tw *TimeWheel) Add(target time.Time, value interface{}) {
63+
func (tw *TimeWheel[Value]) Add(target time.Time, value Value) {
5764
if atomic.LoadUint32(&tw.stopped) == 1 {
5865
// Already stopped, ignore.
5966
return
6067
}
6168

62-
if value == nil {
63-
panic("can't insert nil objects into TimeWheel queue")
64-
}
65-
6669
tw.slotsLock.Lock()
67-
tw.slots.PushBack(TimeSlot{Time: target, Value: value})
70+
tw.slots.PushBack(TimeSlot[Value]{Time: target, Value: value})
6871
tw.slotsLock.Unlock()
6972

7073
tw.updateNotify <- target
7174
}
7275

73-
func (tw *TimeWheel) Close() {
76+
func (tw *TimeWheel[Value]) Close() {
7477
atomic.StoreUint32(&tw.stopped, 1)
7578

7679
// Idempotent Close is convenient sometimes.
7780
if tw.stopNotify == nil {
7881
return
7982
}
8083

84+
tw.tickerCancel()
85+
8186
tw.stopNotify <- struct{}{}
8287
<-tw.stopNotify
8388

@@ -86,16 +91,16 @@ func (tw *TimeWheel) Close() {
8691
close(tw.updateNotify)
8792
}
8893

89-
func (tw *TimeWheel) tick() {
94+
func (tw *TimeWheel[Value]) tick(ctx context.Context) {
9095
for {
9196
now := time.Now()
9297
// Look for list element closest to now.
9398
tw.slotsLock.Lock()
94-
var closestSlot TimeSlot
99+
var closestSlot TimeSlot[Value]
95100
var closestEl *list.Element
96101
for e := tw.slots.Front(); e != nil; e = e.Next() {
97-
slot := e.Value.(TimeSlot)
98-
if slot.Time.Sub(now) < closestSlot.Time.Sub(now) || closestSlot.Value == nil {
102+
slot := e.Value.(TimeSlot[Value])
103+
if slot.Time.Sub(now) < closestSlot.Time.Sub(now) || closestEl == nil {
99104
closestSlot = slot
100105
closestEl = e
101106
}
@@ -124,7 +129,7 @@ func (tw *TimeWheel) tick() {
124129
tw.slots.Remove(closestEl)
125130
tw.slotsLock.Unlock()
126131

127-
tw.dispatch(closestSlot)
132+
tw.dispatch(ctx, closestSlot)
128133

129134
break selectloop
130135
case newTarget := <-tw.updateNotify:

0 commit comments

Comments
 (0)