Skip to content

Commit d7d46e3

Browse files
committed
scripting tracing
1 parent efd2c32 commit d7d46e3

File tree

7 files changed

+228
-59
lines changed

7 files changed

+228
-59
lines changed

internal/scripting/scripting.go

Lines changed: 52 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/ydb-platform/ydb-go-sdk/v3/table"
2020
"github.com/ydb-platform/ydb-go-sdk/v3/table/result"
2121
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
22+
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
2223
)
2324

2425
type client struct {
@@ -30,21 +31,29 @@ func (c *client) Execute(
3031
ctx context.Context,
3132
query string,
3233
params *table.QueryParameters,
33-
) (result.Result, error) {
34-
request := &Ydb_Scripting.ExecuteYqlRequest{
35-
Script: query,
36-
Parameters: params.Params(),
37-
OperationParams: operation.Params(
38-
c.config.OperationTimeout(),
39-
c.config.OperationCancelAfter(),
40-
operation.ModeSync,
41-
),
42-
}
43-
response, err := c.service.ExecuteYql(ctx, request)
34+
) (r result.Result, err error) {
35+
var (
36+
onDone = trace.ScriptingOnExecute(c.config.Trace(), &ctx, query, params)
37+
request = &Ydb_Scripting.ExecuteYqlRequest{
38+
Script: query,
39+
Parameters: params.Params(),
40+
OperationParams: operation.Params(
41+
c.config.OperationTimeout(),
42+
c.config.OperationCancelAfter(),
43+
operation.ModeSync,
44+
),
45+
}
46+
result = Ydb_Scripting.ExecuteYqlResult{}
47+
response *Ydb_Scripting.ExecuteYqlResponse
48+
)
49+
defer func() {
50+
onDone(r, err)
51+
}()
52+
response, err = c.service.ExecuteYql(ctx, request)
4453
if err != nil {
4554
return nil, err
4655
}
47-
result := Ydb_Scripting.ExecuteYqlResult{}
56+
4857
err = proto.Unmarshal(response.GetOperation().GetResult().GetValue(), &result)
4958
if err != nil {
5059
return nil, err
@@ -69,6 +78,7 @@ func (c *client) Explain(
6978
mode scripting.ExplainMode,
7079
) (e table.ScriptingYQLExplanation, err error) {
7180
var (
81+
onDone = trace.ScriptingOnExplain(c.config.Trace(), &ctx, query)
7282
request = &Ydb_Scripting.ExplainYqlRequest{
7383
Script: query,
7484
Mode: mode2mode(mode),
@@ -81,6 +91,9 @@ func (c *client) Explain(
8191
response *Ydb_Scripting.ExplainYqlResponse
8292
result = Ydb_Scripting.ExplainYqlResult{}
8393
)
94+
defer func() {
95+
onDone(e.Explanation.Plan, err)
96+
}()
8497
response, err = c.service.ExplainYql(ctx, request)
8598
if err != nil {
8699
return
@@ -106,16 +119,24 @@ func (c *client) StreamExecute(
106119
ctx context.Context,
107120
query string,
108121
params *table.QueryParameters,
109-
) (result.StreamResult, error) {
110-
request := &Ydb_Scripting.ExecuteYqlRequest{
111-
Script: query,
112-
Parameters: params.Params(),
113-
OperationParams: operation.Params(
114-
c.config.OperationTimeout(),
115-
c.config.OperationCancelAfter(),
116-
operation.ModeSync,
117-
),
118-
}
122+
) (r result.StreamResult, err error) {
123+
var (
124+
onIntermediate = trace.ScriptingOnStreamExecute(c.config.Trace(), &ctx, query, params)
125+
request = &Ydb_Scripting.ExecuteYqlRequest{
126+
Script: query,
127+
Parameters: params.Params(),
128+
OperationParams: operation.Params(
129+
c.config.OperationTimeout(),
130+
c.config.OperationCancelAfter(),
131+
operation.ModeSync,
132+
),
133+
}
134+
)
135+
defer func() {
136+
if err != nil {
137+
onIntermediate(err)(err)
138+
}
139+
}()
119140

120141
ctx, cancel := context.WithCancel(ctx)
121142

@@ -131,6 +152,9 @@ func (c *client) StreamExecute(
131152
stats *Ydb_TableStats.QueryStats,
132153
err error,
133154
) {
155+
defer func() {
156+
onIntermediate(err)
157+
}()
134158
select {
135159
case <-ctx.Done():
136160
return nil, nil, ctx.Err()
@@ -145,12 +169,17 @@ func (c *client) StreamExecute(
145169
},
146170
func(err error) error {
147171
cancel()
172+
onIntermediate(err)(err)
148173
return err
149174
},
150175
), nil
151176
}
152177

153-
func (c *client) Close(context.Context) error {
178+
func (c *client) Close(ctx context.Context) (err error) {
179+
onDone := trace.ScriptingOnClose(c.config.Trace(), &ctx)
180+
defer func() {
181+
onDone(err)
182+
}()
154183
return nil
155184
}
156185

internal/table/session.go

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -944,13 +944,26 @@ func (s *session) StreamExecuteScanQuery(
944944
q := new(dataQuery)
945945
q.initFromText(query)
946946
var (
947+
onIntermediate = trace.TableOnSessionQueryStreamExecute(
948+
s.config.Trace(),
949+
&ctx,
950+
s,
951+
q,
952+
params,
953+
)
947954
request = Ydb_Table.ExecuteScanQueryRequest{
948955
Query: &q.query,
949956
Parameters: params.Params(),
950957
Mode: Ydb_Table.ExecuteScanQueryRequest_MODE_EXEC, // set default
951958
}
952959
stream Ydb_Table_V1.TableService_StreamExecuteScanQueryClient
953960
)
961+
defer func() {
962+
if err != nil {
963+
onIntermediate(err)(err)
964+
}
965+
}()
966+
954967
for _, opt := range opts {
955968
opt((*options.ExecuteScanQueryDesc)(&request))
956969
}
@@ -962,17 +975,8 @@ func (s *session) StreamExecuteScanQuery(
962975
&request,
963976
)
964977

965-
onDone := trace.TableOnSessionQueryStreamExecute(
966-
s.config.Trace(),
967-
&ctx,
968-
s,
969-
q,
970-
params,
971-
)
972-
973978
if err != nil {
974979
cancel()
975-
onDone(err)
976980
return nil, err
977981
}
978982

@@ -982,6 +986,9 @@ func (s *session) StreamExecuteScanQuery(
982986
stats *Ydb_TableStats.QueryStats,
983987
err error,
984988
) {
989+
defer func() {
990+
onIntermediate(err)
991+
}()
985992
select {
986993
case <-ctx.Done():
987994
return nil, nil, ctx.Err()
@@ -996,7 +1003,7 @@ func (s *session) StreamExecuteScanQuery(
9961003
},
9971004
func(err error) error {
9981005
cancel()
999-
onDone(err)
1006+
onIntermediate(err)(err)
10001007
if checkHintSessionClose(stream.Trailer()) {
10011008
s.SetStatus(options.SessionClosing)
10021009
}

log/scripting.go

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,13 @@ func Scripting(log Logger, details trace.Details) (t trace.Scripting) {
2727
}
2828
}
2929
}
30-
t.OnExplain = func(info trace.ExplainQueryStartInfo) func(doneInfo trace.ExplainQueryDoneInfo) {
30+
t.OnExplain = func(info trace.ExplainStartInfo) func(trace.ExplainDoneInfo) {
3131
log.Debugf(`explain start`)
3232
start := time.Now()
33-
return func(info trace.ExplainQueryDoneInfo) {
33+
return func(info trace.ExplainDoneInfo) {
3434
if info.Error == nil {
35-
log.Debugf(`explain done {latency:"%v",ast:%v,plan:%v"}`,
35+
log.Debugf(`explain done {latency:"%v",plan:%v"}`,
3636
time.Since(start),
37-
info.AST,
3837
info.Plan,
3938
)
4039
} else {
@@ -45,6 +44,66 @@ func Scripting(log Logger, details trace.Details) (t trace.Scripting) {
4544
}
4645
}
4746
}
47+
t.OnStreamExecute = func(
48+
info trace.StreamExecuteStartInfo,
49+
) func(
50+
trace.StreamExecuteIntermediateInfo,
51+
) func(
52+
trace.StreamExecuteDoneInfo,
53+
) {
54+
query := info.Query
55+
params := info.Parameters
56+
log.Tracef(`stream execute start {query:"%s",params:"%s"}`,
57+
query,
58+
params,
59+
)
60+
start := time.Now()
61+
return func(
62+
info trace.StreamExecuteIntermediateInfo,
63+
) func(
64+
trace.StreamExecuteDoneInfo,
65+
) {
66+
if info.Error == nil {
67+
log.Tracef(`stream execute intermediate`)
68+
} else {
69+
log.Warnf(`stream execute intermediate failed {error:"%v"}`,
70+
info.Error,
71+
)
72+
}
73+
return func(info trace.StreamExecuteDoneInfo) {
74+
if info.Error == nil {
75+
log.Debugf(`stream execute done {latency:"%v",query:"%s",params:"%s"}`,
76+
time.Since(start),
77+
query,
78+
params,
79+
)
80+
} else {
81+
log.Errorf(`stream execute failed {latency:"%v",query:"%s",params:"%s",error:"%v"}`,
82+
time.Since(start),
83+
query,
84+
params,
85+
info.Error,
86+
)
87+
}
88+
}
89+
}
90+
}
91+
t.OnClose = func(info trace.ScriptingCloseStartInfo) func(trace.ScriptingCloseDoneInfo) {
92+
log.Debugf(`close start`)
93+
start := time.Now()
94+
return func(info trace.ScriptingCloseDoneInfo) {
95+
if info.Error == nil {
96+
log.Debugf(`close done {latency:"%v"}`,
97+
time.Since(start),
98+
)
99+
} else {
100+
log.Errorf(`close failed {latency:"%v",error:"%s"}`,
101+
time.Since(start),
102+
info.Error,
103+
)
104+
}
105+
}
106+
}
48107
}
49108
return t
50109
}

log/table.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ func Table(log Logger, details trace.Details) (t trace.Table) {
273273
session := info.Session
274274
query := info.Query
275275
params := info.Parameters
276-
log.Tracef(`execute start {id:"%s",status:"%s",query:"%s",params:"%s"}`,
276+
log.Tracef(`stream execute start {id:"%s",status:"%s",query:"%s",params:"%s"}`,
277277
session.ID(),
278278
session.Status(),
279279
query,
@@ -286,23 +286,23 @@ func Table(log Logger, details trace.Details) (t trace.Table) {
286286
trace.SessionQueryStreamExecuteDoneInfo,
287287
) {
288288
if info.Error == nil {
289-
log.Tracef(`intermediate`)
289+
log.Tracef(`stream execute intermediate`)
290290
} else {
291-
log.Warnf(`intermediate failed {error:"%v"}`,
291+
log.Warnf(`stream execute intermediate failed {error:"%v"}`,
292292
info.Error,
293293
)
294294
}
295295
return func(info trace.SessionQueryStreamExecuteDoneInfo) {
296296
if info.Error == nil {
297-
log.Debugf(`execute done {latency:"%v",id:"%s",status:"%s",query:"%s",params:"%s"}`,
297+
log.Debugf(`stream execute done {latency:"%v",id:"%s",status:"%s",query:"%s",params:"%s"}`,
298298
time.Since(start),
299299
session.ID(),
300300
session.Status(),
301301
query,
302302
params,
303303
)
304304
} else {
305-
log.Errorf(`execute failed {latency:"%v",id:"%s",status:"%s",query:"%s",params:"%s",error:"%v"}`,
305+
log.Errorf(`stream execute failed {latency:"%v",id:"%s",status:"%s",query:"%s",params:"%s",error:"%v"}`,
306306
time.Since(start),
307307
session.ID(),
308308
session.Status(),

trace/scripting.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ type (
1414
Scripting struct {
1515
OnExecute func(ExecuteStartInfo) func(ExecuteDoneInfo)
1616
OnStreamExecute func(StreamExecuteStartInfo) func(StreamExecuteIntermediateInfo) func(StreamExecuteDoneInfo)
17-
OnExplain func(info ExplainQueryStartInfo) func(doneInfo ExplainQueryDoneInfo)
17+
OnExplain func(ExplainStartInfo) func(ExplainDoneInfo)
18+
OnClose func(ScriptingCloseStartInfo) func(ScriptingCloseDoneInfo)
1819
}
1920
ExecuteStartInfo struct {
2021
// Context make available context in trace callback function.
@@ -44,4 +45,26 @@ type (
4445
StreamExecuteDoneInfo struct {
4546
Error error
4647
}
48+
ExplainStartInfo struct {
49+
// Context make available context in trace callback function.
50+
// Pointer to context provide replacement of context in trace callback function.
51+
// Warning: concurrent access to pointer on client side must be excluded.
52+
// Safe replacement of context are provided only inside callback function
53+
Context *context.Context
54+
Query string
55+
}
56+
ExplainDoneInfo struct {
57+
Plan string
58+
Error error
59+
}
60+
ScriptingCloseStartInfo struct {
61+
// Context make available context in trace callback function.
62+
// Pointer to context provide replacement of context in trace callback function.
63+
// Warning: concurrent access to pointer on client side must be excluded.
64+
// Safe replacement of context are provided only inside callback function
65+
Context *context.Context
66+
}
67+
ScriptingCloseDoneInfo struct {
68+
Error error
69+
}
4770
)

0 commit comments

Comments
 (0)