Skip to content

Commit a7f790a

Browse files
authored
Merge pull request #1412 from ydb-platform/query-script
* Added experimental support for executing scripts over query service…
2 parents c840e4a + 4279f1d commit a7f790a

26 files changed

+830
-697
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Added experimental support for executing scripts over query service client (`query.Client.ExecuteScript` and `query.CLient.FetchScriptResults`)
12
* Removed tx result from `query.Session.Execute` (tx can be obtained from `query.Session.Begin`)
23
* Changed behaviour of `query.Session.Begin` to `noop` for lazy initialization with first call `query.TxActor.Execute`
34
* Splitted experimental method `query.Client.Execute` to methods `query.Client.Exec` without result and `query.Client.Query` with result

internal/coordination/conversation/conversation.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -315,13 +315,11 @@ func (c *Controller) OnSend(ctx context.Context) (*Ydb_Coordination.SessionReque
315315
// OnRecv consumes a new conversation response and process with the corresponding conversation if any exists for it. The
316316
// returned value indicates if any conversation considers the incoming message part of it or the controller is closed.
317317
// You should call this method in the goroutine that handles gRPC stream Recv method.
318-
//
319-
//nolint:ifshort // false-positive this var is used outside if statement in switch-case up
320318
func (c *Controller) OnRecv(resp *Ydb_Coordination.SessionResponse) bool {
321319
c.mutex.Lock()
322320
defer c.mutex.Unlock()
323321

324-
notify := false
322+
notify := false //nolint:ifshort
325323
handled := false
326324
for i := len(c.queue) - 1; i >= 0; i-- {
327325
req := c.queue[i]

internal/discovery/discovery_test.go

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,6 @@ import (
1919
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest"
2020
)
2121

22-
func must[T any](t T, err error) T {
23-
if err != nil {
24-
panic(err)
25-
}
26-
27-
return t
28-
}
29-
3022
func TestDiscover(t *testing.T) {
3123
t.Run("HappyWay", func(t *testing.T) {
3224
ctx := xtest.Context(t)
@@ -39,7 +31,7 @@ func TestDiscover(t *testing.T) {
3931
Operation: &Ydb_Operations.Operation{
4032
Ready: true,
4133
Status: Ydb.StatusIds_SUCCESS,
42-
Result: must(anypb.New(&Ydb_Discovery.ListEndpointsResult{
34+
Result: xtest.Must(anypb.New(&Ydb_Discovery.ListEndpointsResult{
4335
Endpoints: []*Ydb_Discovery.EndpointInfo{
4436
{
4537
Address: "node1",
@@ -133,7 +125,7 @@ func TestDiscover(t *testing.T) {
133125
Operation: &Ydb_Operations.Operation{
134126
Ready: true,
135127
Status: Ydb.StatusIds_SUCCESS,
136-
Result: must(anypb.New(&Ydb_Discovery.ListEndpointsResult{
128+
Result: xtest.Must(anypb.New(&Ydb_Discovery.ListEndpointsResult{
137129
Endpoints: []*Ydb_Discovery.EndpointInfo{
138130
{
139131
Address: "node1",

internal/query/client.go

Lines changed: 276 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,25 @@ package query
33
import (
44
"context"
55
"sync/atomic"
6+
"time"
67

8+
"github.com/ydb-platform/ydb-go-genproto/Ydb_Operation_V1"
79
"github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1"
10+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
11+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations"
12+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query"
813
"google.golang.org/grpc"
914

15+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator"
1016
"github.com/ydb-platform/ydb-go-sdk/v3/internal/closer"
17+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
18+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/operation"
1119
"github.com/ydb-platform/ydb-go-sdk/v3/internal/pool"
1220
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config"
1321
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options"
1422
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
23+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stats"
24+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/types"
1525
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
1626
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1727
"github.com/ydb-platform/ydb-go-sdk/v3/query"
@@ -39,14 +49,273 @@ type (
3949
InUse atomic.Int32
4050
}
4151
Client struct {
42-
config *config.Config
43-
queryServiceClient Ydb_Query_V1.QueryServiceClient
44-
pool sessionPool
52+
config *config.Config
53+
queryServiceClient Ydb_Query_V1.QueryServiceClient
54+
operationServiceClient Ydb_Operation_V1.OperationServiceClient
55+
pool sessionPool
4556

4657
done chan struct{}
4758
}
4859
)
4960

61+
func fetchScriptResultsWithFallback(ctx context.Context,
62+
queryServiceClient Ydb_Query_V1.QueryServiceClient,
63+
operationServiceClient Ydb_Operation_V1.OperationServiceClient,
64+
opID string, opts ...options.FetchScriptOption,
65+
) (*options.FetchScriptResult, error) {
66+
r, err := fetchScriptResults(ctx, queryServiceClient, opID, opts...)
67+
if err != nil {
68+
if xerrors.IsOperationError(err, Ydb.StatusIds_BAD_REQUEST) {
69+
r, err = scriptOperationStatus(ctx, operationServiceClient, opID)
70+
if err != nil {
71+
return nil, xerrors.WithStackTrace(err)
72+
}
73+
74+
return r, nil
75+
}
76+
77+
return nil, xerrors.WithStackTrace(err)
78+
}
79+
80+
return r, nil
81+
}
82+
83+
func fetchScriptResults(ctx context.Context,
84+
client Ydb_Query_V1.QueryServiceClient,
85+
opID string, opts ...options.FetchScriptOption,
86+
) (*options.FetchScriptResult, error) {
87+
r, err := retry.RetryWithResult(ctx, func(ctx context.Context) (*options.FetchScriptResult, error) {
88+
request := &options.FetchScriptResultsRequest{
89+
FetchScriptResultsRequest: Ydb_Query.FetchScriptResultsRequest{
90+
OperationId: opID,
91+
},
92+
}
93+
for _, opt := range opts {
94+
if opt != nil {
95+
opt(request)
96+
}
97+
}
98+
99+
response, err := client.FetchScriptResults(ctx, &request.FetchScriptResultsRequest)
100+
if err != nil {
101+
return nil, xerrors.WithStackTrace(err)
102+
}
103+
104+
rs := response.GetResultSet()
105+
columns := rs.GetColumns()
106+
columnNames := make([]string, len(columns))
107+
columnTypes := make([]types.Type, len(columns))
108+
for i := range columns {
109+
columnNames[i] = columns[i].GetName()
110+
columnTypes[i] = types.TypeFromYDB(columns[i].GetType())
111+
}
112+
rows := make([]query.Row, len(rs.GetRows()))
113+
for i, r := range rs.GetRows() {
114+
rows[i] = NewRow(columns, r)
115+
}
116+
117+
return &options.FetchScriptResult{
118+
Ready: true,
119+
Status: response.GetStatus().String(),
120+
Data: &options.FetchScriptResultData{
121+
ResultSetIndex: response.GetResultSetIndex(),
122+
ResultSet: MaterializedResultSet(int(response.GetResultSetIndex()), columnNames, columnTypes, rows),
123+
NextToken: response.GetNextFetchToken(),
124+
},
125+
}, nil
126+
}, retry.WithIdempotent(true))
127+
if err != nil {
128+
return nil, xerrors.WithStackTrace(err)
129+
}
130+
131+
return r, nil
132+
}
133+
134+
func scriptOperationStatus(
135+
ctx context.Context, client Ydb_Operation_V1.OperationServiceClient, opID string,
136+
) (*options.FetchScriptResult, error) {
137+
status, err := retry.RetryWithResult(ctx, func(ctx context.Context) (*options.FetchScriptResult, error) {
138+
response, err := client.GetOperation(conn.WithoutWrapping(ctx), &Ydb_Operations.GetOperationRequest{
139+
Id: opID,
140+
})
141+
if err != nil {
142+
return nil, xerrors.WithStackTrace(err)
143+
}
144+
145+
var md Ydb_Query.ExecuteScriptMetadata
146+
err = response.GetOperation().GetMetadata().UnmarshalTo(&md)
147+
if err != nil {
148+
return nil, xerrors.WithStackTrace(err)
149+
}
150+
151+
return &options.FetchScriptResult{
152+
Ready: response.GetOperation().GetReady(),
153+
Status: response.GetOperation().GetStatus().String(),
154+
}, nil
155+
})
156+
if err != nil {
157+
return nil, xerrors.WithStackTrace(err)
158+
}
159+
160+
return status, nil
161+
}
162+
163+
func (c *Client) FetchScriptResults(ctx context.Context,
164+
opID string, opts ...options.FetchScriptOption,
165+
) (*options.FetchScriptResult, error) {
166+
r, err := retry.RetryWithResult(ctx, func(ctx context.Context) (*options.FetchScriptResult, error) {
167+
r, err := fetchScriptResultsWithFallback(ctx, c.queryServiceClient, c.operationServiceClient, opID,
168+
append(opts, func(request *options.FetchScriptResultsRequest) {
169+
request.Trace = c.config.Trace()
170+
})...,
171+
)
172+
if err != nil {
173+
return nil, xerrors.WithStackTrace(err)
174+
}
175+
176+
return r, nil
177+
}, retry.WithIdempotent(true))
178+
if err != nil {
179+
return nil, xerrors.WithStackTrace(err)
180+
}
181+
182+
return r, nil
183+
}
184+
185+
type executeScriptSettings struct {
186+
executeSettings
187+
ttl time.Duration
188+
operationParams *Ydb_Operations.OperationParams
189+
}
190+
191+
func (s *executeScriptSettings) OperationParams() *Ydb_Operations.OperationParams {
192+
return s.operationParams
193+
}
194+
195+
func (s *executeScriptSettings) ResultsTTL() time.Duration {
196+
return s.ttl
197+
}
198+
199+
func executeScript(ctx context.Context, //nolint:funlen
200+
client Ydb_Query_V1.QueryServiceClient, request *Ydb_Query.ExecuteScriptRequest, grpcOpts ...grpc.CallOption,
201+
) (*options.ExecuteScriptOperation, error) {
202+
op, err := retry.RetryWithResult(ctx, func(ctx context.Context) (*options.ExecuteScriptOperation, error) {
203+
response, err := client.ExecuteScript(ctx, request, grpcOpts...)
204+
if err != nil {
205+
return nil, xerrors.WithStackTrace(err)
206+
}
207+
208+
var md Ydb_Query.ExecuteScriptMetadata
209+
err = response.GetMetadata().UnmarshalTo(&md)
210+
if err != nil {
211+
return nil, xerrors.WithStackTrace(err)
212+
}
213+
214+
return &options.ExecuteScriptOperation{
215+
ID: response.GetId(),
216+
ConsumedUnits: response.GetCostInfo().GetConsumedUnits(),
217+
Metadata: struct {
218+
ID string
219+
Script struct {
220+
Syntax options.Syntax
221+
Query string
222+
}
223+
Mode options.ExecMode
224+
Stats stats.QueryStats
225+
ResultSetsMeta []struct {
226+
Columns []struct {
227+
Name string
228+
Type query.Type
229+
}
230+
}
231+
}{
232+
ID: md.GetExecutionId(),
233+
Script: struct {
234+
Syntax options.Syntax
235+
Query string
236+
}{
237+
Syntax: options.Syntax(md.GetScriptContent().GetSyntax()),
238+
Query: md.GetScriptContent().GetText(),
239+
},
240+
Mode: options.ExecMode(md.GetExecMode()),
241+
Stats: stats.FromQueryStats(md.GetExecStats()),
242+
ResultSetsMeta: func() (
243+
resultSetsMeta []struct {
244+
Columns []struct {
245+
Name string
246+
Type query.Type
247+
}
248+
},
249+
) {
250+
for _, rs := range md.GetResultSetsMeta() {
251+
resultSetsMeta = append(resultSetsMeta, struct {
252+
Columns []struct {
253+
Name string
254+
Type query.Type
255+
}
256+
}{
257+
Columns: func() (
258+
columns []struct {
259+
Name string
260+
Type types.Type
261+
},
262+
) {
263+
for _, c := range rs.GetColumns() {
264+
columns = append(columns, struct {
265+
Name string
266+
Type types.Type
267+
}{
268+
Name: c.GetName(),
269+
Type: types.TypeFromYDB(c.GetType()),
270+
})
271+
}
272+
273+
return columns
274+
}(),
275+
})
276+
}
277+
278+
return resultSetsMeta
279+
}(),
280+
},
281+
}, nil
282+
}, retry.WithIdempotent(true))
283+
if err != nil {
284+
return op, xerrors.WithStackTrace(err)
285+
}
286+
287+
return op, nil
288+
}
289+
290+
func (c *Client) ExecuteScript(
291+
ctx context.Context, q string, ttl time.Duration, opts ...options.Execute,
292+
) (
293+
op *options.ExecuteScriptOperation, err error,
294+
) {
295+
a := allocator.New()
296+
defer a.Free()
297+
298+
settings := &executeScriptSettings{
299+
executeSettings: options.ExecuteSettings(opts...),
300+
ttl: ttl,
301+
operationParams: operation.Params(
302+
ctx,
303+
c.config.OperationTimeout(),
304+
c.config.OperationCancelAfter(),
305+
operation.ModeSync,
306+
),
307+
}
308+
309+
request, grpcOpts := executeQueryScriptRequest(a, q, settings)
310+
311+
op, err = executeScript(ctx, c.queryServiceClient, request, grpcOpts...)
312+
if err != nil {
313+
return op, xerrors.WithStackTrace(err)
314+
}
315+
316+
return op, nil
317+
}
318+
50319
func (p *poolStub) Close(ctx context.Context) error {
51320
return nil
52321
}
@@ -419,9 +688,10 @@ func New(ctx context.Context, balancer grpc.ClientConnInterface, cfg *config.Con
419688
grpcClient := Ydb_Query_V1.NewQueryServiceClient(balancer)
420689

421690
client := &Client{
422-
config: cfg,
423-
queryServiceClient: grpcClient,
424-
done: make(chan struct{}),
691+
config: cfg,
692+
queryServiceClient: grpcClient,
693+
operationServiceClient: Ydb_Operation_V1.NewOperationServiceClient(balancer),
694+
done: make(chan struct{}),
425695
pool: newPool(ctx, cfg, func(ctx context.Context) (_ *Session, err error) {
426696
var (
427697
createCtx context.Context

0 commit comments

Comments
 (0)