Skip to content

Commit 0152573

Browse files
xelavopelkKlepov Alexasmyasnikovkprokopenko
authored
feat: expose query warnings (#1882)
Co-authored-by: Klepov Alex <[email protected]> Co-authored-by: Aleksey Myasnikov <[email protected]> Co-authored-by: Konstantin Prokopenko <[email protected]>
1 parent f90a68f commit 0152573

File tree

6 files changed

+274
-2
lines changed

6 files changed

+274
-2
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Added method `query.WithIssuesHandler` to get query issues
2+
13
## v3.117.2
24
* Added support for `Result.RowsAffected()` for YDB `database/sql` driver
35
* Upgraded minimal version of Go to 1.23.9

internal/query/client.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,8 @@ func (c *Client) QueryRow(ctx context.Context, q string, opts ...options.Execute
348348
func clientExec(ctx context.Context, pool sessionPool, q string, opts ...options.Execute) (finalErr error) {
349349
settings := options.ExecuteSettings(opts...)
350350
err := do(ctx, pool, func(ctx context.Context, s *Session) (err error) {
351-
streamResult, err := s.execute(ctx, q, settings, withStreamResultTrace(s.trace))
351+
streamResult, err := s.execute(ctx, q, settings,
352+
withStreamResultTrace(s.trace), withIssuesHandler(settings.IssuesOpts()))
352353
if err != nil {
353354
return xerrors.WithStackTrace(err)
354355
}
@@ -397,7 +398,8 @@ func clientQuery(ctx context.Context, pool sessionPool, q string, opts ...option
397398
) {
398399
settings := options.ExecuteSettings(opts...)
399400
err = do(ctx, pool, func(ctx context.Context, s *Session) (err error) {
400-
streamResult, err := s.execute(ctx, q, options.ExecuteSettings(opts...), withStreamResultTrace(s.trace))
401+
streamResult, err := s.execute(ctx, q, settings,
402+
withStreamResultTrace(s.trace), withIssuesHandler(settings.IssuesOpts()))
401403
if err != nil {
402404
return xerrors.WithStackTrace(err)
403405
}

internal/query/options/execute.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package options
22

33
import (
4+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Issue"
45
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query"
56
"google.golang.org/grpc"
67

@@ -42,6 +43,7 @@ type (
4243
callOptions []grpc.CallOption
4344
txControl *tx.Control
4445
retryOptions []retry.Option
46+
issueCallback func(issues []*Ydb_Issue.IssueMessage)
4547
responsePartLimitBytes int64
4648
label string
4749
}
@@ -70,6 +72,9 @@ type (
7072
}
7173
execModeOption = ExecMode
7274
responsePartLimitBytes int64
75+
issuesOption struct {
76+
callback func([]*Ydb_Issue.IssueMessage)
77+
}
7378
)
7479

7580
func (poolID resourcePool) applyExecuteOption(s *executeSettings) {
@@ -80,6 +85,10 @@ func (s *executeSettings) RetryOpts() []retry.Option {
8085
return s.retryOptions
8186
}
8287

88+
func (s *executeSettings) IssuesOpts() func([]*Ydb_Issue.IssueMessage) {
89+
return s.issueCallback
90+
}
91+
8392
func (s *executeSettings) StatsCallback() func(stats.QueryStats) {
8493
return s.statsCallback
8594
}
@@ -119,6 +128,10 @@ func (mode ExecMode) applyExecuteOption(s *executeSettings) {
119128
s.execMode = mode
120129
}
121130

131+
func (opts issuesOption) applyExecuteOption(s *executeSettings) {
132+
s.issueCallback = opts.callback
133+
}
134+
122135
const (
123136
ExecModeParse = ExecMode(Ydb_Query.ExecMode_EXEC_MODE_PARSE)
124137
ExecModeValidate = ExecMode(Ydb_Query.ExecMode_EXEC_MODE_VALIDATE)
@@ -244,6 +257,12 @@ func WithStatsMode(mode StatsMode, callback func(stats.QueryStats)) statsModeOpt
244257
}
245258
}
246259

260+
func WithIssuesHandler(callback func(issues []*Ydb_Issue.IssueMessage)) issuesOption {
261+
return issuesOption{
262+
callback: callback,
263+
}
264+
}
265+
247266
func WithCallOptions(opts ...grpc.CallOption) callOptionsOption {
248267
return opts
249268
}

internal/query/result.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"time"
99

1010
"github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1"
11+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Issue"
1112
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query"
1213

1314
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/result"
@@ -38,6 +39,7 @@ type (
3839
resultSetIndex int64
3940
trace *trace.Query
4041
statsCallback func(queryStats stats.QueryStats)
42+
issuesCallback func(issues []*Ydb_Issue.IssueMessage)
4143
onNextPartErr []func(err error)
4244
onTxMeta []func(txMeta *Ydb_Query.TransactionMeta)
4345
closeTimeout time.Duration
@@ -92,6 +94,12 @@ func withStreamResultTrace(t *trace.Query) resultOption {
9294
}
9395
}
9496

97+
func withIssuesHandler(callback func(issues []*Ydb_Issue.IssueMessage)) resultOption {
98+
return func(s *streamResult) {
99+
s.issuesCallback = callback
100+
}
101+
}
102+
95103
func withStreamResultStatsCallback(callback func(queryStats stats.QueryStats)) resultOption {
96104
return func(s *streamResult) {
97105
s.statsCallback = callback
@@ -193,6 +201,12 @@ func (r *streamResult) nextPart(ctx context.Context) (
193201
}()
194202

195203
part, err = nextPart(r.stream)
204+
if part != nil {
205+
issues := part.GetIssues()
206+
if r.issuesCallback != nil && len(issues) > 0 {
207+
r.issuesCallback(issues)
208+
}
209+
}
196210
if err != nil {
197211
for _, callback := range r.onNextPartErr {
198212
callback(err)

query/execute_options.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package query
22

33
import (
4+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Issue"
45
"google.golang.org/grpc"
56

67
"github.com/ydb-platform/ydb-go-sdk/v3/internal/params"
@@ -57,6 +58,12 @@ func WithStatsMode(mode options.StatsMode, callback func(Stats)) ExecuteOption {
5758
return options.WithStatsMode(mode, callback)
5859
}
5960

61+
// WithIssuesHandler is the option which helps collect issues generated during query execution
62+
// May be more than one call of callback during query execution
63+
func WithIssuesHandler(callback func(issues []*Ydb_Issue.IssueMessage)) ExecuteOption {
64+
return options.WithIssuesHandler(callback)
65+
}
66+
6067
// WithResponsePartLimitSizeBytes limit size of each part (data portion) in stream for query service resoponse
6168
// it isn't limit total size of answer
6269
func WithResponsePartLimitSizeBytes(size int64) ExecuteOption {

tests/integration/query_execute_test.go

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919

2020
"github.com/google/uuid"
2121
"github.com/stretchr/testify/require"
22+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Issue"
2223

2324
"github.com/ydb-platform/ydb-go-sdk/v3"
2425
"github.com/ydb-platform/ydb-go-sdk/v3/internal/decimal"
@@ -771,3 +772,230 @@ func TestIssue1785FillDecimalFields(t *testing.T) {
771772
require.EqualValues(t, expectedVal, rd.DecimalVal)
772773
})
773774
}
775+
776+
// https://github.com/ydb-platform/ydb-go-sdk/issues/1872
777+
func TestIssue1872QueryWarning(t *testing.T) {
778+
ctx, cancel := context.WithCancel(xtest.Context(t))
779+
defer cancel()
780+
db, err := ydb.Open(ctx,
781+
os.Getenv("YDB_CONNECTION_STRING"),
782+
ydb.WithAccessTokenCredentials(os.Getenv("YDB_ACCESS_TOKEN_CREDENTIALS")),
783+
ydb.WithTraceQuery(
784+
log.Query(
785+
log.Default(os.Stdout,
786+
log.WithLogQuery(),
787+
log.WithColoring(),
788+
log.WithMinLevel(log.INFO),
789+
),
790+
trace.QueryEvents,
791+
),
792+
),
793+
)
794+
require.NoError(t, err)
795+
_ = db.Query().Exec(ctx,
796+
`drop table TestIssue1872QueryWarning;`,
797+
)
798+
err = db.Query().Exec(ctx,
799+
`create table TestIssue1872QueryWarning
800+
(Id uint64, Amount decimal(22,9) , primary key(Id));`,
801+
query.WithParameters(
802+
ydb.ParamsBuilder().
803+
Param("$p1").Text("test1").
804+
Build(),
805+
),
806+
)
807+
require.NoError(t, err)
808+
t.Run("Query with declare", func(t *testing.T) {
809+
collector := make([]*Ydb_Issue.IssueMessage, 0)
810+
q := db.Query()
811+
_, err := q.Query(ctx, `
812+
DECLARE $x as String;
813+
SELECT 42;
814+
SELECT 43;
815+
`,
816+
query.WithSyntax(query.SyntaxYQL),
817+
query.WithIdempotent(),
818+
query.WithIssuesHandler(func(issueList []*Ydb_Issue.IssueMessage) {
819+
collector = append(collector, issueList...)
820+
}),
821+
)
822+
require.NoError(t, err)
823+
require.Equal(t, 1, len(collector))
824+
require.Equal(t, "Symbol $x is not used", collector[0].Message)
825+
})
826+
t.Run("Exec with declare", func(t *testing.T) {
827+
collector := make([]*Ydb_Issue.IssueMessage, 0)
828+
q := db.Query()
829+
_, err := q.Query(ctx, `
830+
DECLARE $x as String;
831+
SELECT 42;
832+
SELECT 43;
833+
`,
834+
query.WithSyntax(query.SyntaxYQL),
835+
query.WithIdempotent(),
836+
query.WithIssuesHandler(func(issueList []*Ydb_Issue.IssueMessage) {
837+
collector = append(collector, issueList...)
838+
}),
839+
)
840+
require.NoError(t, err)
841+
require.Equal(t, 1, len(collector))
842+
require.Equal(t, "Symbol $x is not used", collector[0].Message)
843+
})
844+
issueCount := -1
845+
t.Run("Query no issues", func(t *testing.T) {
846+
q := db.Query()
847+
_, err := q.Query(ctx, `
848+
SELECT 42;
849+
SELECT 43;
850+
`,
851+
query.WithSyntax(query.SyntaxYQL),
852+
query.WithIdempotent(),
853+
query.WithIssuesHandler(func(issueList []*Ydb_Issue.IssueMessage) {
854+
issueCount = len(issueList)
855+
}),
856+
)
857+
require.NoError(t, err)
858+
})
859+
require.Equal(t, -1, issueCount)
860+
issueCount = -1
861+
t.Run("Exec no issues", func(t *testing.T) {
862+
q := db.Query()
863+
_, err := q.Query(ctx, `
864+
SELECT 42;
865+
SELECT 43;
866+
`,
867+
query.WithSyntax(query.SyntaxYQL),
868+
query.WithIdempotent(),
869+
query.WithIssuesHandler(func(issueList []*Ydb_Issue.IssueMessage) {
870+
issueCount = len(issueList)
871+
}),
872+
)
873+
require.NoError(t, err)
874+
})
875+
require.Equal(t, -1, issueCount)
876+
t.Run("Exec insert", func(t *testing.T) {
877+
var issueList []*Ydb_Issue.IssueMessage
878+
q := db.Query()
879+
err := q.Exec(ctx, `
880+
insert into TestIssue1872QueryWarning (Id, Amount)
881+
values (-7, Decimal("37.01",22,9));
882+
`,
883+
query.WithIssuesHandler(func(issues []*Ydb_Issue.IssueMessage) {
884+
issueList = issues
885+
}),
886+
)
887+
require.NoError(t, err)
888+
require.Equal(t, 1, len(issueList))
889+
})
890+
891+
t.Run("Exec complex", func(t *testing.T) {
892+
collector := make([]*Ydb_Issue.IssueMessage, 0)
893+
err = db.Query().Exec(ctx,
894+
`DECLARE $x as String;
895+
DECLARE $x1 as String;
896+
SELECT 42;
897+
insert into TestIssue1872QueryWarning (Id, Amount) values (-3, Decimal("3.01",22,9));
898+
SELECT 43;`,
899+
query.WithParameters(
900+
ydb.ParamsBuilder().
901+
Param("$p1").Text("test1").
902+
Build(),
903+
),
904+
query.WithIssuesHandler(func(issueList []*Ydb_Issue.IssueMessage) {
905+
collector = append(collector, issueList...)
906+
}),
907+
)
908+
require.NoError(t, err)
909+
require.Equal(t, 3, len(collector))
910+
require.Equal(t, "Symbol $x is not used", collector[0].Message)
911+
require.Equal(t, "Symbol $x1 is not used", collector[1].Message)
912+
require.Equal(t, "Type annotation", collector[2].Message)
913+
require.Equal(t, 1, len(collector[2].Issues))
914+
require.Equal(t, "At function: KiWriteTable!", collector[2].Issues[0].Message)
915+
require.Equal(t,
916+
"Failed to convert type: Struct<'Amount':Decimal(22,9),'Id':Int32> to Struct<'Amount':Decimal(22,9)?,'Id':Uint64?>",
917+
collector[2].Issues[0].Issues[0].Message)
918+
})
919+
t.Run("Query complex", func(t *testing.T) {
920+
collector := make([]*Ydb_Issue.IssueMessage, 0)
921+
err = db.Query().Exec(ctx,
922+
` DECLARE $x as String;
923+
DECLARE $x1 as String;
924+
SELECT 42;
925+
insert into TestIssue1872QueryWarning (Id, Amount) values (-6, Decimal("3.01",22,9));
926+
SELECT 43;`,
927+
query.WithParameters(
928+
ydb.ParamsBuilder().
929+
Param("$p1").Text("test1").
930+
Build(),
931+
),
932+
query.WithIssuesHandler(func(issueList []*Ydb_Issue.IssueMessage) {
933+
collector = append(collector, issueList...)
934+
}),
935+
)
936+
require.NoError(t, err)
937+
require.Equal(t, 3, len(collector))
938+
require.Equal(t, "Symbol $x is not used", collector[0].Message)
939+
require.Equal(t, "Symbol $x1 is not used", collector[1].Message)
940+
require.Equal(t, "Type annotation", collector[2].Message)
941+
require.Equal(t, 1, len(collector[2].Issues))
942+
require.Equal(t, "At function: KiWriteTable!", collector[2].Issues[0].Message)
943+
require.Equal(t,
944+
"Failed to convert type: Struct<'Amount':Decimal(22,9),'Id':Int32> to Struct<'Amount':Decimal(22,9)?,'Id':Uint64?>",
945+
collector[2].Issues[0].Issues[0].Message)
946+
})
947+
t.Run("Query 2 inserts", func(t *testing.T) {
948+
var issueList []*Ydb_Issue.IssueMessage
949+
q := db.Query()
950+
_, err := q.Query(ctx, `
951+
insert into TestIssue1872QueryWarning (Id, Amount) values (-9, Decimal("3.01",22,9));
952+
insert into TestIssue1872QueryWarning (Id, Amount) values (-5, Decimal("5.01",22,9));
953+
`,
954+
query.WithParameters(
955+
ydb.ParamsBuilder().
956+
Param("$p1").Text("test").
957+
Build(),
958+
),
959+
query.WithSyntax(query.SyntaxYQL),
960+
query.WithIdempotent(),
961+
query.WithIssuesHandler(func(issues []*Ydb_Issue.IssueMessage) {
962+
issueList = issues
963+
}),
964+
)
965+
require.NoError(t, err)
966+
require.Equal(t, 1, len(issueList))
967+
require.Equal(t, "Type annotation", issueList[0].Message)
968+
require.Equal(t, 2, len(issueList[0].Issues))
969+
require.Equal(t, "At function: KiWriteTable!", issueList[0].Issues[0].Message)
970+
require.Equal(t,
971+
"Failed to convert type: Struct<'Amount':Decimal(22,9),'Id':Int32> to Struct<'Amount':Decimal(22,9)?,'Id':Uint64?>",
972+
issueList[0].Issues[0].Issues[0].Message)
973+
})
974+
t.Run("Exec 2 inserts", func(t *testing.T) {
975+
var issueList []*Ydb_Issue.IssueMessage
976+
q := db.Query()
977+
_, err := q.Query(ctx, `
978+
insert into TestIssue1872QueryWarning (Id, Amount) values (-19, Decimal("3.01",22,9));
979+
insert into TestIssue1872QueryWarning (Id, Amount) values (-15, Decimal("5.01",22,9));
980+
`,
981+
query.WithParameters(
982+
ydb.ParamsBuilder().
983+
Param("$p1").Text("test").
984+
Build(),
985+
),
986+
query.WithSyntax(query.SyntaxYQL),
987+
query.WithIdempotent(),
988+
query.WithIssuesHandler(func(issues []*Ydb_Issue.IssueMessage) {
989+
issueList = issues
990+
}),
991+
)
992+
require.NoError(t, err)
993+
require.Equal(t, 1, len(issueList))
994+
require.Equal(t, "Type annotation", issueList[0].Message)
995+
require.Equal(t, 2, len(issueList[0].Issues))
996+
require.Equal(t, "At function: KiWriteTable!", issueList[0].Issues[0].Message)
997+
require.Equal(t,
998+
"Failed to convert type: Struct<'Amount':Decimal(22,9),'Id':Int32> to Struct<'Amount':Decimal(22,9)?,'Id':Uint64?>",
999+
issueList[0].Issues[0].Issues[0].Message)
1000+
})
1001+
}

0 commit comments

Comments
 (0)