Skip to content

Commit 0d80729

Browse files
craig[bot]shubhamdhama
andcommitted
Merge #148267
148267: distsql: register `DistSQL` service with DRPC server r=yuzefovich,cthumuluru-crdb a=shubhamdhama Enable the `DistSQL` service on the DRPC server in addition to gRPC. This is controlled by `rpc.experimental_drpc.enabled` (off by default). This change is part of a series and is similar to: #146926 Note: This only registers the service; the client is not updated to use the DRPC client, so this service will not have any functional effect. Epic: CRDB-48925 Release note: None Co-authored-by: Shubham Dhama <[email protected]>
2 parents a49654d + 3dfaf1e commit 0d80729

File tree

6 files changed

+46
-17
lines changed

6 files changed

+46
-17
lines changed

pkg/sql/colflow/vectorized_flow.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1274,7 +1274,7 @@ var _ flowinfra.InboundStreamHandler = vectorizedInboundStreamHandler{}
12741274
// Run is part of the flowinfra.InboundStreamHandler interface.
12751275
func (s vectorizedInboundStreamHandler) Run(
12761276
ctx context.Context,
1277-
stream execinfrapb.DistSQL_FlowStreamServer,
1277+
stream execinfrapb.RPCDistSQL_FlowStreamStream,
12781278
_ *execinfrapb.ProducerMessage,
12791279
_ *flowinfra.FlowBase,
12801280
) error {

pkg/sql/distsql/server.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,12 @@ func NewServer(
9797
return ds
9898
}
9999

100+
type drpcServerImpl ServerImpl
101+
102+
func (ds *ServerImpl) AsDRPCServer() execinfrapb.DRPCDistSQLServer {
103+
return (*drpcServerImpl)(ds)
104+
}
105+
100106
// Start launches workers for the server.
101107
//
102108
// Note that the initialization of the server required for performing the
@@ -614,6 +620,13 @@ func (ds *ServerImpl) setupSpanForIncomingRPC(
614620
tracing.WithServerSpanKind)
615621
}
616622

623+
// SetupFlow is part of the execinfrapb.DRPCDistSQLServer interface.
624+
func (ds *drpcServerImpl) SetupFlow(
625+
ctx context.Context, req *execinfrapb.SetupFlowRequest,
626+
) (*execinfrapb.SimpleResponse, error) {
627+
return (*ServerImpl)(ds).SetupFlow(ctx, req)
628+
}
629+
617630
// SetupFlow is part of the execinfrapb.DistSQLServer interface.
618631
func (ds *ServerImpl) SetupFlow(
619632
ctx context.Context, req *execinfrapb.SetupFlowRequest,
@@ -676,6 +689,13 @@ func (ds *ServerImpl) SetupFlow(
676689
return &execinfrapb.SimpleResponse{}, nil
677690
}
678691

692+
// CancelDeadFlows is part of the execinfrapb.DRPCDistSQLServer interface.
693+
func (ds *drpcServerImpl) CancelDeadFlows(
694+
ctx context.Context, req *execinfrapb.CancelDeadFlowsRequest,
695+
) (*execinfrapb.SimpleResponse, error) {
696+
return (*ServerImpl)(ds).CancelDeadFlows(ctx, req)
697+
}
698+
679699
// CancelDeadFlows is part of the execinfrapb.DistSQLServer interface.
680700
func (ds *ServerImpl) CancelDeadFlows(
681701
ctx context.Context, req *execinfrapb.CancelDeadFlowsRequest,
@@ -686,7 +706,7 @@ func (ds *ServerImpl) CancelDeadFlows(
686706
}
687707

688708
func (ds *ServerImpl) flowStreamInt(
689-
ctx context.Context, stream execinfrapb.DistSQL_FlowStreamServer,
709+
ctx context.Context, stream execinfrapb.RPCDistSQL_FlowStreamStream,
690710
) error {
691711
// Receive the first message.
692712
msg, err := stream.Recv()
@@ -720,8 +740,17 @@ func (ds *ServerImpl) flowStreamInt(
720740
return streamStrategy.Run(ctx, stream, msg, f)
721741
}
722742

743+
// FlowStream is part of the execinfrapb.DRPCDistSQLServer interface.
744+
func (ds *drpcServerImpl) FlowStream(stream execinfrapb.DRPCDistSQL_FlowStreamStream) error {
745+
return (*ServerImpl)(ds).flowStream(stream)
746+
}
747+
723748
// FlowStream is part of the execinfrapb.DistSQLServer interface.
724749
func (ds *ServerImpl) FlowStream(stream execinfrapb.DistSQL_FlowStreamServer) error {
750+
return ds.flowStream(stream)
751+
}
752+
753+
func (ds *ServerImpl) flowStream(stream execinfrapb.RPCDistSQL_FlowStreamStream) error {
725754
ctx := ds.AnnotateCtx(stream.Context())
726755
err := ds.flowStreamInt(ctx, stream)
727756
if err != nil && log.V(2) {

pkg/sql/flowinfra/flow_registry.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -541,7 +541,7 @@ func (fr *FlowRegistry) ConnectInboundStream(
541541
ctx context.Context,
542542
flowID execinfrapb.FlowID,
543543
streamID execinfrapb.StreamID,
544-
stream execinfrapb.DistSQL_FlowStreamServer,
544+
stream execinfrapb.RPCDistSQL_FlowStreamStream,
545545
timeout time.Duration,
546546
) (_ *FlowBase, _ InboundStreamHandler, cleanup func(), retErr error) {
547547
fr.Lock()

pkg/sql/flowinfra/flow_registry_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -520,7 +520,7 @@ func TestFlowRegistryDrain(t *testing.T) {
520520
close(delayCh)
521521
injectedErr := errors.New("injected error")
522522
serverStream = &delayedErrorServerStream{
523-
DistSQL_FlowStreamServer: serverStream,
523+
RPCDistSQL_FlowStreamStream: serverStream,
524524
// Make rpcCalledCh a buffered channel so that the RPC is not
525525
// blocked.
526526
rpcCalledCh: make(chan struct{}, 1),
@@ -705,7 +705,7 @@ func TestFlowCancelPartiallyBlocked(t *testing.T) {
705705
// allows to block (on delayCh) Send() calls which always result in the
706706
// provided error.
707707
type delayedErrorServerStream struct {
708-
execinfrapb.DistSQL_FlowStreamServer
708+
execinfrapb.RPCDistSQL_FlowStreamStream
709709
// rpcCalledCh is sent on in the very beginning of every Send() call.
710710
rpcCalledCh chan<- struct{}
711711
delayCh <-chan struct{}
@@ -761,10 +761,10 @@ func TestErrorOnSlowHandshake(t *testing.T) {
761761

762762
rpcCalledCh, delayCh := make(chan struct{}), make(chan struct{})
763763
serverStream = &delayedErrorServerStream{
764-
DistSQL_FlowStreamServer: serverStream,
765-
rpcCalledCh: rpcCalledCh,
766-
delayCh: delayCh,
767-
err: errors.New("dummy error"),
764+
RPCDistSQL_FlowStreamStream: serverStream,
765+
rpcCalledCh: rpcCalledCh,
766+
delayCh: delayCh,
767+
err: errors.New("dummy error"),
768768
}
769769

770770
receiver := &distsqlutils.RowBuffer{}

pkg/sql/flowinfra/inbound.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ type InboundStreamHandler interface {
2525
// Run is called once a FlowStream RPC is handled and a stream is obtained to
2626
// make this stream accessible to the rest of the flow.
2727
Run(
28-
ctx context.Context, stream execinfrapb.DistSQL_FlowStreamServer, firstMsg *execinfrapb.ProducerMessage, f *FlowBase,
28+
ctx context.Context, stream execinfrapb.RPCDistSQL_FlowStreamStream, firstMsg *execinfrapb.ProducerMessage, f *FlowBase,
2929
) error
3030
// Timeout is called with an error, which results in the teardown of the
3131
// stream strategy with the given error.
@@ -45,7 +45,7 @@ var _ InboundStreamHandler = RowInboundStreamHandler{}
4545
// Run is part of the InboundStreamHandler interface.
4646
func (s RowInboundStreamHandler) Run(
4747
ctx context.Context,
48-
stream execinfrapb.DistSQL_FlowStreamServer,
48+
stream execinfrapb.RPCDistSQL_FlowStreamStream,
4949
firstMsg *execinfrapb.ProducerMessage,
5050
f *FlowBase,
5151
) error {
@@ -61,13 +61,13 @@ func (s RowInboundStreamHandler) Timeout(err error) {
6161
s.ProducerDone()
6262
}
6363

64-
// processInboundStream receives rows from a DistSQL_FlowStreamServer and sends
64+
// processInboundStream receives rows from a RPCDistSQL_FlowStreamStream and sends
6565
// them to a RowReceiver. Optionally processes an initial StreamMessage that was
6666
// already received (because the first message contains the flow and stream IDs,
6767
// it needs to be received before we can get here).
6868
func processInboundStream(
6969
ctx context.Context,
70-
stream execinfrapb.DistSQL_FlowStreamServer,
70+
stream execinfrapb.RPCDistSQL_FlowStreamStream,
7171
firstMsg *execinfrapb.ProducerMessage,
7272
dst execinfra.RowReceiver,
7373
f *FlowBase,
@@ -90,7 +90,7 @@ func processInboundStream(
9090

9191
func processInboundStreamHelper(
9292
ctx context.Context,
93-
stream execinfrapb.DistSQL_FlowStreamServer,
93+
stream execinfrapb.RPCDistSQL_FlowStreamStream,
9494
firstMsg *execinfrapb.ProducerMessage,
9595
dst execinfra.RowReceiver,
9696
f *FlowBase,
@@ -173,7 +173,7 @@ func processInboundStreamHelper(
173173
// producer that it doesn't need any more rows and the producer should drain. A
174174
// signal is sent on stream to the producer to ask it to send metadata.
175175
func sendDrainSignalToStreamProducer(
176-
ctx context.Context, stream execinfrapb.DistSQL_FlowStreamServer,
176+
ctx context.Context, stream execinfrapb.RPCDistSQL_FlowStreamStream,
177177
) error {
178178
log.VEvent(ctx, 1, "sending drain signal to producer")
179179
sig := execinfrapb.ConsumerSignal{DrainRequest: &execinfrapb.DrainRequest{}}
@@ -187,7 +187,7 @@ func sendDrainSignalToStreamProducer(
187187
func processProducerMessage(
188188
ctx context.Context,
189189
flowBase *FlowBase,
190-
stream execinfrapb.DistSQL_FlowStreamServer,
190+
stream execinfrapb.RPCDistSQL_FlowStreamStream,
191191
dst execinfra.RowReceiver,
192192
sd *StreamDecoder,
193193
draining *bool,

pkg/sql/flowinfra/utils_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import (
3232
func createDummyStream(
3333
t *testing.T,
3434
) (
35-
serverStream execinfrapb.DistSQL_FlowStreamServer,
35+
serverStream execinfrapb.RPCDistSQL_FlowStreamStream,
3636
clientStream execinfrapb.RPCDistSQL_FlowStreamClient,
3737
cleanup func(),
3838
) {

0 commit comments

Comments
 (0)