Skip to content

Commit 446058d

Browse files
committed
server: register Timeseries service with DRPC server
Enable these service on the DRPC server in addition to gRPC. This is controlled by `rpc.experimental_drpc.enabled` (off by default). This change is part of a series and is similar to: #146926 Note: This only registers the service; the client is not updated to use the DRPC client, so this service will not have any functional effect. Epic: CRDB-48925 Release note: None
1 parent 3d7ab2a commit 446058d

File tree

4 files changed

+67
-3
lines changed

4 files changed

+67
-3
lines changed

pkg/server/server.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1256,6 +1256,9 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
12561256
if err := sAuth.RegisterDRPCService(drpcServer); err != nil {
12571257
return nil, err
12581258
}
1259+
if err := sTS.RegisterDRPCService(drpcServer); err != nil {
1260+
return nil, err
1261+
}
12591262

12601263
// Tell the node event logger (join, restart) how to populate SQL entries
12611264
// into system.eventlog.

pkg/server/tenant.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,9 @@ func newTenantServer(
483483
if err := sAuth.RegisterDRPCService(args.drpc); err != nil {
484484
return nil, err
485485
}
486+
if err := args.tenantTimeSeriesServer.RegisterDRPCService(args.drpc); err != nil {
487+
return nil, err
488+
}
486489

487490
// Tell the status/admin servers how to access SQL structures.
488491
sStatus.setStmtDiagnosticsRequester(sqlServer.execCfg.StmtDiagnosticsRecorder)

pkg/ts/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ go_library(
4242
"@com_github_cockroachdb_errors//:errors",
4343
"@com_github_cockroachdb_redact//:redact",
4444
"@com_github_grpc_ecosystem_grpc_gateway//runtime:go_default_library",
45+
"@io_storj_drpc//:drpc",
4546
"@org_golang_google_grpc//:grpc",
4647
"@org_golang_google_grpc//codes",
4748
"@org_golang_google_grpc//status",

pkg/ts/server.go

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"google.golang.org/grpc"
2727
"google.golang.org/grpc/codes"
2828
"google.golang.org/grpc/status"
29+
"storj.io/drpc"
2930
)
3031

3132
const (
@@ -133,6 +134,15 @@ func (s *TenantServer) RegisterService(g *grpc.Server) {
133134
tspb.RegisterTimeSeriesServer(g, s)
134135
}
135136

137+
type drpcTenantServer struct {
138+
*TenantServer
139+
}
140+
141+
// RegisterService registers the DRPC service.
142+
func (s *TenantServer) RegisterDRPCService(d drpc.Mux) error {
143+
return tspb.DRPCRegisterTimeSeries(d, &drpcTenantServer{TenantServer: s})
144+
}
145+
136146
// RegisterGateway starts the gateway (i.e. reverse proxy) that proxies HTTP requests
137147
// to the appropriate gRPC endpoints.
138148
func (s *TenantServer) RegisterGateway(
@@ -210,6 +220,15 @@ func (s *Server) RegisterService(g *grpc.Server) {
210220
tspb.RegisterTimeSeriesServer(g, s)
211221
}
212222

223+
type drpcServer struct {
224+
*Server
225+
}
226+
227+
// RegisterService registers the DRPC service.
228+
func (s *Server) RegisterDRPCService(d drpc.Mux) error {
229+
return tspb.DRPCRegisterTimeSeries(d, &drpcServer{Server: s})
230+
}
231+
213232
// RegisterGateway starts the gateway (i.e. reverse proxy) that proxies HTTP requests
214233
// to the appropriate gRPC endpoints.
215234
func (s *Server) RegisterGateway(
@@ -377,25 +396,63 @@ func (s *Server) Query(
377396
// set up a KV store and write some keys into it (`MakeDataKey`) to do so without
378397
// setting up a `*Server`.
379398
func (s *Server) Dump(req *tspb.DumpRequest, stream tspb.TimeSeries_DumpServer) error {
399+
return s.dump(req, stream)
400+
}
401+
402+
// Dump returns a stream of raw timeseries data that has been stored on the
403+
// server.
404+
func (s *drpcServer) Dump(req *tspb.DumpRequest, stream tspb.DRPCTimeSeries_DumpStream) error {
405+
return s.dump(req, stream)
406+
}
407+
408+
func (s *Server) dump(req *tspb.DumpRequest, stream tspb.RPCTimeSeries_DumpStream) error {
380409
d := DefaultDumper{stream.Send}.Dump
381410
return dumpImpl(stream.Context(), s.db.db, req, d)
382411

383412
}
384413

385414
// DumpRaw is like Dump, but it returns a stream of raw KV pairs.
386415
func (s *Server) DumpRaw(req *tspb.DumpRequest, stream tspb.TimeSeries_DumpRawServer) error {
416+
return s.dumpRaw(req, stream)
417+
}
418+
419+
// DumpRaw is like Dump, but it returns a stream of raw KV pairs.
420+
func (s *drpcServer) DumpRaw(
421+
req *tspb.DumpRequest, stream tspb.DRPCTimeSeries_DumpRawStream,
422+
) error {
423+
return s.dumpRaw(req, stream)
424+
}
425+
426+
func (s *Server) dumpRaw(req *tspb.DumpRequest, stream tspb.RPCTimeSeries_DumpRawStream) error {
387427
d := rawDumper{stream}.Dump
388428
return dumpImpl(stream.Context(), s.db.db, req, d)
389429
}
390430

391-
func (s *TenantServer) DumpRaw(_ *tspb.DumpRequest, _ tspb.TimeSeries_DumpRawServer) error {
431+
// DumpRaw is like Dump, but it returns a stream of raw KV pairs.
432+
func (s *drpcTenantServer) DumpRaw(_ *tspb.DumpRequest, _ tspb.DRPCTimeSeries_DumpRawStream) error {
433+
return s.dumpRaw()
434+
}
435+
436+
func (t *TenantServer) DumpRaw(_ *tspb.DumpRequest, _ tspb.TimeSeries_DumpRawServer) error {
437+
return t.dumpRaw()
438+
}
439+
440+
func (t *TenantServer) dumpRaw() error {
392441
return status.Errorf(codes.Unimplemented, "DumpRaw is not implemented for virtual clusters. "+
393442
"If you are attempting to take a tsdump, please connect to the system virtual cluster, "+
394443
"not an application virtual cluster. System virtual clusters will dump all persisted "+
395444
"metrics from all virtual clusters.")
396445
}
397446

398-
func (s *TenantServer) Dump(_ *tspb.DumpRequest, _ tspb.TimeSeries_DumpServer) error {
447+
func (s *drpcTenantServer) Dump(_ *tspb.DumpRequest, _ tspb.DRPCTimeSeries_DumpStream) error {
448+
return s.dump()
449+
}
450+
451+
func (t *TenantServer) Dump(_ *tspb.DumpRequest, _ tspb.TimeSeries_DumpServer) error {
452+
return t.dump()
453+
}
454+
455+
func (t *TenantServer) dump() error {
399456
return status.Errorf(codes.Unimplemented, "Dump is not implemented for virtual clusters. "+
400457
"If you are attempting to take a tsdump, please connect to the system virtual cluster, "+
401458
"not an application virtual cluster. System virtual clusters will dump all persisted "+
@@ -466,7 +523,7 @@ func (dd DefaultDumper) Dump(kv *roachpb.KeyValue) error {
466523
}
467524

468525
type rawDumper struct {
469-
stream tspb.TimeSeries_DumpRawServer
526+
stream tspb.RPCTimeSeries_DumpRawStream
470527
}
471528

472529
func (rd rawDumper) Dump(kv *roachpb.KeyValue) error {

0 commit comments

Comments
 (0)