Skip to content

Commit 0826d22

Browse files
committed
* Added trace.Query.OnSessionBegin event
* Added `trace.Query.OnResult{New,NextPart,NextResultSet,Close}` events * Added `trace.Query.OnRow{Scan,ScanNamed,ScanStruct}` events
1 parent e699ca1 commit 0826d22

File tree

19 files changed

+1137
-142
lines changed

19 files changed

+1137
-142
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
* Added `trace.Query.OnSessionBegin` event
2+
* Added `trace.Query.OnResult{New,NextPart,NextResultSet,Close}` events
3+
* Added `trace.Query.OnRow{Scan,ScanNamed,ScanStruct}` events
4+
15
## v3.58.1
26
* Dropped all deprecated callbacks and events from traces
37
* Added `trace.Driver.OnConnStream{SendMsg,RecvMsg,CloseSend}` events

internal/query/execute_query.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func execute(ctx context.Context, s *Session, c Ydb_Query_V1.QueryServiceClient,
6565
return nil, nil, xerrors.WithStackTrace(err)
6666
}
6767

68-
r, txID, err := newResult(ctx, stream)
68+
r, txID, err := newResult(ctx, stream, s.trace)
6969
if err != nil {
7070
return nil, nil, xerrors.WithStackTrace(err)
7171
}

internal/query/execute_query_test.go

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -369,37 +369,37 @@ func TestExecute(t *testing.T) {
369369
require.EqualValues(t, 0, rs.index)
370370
{
371371
t.Log("next (row=1)")
372-
_, err := rs.next(ctx)
372+
_, err := rs.nextRow(ctx)
373373
require.NoError(t, err)
374374
require.EqualValues(t, 0, rs.rowIndex)
375375
}
376376
{
377377
t.Log("next (row=2)")
378-
_, err := rs.next(ctx)
378+
_, err := rs.nextRow(ctx)
379379
require.NoError(t, err)
380380
require.EqualValues(t, 1, rs.rowIndex)
381381
}
382382
{
383383
t.Log("next (row=3)")
384-
_, err := rs.next(ctx)
384+
_, err := rs.nextRow(ctx)
385385
require.NoError(t, err)
386386
require.EqualValues(t, 2, rs.rowIndex)
387387
}
388388
{
389389
t.Log("next (row=4)")
390-
_, err := rs.next(ctx)
390+
_, err := rs.nextRow(ctx)
391391
require.NoError(t, err)
392392
require.EqualValues(t, 0, rs.rowIndex)
393393
}
394394
{
395395
t.Log("next (row=5)")
396-
_, err := rs.next(ctx)
396+
_, err := rs.nextRow(ctx)
397397
require.NoError(t, err)
398398
require.EqualValues(t, 1, rs.rowIndex)
399399
}
400400
{
401401
t.Log("next (row=6)")
402-
_, err := rs.next(ctx)
402+
_, err := rs.nextRow(ctx)
403403
require.ErrorIs(t, err, io.EOF)
404404
}
405405
}
@@ -416,37 +416,37 @@ func TestExecute(t *testing.T) {
416416
require.EqualValues(t, 2, rs.index)
417417
{
418418
t.Log("next (row=1)")
419-
_, err := rs.next(ctx)
419+
_, err := rs.nextRow(ctx)
420420
require.NoError(t, err)
421421
require.EqualValues(t, 0, rs.rowIndex)
422422
}
423423
{
424424
t.Log("next (row=2)")
425-
_, err := rs.next(ctx)
425+
_, err := rs.nextRow(ctx)
426426
require.NoError(t, err)
427427
require.EqualValues(t, 1, rs.rowIndex)
428428
}
429429
{
430430
t.Log("next (row=3)")
431-
_, err := rs.next(ctx)
431+
_, err := rs.nextRow(ctx)
432432
require.NoError(t, err)
433433
require.EqualValues(t, 0, rs.rowIndex)
434434
}
435435
{
436436
t.Log("next (row=4)")
437-
_, err := rs.next(ctx)
437+
_, err := rs.nextRow(ctx)
438438
require.NoError(t, err)
439439
require.EqualValues(t, 1, rs.rowIndex)
440440
}
441441
{
442442
t.Log("next (row=5)")
443-
_, err := rs.next(ctx)
443+
_, err := rs.nextRow(ctx)
444444
require.NoError(t, err)
445445
require.EqualValues(t, 2, rs.rowIndex)
446446
}
447447
{
448448
t.Log("next (row=6)")
449-
_, err := rs.next(ctx)
449+
_, err := rs.nextRow(ctx)
450450
require.ErrorIs(t, err, io.EOF)
451451
}
452452
}
@@ -586,37 +586,37 @@ func TestExecute(t *testing.T) {
586586
require.EqualValues(t, 0, rs.index)
587587
{
588588
t.Log("next (row=1)")
589-
_, err := rs.next(ctx)
589+
_, err := rs.nextRow(ctx)
590590
require.NoError(t, err)
591591
require.EqualValues(t, 0, rs.rowIndex)
592592
}
593593
{
594594
t.Log("next (row=2)")
595-
_, err := rs.next(ctx)
595+
_, err := rs.nextRow(ctx)
596596
require.NoError(t, err)
597597
require.EqualValues(t, 1, rs.rowIndex)
598598
}
599599
{
600600
t.Log("next (row=3)")
601-
_, err := rs.next(ctx)
601+
_, err := rs.nextRow(ctx)
602602
require.NoError(t, err)
603603
require.EqualValues(t, 2, rs.rowIndex)
604604
}
605605
{
606606
t.Log("next (row=4)")
607-
_, err := rs.next(ctx)
607+
_, err := rs.nextRow(ctx)
608608
require.NoError(t, err)
609609
require.EqualValues(t, 0, rs.rowIndex)
610610
}
611611
{
612612
t.Log("next (row=5)")
613-
_, err := rs.next(ctx)
613+
_, err := rs.nextRow(ctx)
614614
require.NoError(t, err)
615615
require.EqualValues(t, 1, rs.rowIndex)
616616
}
617617
{
618618
t.Log("next (row=6)")
619-
_, err := rs.next(ctx)
619+
_, err := rs.nextRow(ctx)
620620
require.Error(t, err)
621621
require.True(t, xerrors.IsTransportError(err, grpcCodes.Unavailable))
622622
}
@@ -726,25 +726,25 @@ func TestExecute(t *testing.T) {
726726
require.EqualValues(t, 0, rs.index)
727727
{
728728
t.Log("next (row=1)")
729-
_, err := rs.next(ctx)
729+
_, err := rs.nextRow(ctx)
730730
require.NoError(t, err)
731731
require.EqualValues(t, 0, rs.rowIndex)
732732
}
733733
{
734734
t.Log("next (row=2)")
735-
_, err := rs.next(ctx)
735+
_, err := rs.nextRow(ctx)
736736
require.NoError(t, err)
737737
require.EqualValues(t, 1, rs.rowIndex)
738738
}
739739
{
740740
t.Log("next (row=3)")
741-
_, err := rs.next(ctx)
741+
_, err := rs.nextRow(ctx)
742742
require.NoError(t, err)
743743
require.EqualValues(t, 2, rs.rowIndex)
744744
}
745745
{
746746
t.Log("next (row=4)")
747-
_, err := rs.next(ctx)
747+
_, err := rs.nextRow(ctx)
748748
require.Error(t, err)
749749
require.True(t, xerrors.IsOperationError(err, Ydb.StatusIds_UNAVAILABLE))
750750
}

internal/query/result.go

Lines changed: 47 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,44 +4,59 @@ import (
44
"context"
55
"fmt"
66
"io"
7-
"sync"
87

98
"github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1"
109
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query"
1110

11+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
1212
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
13+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
1314
"github.com/ydb-platform/ydb-go-sdk/v3/query"
15+
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
1416
)
1517

1618
var _ query.Result = (*result)(nil)
1719

1820
type result struct {
1921
stream Ydb_Query_V1.QueryService_ExecuteQueryClient
20-
closeOnce func()
22+
closeOnce func(ctx context.Context) error
2123
lastPart *Ydb_Query.ExecuteQueryResponsePart
2224
resultSetIndex int64
2325
errs []error
2426
closed chan struct{}
27+
trace *trace.Query
2528
}
2629

2730
func newResult(
2831
ctx context.Context,
2932
stream Ydb_Query_V1.QueryService_ExecuteQueryClient,
33+
t *trace.Query,
3034
) (_ *result, txID string, err error) {
35+
if t == nil {
36+
t = &trace.Query{}
37+
}
38+
39+
onDone := trace.QueryOnResultNew(t, &ctx, stack.FunctionID(""))
40+
defer func() {
41+
onDone(err)
42+
}()
43+
3144
select {
3245
case <-ctx.Done():
3346
return nil, txID, xerrors.WithStackTrace(ctx.Err())
3447
default:
35-
part, err := nextPart(stream)
48+
part, err := nextPart(ctx, stream, t)
3649
if err != nil {
3750
return nil, txID, xerrors.WithStackTrace(err)
3851
}
3952
var (
4053
interrupted = make(chan struct{})
4154
closed = make(chan struct{})
42-
closeOnce = sync.OnceFunc(func() {
55+
closeOnce = xsync.OnceFunc(func(ctx context.Context) error {
4356
close(interrupted)
4457
close(closed)
58+
59+
return nil
4560
})
4661
)
4762

@@ -51,13 +66,25 @@ func newResult(
5166
lastPart: part,
5267
closed: closed,
5368
closeOnce: closeOnce,
69+
trace: t,
5470
}, part.GetTxMeta().GetId(), nil
5571
}
5672
}
5773

5874
func nextPart(
75+
ctx context.Context,
5976
stream Ydb_Query_V1.QueryService_ExecuteQueryClient,
77+
t *trace.Query,
6078
) (_ *Ydb_Query.ExecuteQueryResponsePart, finalErr error) {
79+
if t == nil {
80+
t = &trace.Query{}
81+
}
82+
83+
onDone := trace.QueryOnResultNextPart(t, &ctx, stack.FunctionID(""))
84+
defer func() {
85+
onDone(finalErr)
86+
}()
87+
6188
part, err := stream.Recv()
6289
if err != nil {
6390
return nil, xerrors.WithStackTrace(err)
@@ -66,10 +93,13 @@ func nextPart(
6693
return part, nil
6794
}
6895

69-
func (r *result) Close(ctx context.Context) error {
70-
r.closeOnce()
96+
func (r *result) Close(ctx context.Context) (err error) {
97+
onDone := trace.QueryOnResultClose(r.trace, &ctx, stack.FunctionID(""))
98+
defer func() {
99+
onDone(err)
100+
}()
71101

72-
return nil
102+
return r.closeOnce(ctx)
73103
}
74104

75105
func (r *result) nextResultSet(ctx context.Context) (_ *resultSet, err error) {
@@ -103,10 +133,10 @@ func (r *result) nextResultSet(ctx context.Context) (_ *resultSet, err error) {
103133
case <-r.closed:
104134
return nil, errClosedResult
105135
default:
106-
part, err := nextPart(r.stream)
136+
part, err := nextPart(ctx, r.stream, r.trace)
107137
if err != nil {
108138
if xerrors.Is(err, io.EOF) {
109-
r.closeOnce()
139+
_ = r.closeOnce(ctx)
110140
}
111141

112142
return nil, xerrors.WithStackTrace(err)
@@ -121,9 +151,9 @@ func (r *result) nextResultSet(ctx context.Context) (_ *resultSet, err error) {
121151

122152
return part, nil
123153
}
124-
}, r.lastPart), nil
154+
}, r.lastPart, r.trace), nil
125155
}
126-
part, err := nextPart(r.stream)
156+
part, err := nextPart(ctx, r.stream, r.trace)
127157
if err != nil {
128158
return nil, xerrors.WithStackTrace(err)
129159
}
@@ -139,7 +169,12 @@ func (r *result) nextResultSet(ctx context.Context) (_ *resultSet, err error) {
139169
}
140170
}
141171

142-
func (r *result) NextResultSet(ctx context.Context) (query.ResultSet, error) {
172+
func (r *result) NextResultSet(ctx context.Context) (_ query.ResultSet, err error) {
173+
onDone := trace.QueryOnResultNextResultSet(r.trace, &ctx, stack.FunctionID(""))
174+
defer func() {
175+
onDone(err)
176+
}()
177+
143178
return r.nextResultSet(ctx)
144179
}
145180

internal/query/result_set.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ import (
88
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
99
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query"
1010

11+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
1112
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1213
"github.com/ydb-platform/ydb-go-sdk/v3/query"
14+
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
1315
)
1416

1517
var _ query.ResultSet = (*resultSet)(nil)
@@ -20,26 +22,31 @@ type resultSet struct {
2022
columns []*Ydb.Column
2123
currentPart *Ydb_Query.ExecuteQueryResponsePart
2224
rowIndex int
25+
trace *trace.Query
2326
done chan struct{}
2427
}
2528

2629
func newResultSet(
27-
recv func() (
28-
*Ydb_Query.ExecuteQueryResponsePart, error,
29-
),
30+
recv func() (*Ydb_Query.ExecuteQueryResponsePart, error),
3031
part *Ydb_Query.ExecuteQueryResponsePart,
32+
t *trace.Query,
3133
) *resultSet {
34+
if t == nil {
35+
t = &trace.Query{}
36+
}
37+
3238
return &resultSet{
3339
index: part.GetResultSetIndex(),
3440
recv: recv,
3541
currentPart: part,
3642
rowIndex: -1,
3743
columns: part.GetResultSet().GetColumns(),
44+
trace: t,
3845
done: make(chan struct{}),
3946
}
4047
}
4148

42-
func (rs *resultSet) next(ctx context.Context) (*row, error) {
49+
func (rs *resultSet) nextRow(ctx context.Context) (*row, error) {
4350
rs.rowIndex++
4451
select {
4552
case <-rs.done:
@@ -73,10 +80,15 @@ func (rs *resultSet) next(ctx context.Context) (*row, error) {
7380
))
7481
}
7582

76-
return newRow(rs.columns, rs.currentPart.GetResultSet().GetRows()[rs.rowIndex])
83+
return newRow(ctx, rs.columns, rs.currentPart.GetResultSet().GetRows()[rs.rowIndex], rs.trace)
7784
}
7885
}
7986

80-
func (rs *resultSet) NextRow(ctx context.Context) (query.Row, error) {
81-
return rs.next(ctx)
87+
func (rs *resultSet) NextRow(ctx context.Context) (_ query.Row, err error) {
88+
onDone := trace.QueryOnResultSetNextRow(rs.trace, &ctx, stack.FunctionID(""))
89+
defer func() {
90+
onDone(err)
91+
}()
92+
93+
return rs.nextRow(ctx)
8294
}

0 commit comments

Comments
 (0)