Skip to content

Commit 3696809

Browse files
flavianmissiopenshift-cherrypick-robot
authored andcommitted
cmd/move-blobs: add cmd to move blobs from one root dir to another
* Dockerfile: add move-blobs binary to image * cmd/move-blobs: prioritise account key for auth when present
1 parent d27b918 commit 3696809

File tree

3 files changed

+582
-0
lines changed

3 files changed

+582
-0
lines changed

Dockerfile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@ FROM registry.ci.openshift.org/ocp/builder:rhel-8-golang-1.20-openshift-4.14 AS
22
WORKDIR /go/src/github.com/openshift/cluster-image-registry-operator
33
COPY . .
44
RUN make build
5+
WORKDIR /go/src/github.com/openshift/cluster-image-registry-operator/cmd/move-blobs
6+
RUN go build -o ../../tmp/_output/bin/move-blobs
57

68
FROM registry.ci.openshift.org/ocp/4.14:base
79
COPY images/bin/entrypoint.sh /usr/bin/
810
COPY manifests/image-references manifests/0* /manifests/
911
COPY vendor/github.com/openshift/api/imageregistry/v1/*.crd.yaml /manifests/
1012
COPY --from=builder /go/src/github.com/openshift/cluster-image-registry-operator/tmp/_output/bin/cluster-image-registry-operator /usr/bin/
13+
COPY --from=builder /go/src/github.com/openshift/cluster-image-registry-operator/tmp/_output/bin/move-blobs /usr/bin/
1114
RUN chmod -R g+w /etc/pki/ca-trust/extracted/pem/
1215

1316
ENTRYPOINT ["/usr/bin/entrypoint.sh"]

cmd/move-blobs/main.go

Lines changed: 334 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,334 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net/url"
7+
"os"
8+
"strings"
9+
"time"
10+
11+
"k8s.io/klog/v2"
12+
13+
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
14+
"github.com/Azure/azure-sdk-for-go/sdk/azcore/cloud"
15+
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
16+
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
17+
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
18+
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
19+
"github.com/Azure/go-autorest/autorest/azure"
20+
)
21+
22+
func main() {
23+
opts := getConfigOpts()
24+
if err := validate(opts); err != nil {
25+
panic(err)
26+
}
27+
28+
// if the environment specific configs are not given, assume
29+
// AzurePublicCloud as it's probably the most common choice anyway.
30+
if len(opts.environment) == 0 {
31+
opts.environment = "AZUREPUBLICCLOUD"
32+
}
33+
34+
cloudConfig, err := getCloudConfig(opts.environment)
35+
if err != nil {
36+
panic(err)
37+
}
38+
39+
client, err := getClient(cloudConfig, opts)
40+
if err != nil {
41+
panic(err)
42+
}
43+
ctx := context.Background()
44+
_, err = moveBlobs(ctx, client, &moveBlobOpts{
45+
source: "docker",
46+
dest: "/docker",
47+
})
48+
if err != nil {
49+
panic(err)
50+
}
51+
}
52+
53+
type moveBlobOpts struct {
54+
source string
55+
dest string
56+
}
57+
58+
// moveBlobs moves blobs from o.source to o.dest.
59+
//
60+
// moveBlobs will first copy blobs from o.source to o.dest, then delete the
61+
// successfully copied blobs from o.source.
62+
// If o.source has a lot of blobs, this function could take a while to finish.
63+
func moveBlobs(
64+
ctx context.Context,
65+
containerClient *container.Client,
66+
o *moveBlobOpts,
67+
) ([]string, error) {
68+
sourceBlobs, err := listBlobs(ctx, containerClient, o.source)
69+
if err != nil {
70+
return []string{}, err
71+
}
72+
klog.Infof("found %d blobs to move", len(sourceBlobs))
73+
74+
// we gather errors so that when they happen we still have a shot
75+
// of copying some blobs into the destination, which allows for
76+
// incremental retries on error.
77+
errors := []error{}
78+
copiesToWaitFor := map[string]blob.CopyStatusType{}
79+
movedBlobs := []string{}
80+
81+
for _, sourceBlobName := range sourceBlobs {
82+
// rename the source blob to match the destination.
83+
// we're dealing with virtual paths(dirs) here, so the path
84+
// is part of the blob name.
85+
destBlobName := strings.Replace(sourceBlobName, o.source, o.dest, 1)
86+
87+
klog.V(3).Infof("transforced source blob name from %q into %q", sourceBlobName, destBlobName)
88+
89+
// the blob client represents the destination blob, so we use
90+
// blob renamed to match the destination.
91+
blobClient := containerClient.NewBlobClient(destBlobName)
92+
93+
// the source blob has to be on the same container as the
94+
// destination blob for this to work.
95+
// it's name MUST be escaped.
96+
// we also ensure there's a "/" separating the URL from the
97+
// source blob name so the container name doesn't get mixed up
98+
// with the source blob name.
99+
sourceBlobURL := strings.TrimRight(containerClient.URL(), "/") + "/" + url.QueryEscape(sourceBlobName)
100+
101+
// counter-intuitively, this copy uses the blob which this client
102+
// is created for as the destination, and the source is given in
103+
// the call to StartCopyFromURL.
104+
klog.Infof("starting copy of %q", sourceBlobName)
105+
resp, err := blobClient.StartCopyFromURL(ctx, sourceBlobURL, nil)
106+
if err != nil {
107+
errors = append(errors, fmt.Errorf("failed to start copy: %v", err))
108+
continue
109+
}
110+
111+
switch *resp.CopyStatus {
112+
case blob.CopyStatusTypeSuccess:
113+
klog.Infof("copy finished instantly for blob %q", sourceBlobName)
114+
movedBlobs = append(movedBlobs, sourceBlobName)
115+
case blob.CopyStatusTypeAborted, blob.CopyStatusTypeFailed:
116+
klog.Warningf("copy failed failed for blob %q, moving on", sourceBlobName)
117+
errors = append(errors,
118+
fmt.Errorf("copy failed with status %q for blob %q", *resp.CopyStatus, sourceBlobName))
119+
// leave retry up to the client. in the image-registry case, the k8s job
120+
// will handle retrying after failures.
121+
case blob.CopyStatusTypePending:
122+
klog.Infof("copy is pending for blob %q, adding to list of copies to wait for", sourceBlobName)
123+
copiesToWaitFor[destBlobName] = *resp.CopyStatus
124+
}
125+
}
126+
127+
// this code is very difficult to exercise. none of my attempts to
128+
// force an asynchronous copy worked, no matter how big the source file
129+
// was. I was forced to manipulate the code in a way that exercised
130+
// loop a few times to ensure it worked.
131+
for blobName, copyStatus := range copiesToWaitFor {
132+
blobClient := containerClient.NewBlobClient(blobName)
133+
for copyStatus == blob.CopyStatusTypePending {
134+
props, err := blobClient.GetProperties(ctx, nil)
135+
if err != nil {
136+
errors = append(errors, err)
137+
continue
138+
}
139+
copyStatus = *props.CopyStatus
140+
if copyStatus == blob.CopyStatusTypeAborted || copyStatus == blob.CopyStatusTypeFailed {
141+
err := fmt.Errorf("copy failed, status: %q, blob: %q", *props.CopyStatus, blobName)
142+
if props.CopyStatusDescription != nil {
143+
err = fmt.Errorf(
144+
"copy failed, status: %q, desc: %q, blob: %q",
145+
copyStatus,
146+
*props.CopyStatusDescription,
147+
blobName,
148+
)
149+
}
150+
errors = append(errors, err)
151+
continue
152+
}
153+
if copyStatus == blob.CopyStatusTypePending {
154+
// copy still pending - wait an arbitraty amount of time before trying again
155+
klog.Infof("waiting 100ms before re-checking copy status for blob %q", blobName)
156+
time.Sleep(100 * time.Millisecond)
157+
}
158+
}
159+
sourceBlobName := strings.Replace(blobName, o.dest, o.source, 1)
160+
klog.V(3).Infof("adding blob to moved blobs list: %q", sourceBlobName)
161+
movedBlobs = append(movedBlobs, sourceBlobName)
162+
}
163+
164+
// only delete source blobs we know have been moved
165+
for _, blobName := range movedBlobs {
166+
blobClient := containerClient.NewBlobClient(blobName)
167+
_, err := blobClient.Delete(ctx, nil)
168+
if err != nil && !bloberror.HasCode(err, bloberror.BlobNotFound) {
169+
errors = append(errors, fmt.Errorf("failed deleting copied blob: %v", err))
170+
}
171+
klog.Infof("deleted copied blob from source %q", blobName)
172+
}
173+
174+
klog.Infof("moved %d blobs", len(movedBlobs))
175+
if len(errors) > 0 {
176+
return movedBlobs, fmt.Errorf("encountered errors when moving blobs: %v", errors)
177+
}
178+
return movedBlobs, nil
179+
}
180+
181+
func listBlobs(
182+
ctx context.Context,
183+
containerClient *container.Client,
184+
prefix string,
185+
) ([]string, error) {
186+
blobs := []string{}
187+
pager := containerClient.NewListBlobsFlatPager(&container.ListBlobsFlatOptions{
188+
Prefix: &prefix,
189+
})
190+
for pager.More() {
191+
resp, err := pager.NextPage(ctx)
192+
if err != nil {
193+
return []string{}, err
194+
}
195+
if resp.Segment == nil {
196+
return []string{}, fmt.Errorf("response has no segments")
197+
}
198+
for _, blob := range resp.Segment.BlobItems {
199+
if blob.Name == nil {
200+
return []string{}, fmt.Errorf(
201+
"required blob property Name is missing while listing blobs under: %s",
202+
prefix,
203+
)
204+
}
205+
blobs = append(blobs, *blob.Name)
206+
207+
}
208+
}
209+
return blobs, nil
210+
}
211+
212+
type configOpts struct {
213+
storageAccountName string
214+
containerName string
215+
clientID string
216+
tenantID string
217+
clientSecret string
218+
federatedTokenFile string
219+
accountKey string
220+
environment string
221+
}
222+
223+
func getCloudConfig(environment string) (cloud.Configuration, error) {
224+
env, err := azure.EnvironmentFromName(environment)
225+
if err != nil {
226+
return cloud.Configuration{}, err
227+
}
228+
return cloud.Configuration{
229+
ActiveDirectoryAuthorityHost: env.ActiveDirectoryEndpoint,
230+
Services: map[cloud.ServiceName]cloud.ServiceConfiguration{
231+
cloud.ResourceManager: {
232+
Audience: env.TokenAudience,
233+
Endpoint: env.ResourceManagerEndpoint,
234+
},
235+
},
236+
}, nil
237+
}
238+
239+
func getConfigOpts() *configOpts {
240+
return &configOpts{
241+
storageAccountName: strings.TrimSpace(os.Getenv("AZURE_STORAGE_ACCOUNT_NAME")),
242+
containerName: strings.TrimSpace(os.Getenv("AZURE_CONTAINER_NAME")),
243+
clientID: strings.TrimSpace(os.Getenv("AZURE_CLIENT_ID")),
244+
tenantID: strings.TrimSpace(os.Getenv("AZURE_TENANT_ID")),
245+
clientSecret: strings.TrimSpace(os.Getenv("AZURE_CLIENT_SECRET")),
246+
federatedTokenFile: strings.TrimSpace(os.Getenv("AZURE_FEDERATED_TOKEN_FILE")),
247+
accountKey: strings.TrimSpace(os.Getenv("AZURE_ACCOUNTKEY")),
248+
environment: strings.TrimSpace(os.Getenv("AZURE_ENVIRONMENT")),
249+
}
250+
}
251+
252+
// getCreds build credentials from the given parameters.
253+
//
254+
// this function is basically copy of what the operator itself does,
255+
// as a way to ensure that it will work in the same way as the operator.
256+
func getClient(cloudConfig cloud.Configuration, opts *configOpts) (*container.Client, error) {
257+
containerURL := fmt.Sprintf(
258+
"https://%s.blob.core.windows.net/%s",
259+
opts.storageAccountName,
260+
opts.containerName,
261+
)
262+
var client *container.Client
263+
if len(opts.accountKey) > 0 {
264+
cred, err := container.NewSharedKeyCredential(opts.storageAccountName, opts.accountKey)
265+
if err != nil {
266+
return nil, err
267+
}
268+
client, err = container.NewClientWithSharedKeyCredential(containerURL, cred, nil)
269+
if err != nil {
270+
return nil, err
271+
}
272+
} else if len(opts.clientSecret) > 0 {
273+
options := azidentity.ClientSecretCredentialOptions{
274+
ClientOptions: azcore.ClientOptions{
275+
Cloud: cloudConfig,
276+
},
277+
}
278+
cred, err := azidentity.NewClientSecretCredential(opts.tenantID, opts.clientID, opts.clientSecret, &options)
279+
if err != nil {
280+
return nil, err
281+
}
282+
client, err = container.NewClient(containerURL, cred, nil)
283+
if err != nil {
284+
return nil, err
285+
}
286+
} else if len(opts.federatedTokenFile) > 0 {
287+
options := azidentity.WorkloadIdentityCredentialOptions{
288+
ClientOptions: azcore.ClientOptions{
289+
Cloud: cloudConfig,
290+
},
291+
ClientID: opts.clientID,
292+
TenantID: opts.tenantID,
293+
TokenFilePath: opts.federatedTokenFile,
294+
}
295+
cred, err := azidentity.NewWorkloadIdentityCredential(&options)
296+
if err != nil {
297+
return nil, err
298+
}
299+
client, err = container.NewClient(containerURL, cred, nil)
300+
if err != nil {
301+
return nil, err
302+
}
303+
} else {
304+
cred, err := azidentity.NewDefaultAzureCredential(nil)
305+
if err != nil {
306+
return nil, err
307+
}
308+
client, err = container.NewClient(containerURL, cred, nil)
309+
if err != nil {
310+
return nil, err
311+
}
312+
}
313+
return client, nil
314+
}
315+
316+
// validate returns an error when the required options are missing.
317+
func validate(opts *configOpts) error {
318+
if len(opts.clientSecret) == 0 && len(opts.federatedTokenFile) == 0 && len(opts.accountKey) == 0 {
319+
return fmt.Errorf("One of AZURE_CLIENT_SECRET or AZURE_FEDERATED_TOKEN_FILE or AZURE_ACCOUNTKEY is required for authentication")
320+
}
321+
if len(opts.clientID) == 0 {
322+
return fmt.Errorf("AZURE_CLIENT_ID is required for authentication")
323+
}
324+
if len(opts.tenantID) == 0 {
325+
return fmt.Errorf("AZURE_TENANT_ID is required for authentication")
326+
}
327+
if len(opts.storageAccountName) == 0 {
328+
return fmt.Errorf("AZURE_STORAGE_ACCOUNT_NAME is required")
329+
}
330+
if len(opts.containerName) == 0 {
331+
return fmt.Errorf("AZURE_CONTAINER_NAME is required")
332+
}
333+
return nil
334+
}

0 commit comments

Comments
 (0)