Skip to content

Commit 692a8a3

Browse files
Merge pull request #393 from wking/parallel-http-signature-stores
Bug 1840343: pkg/verify: Parallelize HTTP(S) signature retrieval
2 parents 62ff8e3 + a068ee3 commit 692a8a3

File tree

11 files changed

+697
-287
lines changed

11 files changed

+697
-287
lines changed

pkg/cvo/cvo.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@ import (
4646
"github.com/openshift/cluster-version-operator/pkg/payload/precondition"
4747
preconditioncv "github.com/openshift/cluster-version-operator/pkg/payload/precondition/clusterversion"
4848
"github.com/openshift/cluster-version-operator/pkg/verify"
49-
"github.com/openshift/cluster-version-operator/pkg/verify/verifyconfigmap"
49+
"github.com/openshift/cluster-version-operator/pkg/verify/store"
50+
"github.com/openshift/cluster-version-operator/pkg/verify/store/configmap"
51+
"github.com/openshift/cluster-version-operator/pkg/verify/store/serial"
5052
)
5153

5254
const (
@@ -312,8 +314,8 @@ func loadConfigMapVerifierDataFromUpdate(update *payload.Update, clientBuilder v
312314

313315
// allow the verifier to consult the cluster for signature data, and also configure
314316
// a process that writes signatures back to that store
315-
signatureStore := verifyconfigmap.NewStore(configMapClient, nil)
316-
verifier = verifier.WithStores(signatureStore)
317+
signatureStore := configmap.NewStore(configMapClient, nil)
318+
verifier.Store = &serial.Store{Stores: []store.Store{signatureStore, verifier.Store}}
317319
persister := verify.NewSignatureStorePersister(signatureStore, verifier)
318320
return verifier, persister, nil
319321
}

pkg/verify/configmap.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ import (
99
"github.com/pkg/errors"
1010
"golang.org/x/crypto/openpgp"
1111
"k8s.io/klog"
12+
13+
"github.com/openshift/cluster-version-operator/pkg/verify/store"
14+
"github.com/openshift/cluster-version-operator/pkg/verify/store/parallel"
1215
)
1316

1417
// ReleaseAnnotationConfigMapVerifier is an annotation set on a config map in the
@@ -48,7 +51,7 @@ const ReleaseAnnotationConfigMapVerifier = "release.openshift.io/verification-co
4851
// store and the lookup order is internally defined.
4952
func NewFromConfigMapData(src string, data map[string]string, clientBuilder ClientBuilder) (*ReleaseVerifier, error) {
5053
verifiers := make(map[string]openpgp.EntityList)
51-
var stores []*url.URL
54+
var stores []store.Store
5255
for k, v := range data {
5356
switch {
5457
case strings.HasPrefix(k, "verifier-public-key-"):
@@ -63,7 +66,16 @@ func NewFromConfigMapData(src string, data map[string]string, clientBuilder Clie
6366
if err != nil || (u.Scheme != "http" && u.Scheme != "https" && u.Scheme != "file") {
6467
return nil, fmt.Errorf("%s has an invalid key %q: must be a valid URL with scheme file://, http://, or https://", src, k)
6568
}
66-
stores = append(stores, u)
69+
if u.Scheme == "file" {
70+
stores = append(stores, &fileStore{
71+
directory: u.Path,
72+
})
73+
} else {
74+
stores = append(stores, &httpStore{
75+
uri: u,
76+
httpClient: clientBuilder.HTTPClient,
77+
})
78+
}
6779
default:
6880
klog.Warningf("An unexpected key was found in %s and will be ignored (expected store-* or verifier-public-key-*): %s", src, k)
6981
}
@@ -75,7 +87,7 @@ func NewFromConfigMapData(src string, data map[string]string, clientBuilder Clie
7587
return nil, fmt.Errorf("%s did not provide any GPG public keys to verify signatures from and cannot be used", src)
7688
}
7789

78-
return NewReleaseVerifier(verifiers, stores, clientBuilder), nil
90+
return NewReleaseVerifier(verifiers, &parallel.Store{Stores: stores}), nil
7991
}
8092

8193
func loadArmoredOrUnarmoredGPGKeyRing(data []byte) (openpgp.EntityList, error) {

pkg/verify/verifyconfigmap/store.go renamed to pkg/verify/store/configmap/configmap.go

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package verifyconfigmap
1+
package configmap
22

33
import (
44
"context"
@@ -16,6 +16,8 @@ import (
1616
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
1717
"k8s.io/client-go/util/retry"
1818
"k8s.io/klog"
19+
20+
"github.com/openshift/cluster-version-operator/pkg/verify/store"
1921
)
2022

2123
// ReleaseLabelConfigMap is a label applied to a configmap inside the
@@ -87,10 +89,10 @@ func digestToKeyPrefix(digest string) (string, error) {
8789
return fmt.Sprintf("%s-%s", algo, hash), nil
8890
}
8991

90-
// DigestSignatures returns a list of signatures that match the request
92+
// Signatures returns a list of signatures that match the request
9193
// digest out of config maps labelled with ReleaseLabelConfigMap in the
9294
// openshift-config-managed namespace.
93-
func (s *Store) DigestSignatures(ctx context.Context, digest string) ([][]byte, error) {
95+
func (s *Store) Signatures(ctx context.Context, name string, digest string, fn store.Callback) error {
9496
// avoid repeatedly reloading config maps
9597
items := s.mostRecentConfigMaps()
9698
r := s.limiter.Reserve()
@@ -100,31 +102,36 @@ func (s *Store) DigestSignatures(ctx context.Context, digest string) ([][]byte,
100102
})
101103
if err != nil {
102104
s.rememberMostRecentConfigMaps([]corev1.ConfigMap{})
103-
return nil, err
105+
return err
104106
}
105107
items = configMaps.Items
106108
s.rememberMostRecentConfigMaps(configMaps.Items)
107109
}
108110

109111
prefix, err := digestToKeyPrefix(digest)
110112
if err != nil {
111-
return nil, err
113+
return err
112114
}
113115

114-
var signatures [][]byte
115116
for _, cm := range items {
116117
klog.V(4).Infof("searching for %s in signature config map %s", prefix, cm.ObjectMeta.Name)
117118
for k, v := range cm.BinaryData {
118119
if strings.HasPrefix(k, prefix) {
119120
klog.V(4).Infof("key %s from signature config map %s matches %s", k, cm.ObjectMeta.Name, digest)
120-
signatures = append(signatures, v)
121+
done, err := fn(ctx, v, nil)
122+
if err != nil || done {
123+
return err
124+
}
125+
if err := ctx.Err(); err != nil {
126+
return err
127+
}
121128
}
122129
}
123130
}
124-
return signatures, nil
131+
return nil
125132
}
126133

127-
// Store attempts to persist the provided signatures into a form DigestSignatures will
134+
// Store attempts to persist the provided signatures into a form Signatures will
128135
// retrieve.
129136
func (s *Store) Store(ctx context.Context, signaturesByDigest map[string][][]byte) error {
130137
cm := &corev1.ConfigMap{

pkg/verify/store/memory/memory.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Package memory implements an in-memory signature store. This is
2+
// mostly useful for testing.
3+
package memory
4+
5+
import (
6+
"context"
7+
8+
"github.com/openshift/cluster-version-operator/pkg/verify/store"
9+
)
10+
11+
// Store provides access to signatures stored in memory.
12+
type Store struct {
13+
// Data maps digests to slices of signatures.
14+
Data map[string][][]byte
15+
}
16+
17+
// Signatures fetches signatures for the provided digest.
18+
func (s *Store) Signatures(ctx context.Context, name string, digest string, fn store.Callback) error {
19+
for _, signature := range s.Data[digest] {
20+
done, err := fn(ctx, signature, nil)
21+
if err != nil || done {
22+
return err
23+
}
24+
if err := ctx.Err(); err != nil {
25+
return err
26+
}
27+
}
28+
29+
return nil
30+
}
31+
32+
// String returns a description of where this store finds
33+
// signatures.
34+
func (s *Store) String() string {
35+
return "in-memory signature store"
36+
}

pkg/verify/store/parallel/parallel.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
// Package parallel combines several signature stores in a single store.
2+
// Signatures are searched in each substore simultaneously until a
3+
// match is found.
4+
package parallel
5+
6+
import (
7+
"context"
8+
"fmt"
9+
"strings"
10+
11+
"github.com/openshift/cluster-version-operator/pkg/verify/store"
12+
)
13+
14+
type signatureResponse struct {
15+
signature []byte
16+
errIn error
17+
}
18+
19+
// Store provides access to signatures stored in sub-stores.
20+
type Store struct {
21+
Stores []store.Store
22+
}
23+
24+
// Signatures fetches signatures for the provided digest.
25+
func (s *Store) Signatures(ctx context.Context, name string, digest string, fn store.Callback) error {
26+
nestedCtx, cancel := context.WithCancel(ctx)
27+
defer cancel()
28+
responses := make(chan signatureResponse, len(s.Stores))
29+
errorChannelCount := 0
30+
errorChannel := make(chan error, 1)
31+
32+
for i := range s.Stores {
33+
errorChannelCount++
34+
go func(ctx context.Context, wrappedStore store.Store, name string, digest string, responses chan signatureResponse, errorChannel chan error) {
35+
errorChannel <- wrappedStore.Signatures(ctx, name, digest, func(ctx context.Context, signature []byte, errIn error) (done bool, err error) {
36+
select {
37+
case <-ctx.Done():
38+
return true, nil
39+
case responses <- signatureResponse{signature: signature, errIn: errIn}:
40+
}
41+
return false, nil
42+
})
43+
}(nestedCtx, s.Stores[i], name, digest, responses, errorChannel)
44+
}
45+
46+
allDone := false
47+
var loopError error
48+
for errorChannelCount > 0 {
49+
if allDone {
50+
err := <-errorChannel
51+
errorChannelCount--
52+
if loopError == nil && err != nil && err != context.Canceled && err != context.DeadlineExceeded {
53+
loopError = err
54+
}
55+
} else {
56+
select {
57+
case response := <-responses:
58+
done, err := fn(ctx, response.signature, response.errIn)
59+
if done || err != nil {
60+
allDone = true
61+
loopError = err
62+
cancel()
63+
}
64+
case err := <-errorChannel:
65+
errorChannelCount--
66+
if loopError == nil && err != nil && err != context.Canceled && err != context.DeadlineExceeded {
67+
loopError = err
68+
}
69+
}
70+
}
71+
}
72+
close(responses)
73+
close(errorChannel)
74+
if loopError != nil {
75+
return loopError
76+
}
77+
return ctx.Err() // because we discard context errors from the wrapped stores
78+
}
79+
80+
// String returns a description of where this store finds
81+
// signatures.
82+
func (s *Store) String() string {
83+
wrapped := "no stores"
84+
if len(s.Stores) > 0 {
85+
names := make([]string, 0, len(s.Stores))
86+
for _, store := range s.Stores {
87+
names = append(names, store.String())
88+
}
89+
wrapped = strings.Join(names, ", ")
90+
}
91+
return fmt.Sprintf("parallel signature store wrapping %s", wrapped)
92+
}

0 commit comments

Comments
 (0)