Skip to content

Commit c153bc5

Browse files
committed
feat: create grpcZstd blob access
1 parent 7bb781b commit c153bc5

File tree

8 files changed

+1080
-100
lines changed

8 files changed

+1080
-100
lines changed

pkg/blobstore/configuration/cas_blob_access_creator.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,15 @@ func (bac *casBlobAccessCreator) NewCustomBlobAccess(terminationGroup program.Gr
9999
BlobAccess: grpcclients.NewCASBlobAccess(client, uuid.NewRandom, 65536),
100100
DigestKeyFormat: digest.KeyWithInstance,
101101
}, "grpc", nil
102+
case *pb.BlobAccessConfiguration_Compressed:
103+
client, err := bac.grpcClientFactory.NewClientFromConfiguration(backend.Compressed.Grpc, terminationGroup)
104+
if err != nil {
105+
return BlobAccessInfo{}, "", err
106+
}
107+
return BlobAccessInfo{
108+
BlobAccess: grpcclients.NewCASWithZstdBlobAccess(client, uuid.NewRandom, 65536, backend.Compressed.CompressionThresholdBytes),
109+
DigestKeyFormat: digest.KeyWithInstance,
110+
}, "compressed", nil
102111
case *pb.BlobAccessConfiguration_ReferenceExpanding:
103112
// The backend used by ReferenceExpandingBlobAccess is
104113
// an Indirect Content Addressable Storage (ICAS). This

pkg/blobstore/grpcclients/BUILD.bazel

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go_library(
55
srcs = [
66
"ac_blob_access.go",
77
"cas_blob_access.go",
8+
"cas_zstd_blob_access.go",
89
"fsac_blob_access.go",
910
"icas_blob_access.go",
1011
"iscc_blob_access.go",
@@ -22,6 +23,7 @@ go_library(
2223
"//pkg/util",
2324
"@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
2425
"@com_github_google_uuid//:uuid",
26+
"@com_github_klauspost_compress//zstd",
2527
"@org_golang_google_genproto_googleapis_bytestream//:bytestream",
2628
"@org_golang_google_grpc//:grpc",
2729
"@org_golang_google_grpc//codes",
@@ -32,7 +34,10 @@ go_library(
3234

3335
go_test(
3436
name = "grpcclients_test",
35-
srcs = ["cas_blob_access_test.go"],
37+
srcs = [
38+
"cas_blob_access_test.go",
39+
"cas_zstd_blob_access_test.go",
40+
],
3641
deps = [
3742
":grpcclients",
3843
"//internal/mock",
@@ -43,6 +48,7 @@ go_test(
4348
"@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
4449
"@bazel_remote_apis//build/bazel/semver:semver_go_proto",
4550
"@com_github_google_uuid//:uuid",
51+
"@com_github_klauspost_compress//zstd",
4652
"@com_github_stretchr_testify//require",
4753
"@org_golang_google_genproto_googleapis_bytestream//:bytestream",
4854
"@org_golang_google_grpc//:grpc",

pkg/blobstore/grpcclients/cas_blob_access.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,10 @@ func (ba *casBlobAccess) Put(ctx context.Context, digest digest.Digest, b buffer
140140
}
141141

142142
func (ba *casBlobAccess) FindMissing(ctx context.Context, digests digest.Set) (digest.Set, error) {
143+
return findMissingBlobsInternal(ctx, digests, ba.contentAddressableStorageClient)
144+
}
145+
146+
func findMissingBlobsInternal(ctx context.Context, digests digest.Set, cas remoteexecution.ContentAddressableStorageClient) (digest.Set, error) {
143147
// Partition all digests by digest function, as the
144148
// FindMissingBlobs() RPC can only process digests for a single
145149
// instance name and digest function.
@@ -157,7 +161,7 @@ func (ba *casBlobAccess) FindMissing(ctx context.Context, digests digest.Set) (d
157161
BlobDigests: blobDigests,
158162
DigestFunction: digestFunction.GetEnumValue(),
159163
}
160-
response, err := ba.contentAddressableStorageClient.FindMissingBlobs(ctx, &request)
164+
response, err := cas.FindMissingBlobs(ctx, &request)
161165
if err != nil {
162166
return digest.EmptySet, err
163167
}

pkg/blobstore/grpcclients/cas_blob_access_test.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,107 @@ func TestCASBlobAccessPut(t *testing.T) {
163163
})
164164
}
165165

166+
func TestCASBlobAccessGet(t *testing.T) {
167+
ctrl, ctx := gomock.WithContext(context.Background(), t)
168+
169+
client := mock.NewMockClientConnInterface(ctrl)
170+
uuidGenerator := mock.NewMockUUIDGenerator(ctrl)
171+
blobAccess := grpcclients.NewCASBlobAccess(client, uuidGenerator.Call, 10)
172+
173+
t.Run("Success", func(t *testing.T) {
174+
blobDigest := digest.MustNewDigest("hello", remoteexecution.DigestFunction_MD5, "8b1a9953c4611296a827abf8c47804d7", 5)
175+
176+
clientStream := mock.NewMockClientStream(ctrl)
177+
client.EXPECT().NewStream(gomock.Any(), gomock.Any(), "/google.bytestream.ByteStream/Read").
178+
Return(clientStream, nil)
179+
clientStream.EXPECT().SendMsg(testutil.EqProto(t, &bytestream.ReadRequest{
180+
ResourceName: "hello/blobs/8b1a9953c4611296a827abf8c47804d7/5",
181+
ReadOffset: 0,
182+
ReadLimit: 0,
183+
})).Return(nil)
184+
clientStream.EXPECT().RecvMsg(gomock.Any()).DoAndReturn(func(m interface{}) error {
185+
resp := m.(*bytestream.ReadResponse)
186+
resp.Data = []byte("Hello")
187+
return nil
188+
})
189+
clientStream.EXPECT().RecvMsg(gomock.Any()).Return(io.EOF).AnyTimes()
190+
clientStream.EXPECT().CloseSend().Return(nil)
191+
192+
buffer := blobAccess.Get(ctx, blobDigest)
193+
data, err := buffer.ToByteSlice(1000)
194+
require.NoError(t, err)
195+
require.Equal(t, []byte("Hello"), data)
196+
})
197+
198+
t.Run("SuccessLargeBlob", func(t *testing.T) {
199+
// Create large blob data (1000 bytes)
200+
expectedData := make([]byte, 1000)
201+
for i := range expectedData {
202+
expectedData[i] = byte('A' + (i % 26)) // Repeating alphabet pattern
203+
}
204+
largeDigest := digest.MustNewDigest("hello", remoteexecution.DigestFunction_MD5, "1411ffd5854fa029dc4d231aa89311eb", 1000)
205+
206+
clientStream := mock.NewMockClientStream(ctrl)
207+
client.EXPECT().NewStream(gomock.Any(), gomock.Any(), "/google.bytestream.ByteStream/Read").
208+
Return(clientStream, nil)
209+
clientStream.EXPECT().SendMsg(testutil.EqProto(t, &bytestream.ReadRequest{
210+
ResourceName: "hello/blobs/1411ffd5854fa029dc4d231aa89311eb/1000",
211+
ReadOffset: 0,
212+
ReadLimit: 0,
213+
})).Return(nil)
214+
215+
// Send data in a single chunk (simpler for testing)
216+
clientStream.EXPECT().RecvMsg(gomock.Any()).DoAndReturn(func(m interface{}) error {
217+
resp := m.(*bytestream.ReadResponse)
218+
resp.Data = expectedData
219+
return nil
220+
})
221+
clientStream.EXPECT().RecvMsg(gomock.Any()).Return(io.EOF).AnyTimes()
222+
clientStream.EXPECT().CloseSend().Return(nil)
223+
224+
buffer := blobAccess.Get(ctx, largeDigest)
225+
data, err := buffer.ToByteSlice(1500)
226+
require.NoError(t, err)
227+
require.Equal(t, expectedData, data)
228+
})
229+
230+
t.Run("InitialFailure", func(t *testing.T) {
231+
blobDigest := digest.MustNewDigest("hello", remoteexecution.DigestFunction_MD5, "8b1a9953c4611296a827abf8c47804d7", 5)
232+
233+
// Failure to create the outgoing connection.
234+
client.EXPECT().NewStream(gomock.Any(), gomock.Any(), "/google.bytestream.ByteStream/Read").
235+
Return(nil, status.Error(codes.Internal, "Failed to create outgoing connection"))
236+
237+
buffer := blobAccess.Get(ctx, blobDigest)
238+
_, err := buffer.ToByteSlice(1000)
239+
testutil.RequireEqualStatus(t,
240+
status.Error(codes.Internal, "Failed to create outgoing connection"),
241+
err)
242+
})
243+
244+
t.Run("ReceiveFailure", func(t *testing.T) {
245+
blobDigest := digest.MustNewDigest("hello", remoteexecution.DigestFunction_MD5, "8b1a9953c4611296a827abf8c47804d7", 5)
246+
247+
// Failure to receive a response.
248+
clientStream := mock.NewMockClientStream(ctrl)
249+
client.EXPECT().NewStream(gomock.Any(), gomock.Any(), "/google.bytestream.ByteStream/Read").
250+
Return(clientStream, nil)
251+
clientStream.EXPECT().SendMsg(testutil.EqProto(t, &bytestream.ReadRequest{
252+
ResourceName: "hello/blobs/8b1a9953c4611296a827abf8c47804d7/5",
253+
ReadOffset: 0,
254+
ReadLimit: 0,
255+
})).Return(nil)
256+
clientStream.EXPECT().RecvMsg(gomock.Any()).Return(status.Error(codes.Internal, "Lost connection to server")).AnyTimes()
257+
clientStream.EXPECT().CloseSend().Return(nil)
258+
259+
buffer := blobAccess.Get(ctx, blobDigest)
260+
_, err := buffer.ToByteSlice(1000)
261+
testutil.RequireEqualStatus(t,
262+
status.Error(codes.Internal, "Lost connection to server"),
263+
err)
264+
})
265+
}
266+
166267
func TestCASBlobAccessGetCapabilities(t *testing.T) {
167268
ctrl, ctx := gomock.WithContext(context.Background(), t)
168269

0 commit comments

Comments
 (0)