Skip to content

Commit 9e757d2

Browse files
authored
refactor: decouple CAS client code from CLI (#40)
--------- Signed-off-by: Miguel Martinez Trivino <[email protected]>
1 parent e770fae commit 9e757d2

File tree

9 files changed

+96
-133
lines changed

9 files changed

+96
-133
lines changed

app/artifact-cas/README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ Its structure contains the following top to down layers.
2020

2121
## System Dependencies
2222

23-
The CAS proxy **has only one running dependency**. A secret storage backend to retrieve the OCI repository credentials. Currently we support both [Hashicorp Vault](https://www.vaultproject.io/) and [AWS Secret Manager](https://aws.amazon.com/secrets-manager/).
23+
The CAS proxy **has only one running dependency**. A secret storage backend to retrieve the OCI repository credentials. Currently, we support both [Hashicorp Vault](https://www.vaultproject.io/) and [AWS Secret Manager](https://aws.amazon.com/secrets-manager/).
2424

2525
This secret backend is used to download OCI repository credentials (repository path + key pair) during upload/downloads. This makes the Artifact CAS multi-tenant by default since the destination OCI backend gets selected at runtime.
2626

@@ -34,6 +34,10 @@ The token gets signed by the control plane with a private key and verified by th
3434

3535
Note: there are plans to support [JWKS endpoints](https://auth0.com/docs/secure/tokens/json-web-tokens/json-web-key-sets) to enable easy rotation of credentials.
3636

37+
## Client
38+
39+
The client code can be found [here](/internal/casclient/)
40+
3741
## Runbook
3842

3943
We use `make` for most development tasks. Run `make -C app/artifact-cas` to see a list of the available tasks.

app/cli/cmd/attestation_add.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func newAttestationAddCmd() *cobra.Command {
5757
},
5858
RunE: func(cmd *cobra.Command, args []string) error {
5959
a := action.NewAttestationAdd(
60-
&action.AttestationAddOpts{ActionsOpts: actionOpts, ArtifacsCASConn: artifactCASConn},
60+
&action.AttestationAddOpts{ActionsOpts: actionOpts, ArtifactsCASConn: artifactCASConn},
6161
)
6262

6363
err := a.Run(name, value)

app/cli/internal/action/artifact_download.go

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@ import (
2323
"io"
2424
"os"
2525
"path"
26+
"time"
2627

27-
casclient "github.com/chainloop-dev/chainloop/app/cli/internal/casclient/grpc"
28+
"github.com/chainloop-dev/chainloop/internal/casclient"
29+
"github.com/jedib0t/go-pretty/v6/progress"
2830
"google.golang.org/grpc"
2931

3032
crv1 "github.com/google/go-containerregistry/pkg/v1"
@@ -50,7 +52,7 @@ func (a *ArtifactDownload) Run(downloadPath, digest string) error {
5052
return fmt.Errorf("invalid digest: %w", err)
5153
}
5254

53-
client := casclient.NewDownloader(a.artifactsCASConn, casclient.WithLogger(a.Logger), casclient.WithProgressRender(true))
55+
client := casclient.NewDownloader(a.artifactsCASConn)
5456
ctx := context.Background()
5557
info, err := client.Describe(ctx, h.Hex)
5658
if err != nil {
@@ -77,6 +79,11 @@ func (a *ArtifactDownload) Run(downloadPath, digest string) error {
7779
w := io.MultiWriter(f, hash)
7880

7981
a.Logger.Info().Str("name", info.Filename).Str("to", downloadPath).Msg("downloading file")
82+
83+
// render progress bar
84+
go renderOperationStatus(ctx, client.ProgressStatus, a.Logger)
85+
defer close(client.ProgressStatus)
86+
8087
err = client.Download(ctx, w, h.Hex, info.Size)
8188
if err != nil {
8289
a.Logger.Debug().Err(err).Msg("problem downloading file")
@@ -87,7 +94,46 @@ func (a *ArtifactDownload) Run(downloadPath, digest string) error {
8794
return fmt.Errorf("checksums mismatch: got: %s, expected: %s", got, want)
8895
}
8996

97+
// Give some time for the progress renderer to finish
98+
// TODO: Implement with proper subroutine messaging
99+
time.Sleep(progress.DefaultUpdateFrequency)
100+
90101
a.Logger.Info().Str("path", downloadPath).Msg("file downloaded!")
91102

92103
return nil
93104
}
105+
106+
func renderOperationStatus(ctx context.Context, progressChan casclient.ProgressStatusChan, output io.Writer) {
107+
pw := progress.NewWriter()
108+
pw.Style().Visibility.ETA = true
109+
pw.Style().Visibility.Speed = true
110+
pw.SetUpdateFrequency(progress.DefaultUpdateFrequency)
111+
112+
var tracker *progress.Tracker
113+
go pw.Render()
114+
defer pw.Stop()
115+
116+
for {
117+
select {
118+
case <-ctx.Done():
119+
return
120+
case status, ok := <-progressChan:
121+
if !ok {
122+
return
123+
}
124+
125+
// Initialize tracker
126+
if tracker == nil {
127+
// Hack: Add 1 to the total to make sure the tracker is not marked as done before the upload is finished
128+
// this way the current value will never reach the total
129+
// but instead the tracker will be marked as done by the defer statement
130+
total := status.TotalSizeBytes + 1
131+
tracker = &progress.Tracker{Total: total, Units: progress.UnitsBytes}
132+
defer tracker.MarkAsDone()
133+
pw.AppendTracker(tracker)
134+
}
135+
136+
tracker.SetValue(status.ProcessedBytes)
137+
}
138+
}
139+
}

app/cli/internal/action/artifact_upload.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@ package action
1717

1818
import (
1919
"context"
20+
"time"
2021

21-
casclient "github.com/chainloop-dev/chainloop/app/cli/internal/casclient/grpc"
22+
"github.com/chainloop-dev/chainloop/internal/casclient"
23+
"github.com/jedib0t/go-pretty/v6/progress"
2224
"google.golang.org/grpc"
2325
)
2426

@@ -41,11 +43,19 @@ func NewArtifactUpload(opts *ArtifactUploadOpts) *ArtifactUpload {
4143
}
4244

4345
func (a *ArtifactUpload) Run(filePath string) (*CASArtifact, error) {
44-
client := casclient.NewUploader(a.artifactsCASConn, casclient.WithLogger(a.Logger), casclient.WithProgressRender(true))
46+
client := casclient.NewUploader(a.artifactsCASConn, casclient.WithLogger(a.Logger))
47+
// render progress bar
48+
go renderOperationStatus(context.Background(), client.ProgressStatus, a.Logger)
49+
defer close(client.ProgressStatus)
50+
4551
res, err := client.Upload(context.Background(), filePath)
4652
if err != nil {
4753
return nil, err
4854
}
4955

56+
// Give some time for the progress renderer to finish
57+
// TODO: Implement with proper subroutine messaging
58+
time.Sleep(progress.DefaultUpdateFrequency)
59+
5060
return &CASArtifact{Digest: res.Digest, fileName: res.Filename}, nil
5161
}

app/cli/internal/action/attestation_add.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@ package action
1818
import (
1919
"errors"
2020

21-
casclient "github.com/chainloop-dev/chainloop/app/cli/internal/casclient/grpc"
2221
"github.com/chainloop-dev/chainloop/internal/attestation/crafter"
22+
"github.com/chainloop-dev/chainloop/internal/casclient"
2323
"google.golang.org/grpc"
2424
)
2525

2626
type AttestationAddOpts struct {
2727
*ActionsOpts
28-
ArtifacsCASConn *grpc.ClientConn
28+
ArtifactsCASConn *grpc.ClientConn
2929
}
3030

3131
type AttestationAdd struct {
@@ -39,9 +39,9 @@ func NewAttestationAdd(cfg *AttestationAddOpts) *AttestationAdd {
3939
ActionsOpts: cfg.ActionsOpts,
4040
c: crafter.NewCrafter(
4141
crafter.WithLogger(&cfg.Logger),
42-
crafter.WithUploader(casclient.NewUploader(cfg.ArtifacsCASConn, casclient.WithLogger(cfg.Logger), casclient.WithProgressRender(true))),
42+
crafter.WithUploader(casclient.NewUploader(cfg.ArtifactsCASConn, casclient.WithLogger(cfg.Logger))),
4343
),
44-
artifactsCASConn: cfg.ArtifacsCASConn,
44+
artifactsCASConn: cfg.ArtifactsCASConn,
4545
}
4646
}
4747

internal/casclient/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# Artifact Content Addressable Storage (CAS) Client code
2+
3+
Client code used to talk to the [Artifact Storage Proxy](/app/artifact-cas/).
4+
5+
It's a [bytestream gRPC client](https://pkg.go.dev/google.golang.org/api/transport/bytestream) that currently supports download by content digest (sha256) and upload methods.

app/cli/internal/casclient/grpc/downloader.go renamed to internal/casclient/downloader.go

Lines changed: 9 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,16 @@
1313
// See the License for the specific language governing permissions and
1414
// limitations under the License.
1515

16-
package grpc
16+
package casclient
1717

1818
import (
1919
"context"
2020
"errors"
2121
"fmt"
2222
"io"
23-
"os"
24-
"time"
2523

2624
v1 "github.com/chainloop-dev/chainloop/app/artifact-cas/api/cas/v1"
2725
"github.com/chainloop-dev/chainloop/internal/attestation/crafter/materials"
28-
"github.com/jedib0t/go-pretty/v6/progress"
2926
"github.com/rs/zerolog"
3027
"google.golang.org/genproto/googleapis/bytestream"
3128
"google.golang.org/grpc"
@@ -39,8 +36,8 @@ func NewDownloader(conn *grpc.ClientConn, opts ...ClientOpts) *DownloaderClient
3936
client := &DownloaderClient{
4037
casClient: &casClient{
4138
conn: conn,
42-
logger: zerolog.New(os.Stderr),
43-
progressStatus: make(chan *materials.UpDownStatus, 2),
39+
ProgressStatus: make(chan *materials.UpDownStatus, 2),
40+
logger: zerolog.Nop(),
4441
},
4542
}
4643

@@ -61,11 +58,6 @@ func (c *DownloaderClient) Download(ctx context.Context, w io.Writer, digest str
6158
ctx, cancel := context.WithCancel(ctx)
6259
defer cancel()
6360

64-
if c.renderProgress {
65-
go c.renderDownloadStatus(ctx, c.logger)
66-
defer close(c.progressStatus)
67-
}
68-
6961
// Open the stream to start reading chunks
7062
reader, err := bytestream.NewByteStreamClient(c.conn).Read(ctx, &bytestream.ReadRequest{ResourceName: digest})
7163
if err != nil {
@@ -96,23 +88,19 @@ func (c *DownloaderClient) Download(ctx context.Context, w io.Writer, digest str
9688
TotalSizeBytes: totalBytes, ProcessedBytes: totalDownloaded,
9789
}
9890

99-
if c.renderProgress {
100-
c.progressStatus <- latestStatus
91+
select {
92+
case c.ProgressStatus <- latestStatus:
93+
// message sent
94+
default:
95+
c.logger.Debug().Msg("nobody listening to progress updates, dropping message")
10196
}
10297
}
10398

104-
// Give some time for the progress renderer to finish
105-
// TODO: Implement with proper subroutine messaging
106-
if c.renderProgress {
107-
time.Sleep(renderUpdateFrequency)
108-
// Block until the buffer has been filled or the upload process has been canceled
109-
}
110-
11199
return nil
112100
}
113101

114102
// Describe returns the metadata of a resource by its digest
115-
// We use this to get the filename and the total size of the artifacct
103+
// We use this to get the filename and the total size of the artifact
116104
func (c *DownloaderClient) Describe(ctx context.Context, digest string) (*materials.ResourceInfo, error) {
117105
client := v1.NewResourceServiceClient(c.conn)
118106
resp, err := client.Describe(ctx, &v1.ResourceServiceDescribeRequest{Digest: digest})
@@ -124,38 +112,3 @@ func (c *DownloaderClient) Describe(ctx context.Context, digest string) (*materi
124112
Digest: resp.GetResult().GetDigest(), Filename: resp.Result.GetFileName(), Size: resp.Result.GetSize(),
125113
}, nil
126114
}
127-
128-
func (c *DownloaderClient) renderDownloadStatus(ctx context.Context, output io.Writer) {
129-
pw := progress.NewWriter()
130-
pw.Style().Visibility.ETA = true
131-
pw.Style().Visibility.Speed = true
132-
pw.SetUpdateFrequency(renderUpdateFrequency)
133-
134-
var tracker *progress.Tracker
135-
go pw.Render()
136-
defer pw.Stop()
137-
138-
for {
139-
select {
140-
case <-ctx.Done():
141-
return
142-
case s, ok := <-c.progressStatus:
143-
if !ok {
144-
return
145-
}
146-
147-
// Initialize tracker
148-
if tracker == nil {
149-
total := s.TotalSizeBytes
150-
tracker = &progress.Tracker{
151-
Total: total,
152-
Units: progress.UnitsBytes,
153-
}
154-
defer tracker.MarkAsDone()
155-
pw.AppendTracker(tracker)
156-
}
157-
158-
tracker.SetValue(s.ProcessedBytes)
159-
}
160-
}
161-
}

0 commit comments

Comments
 (0)