@@ -38,16 +38,37 @@ type Service struct {
38
38
localStorage * LocalStorage
39
39
}
40
40
41
+ // drpcService is a DRPC wrapper around the Service.
42
+ type drpcService Service
43
+
41
44
var _ blobspb.BlobServer = & Service {}
45
+ var _ blobspb.DRPCBlobServer = (* drpcService )(nil )
42
46
43
47
// NewBlobService instantiates a blob service server.
44
48
func NewBlobService (externalIODir string ) (* Service , error ) {
45
49
localStorage , err := NewLocalStorage (externalIODir )
46
50
return & Service {localStorage : localStorage }, err
47
51
}
48
52
49
- // GetStream implements the gRPC service.
53
+ // AsDRPCServer returns the DRPC server implementation for the Blob service.
54
+ func (s * Service ) AsDRPCServer () blobspb.DRPCBlobServer {
55
+ return (* drpcService )(s )
56
+ }
57
+
58
+ // GetStream implements the DRPC service
59
+ func (s * drpcService ) GetStream (
60
+ req * blobspb.GetRequest , stream blobspb.DRPCBlob_GetStreamStream ,
61
+ ) error {
62
+ return (* Service )(s ).getStream (req , stream )
63
+ }
64
+
65
+ // GetStream implements the gRPC service
50
66
func (s * Service ) GetStream (req * blobspb.GetRequest , stream blobspb.Blob_GetStreamServer ) error {
67
+ return s .getStream (req , stream )
68
+ }
69
+
70
+ // getStream is the shared implementation for GetStream for both gRPC and DRPC.
71
+ func (s * Service ) getStream (req * blobspb.GetRequest , stream blobspb.RPCBlob_GetStreamStream ) error {
51
72
content , _ , err := s .localStorage .ReadFile (req .Filename , req .Offset )
52
73
if err != nil {
53
74
return err
@@ -56,8 +77,18 @@ func (s *Service) GetStream(req *blobspb.GetRequest, stream blobspb.Blob_GetStre
56
77
return streamContent (stream .Context (), stream , content )
57
78
}
58
79
59
- // PutStream implements the gRPC service.
80
+ // PutStream implements the DRPC service
81
+ func (s * drpcService ) PutStream (stream blobspb.DRPCBlob_PutStreamStream ) error {
82
+ return (* Service )(s ).putStream (stream )
83
+ }
84
+
85
+ // PutStream implements the gRPC service
60
86
func (s * Service ) PutStream (stream blobspb.Blob_PutStreamServer ) error {
87
+ return s .putStream (stream )
88
+ }
89
+
90
+ // putStream is the shared implementation for PutStream for both gRPC and DRPC.
91
+ func (s * Service ) putStream (stream blobspb.RPCBlob_PutStreamStream ) error {
61
92
filename , ok := grpcutil .FastFirstValueFromIncomingContext (stream .Context (), "filename" )
62
93
if ! ok {
63
94
return errors .New ("could not fetch metadata or no filename in metadata" )
@@ -86,22 +117,43 @@ func (s *Service) PutStream(stream blobspb.Blob_PutStreamServer) error {
86
117
return err
87
118
}
88
119
89
- // List implements the gRPC service.
120
+ // List implements the DRPC service
121
+ func (s * drpcService ) List (
122
+ ctx context.Context , req * blobspb.GlobRequest ,
123
+ ) (* blobspb.GlobResponse , error ) {
124
+ return (* Service )(s ).List (ctx , req )
125
+ }
126
+
127
+ // List implements the gRPC service
90
128
func (s * Service ) List (
91
129
ctx context.Context , req * blobspb.GlobRequest ,
92
130
) (* blobspb.GlobResponse , error ) {
93
131
matches , err := s .localStorage .List (req .Pattern )
94
132
return & blobspb.GlobResponse {Files : matches }, err
95
133
}
96
134
97
- // Delete implements the gRPC service.
135
+ // Delete implements the DRPC service
136
+ func (s * drpcService ) Delete (
137
+ ctx context.Context , req * blobspb.DeleteRequest ,
138
+ ) (* blobspb.DeleteResponse , error ) {
139
+ return (* Service )(s ).Delete (ctx , req )
140
+ }
141
+
142
+ // Delete implements the gRPC service
98
143
func (s * Service ) Delete (
99
144
ctx context.Context , req * blobspb.DeleteRequest ,
100
145
) (* blobspb.DeleteResponse , error ) {
101
146
return & blobspb.DeleteResponse {}, s .localStorage .Delete (req .Filename )
102
147
}
103
148
104
- // Stat implements the gRPC service.
149
+ // Stat implements the DRPC service
150
+ func (s * drpcService ) Stat (
151
+ ctx context.Context , req * blobspb.StatRequest ,
152
+ ) (* blobspb.BlobStat , error ) {
153
+ return (* Service )(s ).Stat (ctx , req )
154
+ }
155
+
156
+ // Stat implements the gRPC service
105
157
func (s * Service ) Stat (ctx context.Context , req * blobspb.StatRequest ) (* blobspb.BlobStat , error ) {
106
158
resp , err := s .localStorage .Stat (req .Filename )
107
159
if oserror .IsNotExist (err ) {
0 commit comments