Skip to content

Commit a068ee3

Browse files
committed
pkg/verify: Parallelize HTTP(S) signature retrieval
Fixing [1,2]: $ grep 'unable to load signature' cvo.log I0526 13:13:36.123153 1 verify.go:404] unable to load signature: Get https://storage.googleapis.com/openshift-release/official/signatures/openshift/release/sha256=baa687f29b0ac155d8f4c6914056d36d68f343feb9c1e82b46eef95819d00be5/signature-1: dial tcp 172.217.4.48:443: connect: connection timed out I0526 13:15:47.195128 1 verify.go:404] unable to load signature: Get https://storage.googleapis.com/openshift-release/official/signatures/openshift/release/sha256=baa687f29b0ac155d8f4c6914056d36d68f343feb9c1e82b46eef95819d00be5/signature-2: dial tcp 172.217.9.80:443: connect: connection timed out I0526 13:17:10.718027 1 verify.go:404] unable to load signature: Get https://storage.googleapis.com/openshift-release/official/signatures/openshift/release/sha256=baa687f29b0ac155d8f4c6914056d36d68f343feb9c1e82b46eef95819d00be5/signature-3: context deadline exceeded I0526 13:19:44.764143 1 verify.go:404] unable to load signature: Get https://storage.googleapis.com/openshift-release/official/signatures/openshift/release/sha256=baa687f29b0ac155d8f4c6914056d36d68f343feb9c1e82b46eef95819d00be5/signature-1: dial tcp 172.217.4.48:443: connect: connection timed out I0526 13:21:55.835063 1 verify.go:404] unable to load signature: Get https://storage.googleapis.com/openshift-release/official/signatures/openshift/release/sha256=baa687f29b0ac155d8f4c6914056d36d68f343feb9c1e82b46eef95819d00be5/signature-2: dial tcp 172.217.4.48:443: connect: connection timed out I0526 13:23:18.233801 1 verify.go:404] unable to load signature: Get https://storage.googleapis.com/openshift-release/official/signatures/openshift/release/sha256=baa687f29b0ac155d8f4c6914056d36d68f343feb9c1e82b46eef95819d00be5/signature-3: context deadline exceeded In this case, that was because a restricted network blocked access to storage.googleapis.com, but you'd get similar behavior if storage.googleapis.com itself was slow. We want to hit other signature sources (like our mirrors [3]) before giving up on signatures entirely, and with this commit one HTTP(S) store no longer blocks another. The local ConfigMap store still comes first with a serial store, because we don't want to involve external stores before we've exhausted that local-cluster store. [1]: https://bugzilla.redhat.com/show_bug.cgi?id=1840343 [2]: https://bugzilla.redhat.com/show_bug.cgi?id=1838497#c10 [3]: https://github.com/openshift/cluster-update-keys/blob/cca4ce696383e70ae669e770bd63265a9540b721/manifests.rhel/0000_90_cluster-update-keys_configmap.yaml#L5
1 parent 6044c9c commit a068ee3

File tree

3 files changed

+262
-2
lines changed

3 files changed

+262
-2
lines changed

pkg/verify/configmap.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
"k8s.io/klog"
1212

1313
"github.com/openshift/cluster-version-operator/pkg/verify/store"
14-
"github.com/openshift/cluster-version-operator/pkg/verify/store/serial"
14+
"github.com/openshift/cluster-version-operator/pkg/verify/store/parallel"
1515
)
1616

1717
// ReleaseAnnotationConfigMapVerifier is an annotation set on a config map in the
@@ -87,7 +87,7 @@ func NewFromConfigMapData(src string, data map[string]string, clientBuilder Clie
8787
return nil, fmt.Errorf("%s did not provide any GPG public keys to verify signatures from and cannot be used", src)
8888
}
8989

90-
return NewReleaseVerifier(verifiers, &serial.Store{Stores: stores}), nil
90+
return NewReleaseVerifier(verifiers, &parallel.Store{Stores: stores}), nil
9191
}
9292

9393
func loadArmoredOrUnarmoredGPGKeyRing(data []byte) (openpgp.EntityList, error) {

pkg/verify/store/parallel/parallel.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
// Package parallel combines several signature stores in a single store.
2+
// Signatures are searched in each substore simultaneously until a
3+
// match is found.
4+
package parallel
5+
6+
import (
7+
"context"
8+
"fmt"
9+
"strings"
10+
11+
"github.com/openshift/cluster-version-operator/pkg/verify/store"
12+
)
13+
14+
type signatureResponse struct {
15+
signature []byte
16+
errIn error
17+
}
18+
19+
// Store provides access to signatures stored in sub-stores.
20+
type Store struct {
21+
Stores []store.Store
22+
}
23+
24+
// Signatures fetches signatures for the provided digest.
25+
func (s *Store) Signatures(ctx context.Context, name string, digest string, fn store.Callback) error {
26+
nestedCtx, cancel := context.WithCancel(ctx)
27+
defer cancel()
28+
responses := make(chan signatureResponse, len(s.Stores))
29+
errorChannelCount := 0
30+
errorChannel := make(chan error, 1)
31+
32+
for i := range s.Stores {
33+
errorChannelCount++
34+
go func(ctx context.Context, wrappedStore store.Store, name string, digest string, responses chan signatureResponse, errorChannel chan error) {
35+
errorChannel <- wrappedStore.Signatures(ctx, name, digest, func(ctx context.Context, signature []byte, errIn error) (done bool, err error) {
36+
select {
37+
case <-ctx.Done():
38+
return true, nil
39+
case responses <- signatureResponse{signature: signature, errIn: errIn}:
40+
}
41+
return false, nil
42+
})
43+
}(nestedCtx, s.Stores[i], name, digest, responses, errorChannel)
44+
}
45+
46+
allDone := false
47+
var loopError error
48+
for errorChannelCount > 0 {
49+
if allDone {
50+
err := <-errorChannel
51+
errorChannelCount--
52+
if loopError == nil && err != nil && err != context.Canceled && err != context.DeadlineExceeded {
53+
loopError = err
54+
}
55+
} else {
56+
select {
57+
case response := <-responses:
58+
done, err := fn(ctx, response.signature, response.errIn)
59+
if done || err != nil {
60+
allDone = true
61+
loopError = err
62+
cancel()
63+
}
64+
case err := <-errorChannel:
65+
errorChannelCount--
66+
if loopError == nil && err != nil && err != context.Canceled && err != context.DeadlineExceeded {
67+
loopError = err
68+
}
69+
}
70+
}
71+
}
72+
close(responses)
73+
close(errorChannel)
74+
if loopError != nil {
75+
return loopError
76+
}
77+
return ctx.Err() // because we discard context errors from the wrapped stores
78+
}
79+
80+
// String returns a description of where this store finds
81+
// signatures.
82+
func (s *Store) String() string {
83+
wrapped := "no stores"
84+
if len(s.Stores) > 0 {
85+
names := make([]string, 0, len(s.Stores))
86+
for _, store := range s.Stores {
87+
names = append(names, store.String())
88+
}
89+
wrapped = strings.Join(names, ", ")
90+
}
91+
return fmt.Sprintf("parallel signature store wrapping %s", wrapped)
92+
}
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
package parallel
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"log"
8+
"reflect"
9+
"regexp"
10+
"testing"
11+
"time"
12+
13+
"github.com/openshift/cluster-version-operator/pkg/verify/store"
14+
"github.com/openshift/cluster-version-operator/pkg/verify/store/memory"
15+
)
16+
17+
// delay wraps a store and introduces a delay before each callback.
18+
type delay struct {
19+
store store.Store
20+
delay time.Duration
21+
}
22+
23+
// Signatures fetches signatures for the provided digest.
24+
func (s *delay) Signatures(ctx context.Context, name string, digest string, fn store.Callback) error {
25+
nestedCtx, cancel := context.WithCancel(ctx)
26+
defer cancel()
27+
responses := make(chan *signatureResponse, 1)
28+
go func(ctx context.Context, name string, digest string, responses chan *signatureResponse) {
29+
err := s.store.Signatures(ctx, name, digest, func(ctx context.Context, signature []byte, errIn error) (done bool, err error) {
30+
select {
31+
case <-ctx.Done():
32+
return true, nil
33+
case responses <- &signatureResponse{signature: signature, errIn: errIn}:
34+
log.Printf("queued response: %s", string(signature))
35+
}
36+
return false, nil
37+
})
38+
if err != nil && err != context.Canceled && err != context.DeadlineExceeded {
39+
log.Fatal(err)
40+
}
41+
select {
42+
case <-ctx.Done():
43+
case responses <- nil:
44+
}
45+
close(responses)
46+
return
47+
}(nestedCtx, name, digest, responses)
48+
49+
for {
50+
time.Sleep(s.delay)
51+
select {
52+
case <-ctx.Done():
53+
return ctx.Err()
54+
case response := <-responses:
55+
if response == nil {
56+
return nil
57+
}
58+
log.Printf("sent response: %s", string(response.signature))
59+
done, err := fn(ctx, response.signature, response.errIn)
60+
if done || err != nil {
61+
return err
62+
}
63+
}
64+
}
65+
66+
return nil
67+
}
68+
69+
// String returns a description of where this store finds
70+
// signatures.
71+
func (s *delay) String() string {
72+
return fmt.Sprintf("delay signature store wrapping %s with a delay of %s", s.store, s.delay)
73+
}
74+
75+
func TestStore(t *testing.T) {
76+
ctx := context.Background()
77+
parallel := &Store{
78+
Stores: []store.Store{
79+
&delay{
80+
store: &memory.Store{
81+
Data: map[string][][]byte{
82+
"sha256:123": {
83+
[]byte("store-1-sig-1"),
84+
[]byte("store-1-sig-2"),
85+
},
86+
},
87+
},
88+
delay: 200 * time.Millisecond,
89+
},
90+
&delay{
91+
store: &memory.Store{
92+
Data: map[string][][]byte{
93+
"sha256:123": {
94+
[]byte("store-2-sig-1"),
95+
[]byte("store-2-sig-2"),
96+
},
97+
},
98+
},
99+
delay: 300 * time.Millisecond,
100+
},
101+
},
102+
}
103+
104+
for _, testCase := range []struct {
105+
name string
106+
doneSignature string
107+
doneError error
108+
expectedSignatures []string
109+
expectedError *regexp.Regexp
110+
}{
111+
{
112+
name: "all",
113+
expectedSignatures: []string{
114+
"store-1-sig-1", // 200 ms
115+
"store-2-sig-1", // 300 ms
116+
"store-1-sig-2", // 400 ms
117+
"store-2-sig-2", // 600 ms
118+
},
119+
},
120+
{
121+
name: "done early",
122+
doneSignature: "store-1-sig-2",
123+
expectedSignatures: []string{
124+
"store-1-sig-1",
125+
"store-2-sig-1",
126+
"store-1-sig-2",
127+
},
128+
},
129+
{
130+
name: "error early",
131+
doneSignature: "store-1-sig-2",
132+
doneError: errors.New("test error"),
133+
expectedSignatures: []string{
134+
"store-1-sig-1",
135+
"store-2-sig-1",
136+
"store-1-sig-2",
137+
},
138+
expectedError: regexp.MustCompile("^test error$"),
139+
},
140+
} {
141+
t.Run(testCase.name, func(t *testing.T) {
142+
signatures := []string{}
143+
err := parallel.Signatures(ctx, "name", "sha256:123", func(ctx context.Context, signature []byte, errIn error) (done bool, err error) {
144+
if errIn != nil {
145+
return false, errIn
146+
}
147+
signatures = append(signatures, string(signature))
148+
if string(signature) == testCase.doneSignature {
149+
return true, testCase.doneError
150+
}
151+
return false, nil
152+
})
153+
if err == nil {
154+
if testCase.expectedError != nil {
155+
t.Fatalf("signatures succeeded when we expected %s", testCase.expectedError)
156+
}
157+
} else if testCase.expectedError == nil {
158+
t.Fatalf("signatures failed when we expected success: %v", err)
159+
} else if !testCase.expectedError.MatchString(err.Error()) {
160+
t.Fatalf("signatures failed with %v (expected %s)", err, testCase.expectedError)
161+
}
162+
163+
if !reflect.DeepEqual(signatures, testCase.expectedSignatures) {
164+
t.Fatalf("signatures gathered %v when we expected %v", signatures, testCase.expectedSignatures)
165+
}
166+
})
167+
}
168+
}

0 commit comments

Comments
 (0)