Skip to content

Commit dcb853d

Browse files
authored
Unified Search: Remove resource-server-specific methods from distributor (#107607)
* Remove resource-server-specific methods from distributor * Remove BlobStoreServer interface implementation from distributor
1 parent 4d93380 commit dcb853d

File tree

1 file changed

+0
-95
lines changed

1 file changed

+0
-95
lines changed

pkg/storage/unified/resource/search_server_distributor.go

Lines changed: 0 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,9 @@ func ProvideSearchDistributorServer(cfg *setting.Cfg, features featuremgmt.Featu
4141

4242
grpcServer := grpcHandler.GetServer()
4343

44-
resourcepb.RegisterResourceStoreServer(grpcServer, distributorServer)
4544
// resourcepb.RegisterBulkStoreServer(grpcServer, distributorServer)
4645
resourcepb.RegisterResourceIndexServer(grpcServer, distributorServer)
4746
resourcepb.RegisterManagedObjectIndexServer(grpcServer, distributorServer)
48-
resourcepb.RegisterBlobStoreServer(grpcServer, distributorServer)
4947
grpc_health_v1.RegisterHealthServer(grpcServer, healthService)
5048
_, err = grpcserver.ProvideReflectionService(cfg, grpcHandler)
5149
if err != nil {
@@ -106,81 +104,6 @@ func (ds *distributorServer) GetStats(ctx context.Context, r *resourcepb.Resourc
106104
return client.GetStats(ctx, r)
107105
}
108106

109-
func (ds *distributorServer) Read(ctx context.Context, r *resourcepb.ReadRequest) (*resourcepb.ReadResponse, error) {
110-
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Key.Namespace, "Read")
111-
if err != nil {
112-
return nil, err
113-
}
114-
115-
return client.Read(ctx, r)
116-
}
117-
118-
func (ds *distributorServer) Create(ctx context.Context, r *resourcepb.CreateRequest) (*resourcepb.CreateResponse, error) {
119-
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Key.Namespace, "Create")
120-
if err != nil {
121-
return nil, err
122-
}
123-
124-
return client.Create(ctx, r)
125-
}
126-
127-
func (ds *distributorServer) Update(ctx context.Context, r *resourcepb.UpdateRequest) (*resourcepb.UpdateResponse, error) {
128-
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Key.Namespace, "Update")
129-
if err != nil {
130-
return nil, err
131-
}
132-
133-
return client.Update(ctx, r)
134-
}
135-
136-
func (ds *distributorServer) Delete(ctx context.Context, r *resourcepb.DeleteRequest) (*resourcepb.DeleteResponse, error) {
137-
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Key.Namespace, "Delete")
138-
if err != nil {
139-
return nil, err
140-
}
141-
142-
return client.Delete(ctx, r)
143-
}
144-
145-
func (ds *distributorServer) List(ctx context.Context, r *resourcepb.ListRequest) (*resourcepb.ListResponse, error) {
146-
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace, "List")
147-
if err != nil {
148-
return nil, err
149-
}
150-
151-
return client.List(ctx, r)
152-
}
153-
154-
func (ds *distributorServer) Watch(r *resourcepb.WatchRequest, srv resourcepb.ResourceStore_WatchServer) error {
155-
// r -> consumer watch request
156-
// srv -> stream connection with consumer
157-
ctx := srv.Context()
158-
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Options.Key.Namespace, "Watch")
159-
if err != nil {
160-
return err
161-
}
162-
163-
// watchClient -> stream connection with storage-api pod
164-
watchClient, err := client.Watch(ctx, r)
165-
if err != nil {
166-
return err
167-
}
168-
169-
// WARNING
170-
// in Watch, all messages flow from the resource server (watchClient) to the consumer (srv)
171-
// but since this is a streaming connection, in theory the consumer could also send a message to the server
172-
// however for the sake of simplicity we are not handling it here
173-
// but if we decide to handle bi-directional message passing in this method, we will need to update this
174-
// we also never handle EOF err, as the server never closes the connection willingly
175-
for {
176-
msg, err := watchClient.Recv()
177-
if err != nil {
178-
return err
179-
}
180-
_ = srv.Send(msg)
181-
}
182-
}
183-
184107
// TODO implement this if we want to support it in cloud
185108
// func (ds *DistributorServer) BulkProcess(srv BulkStore_BulkProcessServer) error {
186109
// return nil
@@ -204,24 +127,6 @@ func (ds *distributorServer) ListManagedObjects(ctx context.Context, r *resource
204127
return client.ListManagedObjects(ctx, r)
205128
}
206129

207-
func (ds *distributorServer) PutBlob(ctx context.Context, r *resourcepb.PutBlobRequest) (*resourcepb.PutBlobResponse, error) {
208-
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Resource.Namespace, "PutBlob")
209-
if err != nil {
210-
return nil, err
211-
}
212-
213-
return client.PutBlob(ctx, r)
214-
}
215-
216-
func (ds *distributorServer) GetBlob(ctx context.Context, r *resourcepb.GetBlobRequest) (*resourcepb.GetBlobResponse, error) {
217-
ctx, client, err := ds.getClientToDistributeRequest(ctx, r.Resource.Namespace, "GetBlob")
218-
if err != nil {
219-
return nil, err
220-
}
221-
222-
return client.GetBlob(ctx, r)
223-
}
224-
225130
func (ds *distributorServer) getClientToDistributeRequest(ctx context.Context, namespace string, methodName string) (context.Context, ResourceClient, error) {
226131
ringHasher := fnv.New32a()
227132
_, err := ringHasher.Write([]byte(namespace))

0 commit comments

Comments
 (0)