Skip to content

Commit fd8435a

Browse files
committed
logs over query service + fix conns pessimization
1 parent 76e07ce commit fd8435a

File tree

11 files changed

+372
-112
lines changed

11 files changed

+372
-112
lines changed

driver.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,7 @@ func newConnectionFromOptions(ctx context.Context, opts ...Option) (_ *Driver, e
348348
for _, opt := range []Option{
349349
WithTraceDriver(log.Driver(d.logger, d.loggerDetails, d.loggerOpts...)), //nolint:contextcheck
350350
WithTraceTable(log.Table(d.logger, d.loggerDetails, d.loggerOpts...)), //nolint:contextcheck
351+
WithTraceQuery(log.Query(d.logger, d.loggerDetails, d.loggerOpts...)), //nolint:contextcheck
351352
WithTraceScripting(log.Scripting(d.logger, d.loggerDetails, d.loggerOpts...)), //nolint:contextcheck
352353
WithTraceScheme(log.Scheme(d.logger, d.loggerDetails, d.loggerOpts...)),
353354
WithTraceCoordination(log.Coordination(d.logger, d.loggerDetails, d.loggerOpts...)),

internal/conn/pool.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"time"
88

99
"google.golang.org/grpc"
10+
grpcCodes "google.golang.org/grpc/codes"
1011

1112
"github.com/ydb-platform/ydb-go-sdk/v3/internal/closer"
1213
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
@@ -79,6 +80,15 @@ func (p *Pool) Ban(ctx context.Context, cc Conn, cause error) {
7980
return
8081
}
8182

83+
if xerrors.IsTransportError(cause,
84+
grpcCodes.OK,
85+
grpcCodes.Canceled,
86+
grpcCodes.ResourceExhausted,
87+
grpcCodes.OutOfRange,
88+
) {
89+
return
90+
}
91+
8292
e := cc.Endpoint().Copy()
8393

8494
p.mtx.RLock()

internal/pool/pool.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ func (p *Pool[PT, T]) produce(ctx context.Context) {
177177
onDone(&ProduceDoneInfo{})
178178
}()
179179

180-
p.spawn = make(chan PT, p.producersCount)
180+
p.spawn = make(chan PT, p.maxSize)
181181

182182
var wg, started sync.WaitGroup
183183
wg.Add(p.producersCount)

internal/query/client.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ import (
44
"context"
55

66
"github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1"
7-
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
8-
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query"
97
"google.golang.org/grpc"
108

119
"github.com/ydb-platform/ydb-go-sdk/v3/internal/pool"
@@ -142,22 +140,6 @@ func (c *Client) DoTx(ctx context.Context, op query.TxOperation, opts ...options
142140
return doTx(ctx, c.pool, op, c.config.Trace(), opts...)
143141
}
144142

145-
func deleteSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, sessionID string) error {
146-
response, err := client.DeleteSession(ctx,
147-
&Ydb_Query.DeleteSessionRequest{
148-
SessionId: sessionID,
149-
},
150-
)
151-
if err != nil {
152-
return xerrors.WithStackTrace(xerrors.Transport(err))
153-
}
154-
if response.GetStatus() != Ydb.StatusIds_SUCCESS {
155-
return xerrors.WithStackTrace(xerrors.FromOperation(response))
156-
}
157-
158-
return nil
159-
}
160-
161143
func New(ctx context.Context, balancer balancer, cfg *config.Config) (_ *Client, err error) {
162144
onDone := trace.QueryOnNew(cfg.Trace(), &ctx, stack.FunctionID(""))
163145
defer func() {

internal/query/client_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,15 @@ func TestCreateSession(t *testing.T) {
3131
attachStream.EXPECT().Recv().Return(&Ydb_Query.SessionState{
3232
Status: Ydb.StatusIds_SUCCESS,
3333
}, nil).AnyTimes()
34-
attachStream.EXPECT().CloseSend().Return(nil)
3534
service := NewMockQueryServiceClient(ctrl)
3635
service.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(&Ydb_Query.CreateSessionResponse{
3736
Status: Ydb.StatusIds_SUCCESS,
3837
SessionId: "test",
3938
}, nil)
4039
service.EXPECT().AttachSession(gomock.Any(), gomock.Any()).Return(attachStream, nil)
40+
service.EXPECT().DeleteSession(gomock.Any(), gomock.Any()).Return(&Ydb_Query.DeleteSessionResponse{
41+
Status: Ydb.StatusIds_SUCCESS,
42+
}, nil)
4143
t.Log("execute")
4244
attached := 0
4345
s, err := createSession(ctx, service, withSessionTrace(
@@ -101,7 +103,6 @@ func TestCreateSession(t *testing.T) {
101103
ctrl := gomock.NewController(t)
102104
attachStream := NewMockQueryService_AttachSessionClient(ctrl)
103105
attachStream.EXPECT().Recv().Return(nil, grpcStatus.Error(grpcCodes.Unavailable, "")).AnyTimes()
104-
attachStream.EXPECT().CloseSend().Return(nil)
105106
service := NewMockQueryServiceClient(ctrl)
106107
service.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(&Ydb_Query.CreateSessionResponse{
107108
Status: Ydb.StatusIds_SUCCESS,
@@ -141,7 +142,6 @@ func TestCreateSession(t *testing.T) {
141142
attachStream.EXPECT().Recv().Return(&Ydb_Query.SessionState{
142143
Status: Ydb.StatusIds_UNAVAILABLE,
143144
}, nil).AnyTimes()
144-
attachStream.EXPECT().CloseSend().Return(nil)
145145
service := NewMockQueryServiceClient(ctrl)
146146
service.EXPECT().CreateSession(gomock.Any(), gomock.Any()).Return(&Ydb_Query.CreateSessionResponse{
147147
Status: Ydb.StatusIds_SUCCESS,

internal/query/session.go

Lines changed: 35 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package query
22

33
import (
44
"context"
5+
"io"
56
"sync/atomic"
67

78
"github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1"
@@ -11,7 +12,6 @@ import (
1112
"github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator"
1213
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options"
1314
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
14-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
1515
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1616
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
1717
"github.com/ydb-platform/ydb-go-sdk/v3/query"
@@ -170,28 +170,14 @@ func (s *Session) attach(ctx context.Context) (finalErr error) {
170170
onDone(finalErr)
171171
}()
172172

173-
attachCtx, cancelAttach := xcontext.WithCancel(context.Background())
174-
defer func() {
175-
if finalErr != nil {
176-
cancelAttach()
177-
}
178-
}()
179-
180-
attach, err := s.grpcClient.AttachSession(attachCtx, &Ydb_Query.AttachSessionRequest{
173+
attach, err := s.grpcClient.AttachSession(context.Background(), &Ydb_Query.AttachSessionRequest{
181174
SessionId: s.id,
182175
})
183176
if err != nil {
184177
return xerrors.WithStackTrace(
185178
xerrors.Transport(err),
186179
)
187180
}
188-
189-
defer func() {
190-
if finalErr != nil {
191-
_ = attach.CloseSend()
192-
}
193-
}()
194-
195181
state, err := attach.Recv()
196182
if err != nil {
197183
return xerrors.WithStackTrace(xerrors.Transport(err))
@@ -202,13 +188,21 @@ func (s *Session) attach(ctx context.Context) (finalErr error) {
202188
}
203189

204190
go func() {
205-
defer func() {
206-
_ = s.closeOnce(ctx)
207-
}()
208-
209191
for {
192+
if !s.IsAlive() {
193+
return
194+
}
210195
recv, recvErr := attach.Recv()
211-
if recvErr != nil || recv.GetStatus() != Ydb.StatusIds_SUCCESS {
196+
if recvErr != nil {
197+
if xerrors.Is(recvErr, io.EOF) {
198+
s.setStatus(statusClosed)
199+
} else {
200+
s.setStatus(statusError)
201+
}
202+
203+
return
204+
}
205+
if recv.GetStatus() != Ydb.StatusIds_SUCCESS {
212206
s.setStatus(statusError)
213207

214208
return
@@ -227,16 +221,32 @@ func (s *Session) attach(ctx context.Context) (finalErr error) {
227221
}()
228222
}
229223

230-
err = attach.CloseSend()
231-
232-
cancelAttach()
224+
if err = deleteSession(ctx, s.grpcClient, s.id); err != nil {
225+
return xerrors.WithStackTrace(err)
226+
}
233227

234-
return err
228+
return nil
235229
})
236230

237231
return nil
238232
}
239233

234+
func deleteSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, sessionID string) error {
235+
response, err := client.DeleteSession(ctx,
236+
&Ydb_Query.DeleteSessionRequest{
237+
SessionId: sessionID,
238+
},
239+
)
240+
if err != nil {
241+
return xerrors.WithStackTrace(xerrors.Transport(err))
242+
}
243+
if response.GetStatus() != Ydb.StatusIds_SUCCESS {
244+
return xerrors.WithStackTrace(xerrors.FromOperation(response))
245+
}
246+
247+
return nil
248+
}
249+
240250
func (s *Session) IsAlive() bool {
241251
switch s.status() {
242252
case statusIdle, statusInUse:

log/driver.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ func internalDriver(l Logger, d trace.Detailer) (t trace.Driver) { //nolint:gocy
264264
latencyField(start),
265265
)
266266
} else {
267-
l.Log(WithLevel(ctx, WARN), "intermediate fail",
267+
l.Log(WithLevel(ctx, DEBUG), "intermediate fail",
268268
Error(info.Error),
269269
Stringer("endpoint", endpoint),
270270
String("method", method),
@@ -300,10 +300,10 @@ func internalDriver(l Logger, d trace.Detailer) (t trace.Driver) { //nolint:gocy
300300
}
301301
ctx := with(*info.Context, TRACE, "ydb", "driver", "conn", "ban")
302302
endpoint := info.Endpoint
303+
cause := info.Cause
303304
l.Log(ctx, "start",
304305
Stringer("endpoint", endpoint),
305-
NamedError("cause", info.Cause),
306-
versionField(),
306+
NamedError("cause", cause),
307307
)
308308
start := time.Now()
309309

@@ -312,6 +312,7 @@ func internalDriver(l Logger, d trace.Detailer) (t trace.Driver) { //nolint:gocy
312312
Stringer("endpoint", endpoint),
313313
latencyField(start),
314314
Stringer("state", info.State),
315+
NamedError("cause", cause),
315316
versionField(),
316317
)
317318
}

log/query.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,13 @@ func internalQuery(
2727

2828
return func(info trace.QueryNewDoneInfo) {
2929
if info.Error == nil {
30-
l.Log(ctx, "done",
30+
l.Log(WithLevel(ctx, INFO), "done",
3131
latencyField(start),
3232
)
3333
} else {
3434
lvl := FATAL
3535
if !xerrors.IsYdb(info.Error) {
36-
lvl = DEBUG
36+
lvl = ERROR
3737
}
3838
l.Log(WithLevel(ctx, lvl), "failed",
3939
latencyField(start),
@@ -83,7 +83,7 @@ func internalQuery(
8383

8484
return func(info trace.QueryPoolNewDoneInfo) {
8585
if info.Error == nil {
86-
l.Log(ctx, "done",
86+
l.Log(WithLevel(ctx, INFO), "done",
8787
latencyField(start),
8888
Int("MinSize", info.MinSize),
8989
Int("MaxSize", info.MaxSize),
@@ -92,7 +92,7 @@ func internalQuery(
9292
} else {
9393
lvl := FATAL
9494
if !xerrors.IsYdb(info.Error) {
95-
lvl = DEBUG
95+
lvl = ERROR
9696
}
9797
l.Log(WithLevel(ctx, lvl), "failed",
9898
latencyField(start),

tests/slo/go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ require (
2525
github.com/jinzhu/now v1.1.5 // indirect
2626
github.com/jonboulle/clockwork v0.4.0 // indirect
2727
github.com/json-iterator/go v1.1.12 // indirect
28+
github.com/mattn/go-sqlite3 v1.14.16 // indirect
2829
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
2930
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
3031
github.com/modern-go/reflect2 v1.0.2 // indirect
@@ -51,4 +52,4 @@ require (
5152

5253
replace github.com/ydb-platform/ydb-go-sdk/v3 => ../../.
5354

54-
replace xorm.io/xorm => github.com/ydb-platform/xorm v0.0.6
55+
replace xorm.io/xorm => github.com/ydb-platform/xorm v0.0.3

0 commit comments

Comments
 (0)