Skip to content

Commit e6505bf

Browse files
authored
Merge pull request #1121 from ydb-platform/query-metrics
* Added metrics over query service internals
2 parents 46d63c0 + 4ce292c commit e6505bf

File tree

10 files changed

+381
-20
lines changed

10 files changed

+381
-20
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
* Added metrics over query service internals
2+
* Added session create and delete events into `trace.Query`
3+
* Moved public type `query.SessionStatus` into `internal/query` package
4+
15
## v3.57.2
26
* Fixed cases when some option is nil
37

internal/query/client.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ func createSession(
214214
id: s.GetSessionId(),
215215
nodeID: s.GetNodeId(),
216216
queryClient: client,
217-
status: query.SessionStatusReady,
217+
status: SessionStatusReady,
218218
}
219219

220220
if cfg.onAttach != nil {
@@ -232,15 +232,15 @@ func createSession(
232232

233233
atomic.StoreUint32(
234234
(*uint32)(&session.status),
235-
uint32(query.SessionStatusClosed),
235+
uint32(SessionStatusClosed),
236236
)
237237
})
238238

239239
go func() {
240240
defer session.close()
241241
for {
242-
switch session.Status() {
243-
case query.SessionStatusReady, query.SessionStatusInUse:
242+
switch session.sessionStatus() {
243+
case SessionStatusReady, SessionStatusInUse:
244244
sessionState, recvErr := attach.Recv()
245245
if recvErr != nil || sessionState.GetStatus() != Ydb.StatusIds_SUCCESS {
246246
return
@@ -262,7 +262,11 @@ func New(ctx context.Context, balancer balancer, config *config.Config) (*Client
262262

263263
client.pool = pool.New(
264264
config.PoolMaxSize(),
265-
func(ctx context.Context, onClose func(s *Session)) (*Session, error) {
265+
func(ctx context.Context, onClose func(s *Session)) (s *Session, err error) {
266+
onDone := trace.QueryOnCreateSession(config.Trace(), &ctx, stack.FunctionID(""))
267+
defer func() {
268+
onDone(s, err)
269+
}()
266270
var cancel context.CancelFunc
267271
if d := config.CreateSessionTimeout(); d > 0 {
268272
ctx, cancel = xcontext.WithTimeout(ctx, d)
@@ -271,7 +275,7 @@ func New(ctx context.Context, balancer balancer, config *config.Config) (*Client
271275
}
272276
defer cancel()
273277

274-
s, err := createSession(ctx, client.grpcClient, createSessionConfig{
278+
s, err = createSession(ctx, client.grpcClient, createSessionConfig{
275279
onClose: onClose,
276280
})
277281
if err != nil {
@@ -280,7 +284,11 @@ func New(ctx context.Context, balancer balancer, config *config.Config) (*Client
280284

281285
return s, nil
282286
},
283-
func(ctx context.Context, s *Session) error {
287+
func(ctx context.Context, s *Session) (err error) {
288+
onDone := trace.QueryOnDeleteSession(config.Trace(), &ctx, stack.FunctionID(""), s)
289+
defer func() {
290+
onDone(err)
291+
}()
284292
var cancel context.CancelFunc
285293
if d := config.CreateSessionTimeout(); d > 0 {
286294
ctx, cancel = xcontext.WithTimeout(ctx, d)
@@ -289,7 +297,7 @@ func New(ctx context.Context, balancer balancer, config *config.Config) (*Client
289297
}
290298
defer cancel()
291299

292-
err := deleteSession(ctx, client.grpcClient, s.id)
300+
err = deleteSession(ctx, client.grpcClient, s.id)
293301
if err != nil {
294302
return xerrors.WithStackTrace(err)
295303
}

internal/query/session.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ type Session struct {
2020
id string
2121
nodeID int64
2222
queryClient Ydb_Query_V1.QueryServiceClient
23-
status query.SessionStatus
23+
status sessionStatus
2424
close func()
2525
}
2626

@@ -71,8 +71,12 @@ func (s *Session) NodeID() int64 {
7171
return s.nodeID
7272
}
7373

74-
func (s *Session) Status() query.SessionStatus {
75-
return query.SessionStatus(atomic.LoadUint32((*uint32)(&s.status)))
74+
func (s *Session) sessionStatus() sessionStatus {
75+
return sessionStatus(atomic.LoadUint32((*uint32)(&s.status)))
76+
}
77+
78+
func (s *Session) Status() string {
79+
return s.sessionStatus().String()
7680
}
7781

7882
func (s *Session) Execute(

query/session_status.go renamed to internal/query/session_status.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,16 @@ import (
66
"github.com/ydb-platform/ydb-go-sdk/v3/internal/session"
77
)
88

9-
type SessionStatus uint32
9+
type sessionStatus uint32
1010

1111
const (
12-
SessionStatusUnknown = SessionStatus(iota)
12+
SessionStatusUnknown = sessionStatus(iota)
1313
SessionStatusReady
1414
SessionStatusInUse
1515
SessionStatusClosed
1616
)
1717

18-
func (s SessionStatus) String() string {
18+
func (s sessionStatus) String() string {
1919
switch s {
2020
case 0:
2121
return session.StatusUnknown

log/query.go

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func internalQuery(
2323
) func(
2424
trace.QueryDoDoneInfo,
2525
) {
26-
if d.Details()&trace.QueryPoolEvents == 0 {
26+
if d.Details()&trace.QueryEvents == 0 {
2727
return nil
2828
}
2929
ctx := with(*info.Context, TRACE, "ydb", "query", "do")
@@ -58,7 +58,7 @@ func internalQuery(
5858
if !xerrors.IsYdb(info.Error) {
5959
lvl = DEBUG
6060
}
61-
l.Log(WithLevel(ctx, lvl), "done",
61+
l.Log(WithLevel(ctx, lvl), "failed",
6262
latencyField(start),
6363
Error(info.Error),
6464
Int("attempts", info.Attempts),
@@ -75,7 +75,7 @@ func internalQuery(
7575
) func(
7676
trace.QueryDoTxDoneInfo,
7777
) {
78-
if d.Details()&trace.TablePoolAPIEvents == 0 {
78+
if d.Details()&trace.QueryEvents == 0 {
7979
return nil
8080
}
8181
ctx := with(*info.Context, TRACE, "ydb", "query", "do", "tx")
@@ -110,7 +110,7 @@ func internalQuery(
110110
if !xerrors.IsYdb(info.Error) {
111111
lvl = DEBUG
112112
}
113-
l.Log(WithLevel(ctx, lvl), "done",
113+
l.Log(WithLevel(ctx, lvl), "failed",
114114
latencyField(start),
115115
Error(info.Error),
116116
Int("attempts", info.Attempts),
@@ -120,6 +120,63 @@ func internalQuery(
120120
}
121121
}
122122
}
123+
t.OnCreateSession = func(info trace.QueryCreateSessionStartInfo) func(info trace.QueryCreateSessionDoneInfo) {
124+
if d.Details()&trace.QuerySessionEvents == 0 {
125+
return nil
126+
}
127+
ctx := with(*info.Context, TRACE, "ydb", "query", "session", "create")
128+
l.Log(ctx, "start")
129+
start := time.Now()
130+
131+
return func(info trace.QueryCreateSessionDoneInfo) {
132+
if info.Error == nil {
133+
l.Log(ctx, "done",
134+
latencyField(start),
135+
String("session_id", info.Session.ID()),
136+
String("session_status", info.Session.Status()),
137+
)
138+
} else {
139+
lvl := WARN
140+
if !xerrors.IsYdb(info.Error) {
141+
lvl = DEBUG
142+
}
143+
l.Log(WithLevel(ctx, lvl), "done",
144+
latencyField(start),
145+
Error(info.Error),
146+
versionField(),
147+
)
148+
}
149+
}
150+
}
151+
t.OnDeleteSession = func(info trace.QueryDeleteSessionStartInfo) func(info trace.QueryDeleteSessionDoneInfo) {
152+
if d.Details()&trace.QuerySessionEvents == 0 {
153+
return nil
154+
}
155+
ctx := with(*info.Context, TRACE, "ydb", "query", "session", "delete")
156+
l.Log(ctx, "start",
157+
String("session_id", info.Session.ID()),
158+
String("session_status", info.Session.Status()),
159+
)
160+
start := time.Now()
161+
162+
return func(info trace.QueryDeleteSessionDoneInfo) {
163+
if info.Error == nil {
164+
l.Log(ctx, "done",
165+
latencyField(start),
166+
)
167+
} else {
168+
lvl := WARN
169+
if !xerrors.IsYdb(info.Error) {
170+
lvl = DEBUG
171+
}
172+
l.Log(WithLevel(ctx, lvl), "failed",
173+
latencyField(start),
174+
Error(info.Error),
175+
versionField(),
176+
)
177+
}
178+
}
179+
}
123180

124181
return t
125182
}

metrics/query.go

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package metrics
2+
3+
import (
4+
"time"
5+
6+
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
7+
)
8+
9+
func query(config Config) (t trace.Query) {
10+
queryConfig := config.WithSystem("query")
11+
{
12+
doConfig := queryConfig.WithSystem("do")
13+
{
14+
intermediateErrs := doConfig.WithSystem("intermediate").CounterVec("errs", "status")
15+
errs := doConfig.CounterVec("errs", "status")
16+
attempts := doConfig.HistogramVec("attempts", []float64{0, 1, 2, 3, 4, 5, 7, 10})
17+
latency := doConfig.TimerVec("latency")
18+
t.OnDo = func(
19+
info trace.QueryDoStartInfo,
20+
) func(
21+
info trace.QueryDoIntermediateInfo,
22+
) func(
23+
trace.QueryDoDoneInfo,
24+
) {
25+
start := time.Now()
26+
27+
return func(info trace.QueryDoIntermediateInfo) func(trace.QueryDoDoneInfo) {
28+
if info.Error != nil && doConfig.Details()&trace.QueryEvents != 0 {
29+
intermediateErrs.With(map[string]string{
30+
"status": errorBrief(info.Error),
31+
}).Inc()
32+
}
33+
34+
return func(info trace.QueryDoDoneInfo) {
35+
if doConfig.Details()&trace.QueryEvents != 0 {
36+
attempts.With(nil).Record(float64(info.Attempts))
37+
errs.With(map[string]string{
38+
"status": errorBrief(info.Error),
39+
}).Inc()
40+
latency.With(nil).Record(time.Since(start))
41+
}
42+
}
43+
}
44+
}
45+
}
46+
{
47+
doTxConfig := doConfig.WithSystem("tx")
48+
intermediateErrs := doTxConfig.WithSystem("intermediate").CounterVec("errs", "status")
49+
errs := doTxConfig.CounterVec("errs", "status")
50+
attempts := doTxConfig.HistogramVec("attempts", []float64{0, 1, 2, 3, 4, 5, 7, 10})
51+
latency := doTxConfig.TimerVec("latency")
52+
t.OnDoTx = func(
53+
info trace.QueryDoTxStartInfo,
54+
) func(
55+
info trace.QueryDoTxIntermediateInfo,
56+
) func(
57+
trace.QueryDoTxDoneInfo,
58+
) {
59+
start := time.Now()
60+
61+
return func(info trace.QueryDoTxIntermediateInfo) func(trace.QueryDoTxDoneInfo) {
62+
if info.Error != nil && doTxConfig.Details()&trace.QueryEvents != 0 {
63+
intermediateErrs.With(map[string]string{
64+
"status": errorBrief(info.Error),
65+
}).Inc()
66+
}
67+
68+
return func(info trace.QueryDoTxDoneInfo) {
69+
if doTxConfig.Details()&trace.QueryEvents != 0 {
70+
attempts.With(nil).Record(float64(info.Attempts))
71+
errs.With(map[string]string{
72+
"status": errorBrief(info.Error),
73+
}).Inc()
74+
latency.With(nil).Record(time.Since(start))
75+
}
76+
}
77+
}
78+
}
79+
}
80+
}
81+
{
82+
sessionConfig := queryConfig.WithSystem("session")
83+
{
84+
createConfig := sessionConfig.WithSystem("create")
85+
errs := createConfig.CounterVec("errs", "status")
86+
latency := createConfig.TimerVec("latency")
87+
t.OnCreateSession = func(
88+
info trace.QueryCreateSessionStartInfo,
89+
) func(
90+
info trace.QueryCreateSessionDoneInfo,
91+
) {
92+
start := time.Now()
93+
94+
return func(info trace.QueryCreateSessionDoneInfo) {
95+
if info.Error != nil && createConfig.Details()&trace.QuerySessionEvents != 0 {
96+
errs.With(map[string]string{
97+
"status": errorBrief(info.Error),
98+
}).Inc()
99+
latency.With(nil).Record(time.Since(start))
100+
}
101+
}
102+
}
103+
}
104+
{
105+
deleteConfig := sessionConfig.WithSystem("delete")
106+
errs := deleteConfig.CounterVec("errs", "status")
107+
latency := deleteConfig.TimerVec("latency")
108+
t.OnCreateSession = func(
109+
info trace.QueryCreateSessionStartInfo,
110+
) func(
111+
info trace.QueryCreateSessionDoneInfo,
112+
) {
113+
start := time.Now()
114+
115+
return func(info trace.QueryCreateSessionDoneInfo) {
116+
if info.Error != nil && deleteConfig.Details()&trace.QuerySessionEvents != 0 {
117+
errs.With(map[string]string{
118+
"status": errorBrief(info.Error),
119+
}).Inc()
120+
latency.With(nil).Record(time.Since(start))
121+
}
122+
}
123+
}
124+
}
125+
}
126+
127+
return t
128+
}

metrics/traces.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,6 @@ func WithTraces(config Config) ydb.Option {
2020
ydb.WithTraceDiscovery(discovery(config)),
2121
ydb.WithTraceDatabaseSQL(databaseSQL(config)),
2222
ydb.WithTraceRetry(retry(config)),
23+
ydb.WithTraceQuery(query(config)),
2324
)
2425
}

query/session.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ type (
1414
SessionInfo interface {
1515
ID() string
1616
NodeID() int64
17-
Status() SessionStatus
17+
Status() string
1818
}
1919

2020
Session interface {

0 commit comments

Comments
 (0)