Skip to content

Commit 4e4161e

Browse files
authored
feat(cas): Azure Blob Storage support (#360)
Signed-off-by: Miguel Martinez Trivino <[email protected]>
1 parent 892f68c commit 4e4161e

29 files changed

+892
-69
lines changed

app/artifact-cas/cmd/main.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/chainloop-dev/chainloop/app/artifact-cas/internal/conf"
2626
"github.com/chainloop-dev/chainloop/app/artifact-cas/internal/server"
2727
backend "github.com/chainloop-dev/chainloop/internal/blobmanager"
28+
"github.com/chainloop-dev/chainloop/internal/blobmanager/azureblob"
2829
"github.com/chainloop-dev/chainloop/internal/blobmanager/oci"
2930
"github.com/chainloop-dev/chainloop/internal/credentials"
3031
"github.com/chainloop-dev/chainloop/internal/credentials/manager"
@@ -64,11 +65,12 @@ type app struct {
6465
}
6566

6667
func loadCASBackendProviders(creader credentials.Reader) backend.Providers {
67-
// Currently only OCI is supported
68-
// Here we will load the rest of providers, S3, GCS, etc
69-
p := oci.NewBackendProvider(creader)
68+
// Initialize CAS backend providers
69+
ociProvider := oci.NewBackendProvider(creader)
70+
azureBlobProvider := azureblob.NewBackendProvider(creader)
7071
return backend.Providers{
71-
p.ID(): p,
72+
ociProvider.ID(): ociProvider,
73+
azureBlobProvider.ID(): azureBlobProvider,
7274
}
7375
}
7476

app/artifact-cas/configs/config.devel.yaml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55
server:
66
http:
77
addr: 0.0.0.0:8001
8-
timeout: 1s
8+
timeout: 5s
99
grpc:
1010
addr: 0.0.0.0:9001
11-
timeout: 1s
11+
# Some cas backends are slow, so we need to increase the timeout
12+
# for example, Azure Blob Storage describe takes more than 1 second to respond sometimes
13+
timeout: 5s
1214
http_metrics:
1315
addr: 0.0.0.0:5001
1416

app/artifact-cas/internal/service/bytestream.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@ package service
1818
import (
1919
"bytes"
2020
"context"
21+
"crypto/sha256"
2122
"encoding/base64"
2223
"encoding/gob"
24+
"fmt"
25+
"hash"
2326
"io"
2427

2528
"errors"
@@ -113,7 +116,7 @@ func (s *ByteStreamService) Write(stream bytestream.ByteStream_WriteServer) erro
113116
return sl.LogAndMaskErr(err, s.log)
114117
}
115118

116-
s.log.Infow("msg", "artifact received, uploading now to OCI backend", "name", req.resource.FileName, "digest", req.resource.Digest, "size", buffer.size)
119+
s.log.Infow("msg", "artifact received, uploading now to backend", "name", req.resource.FileName, "digest", req.resource.Digest, "size", buffer.size)
117120
if err := backend.Upload(ctx, buffer, req.resource); err != nil {
118121
return sl.LogAndMaskErr(err, s.log)
119122
}
@@ -151,7 +154,7 @@ func (s *ByteStreamService) Read(req *bytestream.ReadRequest, stream bytestream.
151154
}
152155

153156
// streamwriter will stream chunks of data to the client
154-
sw := &streamWriter{stream, s.log, req.ResourceName}
157+
sw := &streamWriter{stream, s.log, req.ResourceName, sha256.New()}
155158
if err := backend.Download(ctx, sw, req.ResourceName); err != nil {
156159
if errors.Is(err, context.Canceled) {
157160
s.log.Infow("msg", "download canceled", "digest", req.ResourceName)
@@ -161,6 +164,11 @@ func (s *ByteStreamService) Read(req *bytestream.ReadRequest, stream bytestream.
161164
return sl.LogAndMaskErr(err, s.log)
162165
}
163166

167+
// check if the file has been tampered with and notify the client
168+
if sw.GetChecksum() != req.ResourceName {
169+
return kerrors.Unauthorized("checksum", fmt.Sprintf("checksum mismatch: got=%s, want=%s", sw.GetChecksum(), req.ResourceName))
170+
}
171+
164172
s.log.Infow("msg", "download finished", "digest", req.ResourceName)
165173

166174
return nil
@@ -269,11 +277,24 @@ func decodeResource(b64encoded string) (*v1.CASResource, error) {
269277
type streamWriter struct {
270278
stream bytestream.ByteStream_ReadServer
271279
log *log.Helper
272-
digest string
280+
// expected wantChecksum of the data being sent
281+
wantChecksum string
282+
// calculated gotChecksum of the data sent
283+
gotChecksum hash.Hash
273284
}
274285

275286
// Send the chunk of data through the bytestream
276287
func (sw *streamWriter) Write(data []byte) (int, error) {
277-
sw.log.Debugw("msg", "sending download chunk", "digest", sw.digest, "chunkSize", len(data))
288+
sw.log.Debugw("msg", "sending download chunk", "digest", sw.wantChecksum, "chunkSize", len(data))
289+
290+
// Update the checksum of the data being sent
291+
if _, err := sw.gotChecksum.Write(data); err != nil {
292+
return 0, err
293+
}
278294
return len(data), sw.stream.Send(&bytestream.ReadResponse{Data: data})
279295
}
296+
297+
// GetChecksum retrieves the sha256 checksum of the read contents
298+
func (sw *streamWriter) GetChecksum() string {
299+
return fmt.Sprintf("%x", sw.gotChecksum.Sum(nil))
300+
}

app/artifact-cas/internal/service/bytestream_test.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,27 @@ func (s *bytestreamSuite) TestReadErrorDownloading() {
201201
}
202202

203203
func (s *bytestreamSuite) TestDownloadOk() {
204+
s.ociBackend.On("Download", mock.Anything, mock.Anything, "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9").
205+
Return(nil).Run(func(args mock.Arguments) {
206+
buf := bytes.NewBuffer([]byte("hello world"))
207+
_, err := io.Copy(args.Get(1).(io.Writer), buf)
208+
s.NoError(err)
209+
})
210+
211+
reader, err := s.client.Read(s.downCtx, &bytestream.ReadRequest{ResourceName: "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9"})
212+
s.NoError(err)
213+
214+
// receive the data, it should contain all of it since the buffer is server side is 1MB
215+
got, err := reader.Recv()
216+
s.NoError(err)
217+
s.Equal("hello world", string(got.Data))
218+
// EOF
219+
got, err = reader.Recv()
220+
s.ErrorIs(err, io.EOF)
221+
s.Nil(got)
222+
}
223+
224+
func (s *bytestreamSuite) TestDownloadFoundMistmathedDigest() {
204225
s.ociBackend.On("Download", mock.Anything, mock.Anything, "deadbeef").
205226
Return(nil).Run(func(args mock.Arguments) {
206227
buf := bytes.NewBuffer([]byte("hello world"))
@@ -215,9 +236,9 @@ func (s *bytestreamSuite) TestDownloadOk() {
215236
got, err := reader.Recv()
216237
s.NoError(err)
217238
s.Equal("hello world", string(got.Data))
218-
// EOF
239+
// Return a mistmached digest
219240
got, err = reader.Recv()
220-
s.ErrorIs(err, io.EOF)
241+
s.ErrorContains(err, "checksum mismatch:")
221242
s.Nil(got)
222243
}
223244

app/artifact-cas/internal/service/download.go

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@
1616
package service
1717

1818
import (
19+
"bytes"
1920
"context"
21+
"crypto/sha256"
2022
"errors"
2123
"fmt"
24+
"io"
2225
"net/http"
2326
"strconv"
2427

@@ -58,7 +61,7 @@ func (s *DownloadService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
5861
return
5962
}
6063

61-
hash, err := cr_v1.NewHash(digest)
64+
wantChecksum, err := cr_v1.NewHash(digest)
6265
if err != nil {
6366
http.Error(w, "invalid digest", http.StatusBadRequest)
6467
return
@@ -79,7 +82,7 @@ func (s *DownloadService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
7982
return
8083
}
8184

82-
info, err := b.Describe(ctx, hash.Hex)
85+
info, err := b.Describe(ctx, wantChecksum.Hex)
8386
if err != nil && backend.IsNotFound(err) {
8487
http.Error(w, "artifact not found", http.StatusNotFound)
8588
return
@@ -88,20 +91,44 @@ func (s *DownloadService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
8891
return
8992
}
9093

91-
// Set headers
92-
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s", info.FileName))
93-
w.Header().Set("Content-Length", strconv.FormatInt(info.Size, 10))
94+
s.log.Infow("msg", "download initialized", "digest", wantChecksum, "size", bytefmt.ByteSize(uint64(info.Size)))
9495

95-
s.log.Infow("msg", "download initialized", "digest", hash, "size", bytefmt.ByteSize(uint64(info.Size)))
96+
gotChecksum := sha256.New()
97+
// create temporary buffer to write to both the writer and the checksum
98+
buf := bytes.NewBuffer(nil)
9699

97-
if err := b.Download(ctx, w, hash.Hex); err != nil {
100+
// NOTE: we don't sent the file directly to the writer because we need to calculate the checksum
101+
// and we want to send the file / even if partially only if the checksum matches
102+
// this has a performance impact but it's the only way to ensure that the file is not corrupted
103+
// and don't require client-side verification
104+
mw := io.MultiWriter(buf, gotChecksum)
105+
if err := b.Download(ctx, mw, wantChecksum.Hex); err != nil {
98106
if errors.Is(err, context.Canceled) {
99-
s.log.Infow("msg", "download canceled", "digest", hash)
107+
s.log.Infow("msg", "download canceled", "digest", wantChecksum)
100108
return
101109
}
102110

103111
http.Error(w, sl.LogAndMaskErr(err, s.log).Error(), http.StatusInternalServerError)
112+
return
113+
}
114+
115+
// Verify the checksum
116+
if got, want := fmt.Sprintf("%x", gotChecksum.Sum(nil)), wantChecksum.Hex; got != want {
117+
msg := fmt.Sprintf("checksums mismatch: got: %s, want: %s", got, want)
118+
s.log.Info(msg)
119+
http.Error(w, msg, http.StatusUnauthorized)
120+
return
121+
}
122+
123+
// if the buffer contains the actual data we expect we proceed with sending it to the browser
124+
// Set headers
125+
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s", info.FileName))
126+
w.Header().Set("Content-Length", strconv.FormatInt(info.Size, 10))
127+
128+
if _, err := io.Copy(w, buf); err != nil {
129+
http.Error(w, sl.LogAndMaskErr(err, s.log).Error(), http.StatusInternalServerError)
130+
return
104131
}
105132

106-
s.log.Infow("msg", "download finished", "digest", hash, "size", bytefmt.ByteSize(uint64(info.Size)))
133+
s.log.Infow("msg", "download finished", "digest", wantChecksum, "size", bytefmt.ByteSize(uint64(info.Size)))
107134
}

app/artifact-cas/internal/service/service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func (s *commonService) loadBackend(ctx context.Context, providerType, secretID
4040
// get the OCI provider from the map
4141
p, ok := s.backends[providerType]
4242
if !ok || p == nil {
43-
return nil, kerrors.NotFound("backend provider", providerType)
43+
return nil, kerrors.NotFound("backend provider", fmt.Sprintf("backend %q not found", providerType))
4444
}
4545

4646
s.log.Infow("msg", "selected provider", "provider", providerType)

app/cli/cmd/casbackend.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func newCASBackendAddCmd() *cobra.Command {
4141
cmd.PersistentFlags().Bool("default", false, "set the backend as default in your organization")
4242
cmd.PersistentFlags().String("description", "", "descriptive information for this registration")
4343

44-
cmd.AddCommand(newCASBackendAddOCICmd())
44+
cmd.AddCommand(newCASBackendAddOCICmd(), newCASBackendAddAzureBlobStorageCmd())
4545
return cmd
4646
}
4747

@@ -54,7 +54,7 @@ func newCASBackendUpdateCmd() *cobra.Command {
5454
cmd.PersistentFlags().Bool("default", false, "set the backend as default in your organization")
5555
cmd.PersistentFlags().String("description", "", "descriptive information for this registration")
5656

57-
cmd.AddCommand(newCASBackendUpdateOCICmd(), newCASBackendUpdateInlineCmd())
57+
cmd.AddCommand(newCASBackendUpdateOCICmd(), newCASBackendUpdateInlineCmd(), newCASBackendUpdateAzureBlobCmd())
5858
return cmd
5959
}
6060

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
//
2+
// Copyright 2023 The Chainloop Authors.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
package cmd
17+
18+
import (
19+
"fmt"
20+
21+
"github.com/chainloop-dev/chainloop/app/cli/internal/action"
22+
"github.com/chainloop-dev/chainloop/internal/blobmanager/azureblob"
23+
"github.com/go-kratos/kratos/v2/log"
24+
"github.com/spf13/cobra"
25+
)
26+
27+
func newCASBackendAddAzureBlobStorageCmd() *cobra.Command {
28+
var storageAccountName, tenantID, clientID, clientSecret, container string
29+
cmd := &cobra.Command{
30+
Use: "azure-blob",
31+
Short: "Register a Azure Blob Storage CAS Backend",
32+
RunE: func(cmd *cobra.Command, args []string) error {
33+
// If we are setting the default, we list existing CAS backends
34+
// and ask the user to confirm the rewrite
35+
isDefault, err := cmd.Flags().GetBool("default")
36+
cobra.CheckErr(err)
37+
38+
description, err := cmd.Flags().GetString("description")
39+
cobra.CheckErr(err)
40+
41+
if isDefault {
42+
if confirmed, err := confirmDefaultCASBackendOverride(actionOpts, ""); err != nil {
43+
return err
44+
} else if !confirmed {
45+
log.Info("Aborting...")
46+
return nil
47+
}
48+
}
49+
50+
opts := &action.NewCASBackendAddOpts{
51+
Location: fmt.Sprintf("%s/%s", storageAccountName, container),
52+
Provider: azureblob.ProviderID,
53+
Description: description,
54+
Credentials: map[string]any{
55+
"tenantID": tenantID,
56+
"clientID": clientID,
57+
"clientSecret": clientSecret,
58+
},
59+
Default: isDefault,
60+
}
61+
62+
res, err := action.NewCASBackendAdd(actionOpts).Run(opts)
63+
if err != nil {
64+
return err
65+
} else if res == nil {
66+
return nil
67+
}
68+
69+
return encodeOutput([]*action.CASBackendItem{res}, casBackendListTableOutput)
70+
},
71+
}
72+
73+
cmd.Flags().StringVar(&storageAccountName, "storage-account", "", "Storage Account Name")
74+
err := cmd.MarkFlagRequired("storage-account")
75+
cobra.CheckErr(err)
76+
77+
cmd.Flags().StringVar(&tenantID, "tenant", "", "Active Directory Tenant ID")
78+
err = cmd.MarkFlagRequired("tenant")
79+
cobra.CheckErr(err)
80+
81+
cmd.Flags().StringVar(&clientID, "client-id", "", "Service Principal Client ID")
82+
err = cmd.MarkFlagRequired("client-id")
83+
cobra.CheckErr(err)
84+
85+
cmd.Flags().StringVar(&clientSecret, "client-secret", "", "Service Principal Client Secret")
86+
err = cmd.MarkFlagRequired("client-secret")
87+
cobra.CheckErr(err)
88+
89+
cmd.Flags().StringVar(&container, "container", "chainloop", "Storage Container Name")
90+
return cmd
91+
}

0 commit comments

Comments
 (0)