Skip to content

Commit 28e1b22

Browse files
craig[bot]cthumuluru-crdb
andcommitted
Merge #147950
147950: *: replace grpc clients with rpc adapters r=cthumuluru-crdb a=cthumuluru-crdb gRPC and DRPC clients have different signatures. To unify them, we created a common interface for both types of clients, and replaced the gRPC and DRPC clients with adapters. Epic: CRDB-48923 Informs: #148353 Release note: none Co-authored-by: Chandra Thumuluru <[email protected]>
2 parents ae09dc0 + 4930b55 commit 28e1b22

File tree

75 files changed

+258
-258
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

75 files changed

+258
-258
lines changed

pkg/acceptance/localcluster/cluster.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,7 @@ type Node struct {
413413
cmd *exec.Cmd
414414
rpcPort, pgURL string // legacy: remove once 1.0.x is no longer tested
415415
db *gosql.DB
416-
statusClient serverpb.StatusClient
416+
statusClient serverpb.RPCStatusClient
417417
}
418418

419419
// RPCPort returns the RPC + Postgres port.
@@ -469,7 +469,7 @@ func (n *Node) Alive() bool {
469469
}
470470

471471
// StatusClient returns a StatusClient set up to talk to this node.
472-
func (n *Node) StatusClient(ctx context.Context) serverpb.StatusClient {
472+
func (n *Node) StatusClient(ctx context.Context) serverpb.RPCStatusClient {
473473
n.Lock()
474474
existingClient := n.statusClient
475475
n.Unlock()
@@ -483,7 +483,7 @@ func (n *Node) StatusClient(ctx context.Context) serverpb.StatusClient {
483483
if err != nil {
484484
log.Fatalf(context.Background(), "failed to initialize status client: %s", err)
485485
}
486-
return serverpb.NewStatusClient(conn)
486+
return serverpb.NewGRPCStatusClientAdapter(conn)
487487
}
488488
return nil // This should never happen
489489
}

pkg/blobs/blobspb/rpc_clients.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,16 @@ import (
1414

1515
// DialBlobClient establishes a DRPC connection if enabled; otherwise,
1616
// it falls back to gRPC. The established connection is used to create a
17-
// BlobClient.
17+
// RPCBlobClient.
1818
func DialBlobClient(
1919
nd rpcbase.NodeDialer, ctx context.Context, nodeID roachpb.NodeID, class rpcbase.ConnectionClass,
20-
) (BlobClient, error) {
20+
) (RPCBlobClient, error) {
2121
if !rpcbase.TODODRPC {
2222
conn, err := nd.Dial(ctx, nodeID, class)
2323
if err != nil {
2424
return nil, err
2525
}
26-
return NewBlobClient(conn), nil
26+
return NewGRPCBlobClientAdapter(conn), nil
2727
}
2828
return nil, nil
2929
}

pkg/blobs/client.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,11 @@ var _ BlobClient = &remoteClient{}
4747
// remoteClient uses the node dialer and blob service clients
4848
// to Read or Write bulk files from/to other nodes.
4949
type remoteClient struct {
50-
blobClient blobspb.BlobClient
50+
blobClient blobspb.RPCBlobClient
5151
}
5252

5353
// newRemoteClient instantiates a remote blob service client.
54-
func newRemoteClient(blobClient blobspb.BlobClient) BlobClient {
54+
func newRemoteClient(blobClient blobspb.RPCBlobClient) BlobClient {
5555
return &remoteClient{blobClient: blobClient}
5656
}
5757

@@ -71,7 +71,7 @@ func (c *remoteClient) ReadFile(
7171
}
7272

7373
type streamWriter struct {
74-
s blobspb.Blob_PutStreamClient
74+
s blobspb.RPCBlob_PutStreamClient
7575
buf blobspb.StreamChunk
7676
}
7777

pkg/blobs/stream.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ type streamReceiver interface {
4343
// This is needed as Blob_GetStreamClient does not have a Close() function, whereas
4444
// the other sender, Blob_PutStreamServer, does.
4545
type nopSendAndClose struct {
46-
blobspb.Blob_GetStreamClient
46+
blobspb.RPCBlob_GetStreamClient
4747
}
4848

4949
func (*nopSendAndClose) SendAndClose(*blobspb.StreamResponse) error {
@@ -52,7 +52,7 @@ func (*nopSendAndClose) SendAndClose(*blobspb.StreamResponse) error {
5252

5353
// newGetStreamReader creates an io.ReadCloser that uses gRPC's streaming API
5454
// to read chunks of data.
55-
func newGetStreamReader(client blobspb.Blob_GetStreamClient) ioctx.ReadCloserCtx {
55+
func newGetStreamReader(client blobspb.RPCBlob_GetStreamClient) ioctx.ReadCloserCtx {
5656
return &blobStreamReader{
5757
stream: &nopSendAndClose{client},
5858
}

pkg/ccl/serverccl/statusccl/tenant_grpc_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func TestTenantGRPCServices(t *testing.T) {
135135
conn, err := rpcCtx.GRPCDialNode(grpcAddr, nodeID, roachpb.Locality{}, rpcbase.DefaultClass).Connect(ctx)
136136
require.NoError(t, err)
137137

138-
client := serverpb.NewStatusClient(conn)
138+
client := serverpb.NewGRPCStatusClientAdapter(conn)
139139

140140
resp, err := client.Statements(ctx, &serverpb.StatementsRequest{NodeID: "local"})
141141
require.NoError(t, err)
@@ -149,7 +149,7 @@ func TestTenantGRPCServices(t *testing.T) {
149149
conn, err := rpcCtx.GRPCDialNode(grpcAddr, server.NodeID(), roachpb.Locality{}, rpcbase.DefaultClass).Connect(ctx)
150150
require.NoError(t, err)
151151

152-
client := serverpb.NewStatusClient(conn)
152+
client := serverpb.NewGRPCStatusClientAdapter(conn)
153153

154154
_, err = client.Statements(ctx, &serverpb.StatementsRequest{NodeID: "local"})
155155
require.Errorf(t, err, "statements endpoint should not be accessed on KV node by tenant")

pkg/ccl/sqlproxyccl/proxy_handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ func newProxyHandler(
279279
dirOpts = append(dirOpts, handler.testingKnobs.dirOpts...)
280280
}
281281

282-
client := tenant.NewDirectoryClient(conn)
282+
client := tenant.NewGRPCDirectoryClientAdapter(conn)
283283
handler.directoryCache, err = tenant.NewDirectoryCache(ctx, stopper, client, dirOpts...)
284284
if err != nil {
285285
return nil, err

pkg/ccl/sqlproxyccl/tenant/directory_cache.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ func TenantWatcher(tenantWatcher chan *WatchTenantsResponse) func(opts *dirOptio
114114
type directoryCache struct {
115115
// client is the directory client instance used to make directory server
116116
// calls.
117-
client DirectoryClient
117+
client RPCDirectoryClient
118118

119119
// stopper is used for graceful shutdown of the pod watcher.
120120
stopper *stop.Stopper
@@ -144,7 +144,7 @@ var _ DirectoryCache = &directoryCache{}
144144
// NOTE: stopper.Stop must be called on the directory when it is no longer
145145
// needed.
146146
func NewDirectoryCache(
147-
ctx context.Context, stopper *stop.Stopper, client DirectoryClient, opts ...DirOption,
147+
ctx context.Context, stopper *stop.Stopper, client RPCDirectoryClient, opts ...DirOption,
148148
) (DirectoryCache, error) {
149149
dir := &directoryCache{client: client, stopper: stopper}
150150

@@ -380,7 +380,7 @@ func (d *directoryCache) watchPods(ctx context.Context, stopper *stop.Stopper) e
380380
waitInit.Add(1)
381381

382382
err := stopper.RunAsyncTask(ctx, "watch-pods-client", func(ctx context.Context) {
383-
var client Directory_WatchPodsClient
383+
var client RPCDirectory_WatchPodsClient
384384
var err error
385385
firstRun := true
386386
ctx, cancel := stopper.WithCancelOnQuiesce(ctx)
@@ -492,7 +492,7 @@ func (d *directoryCache) updateTenantPodEntry(ctx context.Context, pod *Pod) {
492492
// notification and update the directory to reflect that change.
493493
func (d *directoryCache) watchTenants(ctx context.Context, stopper *stop.Stopper) error {
494494
return stopper.RunAsyncTask(ctx, "watch-tenants-client", func(ctx context.Context) {
495-
var client Directory_WatchTenantsClient
495+
var client RPCDirectory_WatchTenantsClient
496496
var err error
497497
ctx, cancel := stopper.WithCancelOnQuiesce(ctx)
498498
defer cancel()

pkg/ccl/sqlproxyccl/tenant/directory_cache_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -697,7 +697,7 @@ func newTestDirectoryCache(
697697
require.NoError(t, err)
698698
// nolint:grpcconnclose
699699
clusterStopper.AddCloser(stop.CloserFn(func() { require.NoError(t, conn.Close() /* nolint:grpcconnclose */) }))
700-
client := tenant.NewDirectoryClient(conn)
700+
client := tenant.NewGRPCDirectoryClientAdapter(conn)
701701
directoryCache, err = tenant.NewDirectoryCache(context.Background(), clusterStopper, client, opts...)
702702
require.NoError(t, err)
703703
return

pkg/ccl/sqlproxyccl/tenant/entry.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ type tenantEntry struct {
7272
// Initialize fetches metadata about a tenant, such as its cluster name, and
7373
// stores that in the entry. If the tenant's metadata is stale, they will be
7474
// refreshed.
75-
func (e *tenantEntry) Initialize(ctx context.Context, client DirectoryClient) error {
75+
func (e *tenantEntry) Initialize(ctx context.Context, client RPCDirectoryClient) error {
7676
e.calls.Lock()
7777
defer e.calls.Unlock()
7878

@@ -112,7 +112,7 @@ func (e *tenantEntry) Initialize(ctx context.Context, client DirectoryClient) er
112112

113113
// RefreshPods makes a synchronous directory server call to fetch the latest
114114
// information about the tenant's available pods, such as their IP addresses.
115-
func (e *tenantEntry) RefreshPods(ctx context.Context, client DirectoryClient) error {
115+
func (e *tenantEntry) RefreshPods(ctx context.Context, client RPCDirectoryClient) error {
116116
// Lock so that only one thread at a time will refresh, since there's no
117117
// point in multiple threads doing it within a short span of time - it's
118118
// likely nothing has changed.
@@ -181,7 +181,7 @@ func (e *tenantEntry) GetPods() []*Pod {
181181
// errorIfNoPods is true, then EnsureTenantPod returns an error if there are no
182182
// pods available rather than blocking.
183183
func (e *tenantEntry) EnsureTenantPod(
184-
ctx context.Context, client DirectoryClient, errorIfNoPods bool,
184+
ctx context.Context, client RPCDirectoryClient, errorIfNoPods bool,
185185
) (pods []*Pod, err error) {
186186
const retryDelay = 100 * time.Millisecond
187187

@@ -282,7 +282,7 @@ func (e *tenantEntry) ToProto() *Tenant {
282282
//
283283
// NOTE: Caller must lock the "calls" mutex before calling fetchPodsLocked.
284284
func (e *tenantEntry) fetchPodsLocked(
285-
ctx context.Context, client DirectoryClient,
285+
ctx context.Context, client RPCDirectoryClient,
286286
) (tenantPods []*Pod, err error) {
287287
// List the pods for the given tenant.
288288
//

pkg/ccl/sqlproxyccl/tenantdirsvr/testutils.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func SetupTestDirectory(
4444
stopper.AddCloser(stop.CloserFn(func() {
4545
_ = conn.Close() // nolint:grpcconnclose
4646
}))
47-
client := tenant.NewDirectoryClient(conn)
47+
client := tenant.NewGRPCDirectoryClientAdapter(conn)
4848
directoryCache, err := tenant.NewDirectoryCache(ctx, stopper, client, opts...)
4949
require.NoError(t, err)
5050

0 commit comments

Comments
 (0)