Skip to content

Commit 08c3ba6

Browse files
authored
Merge pull request #1306 from ydb-platform/query-with-pool
refactored query client + added env YDB_GO_SDK_QUERY_SERVICE_USE_SESS…
2 parents 39d0d61 + a3aa767 commit 08c3ba6

File tree

9 files changed

+193
-169
lines changed

9 files changed

+193
-169
lines changed

.github/workflows/slo.yml

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -49,29 +49,35 @@ jobs:
4949
workload_build_context0: ../..
5050
workload_build_options0: -f Dockerfile --build-arg SRC_PATH=native/table --build-arg JOB_NAME=workload-native-table
5151

52-
language_id1: 'native-query'
52+
language_id1: 'database-sql'
5353
workload_path1: 'tests/slo'
54-
language1: 'Native ydb-go-sdk/v3 over query-service'
54+
language1: 'Go SDK database/sql'
5555
workload_build_context1: ../..
56-
workload_build_options1: -f Dockerfile --build-arg SRC_PATH=native/query --build-arg JOB_NAME=workload-native-query
56+
workload_build_options1: -f Dockerfile --build-arg SRC_PATH=database/sql --build-arg JOB_NAME=workload-database-sql
5757

58-
language_id2: 'database-sql'
58+
language_id2: 'gorm'
5959
workload_path2: 'tests/slo'
60-
language2: 'Go SDK database/sql'
60+
language2: 'Go SDK gorm'
6161
workload_build_context2: ../..
62-
workload_build_options2: -f Dockerfile --build-arg SRC_PATH=database/sql --build-arg JOB_NAME=workload-database-sql
62+
workload_build_options2: -f Dockerfile --build-arg SRC_PATH=gorm --build-arg JOB_NAME=workload-gorm
6363

64-
language_id3: 'gorm'
64+
language_id3: 'xorm'
6565
workload_path3: 'tests/slo'
66-
language3: 'Go SDK gorm'
66+
language3: 'Go SDK xorm'
6767
workload_build_context3: ../..
68-
workload_build_options3: -f Dockerfile --build-arg SRC_PATH=gorm --build-arg JOB_NAME=workload-gorm
68+
workload_build_options3: -f Dockerfile --build-arg SRC_PATH=xorm --build-arg JOB_NAME=workload-xorm
6969

70-
language_id4: 'xorm'
70+
language_id4: 'native-query'
7171
workload_path4: 'tests/slo'
72-
language4: 'Go SDK xorm'
72+
language4: 'Native ydb-go-sdk/v3 over query-service'
7373
workload_build_context4: ../..
74-
workload_build_options4: -f Dockerfile --build-arg SRC_PATH=xorm --build-arg JOB_NAME=workload-xorm
74+
workload_build_options4: -f Dockerfile --build-arg SRC_PATH=native/query --build-arg JOB_NAME=workload-native-query
75+
76+
language_id5: 'native-query-with-pool'
77+
workload_path5: 'tests/slo'
78+
language5: 'Native ydb-go-sdk/v3 over query-service with session pool'
79+
workload_build_context5: ../..
80+
workload_build_options5: -f Dockerfile --build-arg SRC_PATH=native/query/with/pool --build-arg JOB_NAME=workload-native-query-with-pool
7581

7682
- uses: actions/upload-artifact@v4
7783
if: always()

internal/query/client.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -119,10 +119,6 @@ func do(
119119

120120
err := op(ctx, s)
121121
if err != nil {
122-
if !xerrors.IsRetryObjectValid(err) {
123-
s.setStatus(statusError)
124-
}
125-
126122
return xerrors.WithStackTrace(err)
127123
}
128124

internal/query/config/config.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package config
22

33
import (
4+
"os"
45
"time"
56

67
"github.com/ydb-platform/ydb-go-sdk/v3/internal/config"
@@ -43,7 +44,7 @@ func defaults() *Config {
4344
sessionCreateTimeout: DefaultSessionCreateTimeout,
4445
sessionDeleteTimeout: DefaultSessionDeleteTimeout,
4546
trace: &trace.Query{},
46-
useSessionPool: false,
47+
useSessionPool: os.Getenv("YDB_GO_SDK_QUERY_SERVICE_USE_SESSION_POOL") != "",
4748
}
4849
}
4950

internal/query/session.go

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package query
22

33
import (
44
"context"
5-
"io"
65
"sync/atomic"
76

87
"github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1"
@@ -82,7 +81,7 @@ func createSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient,
8281
checks: []func(s *Session) bool{
8382
func(s *Session) bool {
8483
switch s.status() {
85-
case statusClosed, statusClosing, statusError:
84+
case statusClosed, statusClosing:
8685
return false
8786
default:
8887
return true
@@ -92,7 +91,7 @@ func createSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient,
9291
}
9392
defer func() {
9493
if finalErr != nil && s != nil {
95-
s.setStatus(statusError)
94+
panic("abnormal result")
9695
}
9796
}()
9897

@@ -135,6 +134,11 @@ func (s *Session) attach(ctx context.Context) (finalErr error) {
135134
}()
136135

137136
attachCtx, cancelAttach := xcontext.WithCancel(xcontext.ValueOnly(ctx))
137+
defer func() {
138+
if finalErr != nil {
139+
cancelAttach()
140+
}
141+
}()
138142

139143
attach, err := s.grpcClient.AttachSession(attachCtx, &Ydb_Query.AttachSessionRequest{
140144
SessionId: s.id,
@@ -145,8 +149,6 @@ func (s *Session) attach(ctx context.Context) (finalErr error) {
145149

146150
_, err = attach.Recv()
147151
if err != nil {
148-
cancelAttach()
149-
150152
return xerrors.WithStackTrace(err)
151153
}
152154

@@ -157,20 +159,11 @@ func (s *Session) attach(ctx context.Context) (finalErr error) {
157159
_ = s.closeOnce(xcontext.ValueOnly(ctx))
158160
}()
159161

160-
for {
161-
if !s.IsAlive() {
162-
return
163-
}
162+
for func() bool {
164163
_, recvErr := attach.Recv()
165-
if recvErr != nil {
166-
if xerrors.Is(recvErr, io.EOF) {
167-
s.setStatus(statusClosed)
168-
} else {
169-
s.setStatus(statusError)
170-
}
171164

172-
return
173-
}
165+
return recvErr == nil
166+
}() {
174167
}
175168
}()
176169

internal/query/session_status.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ const (
1414
statusInUse
1515
statusClosing
1616
statusClosed
17-
statusError
1817
)
1918

2019
func (s statusCode) String() string {
@@ -29,8 +28,6 @@ func (s statusCode) String() string {
2928
return session.StatusClosing
3029
case statusClosed:
3130
return session.StatusClosed
32-
case statusError:
33-
return session.StatusError
3431
default:
3532
return fmt.Sprintf("Unknown%d", s)
3633
}
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package internal
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os/signal"
7+
"sync"
8+
"syscall"
9+
"time"
10+
11+
"golang.org/x/sync/errgroup"
12+
"golang.org/x/time/rate"
13+
14+
"slo/internal/config"
15+
"slo/internal/generator"
16+
"slo/internal/workers"
17+
)
18+
19+
func Main(
20+
label string,
21+
jobName string,
22+
) {
23+
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
24+
defer cancel()
25+
26+
cfg, err := config.New()
27+
if err != nil {
28+
panic(fmt.Errorf("create config failed: %w", err))
29+
}
30+
31+
fmt.Println("program started")
32+
defer fmt.Println("program finished")
33+
34+
ctx, cancel = context.WithTimeout(ctx, time.Duration(cfg.Time)*time.Second)
35+
defer cancel()
36+
37+
s, err := NewStorage(ctx, cfg, cfg.ReadRPS+cfg.WriteRPS, label)
38+
if err != nil {
39+
panic(fmt.Errorf("create storage failed: %w", err))
40+
}
41+
defer func() {
42+
var (
43+
shutdownCtx context.Context
44+
shutdownCancel context.CancelFunc
45+
)
46+
if cfg.ShutdownTime > 0 {
47+
shutdownCtx, shutdownCancel = context.WithTimeout(context.Background(),
48+
time.Duration(cfg.ShutdownTime)*time.Second)
49+
} else {
50+
shutdownCtx, shutdownCancel = context.WithCancel(context.Background())
51+
}
52+
defer shutdownCancel()
53+
54+
_ = s.Close(shutdownCtx)
55+
}()
56+
57+
fmt.Println("db init ok")
58+
59+
switch cfg.Mode {
60+
case config.CreateMode:
61+
err = s.CreateTable(ctx)
62+
if err != nil {
63+
panic(fmt.Errorf("create table failed: %w", err))
64+
}
65+
fmt.Println("create table ok")
66+
67+
gen := generator.New(0)
68+
69+
g := errgroup.Group{}
70+
71+
for i := uint64(0); i < cfg.InitialDataCount; i++ {
72+
g.Go(func() (err error) {
73+
e, err := gen.Generate()
74+
if err != nil {
75+
return err
76+
}
77+
78+
_, err = s.Write(ctx, e)
79+
if err != nil {
80+
return err
81+
}
82+
83+
return nil
84+
})
85+
}
86+
87+
err = g.Wait()
88+
if err != nil {
89+
panic(err)
90+
}
91+
92+
fmt.Println("entries write ok")
93+
case config.CleanupMode:
94+
err = s.DropTable(ctx)
95+
if err != nil {
96+
panic(fmt.Errorf("create table failed: %w", err))
97+
}
98+
99+
fmt.Println("cleanup table ok")
100+
case config.RunMode:
101+
gen := generator.New(cfg.InitialDataCount)
102+
103+
w, err := workers.New(cfg, s, label, jobName)
104+
if err != nil {
105+
panic(fmt.Errorf("create workers failed: %w", err))
106+
}
107+
defer func() {
108+
err := w.Close()
109+
if err != nil {
110+
panic(fmt.Errorf("workers close failed: %w", err))
111+
}
112+
fmt.Println("workers close ok")
113+
}()
114+
115+
wg := sync.WaitGroup{}
116+
117+
readRL := rate.NewLimiter(rate.Limit(cfg.ReadRPS), 1)
118+
wg.Add(cfg.ReadRPS)
119+
for i := 0; i < cfg.ReadRPS; i++ {
120+
go w.Read(ctx, &wg, readRL)
121+
}
122+
123+
writeRL := rate.NewLimiter(rate.Limit(cfg.WriteRPS), 1)
124+
wg.Add(cfg.WriteRPS)
125+
for i := 0; i < cfg.WriteRPS; i++ {
126+
go w.Write(ctx, &wg, writeRL, gen)
127+
}
128+
129+
metricsRL := rate.NewLimiter(rate.Every(time.Duration(cfg.ReportPeriod)*time.Millisecond), 1)
130+
wg.Add(1)
131+
go w.Metrics(ctx, &wg, metricsRL)
132+
133+
wg.Wait()
134+
default:
135+
panic(fmt.Errorf("unknown mode: %v", cfg.Mode))
136+
}
137+
}

tests/slo/native/query/storage.go renamed to tests/slo/native/query/internal/storage.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package main
1+
package internal
22

33
import (
44
"context"
@@ -69,7 +69,7 @@ const dropTableQuery = `
6969
DROP TABLE %s
7070
`
7171

72-
func NewStorage(ctx context.Context, cfg *config.Config, poolSize int) (*Storage, error) {
72+
func NewStorage(ctx context.Context, cfg *config.Config, poolSize int, label string) (*Storage, error) {
7373
ctx, cancel := context.WithTimeout(ctx, time.Minute*5) //nolint:gomnd
7474
defer cancel()
7575

@@ -219,7 +219,7 @@ func (s *Storage) Write(ctx context.Context, e generator.Row) (attempts int, fin
219219
return attempts, err
220220
}
221221

222-
func (s *Storage) createTable(ctx context.Context) error {
222+
func (s *Storage) CreateTable(ctx context.Context) error {
223223
ctx, cancel := context.WithTimeout(ctx, time.Duration(s.cfg.WriteTimeout)*time.Millisecond)
224224
defer cancel()
225225

@@ -237,7 +237,7 @@ func (s *Storage) createTable(ctx context.Context) error {
237237
)
238238
}
239239

240-
func (s *Storage) dropTable(ctx context.Context) error {
240+
func (s *Storage) DropTable(ctx context.Context) error {
241241
err := ctx.Err()
242242
if err != nil {
243243
return err
@@ -260,7 +260,7 @@ func (s *Storage) dropTable(ctx context.Context) error {
260260
)
261261
}
262262

263-
func (s *Storage) close(ctx context.Context) error {
263+
func (s *Storage) Close(ctx context.Context) error {
264264
s.retryBudget.Stop()
265265

266266
var (

0 commit comments

Comments
 (0)