Skip to content

Commit 13835e9

Browse files
committed
kv: extract KVBatch service from Internal service and remove api_drpc_hacky
This change removes the old manually-created `api_drpc_hacky.go` file and uses the properly auto-generated `api_drpc.pb.go` now that the DRPC compiler setup is working. We also extract the `KVBatch` service from the `Internal` service, which has largely been an ad-hoc collection of internal RPCs. This enables: - Easier Maintenance: Smaller services are simpler to manage and update. - Smoother DRPC Rollout: We can move to DRPC step-by-step with these smaller services, rather than doing `Internal` all at once. - Subsystems get client interfaces focused only on what they need. We also refactor `batchStreamImpl` to directly call `RecvMsg` on the provided stream, which is enabled by the change in codegen to include a gRPC-compatible `RecvMsg`. --- Some context on why we use `RecvMsg`: Commit 34e3abf optimized the `BatchStream` implementation by changing how the `BatchRequest` and its initial `Requests` slice capacity are allocated. `RecvMsg` allows the caller to provide a pre-allocated `BatchRequest` instance. This pattern, when used with a pre-allocated instance of a `BatchRequest` header and first request together, helps reduce per-message allocations compared to a `Recv()` method that might allocate a new message object on each call
1 parent 3853e29 commit 13835e9

File tree

9 files changed

+28
-240
lines changed

9 files changed

+28
-240
lines changed

pkg/kv/kvpb/BUILD.bazel

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,17 @@ load(":gen.bzl", "batch_gen")
88
go_library(
99
name = "kvpb",
1010
srcs = [
11-
":gen-batch-generated", # keep
12-
":gen-errordetailtype-stringer", # keep
13-
":gen-method-stringer", # keep
1411
"ambiguous_result_error.go",
1512
"api.go",
16-
# DRPC protobuf file (api_drpc.pb.go) is currently generated manually.
17-
# TODO (chandrat): Remove this once DRPC protobuf generation is
18-
# integrated into the build process.
19-
"api_drpc_hacky.go",
2013
"batch.go",
2114
"data.go",
2215
"errors.go",
2316
"method.go",
2417
"node_decommissioned_error.go",
2518
"replica_unavailable_error.go",
19+
":gen-batch-generated", # keep
20+
":gen-errordetailtype-stringer", # keep
21+
":gen-method-stringer", # keep
2622
],
2723
embed = [":kvpb_go_proto"],
2824
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvpb",
@@ -52,8 +48,6 @@ go_library(
5248
"@com_github_gogo_protobuf//types",
5349
"@com_github_gogo_status//:status",
5450
"@com_github_golang_mock//gomock", # keep
55-
"@io_storj_drpc//:drpc",
56-
"@io_storj_drpc//drpcerr",
5751
"@org_golang_google_grpc//codes",
5852
"@org_golang_google_grpc//metadata", # keep
5953
],

pkg/kv/kvpb/api.proto

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3618,6 +3618,19 @@ message JoinNodeResponse {
36183618
roachpb.Version active_version = 4;
36193619
}
36203620

3621+
// KVBatch defines Batch RPCs of Internal service (see below) for DRPC use only.
3622+
//
3623+
// During the gRPC to DRPC migration:
3624+
// - gRPC clients will continue to use the full 'Internal' service client.
3625+
// - DRPC clients targeting Batch* RPCs will use clients generated from this.
3626+
//
3627+
// TODO(server): Remove these methods from the 'Internal' service once the
3628+
// migration to DRPC complete, making this service the sole definition.
3629+
service KVBatch {
3630+
rpc Batch(BatchRequest) returns (BatchResponse) {}
3631+
rpc BatchStream(stream BatchRequest) returns (stream BatchResponse) {}
3632+
}
3633+
36213634
// Batch and RangeFeed service implemented by nodes for KV API requests.
36223635
service Internal {
36233636
rpc Batch (BatchRequest) returns (BatchResponse) {}

pkg/kv/kvpb/api_drpc_hacky.go

Lines changed: 0 additions & 207 deletions
This file was deleted.

pkg/rpc/nodedialer/nodedialer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ func (n *Dialer) DialInternalClient(
167167
client = &unaryDRPCBatchServiceToInternalAdapter{
168168
useStreamPoolClient: useStreamPoolClient,
169169
RestrictedInternalClient: client, // for RangeFeed only
170-
drpcClient: kvpb.NewDRPCBatchClient(dconn),
170+
drpcClient: kvpb.NewDRPCKVBatchClient(dconn),
171171
drpcStreamPool: drpcBatchStreamPool,
172172
}
173173
return client, nil

pkg/rpc/nodedialer/nodedialer_drpc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717
type unaryDRPCBatchServiceToInternalAdapter struct {
1818
useStreamPoolClient bool
1919
rpc.RestrictedInternalClient
20-
drpcClient kvpb.DRPCBatchClient
20+
drpcClient kvpb.DRPCKVBatchClient
2121
drpcStreamPool *rpc.DRPCBatchStreamPool
2222
}
2323

pkg/rpc/stream_pool.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,5 +334,5 @@ type DRPCBatchStreamClient = streamClient[*kvpb.BatchRequest, *kvpb.BatchRespons
334334

335335
// newDRPCBatchStream constructs a BatchStreamClient from a drpc.Conn.
336336
func newDRPCBatchStream(ctx context.Context, dc drpc.Conn) (DRPCBatchStreamClient, error) {
337-
return kvpb.NewDRPCBatchClient(dc).BatchStream(ctx)
337+
return kvpb.NewDRPCKVBatchClient(dc).BatchStream(ctx)
338338
}

pkg/server/drpc_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func TestDRPCBatchServer(t *testing.T) {
6767

6868
desc := c.LookupRangeOrFatal(t, c.ScratchRange(t))
6969

70-
client := kvpb.NewDRPCBatchClient(conn)
70+
client := kvpb.NewDRPCKVBatchClient(conn)
7171
ba := &kvpb.BatchRequest{}
7272
ba.RangeID = desc.RangeID
7373
var ok bool
@@ -121,7 +121,7 @@ func TestStreamContextCancel(t *testing.T) {
121121
}()
122122

123123
desc := c.LookupRangeOrFatal(t, c.ScratchRange(t))
124-
client := kvpb.NewDRPCBatchClient(conn)
124+
client := kvpb.NewDRPCKVBatchClient(conn)
125125

126126
singleRequest := func() {
127127
streamCtx, streamCtxCancel := context.WithCancel(ctx)

pkg/server/node.go

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1867,18 +1867,10 @@ func (n *Node) Batch(ctx context.Context, args *kvpb.BatchRequest) (*kvpb.BatchR
18671867

18681868
// BatchStream implements the kvpb.InternalServer interface.
18691869
func (n *Node) BatchStream(stream kvpb.Internal_BatchStreamServer) error {
1870-
return n.batchStreamImpl(stream, func(ba *kvpb.BatchRequest) error {
1871-
return stream.RecvMsg(ba)
1872-
})
1870+
return n.batchStreamImpl(stream)
18731871
}
18741872

1875-
func (n *Node) batchStreamImpl(
1876-
stream interface {
1877-
Context() context.Context
1878-
Send(response *kvpb.BatchResponse) error
1879-
},
1880-
recvMsg func(*kvpb.BatchRequest) error,
1881-
) error {
1873+
func (n *Node) batchStreamImpl(stream kvpb.RPCKVBatch_BatchStreamStream) error {
18821874
ctx := stream.Context()
18831875
for {
18841876
argsAlloc := new(struct {
@@ -1888,7 +1880,7 @@ func (n *Node) batchStreamImpl(
18881880
args := &argsAlloc.args
18891881
args.Requests = argsAlloc.reqs[:0]
18901882

1891-
err := recvMsg(args)
1883+
err := stream.RecvMsg(args)
18921884
if err != nil {
18931885
if errors.Is(err, io.EOF) {
18941886
return nil
@@ -1907,7 +1899,7 @@ func (n *Node) batchStreamImpl(
19071899
}
19081900
}
19091901

1910-
func (n *Node) AsDRPCBatchServer() kvpb.DRPCBatchServer {
1902+
func (n *Node) AsDRPCKVBatchServer() kvpb.DRPCKVBatchServer {
19111903
return (*drpcNode)(n)
19121904
}
19131905

@@ -1919,12 +1911,8 @@ func (n *drpcNode) Batch(
19191911
return (*Node)(n).Batch(ctx, request)
19201912
}
19211913

1922-
func (n *drpcNode) BatchStream(stream kvpb.DRPCBatch_BatchStreamStream) error {
1923-
return (*Node)(n).batchStreamImpl(stream, func(ba *kvpb.BatchRequest) error {
1924-
return stream.(interface {
1925-
RecvMsg(request *kvpb.BatchRequest) error
1926-
}).RecvMsg(ba)
1927-
})
1914+
func (n *drpcNode) BatchStream(stream kvpb.DRPCKVBatch_BatchStreamStream) error {
1915+
return (*Node)(n).batchStreamImpl(stream)
19281916
}
19291917

19301918
// spanForRequest is the retval of setupSpanForIncomingRPC. It groups together a

pkg/server/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -997,7 +997,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
997997
cfg.LicenseEnforcer,
998998
)
999999
kvpb.RegisterInternalServer(grpcServer.Server, node)
1000-
if err := kvpb.DRPCRegisterBatch(drpcServer.mux, node.AsDRPCBatchServer()); err != nil {
1000+
if err := kvpb.DRPCRegisterKVBatch(drpcServer.mux, node.AsDRPCKVBatchServer()); err != nil {
10011001
return nil, err
10021002
}
10031003
kvserver.RegisterPerReplicaServer(grpcServer.Server, node.perReplicaServer)

0 commit comments

Comments
 (0)