Skip to content

Commit 64dc647

Browse files
craig[bot]shubhamdhama
andcommitted
Merge #148500
148500: blobs: register `Blob` service with DRPC server r=cthumuluru-crdb a=shubhamdhama Enable the `Blob` service on the DRPC server in addition to gRPC. This is controlled by `rpc.experimental_drpc.enabled` (off by default). This change is part of a series and is similar to: #146926 Note: This only registers the service; the client is not updated to use the DRPC client, so this service will not have any functional effect. Epic: CRDB-48925 Release note: None Co-authored-by: Shubham Dhama <[email protected]>
2 parents 0777857 + 22ff635 commit 64dc647

File tree

3 files changed

+61
-6
lines changed

3 files changed

+61
-6
lines changed

pkg/blobs/service.go

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,37 @@ type Service struct {
3838
localStorage *LocalStorage
3939
}
4040

41+
// drpcService is a DRPC wrapper around the Service.
42+
type drpcService Service
43+
4144
var _ blobspb.BlobServer = &Service{}
45+
var _ blobspb.DRPCBlobServer = (*drpcService)(nil)
4246

4347
// NewBlobService instantiates a blob service server.
4448
func NewBlobService(externalIODir string) (*Service, error) {
4549
localStorage, err := NewLocalStorage(externalIODir)
4650
return &Service{localStorage: localStorage}, err
4751
}
4852

49-
// GetStream implements the gRPC service.
53+
// AsDRPCServer returns the DRPC server implementation for the Blob service.
54+
func (s *Service) AsDRPCServer() blobspb.DRPCBlobServer {
55+
return (*drpcService)(s)
56+
}
57+
58+
// GetStream implements the DRPC service
59+
func (s *drpcService) GetStream(
60+
req *blobspb.GetRequest, stream blobspb.DRPCBlob_GetStreamStream,
61+
) error {
62+
return (*Service)(s).getStream(req, stream)
63+
}
64+
65+
// GetStream implements the gRPC service
5066
func (s *Service) GetStream(req *blobspb.GetRequest, stream blobspb.Blob_GetStreamServer) error {
67+
return s.getStream(req, stream)
68+
}
69+
70+
// getStream is the shared implementation for GetStream for both gRPC and DRPC.
71+
func (s *Service) getStream(req *blobspb.GetRequest, stream blobspb.RPCBlob_GetStreamStream) error {
5172
content, _, err := s.localStorage.ReadFile(req.Filename, req.Offset)
5273
if err != nil {
5374
return err
@@ -56,8 +77,18 @@ func (s *Service) GetStream(req *blobspb.GetRequest, stream blobspb.Blob_GetStre
5677
return streamContent(stream.Context(), stream, content)
5778
}
5879

59-
// PutStream implements the gRPC service.
80+
// PutStream implements the DRPC service
81+
func (s *drpcService) PutStream(stream blobspb.DRPCBlob_PutStreamStream) error {
82+
return (*Service)(s).putStream(stream)
83+
}
84+
85+
// PutStream implements the gRPC service
6086
func (s *Service) PutStream(stream blobspb.Blob_PutStreamServer) error {
87+
return s.putStream(stream)
88+
}
89+
90+
// putStream is the shared implementation for PutStream for both gRPC and DRPC.
91+
func (s *Service) putStream(stream blobspb.RPCBlob_PutStreamStream) error {
6192
filename, ok := grpcutil.FastFirstValueFromIncomingContext(stream.Context(), "filename")
6293
if !ok {
6394
return errors.New("could not fetch metadata or no filename in metadata")
@@ -86,22 +117,43 @@ func (s *Service) PutStream(stream blobspb.Blob_PutStreamServer) error {
86117
return err
87118
}
88119

89-
// List implements the gRPC service.
120+
// List implements the DRPC service
121+
func (s *drpcService) List(
122+
ctx context.Context, req *blobspb.GlobRequest,
123+
) (*blobspb.GlobResponse, error) {
124+
return (*Service)(s).List(ctx, req)
125+
}
126+
127+
// List implements the gRPC service
90128
func (s *Service) List(
91129
ctx context.Context, req *blobspb.GlobRequest,
92130
) (*blobspb.GlobResponse, error) {
93131
matches, err := s.localStorage.List(req.Pattern)
94132
return &blobspb.GlobResponse{Files: matches}, err
95133
}
96134

97-
// Delete implements the gRPC service.
135+
// Delete implements the DRPC service
136+
func (s *drpcService) Delete(
137+
ctx context.Context, req *blobspb.DeleteRequest,
138+
) (*blobspb.DeleteResponse, error) {
139+
return (*Service)(s).Delete(ctx, req)
140+
}
141+
142+
// Delete implements the gRPC service
98143
func (s *Service) Delete(
99144
ctx context.Context, req *blobspb.DeleteRequest,
100145
) (*blobspb.DeleteResponse, error) {
101146
return &blobspb.DeleteResponse{}, s.localStorage.Delete(req.Filename)
102147
}
103148

104-
// Stat implements the gRPC service.
149+
// Stat implements the DRPC service
150+
func (s *drpcService) Stat(
151+
ctx context.Context, req *blobspb.StatRequest,
152+
) (*blobspb.BlobStat, error) {
153+
return (*Service)(s).Stat(ctx, req)
154+
}
155+
156+
// Stat implements the gRPC service
105157
func (s *Service) Stat(ctx context.Context, req *blobspb.StatRequest) (*blobspb.BlobStat, error) {
106158
resp, err := s.localStorage.Stat(req.Filename)
107159
if oserror.IsNotExist(err) {

pkg/blobs/stream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func newGetStreamReader(client blobspb.RPCBlob_GetStreamClient) ioctx.ReadCloser
6060

6161
// newPutStreamReader creates an io.ReadCloser that uses gRPC's streaming API
6262
// to read chunks of data.
63-
func newPutStreamReader(client blobspb.Blob_PutStreamServer) ioctx.ReadCloserCtx {
63+
func newPutStreamReader(client blobspb.RPCBlob_PutStreamStream) ioctx.ReadCloserCtx {
6464
return &blobStreamReader{stream: client}
6565
}
6666

pkg/server/server.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1017,6 +1017,9 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
10171017
return nil, errors.Wrap(err, "creating blob service")
10181018
}
10191019
blobspb.RegisterBlobServer(grpcServer.Server, blobService)
1020+
if err := blobspb.DRPCRegisterBlob(drpcServer, blobService.AsDRPCServer()); err != nil {
1021+
return nil, err
1022+
}
10201023

10211024
replicationReporter := reports.NewReporter(
10221025
db, node.stores, storePool, st, nodeLiveness, internalExecutor, systemConfigWatcher,

0 commit comments

Comments
 (0)