Skip to content

Commit 7658355

Browse files
authored
Simplify crl-updater's simultaneous gRPC streams (#6419)
Previously, we would stream CRL Entries directly from the SA's response stream into the CA's request stream, and similarly directly stream bytes from the CA's response stream into the Storer's request stream. Since we're seeing odd errors and inconsistencies in our gRPC streaming metrics, simplify these to only conduct one stream at a time. This will make our streaming and error semantics much simpler, at the cost of memory usage in the updater.
1 parent 868225d commit 7658355

File tree

2 files changed

+61
-45
lines changed

2 files changed

+61
-45
lines changed

crl/updater/updater.go

Lines changed: 45 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/prometheus/client_golang/prometheus"
1414

1515
capb "github.com/letsencrypt/boulder/ca/proto"
16+
"github.com/letsencrypt/boulder/core/proto"
1617
"github.com/letsencrypt/boulder/crl"
1718
cspb "github.com/letsencrypt/boulder/crl/storer/proto"
1819
"github.com/letsencrypt/boulder/issuance"
@@ -284,6 +285,8 @@ func (cu *crlUpdater) tickShard(ctx context.Context, atTime time.Time, issuerNam
284285
ctx, cancel := context.WithCancel(ctx)
285286
defer cancel()
286287

288+
crlID := crl.Id(issuerNameID, crl.Number(atTime), shardIdx)
289+
287290
start := cu.clk.Now()
288291
defer func() {
289292
// This func closes over the named return value `err`, so can reference it.
@@ -294,10 +297,13 @@ func (cu *crlUpdater) tickShard(ctx context.Context, atTime time.Time, issuerNam
294297
cu.tickHistogram.WithLabelValues(cu.issuers[issuerNameID].Subject.CommonName, result).Observe(cu.clk.Since(start).Seconds())
295298
cu.updatedCounter.WithLabelValues(cu.issuers[issuerNameID].Subject.CommonName, result).Inc()
296299
}()
297-
cu.log.Debugf("Ticking shard %d of issuer %d at time %s", shardIdx, issuerNameID, atTime)
298300

299301
expiresAfter, expiresBefore := cu.getShardBoundaries(atTime, shardIdx)
302+
cu.log.Infof(
303+
"Generating CRL shard: id=[%s] expiresAfter=[%s] expiresBefore=[%s]",
304+
crlID, expiresAfter, expiresBefore)
300305

306+
// Get the full list of CRL Entries for this shard from the SA.
301307
saStream, err := cu.sa.GetRevokedCerts(ctx, &sapb.GetRevokedCertsRequest{
302308
IssuerNameID: int64(issuerNameID),
303309
ExpiresAfter: expiresAfter.UnixNano(),
@@ -308,6 +314,21 @@ func (cu *crlUpdater) tickShard(ctx context.Context, atTime time.Time, issuerNam
308314
return fmt.Errorf("connecting to SA: %w", err)
309315
}
310316

317+
var crlEntries []*proto.CRLEntry
318+
for {
319+
entry, err := saStream.Recv()
320+
if err != nil {
321+
if err == io.EOF {
322+
break
323+
}
324+
return fmt.Errorf("retrieving entry from SA: %w", err)
325+
}
326+
crlEntries = append(crlEntries, entry)
327+
}
328+
329+
cu.log.Infof("Queried SA for CRL shard: id=[%s] numEntries=[%s]")
330+
331+
// Send the full list of CRL Entries to the CA.
311332
caStream, err := cu.ca.GenerateCRL(ctx)
312333
if err != nil {
313334
return fmt.Errorf("connecting to CA: %w", err)
@@ -326,15 +347,7 @@ func (cu *crlUpdater) tickShard(ctx context.Context, atTime time.Time, issuerNam
326347
return fmt.Errorf("sending CA metadata: %w", err)
327348
}
328349

329-
for {
330-
entry, err := saStream.Recv()
331-
if err != nil {
332-
if err == io.EOF {
333-
break
334-
}
335-
return fmt.Errorf("retrieving entry from SA: %w", err)
336-
}
337-
350+
for _, entry := range crlEntries {
338351
err = caStream.Send(&capb.GenerateCRLRequest{
339352
Payload: &capb.GenerateCRLRequest_Entry{
340353
Entry: entry,
@@ -345,14 +358,30 @@ func (cu *crlUpdater) tickShard(ctx context.Context, atTime time.Time, issuerNam
345358
}
346359
}
347360

348-
// It's okay to close the CA send stream before we start reading from the
349-
// receive stream, because we know that the CA has to hold the entire tbsCRL
350-
// in memory before it can sign it and start returning the real CRL.
351361
err = caStream.CloseSend()
352362
if err != nil {
353363
return fmt.Errorf("closing CA request stream: %w", err)
354364
}
355365

366+
// Receive the full bytes of the signed CRL from the CA.
367+
crlLen := 0
368+
crlHash := sha256.New()
369+
var crlChunks [][]byte
370+
for {
371+
out, err := caStream.Recv()
372+
if err != nil {
373+
if err == io.EOF {
374+
break
375+
}
376+
return fmt.Errorf("receiving CRL bytes: %w", err)
377+
}
378+
379+
crlLen += len(out.Chunk)
380+
crlHash.Write(out.Chunk)
381+
crlChunks = append(crlChunks, out.Chunk)
382+
}
383+
384+
// Send the full bytes of the signed CRL to the Storer.
356385
csStream, err := cu.cs.UploadCRL(ctx)
357386
if err != nil {
358387
return fmt.Errorf("connecting to CRLStorer: %w", err)
@@ -371,28 +400,15 @@ func (cu *crlUpdater) tickShard(ctx context.Context, atTime time.Time, issuerNam
371400
return fmt.Errorf("sending CRLStorer metadata: %w", err)
372401
}
373402

374-
crlLen := 0
375-
crlHash := sha256.New()
376-
for {
377-
out, err := caStream.Recv()
378-
if err != nil {
379-
if err == io.EOF {
380-
break
381-
}
382-
return fmt.Errorf("receiving CRL bytes: %w", err)
383-
}
384-
403+
for _, chunk := range crlChunks {
385404
err = csStream.Send(&cspb.UploadCRLRequest{
386405
Payload: &cspb.UploadCRLRequest_CrlChunk{
387-
CrlChunk: out.Chunk,
406+
CrlChunk: chunk,
388407
},
389408
})
390409
if err != nil {
391410
return fmt.Errorf("uploading CRL bytes: %w", err)
392411
}
393-
394-
crlLen += len(out.Chunk)
395-
crlHash.Write(out.Chunk)
396412
}
397413

398414
_, err = csStream.CloseAndRecv()
@@ -402,7 +418,7 @@ func (cu *crlUpdater) tickShard(ctx context.Context, atTime time.Time, issuerNam
402418

403419
cu.log.Infof(
404420
"Generated CRL shard: id=[%s] size=[%d] hash=[%x]",
405-
crl.Id(issuerNameID, crl.Number(atTime), shardIdx), crlLen, crlHash.Sum(nil))
421+
crlID, crlLen, crlHash.Sum(nil))
406422
return nil
407423
}
408424

crl/updater/updater_test.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -162,17 +162,6 @@ func TestTickShard(t *testing.T) {
162162
}, 1)
163163
cu.updatedCounter.Reset()
164164

165-
// Errors reading from the CA should bubble up sooner.
166-
cu.ca = &fakeCGC{gcc: fakeGCC{recvErr: sentinelErr}}
167-
err = cu.tickShard(context.Background(), cu.clk.Now(), e1.NameID(), 0)
168-
test.AssertError(t, err, "CA error")
169-
test.AssertContains(t, err.Error(), "receiving CRL bytes")
170-
test.AssertErrorIs(t, err, sentinelErr)
171-
test.AssertMetricWithLabelsEquals(t, cu.updatedCounter, prometheus.Labels{
172-
"issuer": "(TEST) Elegant Elephant E1", "result": "failed",
173-
}, 1)
174-
cu.updatedCounter.Reset()
175-
176165
// Errors sending to the Storer should bubble up sooner.
177166
cu.cs = &fakeCSC{ucc: fakeUCC{sendErr: sentinelErr}}
178167
err = cu.tickShard(context.Background(), cu.clk.Now(), e1.NameID(), 0)
@@ -184,18 +173,18 @@ func TestTickShard(t *testing.T) {
184173
}, 1)
185174
cu.updatedCounter.Reset()
186175

187-
// Errors reading from the SA should bubble up sooner.
188-
cu.sa = &fakeSAC{grcc: fakeGRCC{err: sentinelErr}}
176+
// Errors reading from the CA should bubble up sooner.
177+
cu.ca = &fakeCGC{gcc: fakeGCC{recvErr: sentinelErr}}
189178
err = cu.tickShard(context.Background(), cu.clk.Now(), e1.NameID(), 0)
190-
test.AssertError(t, err, "database error")
191-
test.AssertContains(t, err.Error(), "retrieving entry from SA")
179+
test.AssertError(t, err, "CA error")
180+
test.AssertContains(t, err.Error(), "receiving CRL bytes")
192181
test.AssertErrorIs(t, err, sentinelErr)
193182
test.AssertMetricWithLabelsEquals(t, cu.updatedCounter, prometheus.Labels{
194183
"issuer": "(TEST) Elegant Elephant E1", "result": "failed",
195184
}, 1)
196185
cu.updatedCounter.Reset()
197186

198-
// Errors sending to the CA should bubble up soonest.
187+
// Errors sending to the CA should bubble up sooner.
199188
cu.ca = &fakeCGC{gcc: fakeGCC{sendErr: sentinelErr}}
200189
err = cu.tickShard(context.Background(), cu.clk.Now(), e1.NameID(), 0)
201190
test.AssertError(t, err, "CA error")
@@ -205,6 +194,17 @@ func TestTickShard(t *testing.T) {
205194
"issuer": "(TEST) Elegant Elephant E1", "result": "failed",
206195
}, 1)
207196
cu.updatedCounter.Reset()
197+
198+
// Errors reading from the SA should bubble up soonest.
199+
cu.sa = &fakeSAC{grcc: fakeGRCC{err: sentinelErr}}
200+
err = cu.tickShard(context.Background(), cu.clk.Now(), e1.NameID(), 0)
201+
test.AssertError(t, err, "database error")
202+
test.AssertContains(t, err.Error(), "retrieving entry from SA")
203+
test.AssertErrorIs(t, err, sentinelErr)
204+
test.AssertMetricWithLabelsEquals(t, cu.updatedCounter, prometheus.Labels{
205+
"issuer": "(TEST) Elegant Elephant E1", "result": "failed",
206+
}, 1)
207+
cu.updatedCounter.Reset()
208208
}
209209

210210
func TestTickIssuer(t *testing.T) {

0 commit comments

Comments
 (0)