Skip to content

Commit 8a154da

Browse files
authored
internal/ctlog: evict low priority entries from pool under load (#56)
If the pool is full, any lower-priority entries will be evicted and replaced by higher-priority submissions. Lower-priority entries are precertificates with NotBefore more than 48h in the past, or certificates with an SCT extension. Evicted entries are served a 503 immediately.
1 parent ef68bdf commit 8a154da

File tree

6 files changed

+190
-17
lines changed

6 files changed

+190
-17
lines changed

cmd/sunlight/sunlight.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -261,9 +261,11 @@ type LogConfig struct {
261261
Cache string
262262

263263
// PoolSize is the maximum number of chains pending in the sequencing pool.
264-
// Since the pool is sequenced every second, it works as a qps limit. If the
265-
// pool is full, add-chain requests will be rejected with a 503. Zero means
266-
// no limit.
264+
// Since the pool is sequenced every Period, it works as a qps limit. If the
265+
// pool is full, lower-priority entries will be evicted and replaced if
266+
// possible, and otherwise add-chain requests will be rejected with a 503.
267+
// Lower-priority entries are precertificates with NotBefore more than 48h
268+
// in the past, or certificates with an SCT extension. Zero means no limit.
267269
PoolSize int
268270

269271
// S3Region is the AWS region for the S3 bucket.

internal/ctlog/ctlog.go

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -586,6 +586,7 @@ func computeCacheHash(Certificate []byte, IsPrecert bool, IssuerKeyHash [32]byte
586586
type pool struct {
587587
pendingLeaves []*PendingLogEntry
588588
byHash map[cacheHash]waitEntryFunc
589+
lowPriority map[int]func() // pendingLeaves idx => cancel func
589590

590591
// done is closed when the pool has been sequenced and
591592
// the results below are ready.
@@ -605,18 +606,23 @@ type waitEntryFunc func(ctx context.Context) (*sunlight.LogEntry, error)
605606

606607
func newPool() *pool {
607608
return &pool{
608-
done: make(chan struct{}),
609-
byHash: make(map[cacheHash]waitEntryFunc),
609+
done: make(chan struct{}),
610+
byHash: make(map[cacheHash]waitEntryFunc),
611+
lowPriority: make(map[int]func()),
610612
}
611613
}
612614

613615
var errPoolFull = fmtErrorf("rate limited")
616+
var errEvicted = fmtErrorf("evicted to make way for higher priority leaves")
614617

615618
// addLeafToPool adds leaf to the current pool, unless it is found in a
616619
// deduplication cache. It returns a function that will wait until the pool is
617620
// sequenced and return the sequenced leaf, as well as the source of the
618621
// sequenced leaf (pool or cache if deduplicated, sequencer otherwise).
619-
func (l *Log) addLeafToPool(ctx context.Context, leaf *PendingLogEntry) (f waitEntryFunc, source string) {
622+
//
623+
// Low priority entries might get evicted to make space for high priority ones,
624+
// in which case the waitEntryFunc of the evicted entry will immediately return.
625+
func (l *Log) addLeafToPool(ctx context.Context, leaf *PendingLogEntry, lowPriority bool) (f waitEntryFunc, source string) {
620626
// We could marginally more efficiently do uploadIssuer after checking the
621627
// caches, but it's simpler for the the block below to be under a single
622628
// poolMu lock, and uploadIssuer goes to the network so we don't want to
@@ -656,16 +662,43 @@ func (l *Log) addLeafToPool(ctx context.Context, leaf *PendingLogEntry) (f waitE
656662
}
657663
n := len(p.pendingLeaves)
658664
if l.c.PoolSize > 0 && n >= l.c.PoolSize {
659-
return func(ctx context.Context) (*sunlight.LogEntry, error) {
660-
return nil, errPoolFull
661-
}, "ratelimit"
665+
if lowPriority || len(p.lowPriority) == 0 {
666+
return func(ctx context.Context) (*sunlight.LogEntry, error) {
667+
return nil, errPoolFull
668+
}, "ratelimit"
669+
}
670+
for nn, cancel := range p.lowPriority {
671+
cancel()
672+
delete(p.lowPriority, nn)
673+
n = nn
674+
p.pendingLeaves[n] = leaf
675+
break
676+
}
677+
} else {
678+
p.pendingLeaves = append(p.pendingLeaves, leaf)
679+
}
680+
var cancelChan chan struct{}
681+
if lowPriority {
682+
cancelChan = make(chan struct{})
683+
p.lowPriority[n] = func() {
684+
close(cancelChan)
685+
}
662686
}
663-
p.pendingLeaves = append(p.pendingLeaves, leaf)
664687
f = func(ctx context.Context) (*sunlight.LogEntry, error) {
665688
select {
666689
case <-ctx.Done():
667690
return nil, fmtErrorf("context canceled while waiting for sequencing: %w", ctx.Err())
691+
case <-cancelChan:
692+
return nil, errEvicted
668693
case <-p.done:
694+
if err := ctx.Err(); err != nil {
695+
return nil, fmtErrorf("context canceled while waiting for sequencing: %w", err)
696+
}
697+
select {
698+
case <-cancelChan:
699+
return nil, errEvicted
700+
default:
701+
}
669702
if p.err != nil {
670703
return nil, p.err
671704
}

internal/ctlog/ctlog_test.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,117 @@ func TestSequenceUploadPaths(t *testing.T) {
229229
}
230230
}
231231

232+
func TestRatelimit(t *testing.T) {
233+
tl := NewEmptyTestLog(t)
234+
tl.Config.PoolSize = 10
235+
236+
var pendingEvictions []ctlog.WaitEntryFunc
237+
addCertificateExpectEvictionWithSeed := func(seed int64) string {
238+
r := mathrand.New(mathrand.NewSource(seed))
239+
e := &ctlog.PendingLogEntry{}
240+
e.Certificate = make([]byte, r.Intn(4)+8)
241+
r.Read(e.Certificate)
242+
e.Issuers = chains[r.Intn(len(chains))]
243+
f, source := tl.Log.AddLeafToPoolWithLowPriority(e)
244+
pendingEvictions = append(pendingEvictions, f)
245+
return source
246+
}
247+
addCertificateExpectEviction := func() {
248+
addCertificateExpectEvictionWithSeed(mathrand.Int63())
249+
}
250+
checkEvictions := func() {
251+
for _, f := range pendingEvictions {
252+
_, err := f(t.Context())
253+
if err != ctlog.ErrEvicted {
254+
t.Errorf("got error %v, expected ErrEvicted", err)
255+
}
256+
}
257+
pendingEvictions = nil
258+
}
259+
260+
addLowPriorityExpectRatelimit := func() {
261+
r := mathrand.New(mathrand.NewSource(mathrand.Int63()))
262+
e := &ctlog.PendingLogEntry{}
263+
e.Certificate = make([]byte, r.Intn(4)+8)
264+
r.Read(e.Certificate)
265+
e.Issuers = chains[r.Intn(len(chains))]
266+
f, source := tl.Log.AddLeafToPoolWithLowPriority(e)
267+
if source != "ratelimit" {
268+
t.Errorf("got source %q, expected \"ratelimit\"", source)
269+
}
270+
if _, err := f(t.Context()); err != ctlog.ErrPoolFull {
271+
t.Errorf("got error %v, expected ErrPoolFull", err)
272+
}
273+
}
274+
275+
// When the pool is full of high-priority entries, new entries are rejected.
276+
for range 10 {
277+
addCertificate(t, tl)
278+
}
279+
for range 10 {
280+
addCertificateExpectFailure(t, tl)
281+
}
282+
for range 10 {
283+
addLowPriorityExpectRatelimit()
284+
}
285+
fatalIfErr(t, tl.Log.Sequence())
286+
tl.CheckLog(10)
287+
288+
// When there are low-priority entries, high-priority entries cause them to
289+
// be evicted.
290+
for range 5 {
291+
addCertificate(t, tl)
292+
}
293+
for range 3 {
294+
addCertificateExpectEviction()
295+
}
296+
for range 2 {
297+
addCertificate(t, tl)
298+
}
299+
addLowPriorityExpectRatelimit()
300+
for range 3 {
301+
addCertificate(t, tl)
302+
}
303+
addCertificateExpectFailure(t, tl)
304+
// Evictions unblock before the sequencing.
305+
checkEvictions()
306+
fatalIfErr(t, tl.Log.Sequence())
307+
tl.CheckLog(20)
308+
309+
// If we were to wait to call the waitFunc until after sequencing, the
310+
// evictions would still know they were evicted.
311+
for range 5 {
312+
addCertificateExpectEviction()
313+
}
314+
for range 10 {
315+
addCertificate(t, tl)
316+
}
317+
fatalIfErr(t, tl.Log.Sequence())
318+
tl.CheckLog(30)
319+
checkEvictions()
320+
321+
// If a low-priority entry is deduplicated (resubmitted to the same pool)
322+
// and then evicted, both callers get ErrEvicted.
323+
for range 5 {
324+
addCertificate(t, tl)
325+
}
326+
seed := mathrand.Int63()
327+
if source := addCertificateExpectEvictionWithSeed(seed); source != "sequencer" {
328+
t.Errorf("got source %q, expected \"sequencer\"", source)
329+
}
330+
if source := addCertificateExpectEvictionWithSeed(seed); source != "pool" {
331+
t.Errorf("got source %q, expected \"pool\"", source)
332+
}
333+
for range 4 {
334+
addCertificate(t, tl)
335+
}
336+
// Evict the low-priority entry by filling the pool with high-priority ones.
337+
addCertificate(t, tl)
338+
checkEvictions()
339+
fatalIfErr(t, tl.Log.Sequence())
340+
tl.CheckLog(40)
341+
}
342+
232343
func TestDuplicates(t *testing.T) {
233344
t.Run("Certificates", func(t *testing.T) {
234345
testDuplicates(t, addCertificateWithSeed)

internal/ctlog/export_test.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,17 @@ import (
66
"filippo.io/sunlight"
77
)
88

9-
func (l *Log) AddLeafToPool(e *PendingLogEntry) (waitEntryFunc, string) {
10-
return l.addLeafToPool(context.Background(), e)
9+
var ErrEvicted = errEvicted
10+
var ErrPoolFull = errPoolFull
11+
12+
type WaitEntryFunc = waitEntryFunc
13+
14+
func (l *Log) AddLeafToPool(e *PendingLogEntry) (WaitEntryFunc, string) {
15+
return l.addLeafToPool(context.Background(), e, false)
16+
}
17+
18+
func (l *Log) AddLeafToPoolWithLowPriority(e *PendingLogEntry) (WaitEntryFunc, string) {
19+
return l.addLeafToPool(context.Background(), e, true)
1120
}
1221

1322
func (l *Log) Sequence() error {

internal/ctlog/http.go

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func (l *Log) addPreChain(rw http.ResponseWriter, r *http.Request) {
118118

119119
func (l *Log) addChainOrPreChain(ctx context.Context, reqBody io.ReadCloser, checkType func(*PendingLogEntry) error) (response []byte, code int, err error) {
120120
labels := prometheus.Labels{"error": "", "issuer": "", "root": "", "reused": "",
121-
"precert": "", "preissuer": "", "chain_len": "", "source": ""}
121+
"precert": "", "preissuer": "", "chain_len": "", "low_priority": "", "source": ""}
122122
defer func() {
123123
if err != nil {
124124
labels["error"] = errorCategory(err)
@@ -147,9 +147,11 @@ func (l *Log) addChainOrPreChain(ctx context.Context, reqBody io.ReadCloser, che
147147
if err != nil {
148148
return nil, http.StatusBadRequest, fmtErrorf("invalid chain: %w", err)
149149
}
150+
lowPriority := lowPriority(chain[0])
150151
labels["chain_len"] = fmt.Sprintf("%d", len(chain))
151152
labels["root"] = x509util.NameToString(chain[len(chain)-1].Subject)
152153
labels["issuer"] = x509util.NameToString(chain[0].Issuer)
154+
labels["low_priority"] = fmt.Sprintf("%v", lowPriority)
153155

154156
e := &PendingLogEntry{Certificate: chain[0].Raw}
155157
for _, issuer := range chain[1:] {
@@ -195,14 +197,17 @@ func (l *Log) addChainOrPreChain(ctx context.Context, reqBody io.ReadCloser, che
195197
return nil, http.StatusBadRequest, err
196198
}
197199

198-
waitLeaf, source := l.addLeafToPool(ctx, e)
200+
waitLeaf, source := l.addLeafToPool(ctx, e, lowPriority)
199201
labels["source"] = source
200202
waitTimer := prometheus.NewTimer(l.m.AddChainWait)
201203
seq, err := waitLeaf(ctx)
202-
if source == "sequencer" {
204+
if source == "sequencer" && err != errEvicted {
203205
waitTimer.ObserveDuration()
204206
}
205-
if err == errPoolFull {
207+
if err == errEvicted {
208+
labels["source"] = "evicted"
209+
}
210+
if err == errPoolFull || err == errEvicted {
206211
return nil, http.StatusServiceUnavailable, err
207212
} else if errors.As(err, new(SunsetLogError)) {
208213
return nil, http.StatusGone, err
@@ -236,6 +241,19 @@ func (l *Log) addChainOrPreChain(ctx context.Context, reqBody io.ReadCloser, che
236241
return rsp, http.StatusOK, nil
237242
}
238243

244+
func lowPriority(c *x509.Certificate) bool {
245+
if isPrecert, _ := ctfe.IsPrecertificate(c); isPrecert {
246+
// The BRs allow at most 48 hours of backdating. A precertificate older
247+
// than that can't turn into a valid certificate anymore, so it must be
248+
// cross-posted.
249+
return time.Since(c.NotBefore) >= 48*time.Hour
250+
}
251+
// If a certificate has SCTs, it's already been logged. It'd be better to
252+
// verify the signatures, but this check is meant for when we are under load
253+
// and need to prioritize.
254+
return len(c.SCTList.SCTList) > 0
255+
}
256+
239257
func (l *Log) getRoots(rw http.ResponseWriter, r *http.Request) {
240258
roots := l.rootPool().RawCertificates()
241259
var res struct {

internal/ctlog/metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ func initMetrics() metrics {
192192
Name: "addchain_requests_total",
193193
Help: "Number of add-[pre-]chain requests, by chain characteristics and errors if any.",
194194
},
195-
[]string{"error", "issuer", "root", "precert", "preissuer", "chain_len", "source", "reused"},
195+
[]string{"error", "issuer", "root", "precert", "preissuer", "chain_len", "low_priority", "source", "reused"},
196196
),
197197
AddChainWait: prometheus.NewSummary(
198198
prometheus.SummaryOpts{

0 commit comments

Comments
 (0)