Skip to content

Commit d0805c3

Browse files
authored
Fix Elan's List endpoint to return true size for compressed blobs. (#315)
* Return uncompressed size in List endpoint on Elan. * Bump version to 11.10.6.
1 parent eb86959 commit d0805c3

File tree

4 files changed

+42
-6
lines changed

4 files changed

+42
-6
lines changed

ChangeLog

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
Version 11.10.6
2+
---------------
3+
* Fix Elan's List endpoint to return true size for compressed blobs.
4+
15
Version 11.10.5
26
---------------
37
* Populate partial ExecutedActionMetadata messages during execution

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
11.10.5
1+
11.10.6

elan/rpc/gc.go

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ import (
88
"strings"
99
"time"
1010

11+
pb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
1112
"github.com/hashicorp/go-multierror"
13+
"github.com/klauspost/compress/zstd"
1214
"gocloud.dev/blob"
1315
"google.golang.org/grpc/codes"
1416
"google.golang.org/grpc/status"
@@ -23,20 +25,27 @@ func (s *server) List(ctx context.Context, req *ppb.ListRequest) (*ppb.ListRespo
2325
var g multierror.Group
2426
resp := &ppb.ListResponse{}
2527
g.Go(func() error {
26-
ar, err := s.list(ctx, "ac", req.Prefix)
28+
ar, err := s.list(ctx, ACPrefix, req.Prefix)
2729
resp.ActionResults = ar
2830
return err
2931
})
3032
g.Go(func() error {
31-
ar, err := s.list(ctx, "cas", req.Prefix)
33+
ar, err := s.list(ctx, CASPrefix, req.Prefix)
3234
for _, a := range ar {
3335
resp.Blobs = append(resp.Blobs, &ppb.Blob{Hash: a.Hash, SizeBytes: a.SizeBytes, Replicas: 1, CachePrefix: a.CachePrefix})
3436
}
3537
return err
3638
})
3739
g.Go(func() error {
38-
ar, err := s.list(ctx, "zstd_cas", req.Prefix)
40+
ar, err := s.list(ctx, CompressedCASPrefix, req.Prefix)
3941
for _, a := range ar {
42+
// here we need to get the uncompressed size of the blob, otherwise REX SDK will complain about it
43+
if size, err := s.getBlobUncompressedSize(ctx, a.Hash); err != nil {
44+
// this is not an issue for GC, but would be for replication
45+
log.Warningf("failed getting uncompressed size for blob %s (defaulting to compressed size): %v", a.Hash, err)
46+
} else {
47+
a.SizeBytes = int64(size)
48+
}
4049
resp.Blobs = append(resp.Blobs, &ppb.Blob{Hash: a.Hash, SizeBytes: a.SizeBytes, Replicas: 1, CachePrefix: a.CachePrefix})
4150
}
4251
return err
@@ -61,7 +70,7 @@ func (s *server) list(ctx context.Context, prefix1, prefix2 string) ([]*ppb.Acti
6170
} else if hash := path.Base(obj.Key); !strings.HasPrefix(hash, "tmp") {
6271
ret = append(ret, &ppb.ActionResult{
6372
Hash: hash,
64-
SizeBytes: obj.Size, // Note that this might not be accurate for compressed blobs. For GC it is unlikely to matter deeply.
73+
SizeBytes: obj.Size,
6574
LastAccessed: obj.ModTime.Unix(),
6675
Replicas: 1,
6776
CachePrefix: prefix,
@@ -71,6 +80,25 @@ func (s *server) list(ctx context.Context, prefix1, prefix2 string) ([]*ppb.Acti
7180
return ret, nil
7281
}
7382

83+
func (s *server) getBlobUncompressedSize(ctx context.Context, hash string) (uint64, error) {
84+
r, err := s.bucket.NewRangeReader(ctx, s.key(CompressedCASPrefix, &pb.Digest{Hash: hash}), 0, zstd.HeaderMaxSize, nil)
85+
if err != nil {
86+
return 0, fmt.Errorf("failed opening blob to get uncompressed size: %v", err)
87+
}
88+
defer r.Close()
89+
90+
hdr := make([]byte, zstd.HeaderMaxSize)
91+
if _, err = r.Read(hdr); err != nil {
92+
return 0, fmt.Errorf("failed reading zstd header: %v", err)
93+
}
94+
95+
var zhdr zstd.Header
96+
if err = zhdr.Decode(hdr); err != nil {
97+
return 0, fmt.Errorf("failed decoding zstd header: %v", err)
98+
}
99+
return zhdr.FrameContentSize, nil
100+
}
101+
74102
func (s *server) Delete(ctx context.Context, req *ppb.DeleteRequest) (*ppb.DeleteResponse, error) {
75103
log.Notice("Delete request for %d action results, %d blobs", len(req.ActionResults), len(req.Blobs))
76104
var g multierror.Group

elan/rpc/rpc.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,11 @@ import (
5454

5555
const timeout = 2 * time.Minute
5656

57-
const CASPrefix = "cas"
57+
const (
58+
ACPrefix = "ac"
59+
CASPrefix = "cas"
60+
CompressedCASPrefix = "zstd_cas"
61+
)
5862

5963
var log = logging.MustGetLogger()
6064

0 commit comments

Comments
 (0)