forked from googleapis/google-cloud-go
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsessionclient.go
More file actions
342 lines (303 loc) · 11.7 KB
/
sessionclient.go
File metadata and controls
342 lines (303 loc) · 11.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
/*
Copyright 2019 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package spanner
import (
"context"
"fmt"
"log"
"reflect"
"sync"
"sync/atomic"
"time"
"cloud.google.com/go/internal/trace"
vkit "cloud.google.com/go/spanner/apiv1"
sppb "cloud.google.com/go/spanner/apiv1/spannerpb"
"cloud.google.com/go/spanner/internal"
"github.com/googleapis/gax-go/v2"
"go.opencensus.io/tag"
"google.golang.org/api/option"
gtransport "google.golang.org/api/transport/grpc"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
)
var cidGen = newClientIDGenerator()
type clientIDGenerator struct {
mu sync.Mutex
ids map[string]int
}
func newClientIDGenerator() *clientIDGenerator {
return &clientIDGenerator{ids: make(map[string]int)}
}
func (cg *clientIDGenerator) nextClientIDAndOrdinal(database string) (clientID string, nthClient int) {
cg.mu.Lock()
defer cg.mu.Unlock()
var id int
if val, ok := cg.ids[database]; ok {
id = val + 1
} else {
id = 1
}
cg.ids[database] = id
return fmt.Sprintf("client-%d", id), id
}
func (cg *clientIDGenerator) nextID(database string) string {
clientStrID, _ := cg.nextClientIDAndOrdinal(database)
return clientStrID
}
// sessionConsumer is passed to the session creation methods and will receive
// the sessions that are created as they become available. A sessionConsumer
// implementation must be safe for concurrent use.
//
// The interface is implemented by sessionManager and is used for testing the
// sessionClient.
type sessionConsumer interface {
// sessionReady is called when a session has been created and is ready for
// use.
sessionReady(ctx context.Context, s *session)
// sessionCreationFailed is called when the creation of a session failed.
sessionCreationFailed(ctx context.Context, err error)
}
// sessionClient creates sessions for a database. Each session will be
// affiliated with a gRPC channel. The session client now only supports
// creating multiplexed sessions.
type sessionClient struct {
waitWorkers sync.WaitGroup
mu sync.Mutex
closed bool
disableRouteToLeader bool
connPool gtransport.ConnPool
database string
id string
userAgent string
sessionLabels map[string]string
databaseRole string
md metadata.MD
batchTimeout time.Duration
logger *log.Logger
callOptions *vkit.CallOptions
otConfig *openTelemetryConfig
metricsTracerFactory *builtinMetricsTracerFactory
channelIDMap map[*grpc.ClientConn]uint64
// baseClientOpts holds the client options used for creating endpoint-specific
// gRPC connections in location-aware routing.
baseClientOpts []option.ClientOption
// These fields are for request-id propagation.
nthClient int
// nthRequest shall always be incremented on every fresh request.
nthRequest *atomic.Uint32
}
// newSessionClient creates a session client to use for a database.
func newSessionClient(connPool gtransport.ConnPool, database, userAgent string, sessionLabels map[string]string, databaseRole string, disableRouteToLeader bool, md metadata.MD, batchTimeout time.Duration, logger *log.Logger, callOptions *vkit.CallOptions) *sessionClient {
clientID, nthClient := cidGen.nextClientIDAndOrdinal(database)
return &sessionClient{
connPool: connPool,
database: database,
userAgent: userAgent,
id: clientID,
sessionLabels: sessionLabels,
databaseRole: databaseRole,
disableRouteToLeader: disableRouteToLeader,
md: md,
batchTimeout: batchTimeout,
logger: logger,
callOptions: callOptions,
nthClient: nthClient,
nthRequest: new(atomic.Uint32),
}
}
func (sc *sessionClient) close() error {
defer sc.waitWorkers.Wait()
var err error
func() {
sc.mu.Lock()
defer sc.mu.Unlock()
sc.closed = true
err = sc.connPool.Close()
}()
return err
}
// createSession creates one session for the database of the sessionClient. The
// session is created using one synchronous RPC.
func (sc *sessionClient) createSession(ctx context.Context) (*session, error) {
sc.mu.Lock()
if sc.closed {
sc.mu.Unlock()
return nil, spannerErrorf(codes.FailedPrecondition, "SessionClient is closed")
}
sc.mu.Unlock()
client, err := sc.nextClient()
if err != nil {
return nil, err
}
var md metadata.MD
sid, err := client.CreateSession(contextWithOutgoingMetadata(ctx, sc.md, sc.disableRouteToLeader), &sppb.CreateSessionRequest{
Database: sc.database,
Session: &sppb.Session{Labels: sc.sessionLabels, CreatorRole: sc.databaseRole},
}, gax.WithGRPCOptions(grpc.Header(&md)))
if getGFELatencyMetricsFlag() && md != nil {
_, instance, database, err := parseDatabaseName(sc.database)
if err != nil {
return nil, ToSpannerError(err)
}
ctxGFE, err := tag.New(ctx,
tag.Upsert(tagKeyClientID, sc.id),
tag.Upsert(tagKeyDatabase, database),
tag.Upsert(tagKeyInstance, instance),
tag.Upsert(tagKeyLibVersion, internal.Version),
)
if err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", ToSpannerError(err))
}
err = captureGFELatencyStats(ctxGFE, md, "createSession")
if err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", ToSpannerError(err))
}
}
if metricErr := recordGFELatencyMetricsOT(ctx, md, "createSession", sc.otConfig); metricErr != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency through OpenTelemetry. Error: %v", metricErr)
}
if err != nil {
return nil, ToSpannerError(err)
}
return &session{client: client, id: sid.Name, createTime: time.Now(), md: sc.md, logger: sc.logger}, nil
}
func (sc *sessionClient) executeCreateMultiplexedSession(ctx context.Context, client spannerClient, md metadata.MD, consumer sessionConsumer) {
ctx, _ = startSpan(ctx, "CreateSession", sc.otConfig.commonTraceStartOptions...)
defer func() { endSpan(ctx, nil) }()
trace.TracePrintf(ctx, nil, "Creating a multiplexed session")
sc.mu.Lock()
closed := sc.closed
sc.mu.Unlock()
if closed {
err := spannerErrorf(codes.Canceled, "Session client closed")
trace.TracePrintf(ctx, nil, "Session client closed while creating a multiplexed session: %v", err)
return
}
if ctx.Err() != nil {
trace.TracePrintf(ctx, nil, "Context error while creating a multiplexed session: %v", ctx.Err())
consumer.sessionCreationFailed(ctx, ToSpannerError(ctx.Err()))
return
}
var mdForGFELatency metadata.MD
response, err := client.CreateSession(contextWithOutgoingMetadata(ctx, sc.md, sc.disableRouteToLeader), &sppb.CreateSessionRequest{
Database: sc.database,
// Multiplexed sessions do not support labels.
Session: &sppb.Session{CreatorRole: sc.databaseRole, Multiplexed: true},
}, gax.WithGRPCOptions(grpc.Header(&mdForGFELatency)))
if getGFELatencyMetricsFlag() && mdForGFELatency != nil {
_, instance, database, err := parseDatabaseName(sc.database)
if err != nil {
trace.TracePrintf(ctx, nil, "Error getting instance and database name: %v", err)
}
// Errors should not prevent initializing the session pool.
ctxGFE, err := tag.New(ctx,
tag.Upsert(tagKeyClientID, sc.id),
tag.Upsert(tagKeyDatabase, database),
tag.Upsert(tagKeyInstance, instance),
tag.Upsert(tagKeyLibVersion, internal.Version),
)
if err != nil {
trace.TracePrintf(ctx, nil, "Error in adding tags in CreateSession for GFE Latency: %v", err)
}
err = captureGFELatencyStats(ctxGFE, mdForGFELatency, "executeCreateSession")
if err != nil {
trace.TracePrintf(ctx, nil, "Error in Capturing GFE Latency and Header Missing count. Try disabling and rerunning. Error: %v", err)
}
}
if metricErr := recordGFELatencyMetricsOT(ctx, mdForGFELatency, "executeCreateSession", sc.otConfig); metricErr != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency through OpenTelemetry. Error: %v", metricErr)
}
if err != nil {
trace.TracePrintf(ctx, nil, "Error creating a multiplexed sessions: %v", err)
consumer.sessionCreationFailed(ctx, ToSpannerError(err))
return
}
consumer.sessionReady(ctx, &session{client: client, id: response.Name, createTime: time.Now(), md: md, logger: sc.logger})
trace.TracePrintf(ctx, nil, "Finished creating multiplexed sessions")
}
func (sc *sessionClient) sessionWithID(id string) (*session, error) {
sc.mu.Lock()
defer sc.mu.Unlock()
client, err := sc.nextClient()
if err != nil {
return nil, err
}
return &session{client: client, id: id, createTime: time.Now(), md: sc.md, logger: sc.logger}, nil
}
// nextClient returns the next gRPC client to use for session creation. The
// client is set on the session, and used by all subsequent gRPC calls on the
// session. Using the same channel for all gRPC calls for a session ensures the
// optimal usage of server side caches.
func (sc *sessionClient) nextClient() (spannerClient, error) {
var clientOpt option.ClientOption
var channelID uint64
if _, ok := sc.connPool.(*gmeWrapper); ok {
// Pass GCPMultiEndpoint as a pool.
clientOpt = gtransport.WithConnPool(sc.connPool)
} else if _, ok := sc.connPool.(*fallbackWrapper); ok {
clientOpt = gtransport.WithConnPool(sc.connPool)
} else {
// Pick a grpc.ClientConn from a regular pool.
conn := sc.connPool.Conn()
// Retrieve the channelID for each spannerClient.
// It is assumed that this method is invoked
// under a lock already.
var ok bool
channelID, ok = sc.channelIDMap[conn]
if !ok {
if sc.channelIDMap == nil {
sc.channelIDMap = make(map[*grpc.ClientConn]uint64)
}
channelID = uint64(len(sc.channelIDMap)) + 1
sc.channelIDMap[conn] = channelID
}
clientOpt = option.WithGRPCConn(conn)
}
client, err := newGRPCSpannerClient(context.Background(), sc, channelID, clientOpt)
if err != nil {
return nil, err
}
return client, nil
}
// createEndpointClient creates a new spannerClient for a specific server endpoint
// address. This is used by the location-aware routing feature to create direct
// connections to Spanner servers.
func (sc *sessionClient) createEndpointClient(ctx context.Context, address string) (spannerClient, error) {
opts := make([]option.ClientOption, len(sc.baseClientOpts))
copy(opts, sc.baseClientOpts)
opts = append(opts, option.WithEndpoint(address))
if _, ok := sc.connPool.(*gmeWrapper); ok {
// Endpoint-specific clients should keep a single connection per endpoint
// when the parent client uses GCPMultiEndpoint.
opts = append(opts, option.WithGRPCConnectionPool(1))
}
return newGRPCSpannerClient(ctx, sc, 0, opts...)
}
// mergeCallOptions merges two CallOptions into one and the first argument has
// a lower order of precedence than the second one.
func mergeCallOptions(a *vkit.CallOptions, b *vkit.CallOptions) *vkit.CallOptions {
res := &vkit.CallOptions{}
resVal := reflect.ValueOf(res).Elem()
aVal := reflect.ValueOf(a).Elem()
bVal := reflect.ValueOf(b).Elem()
t := aVal.Type()
for i := 0; i < aVal.NumField(); i++ {
fieldName := t.Field(i).Name
aFieldVal := aVal.Field(i).Interface().([]gax.CallOption)
bFieldVal := bVal.Field(i).Interface().([]gax.CallOption)
merged := append(aFieldVal, bFieldVal...)
resVal.FieldByName(fieldName).Set(reflect.ValueOf(merged))
}
return res
}