Skip to content

Commit f321d36

Browse files
authored
chore(spanner): support multiplexed session for Partitioned operations (googleapis#11583)
* chore(spanner): support multiplexed session for Partitioned operations * add unit tests and fallback to retry partitioned ops with regular session * add tests for batch read * fix tests * update error code matching * update unimplemented error check, and add logging
1 parent 2f4b04a commit f321d36

File tree

8 files changed

+351
-24
lines changed

8 files changed

+351
-24
lines changed

spanner/batch.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,9 @@ func (t *BatchReadOnlyTransaction) PartitionReadUsingIndexWithOptions(ctx contex
152152
if metricErr := recordGFELatencyMetricsOT(ctx, md, "PartitionReadUsingIndexWithOptions", t.otConfig); metricErr != nil {
153153
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency through OpenTelemetry. Error: %v", metricErr)
154154
}
155+
if isUnimplementedErrorForMultiplexedPartitionReads(err) && t.sp.isMultiplexedSessionForPartitionedOpsEnabled() {
156+
t.sp.disableMultiplexedSessionForPartitionedOps()
157+
}
155158
// Prepare ReadRequest.
156159
req := &sppb.ReadRequest{
157160
Session: sid,
@@ -219,6 +222,9 @@ func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement
219222
if metricErr := recordGFELatencyMetricsOT(ctx, md, "partitionQuery", t.otConfig); metricErr != nil {
220223
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency through OpenTelemetry. Error: %v", metricErr)
221224
}
225+
if isUnimplementedErrorForMultiplexedPartitionReads(err) && t.sp.isMultiplexedSessionForPartitionedOpsEnabled() {
226+
t.sp.disableMultiplexedSessionForPartitionedOps()
227+
}
222228

223229
// prepare ExecuteSqlRequest
224230
r := &sppb.ExecuteSqlRequest{
@@ -281,6 +287,10 @@ func (t *BatchReadOnlyTransaction) Cleanup(ctx context.Context) {
281287
}
282288
t.sh = nil
283289
sid, client := sh.getID(), sh.getClient()
290+
// skip cleanup if session is multiplexed
291+
if sh.session.isMultiplexed {
292+
return
293+
}
284294

285295
var md metadata.MD
286296
err := client.DeleteSession(contextWithOutgoingMetadata(ctx, sh.getMetadata(), true), &sppb.DeleteSessionRequest{Name: sid}, gax.WithGRPCOptions(grpc.Header(&md)))
@@ -348,6 +358,9 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
348358
if metricErr := recordGFELatencyMetricsOT(ctx, md, "Execute", t.otConfig); metricErr != nil {
349359
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency through OpenTelemetry. Error: %v", metricErr)
350360
}
361+
if isUnimplementedErrorForMultiplexedPartitionReads(err) && t.sp.isMultiplexedSessionForPartitionedOpsEnabled() {
362+
t.sp.disableMultiplexedSessionForPartitionedOps()
363+
}
351364
return client, err
352365
}
353366
} else {
@@ -378,6 +391,9 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
378391
if metricErr := recordGFELatencyMetricsOT(ctx, md, "Execute", t.otConfig); metricErr != nil {
379392
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency through OpenTelemetry. Error: %v", metricErr)
380393
}
394+
if isUnimplementedErrorForMultiplexedPartitionReads(err) && t.sp.isMultiplexedSessionForPartitionedOpsEnabled() {
395+
t.sp.disableMultiplexedSessionForPartitionedOps()
396+
}
381397
return client, err
382398
}
383399
}

spanner/batch_test.go

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package spanner
1818

1919
import (
2020
"context"
21+
"strings"
2122
"sync"
2223
"testing"
2324
"time"
@@ -242,3 +243,132 @@ func TestPartitionQuery_Parallel(t *testing.T) {
242243
t.Errorf("Row count mismatch\nGot: %d\nWant: %d", g, w)
243244
}
244245
}
246+
247+
func TestPartitionQuery_Multiplexed(t *testing.T) {
248+
t.Parallel()
249+
ctx := context.Background()
250+
server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
251+
SessionPoolConfig: SessionPoolConfig{
252+
enableMultiplexSession: true,
253+
enableMultiplexedSessionForPartitionedOps: true,
254+
},
255+
DisableNativeMetrics: true,
256+
})
257+
defer teardown()
258+
259+
txn, err := client.BatchReadOnlyTransaction(ctx, StrongRead())
260+
if err != nil {
261+
t.Fatal(err)
262+
}
263+
defer txn.Cleanup(ctx)
264+
265+
// Test PartitionQuery
266+
paritions, err := txn.PartitionQuery(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), PartitionOptions{0, 3})
267+
if err != nil {
268+
t.Fatal(err)
269+
}
270+
for _, p := range paritions {
271+
iter := txn.Execute(ctx, p)
272+
iter.Do(func(row *Row) error {
273+
return nil
274+
})
275+
break
276+
}
277+
uniqueReq := make(map[string]bool)
278+
handled := 0
279+
reqs := drainRequestsFromServer(server.TestSpanner)
280+
for _, s := range reqs {
281+
switch req := s.(type) {
282+
case *sppb.BeginTransactionRequest:
283+
if !strings.Contains(req.Session, "multiplexed") {
284+
t.Errorf("TestPartitionQuery_Multiplexed expected multiplexed session to be used, got: %v", req.Session)
285+
}
286+
if _, ok := uniqueReq["BeginTransactionRequest"]; !ok {
287+
handled++
288+
}
289+
case *sppb.ExecuteSqlRequest:
290+
if !strings.Contains(req.Session, "multiplexed") {
291+
t.Errorf("TestPartitionQuery_Multiplexed expected multiplexed session to be used with execute sql request, got: %v", req.Session)
292+
}
293+
if _, ok := uniqueReq["ExecuteSqlRequest"]; !ok {
294+
handled++
295+
}
296+
case *sppb.PartitionQueryRequest:
297+
// Validate the session is multiplexed
298+
if !strings.Contains(req.Session, "multiplexed") {
299+
t.Errorf("TestPartitionQuery_Multiplexed expected multiplexed session to be used with partition query request, got: %v", req.Session)
300+
}
301+
if _, ok := uniqueReq["PartitionQueryRequest"]; !ok {
302+
handled++
303+
}
304+
}
305+
}
306+
if handled != 3 {
307+
t.Errorf("TestPartitionQuery_Multiplexed: expected 3 requests to be handled, got: %d", handled)
308+
}
309+
}
310+
311+
func TestPartitionRead_Multiplexed(t *testing.T) {
312+
t.Parallel()
313+
ctx := context.Background()
314+
server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
315+
SessionPoolConfig: SessionPoolConfig{
316+
enableMultiplexSession: true,
317+
enableMultiplexedSessionForPartitionedOps: true,
318+
},
319+
DisableNativeMetrics: true,
320+
})
321+
defer teardown()
322+
323+
txn, err := client.BatchReadOnlyTransaction(ctx, StrongRead())
324+
if err != nil {
325+
t.Fatal(err)
326+
}
327+
defer txn.Cleanup(ctx)
328+
329+
// Test PartitionRead
330+
paritions, err := txn.PartitionRead(ctx, "Albums", KeySets(Key{"foo"}), []string{"SingerId", "AlbumId", "AlbumTitle"}, PartitionOptions{0, 3})
331+
if err != nil {
332+
t.Fatal(err)
333+
}
334+
for _, p := range paritions {
335+
iter := txn.Execute(ctx, p)
336+
iter.Do(func(row *Row) error {
337+
return nil
338+
})
339+
break
340+
}
341+
reqs := drainRequestsFromServer(server.TestSpanner)
342+
uniqueReq := make(map[string]bool)
343+
handled := 0
344+
for _, s := range reqs {
345+
switch req := s.(type) {
346+
case *sppb.BeginTransactionRequest:
347+
if !strings.Contains(req.Session, "multiplexed") {
348+
t.Errorf("TestPartitionQuery_Multiplexed expected multiplexed session to be used, got: %v", req.Session)
349+
}
350+
if _, ok := uniqueReq["BeginTransactionRequest"]; !ok {
351+
handled++
352+
}
353+
case *sppb.ReadRequest:
354+
// Validate the session is multiplexed
355+
if !strings.Contains(req.Session, "multiplexed") {
356+
t.Errorf("TestPartitionRead_Multiplexed expected multiplexed session to be used with read request, got: %v", req.Session)
357+
}
358+
if _, ok := uniqueReq["ReadRequest"]; !ok {
359+
handled++
360+
}
361+
case *sppb.PartitionReadRequest:
362+
// Validate the session is multiplexed
363+
if !strings.Contains(req.Session, "multiplexed") {
364+
t.Errorf("TestPartitionRead_Multiplexed expected multiplexed session to be used with partition read request, got: %v", req.Session)
365+
}
366+
if _, ok := uniqueReq["PartitionReadRequest"]; !ok {
367+
handled++
368+
}
369+
}
370+
}
371+
if handled != 3 {
372+
t.Errorf("TestPartitionQuery_Multiplexed: expected 2 requests to be handled, got: %d", handled)
373+
}
374+
}

spanner/client.go

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,14 @@ func newClientWithConfig(ctx context.Context, database string, config ClientConf
502502
// config.enableMultiplexedSessionForRW = config.enableMultiplexedSessionForRW && config.SessionPoolConfig.enableMultiplexSession
503503
//}
504504

505+
//if isMultiplexForPartitionOps := os.Getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS"); isMultiplexForPartitionOps != "" {
506+
// config.enableMultiplexedSessionForPartitionedOps, err = strconv.ParseBool(isMultiplexForPartitionOps)
507+
// if err != nil {
508+
// return nil, spannerErrorf(codes.InvalidArgument, "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS must be either true or false")
509+
// }
510+
// config.enableMultiplexedSessionForPartitionedOps = config.enableMultiplexedSessionForPartitionedOps && config.SessionPoolConfig.enableMultiplexSession
511+
//}
512+
505513
// Create a session client.
506514
sc := newSessionClient(pool, database, config.UserAgent, sessionLabels, config.DatabaseRole, config.DisableRouteToLeader, md, config.BatchTimeout, config.Logger, config.CallOptions)
507515

@@ -867,13 +875,20 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound
867875
err error
868876
)
869877

870-
// Create session.
871-
s, err = c.sc.createSession(ctx)
872-
if err != nil {
873-
return nil, err
878+
if c.idleSessions.isMultiplexedSessionForPartitionedOpsEnabled() {
879+
sh, err = c.idleSessions.takeMultiplexed(ctx)
880+
if err != nil {
881+
return nil, err
882+
}
883+
} else {
884+
// Create session.
885+
s, err = c.sc.createSession(ctx)
886+
if err != nil {
887+
return nil, err
888+
}
889+
sh = &sessionHandle{session: s}
890+
sh.updateLastUseTime()
874891
}
875-
sh = &sessionHandle{session: s}
876-
sh.updateLastUseTime()
877892

878893
// Begin transaction.
879894
res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata(), true), &sppb.BeginTransactionRequest{
@@ -885,6 +900,9 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound
885900
},
886901
})
887902
if err != nil {
903+
if isUnimplementedErrorForMultiplexedPartitionedDML(err) && c.idleSessions.isMultiplexedSessionForPartitionedOpsEnabled() {
904+
c.idleSessions.disableMultiplexedSessionForRW()
905+
}
888906
return nil, ToSpannerError(err)
889907
}
890908
tx = res.Id

spanner/internal/testutil/inmem_spanner_server.go

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1160,6 +1160,38 @@ func (s *inMemSpannerServer) PartitionQuery(ctx context.Context, req *spannerpb.
11601160
if _, err := s.simulateExecutionTime(MethodPartitionQuery, req); err != nil {
11611161
return nil, err
11621162
}
1163+
if req.Session == "" {
1164+
return nil, gstatus.Error(codes.InvalidArgument, "Missing session name")
1165+
}
1166+
session, err := s.findSession(req.Session)
1167+
if err != nil {
1168+
return nil, err
1169+
}
1170+
var id []byte
1171+
var tx *spannerpb.Transaction
1172+
s.updateSessionLastUseTime(session.Name)
1173+
if id = s.getTransactionID(session, req.Transaction); id != nil {
1174+
tx, err = s.getTransactionByID(session, id)
1175+
if err != nil {
1176+
return nil, err
1177+
}
1178+
}
1179+
var partitions []*spannerpb.Partition
1180+
for i := int64(0); i < req.PartitionOptions.MaxPartitions; i++ {
1181+
token := make([]byte, 10)
1182+
_, err := rand.Read(token)
1183+
if err != nil {
1184+
return nil, gstatus.Error(codes.Internal, "failed to generate random partition token")
1185+
}
1186+
partitions = append(partitions, &spannerpb.Partition{PartitionToken: token})
1187+
}
1188+
return &spannerpb.PartitionResponse{
1189+
Partitions: partitions,
1190+
Transaction: tx,
1191+
}, nil
1192+
}
1193+
1194+
func (s *inMemSpannerServer) PartitionRead(ctx context.Context, req *spannerpb.PartitionReadRequest) (*spannerpb.PartitionResponse, error) {
11631195
s.mu.Lock()
11641196
if s.stopped {
11651197
s.mu.Unlock()
@@ -1198,20 +1230,6 @@ func (s *inMemSpannerServer) PartitionQuery(ctx context.Context, req *spannerpb.
11981230
}, nil
11991231
}
12001232

1201-
func (s *inMemSpannerServer) PartitionRead(ctx context.Context, req *spannerpb.PartitionReadRequest) (*spannerpb.PartitionResponse, error) {
1202-
return s.PartitionQuery(ctx, &spannerpb.PartitionQueryRequest{
1203-
Session: req.Session,
1204-
Transaction: req.Transaction,
1205-
PartitionOptions: req.PartitionOptions,
1206-
// KeySet is currently ignored.
1207-
Sql: fmt.Sprintf(
1208-
"SELECT %s FROM %s",
1209-
strings.Join(req.Columns, ", "),
1210-
req.Table,
1211-
),
1212-
})
1213-
}
1214-
12151233
// EncodeResumeToken return mock resume token encoding for an uint64 integer.
12161234
func EncodeResumeToken(t uint64) []byte {
12171235
rt := make([]byte, 16)

spanner/kokoro/presubmit.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ case $JOB_TYPE in
4747
integration-with-multiplexed-session )
4848
GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS=true
4949
GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW=true
50+
GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS=true
5051
echo "running presubmit with multiplexed sessions enabled: $GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS"
5152
;;
5253
esac

spanner/pdml.go

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,19 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt
5151
if err := checkNestedTxn(ctx); err != nil {
5252
return 0, err
5353
}
54-
55-
sh, err := c.idleSessions.take(ctx)
54+
var sh *sessionHandle
55+
if c.idleSessions.isMultiplexedSessionForPartitionedOpsEnabled() {
56+
sh, err = c.idleSessions.takeMultiplexed(ctx)
57+
} else {
58+
sh, err = c.idleSessions.take(ctx)
59+
}
5660
if err != nil {
5761
return 0, ToSpannerError(err)
5862
}
59-
if sh != nil {
60-
defer sh.recycle()
63+
if sh == nil {
64+
return 0, spannerErrorf(codes.Internal, "no session available")
6165
}
66+
defer sh.recycle()
6267
// Mark isLongRunningTransaction to true, as the session in case of partitioned dml can be long-running
6368
sh.mu.Lock()
6469
sh.eligibleForLongRunning = true
@@ -88,6 +93,24 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt
8893
if err == nil {
8994
return count, nil
9095
}
96+
if isUnimplementedErrorForMultiplexedPartitionedDML(err) && sh.session.pool.isMultiplexedSessionForPartitionedOpsEnabled() {
97+
logf(c.logger, "Warning: Multiplexed sessions are not supported for partitioned operations in this environment. Falling back to regular sessions.")
98+
sh.session.pool.disableMultiplexedSessionForPartitionedOps()
99+
sh, err = c.idleSessions.take(ctx)
100+
if err != nil {
101+
return 0, ToSpannerError(err)
102+
}
103+
if sh == nil {
104+
return 0, spannerErrorf(codes.Internal, "no session available")
105+
}
106+
defer sh.recycle()
107+
// Mark isLongRunningTransaction to true, as the session in case of partitioned dml can be long-running
108+
sh.mu.Lock()
109+
sh.eligibleForLongRunning = true
110+
sh.mu.Unlock()
111+
req.Session = sh.getID()
112+
continue
113+
}
91114
delay, shouldRetry := retryer.Retry(err)
92115
if !shouldRetry {
93116
return 0, err

0 commit comments

Comments
 (0)