Skip to content

Commit 25f4889

Browse files
authored
Merge pull request #1251 from ydb-platform/query-execute
* Added experimental method `query/Client.Execute` for execute query …
2 parents f8d8299 + 32e7e09 commit 25f4889

File tree

11 files changed

+751
-240
lines changed

11 files changed

+751
-240
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Added experimental method `query/Client.Execute` for execute query and read materialized result
2+
13
## v3.69.0
24
* Added experimental method for execute query and read only one row from result:
35
* `query/Client.ReadRow`

internal/query/client.go

Lines changed: 87 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,8 @@ func do(
5959
ctx context.Context,
6060
pool *pool.Pool[*Session, Session],
6161
op query.Operation,
62-
t *trace.Query,
63-
opts ...options.DoOption,
64-
) (attempts int, finalErr error) {
65-
doOpts := options.ParseDoOpts(t, opts...)
66-
62+
opts ...retry.Option,
63+
) (finalErr error) {
6764
err := pool.With(ctx, func(ctx context.Context, s *Session) error {
6865
s.setStatus(statusInUse)
6966

@@ -79,13 +76,32 @@ func do(
7976
s.setStatus(statusIdle)
8077

8178
return nil
82-
}, append(doOpts.RetryOpts(), retry.WithTrace(&trace.Retry{
83-
OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopDoneInfo) {
84-
return func(info trace.RetryLoopDoneInfo) {
85-
attempts = info.Attempts
86-
}
87-
},
88-
}))...)
79+
}, opts...)
80+
if err != nil {
81+
return xerrors.WithStackTrace(err)
82+
}
83+
84+
return nil
85+
}
86+
87+
func doWithAttempts(
88+
ctx context.Context,
89+
pool *pool.Pool[*Session, Session],
90+
op query.Operation,
91+
t *trace.Query,
92+
opts ...options.DoOption,
93+
) (attempts int, finalErr error) {
94+
doOpts := options.ParseDoOpts(t, opts...)
95+
96+
err := do(ctx, pool, op,
97+
append(doOpts.RetryOpts(), retry.WithTrace(&trace.Retry{
98+
OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopDoneInfo) {
99+
return func(info trace.RetryLoopDoneInfo) {
100+
attempts = info.Attempts
101+
}
102+
},
103+
}))...,
104+
)
89105
if err != nil {
90106
return attempts, xerrors.WithStackTrace(err)
91107
}
@@ -100,13 +116,13 @@ func (c *Client) Do(ctx context.Context, op query.Operation, opts ...options.DoO
100116
onDone := trace.QueryOnDo(c.config.Trace(), &ctx,
101117
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.(*Client).Do"),
102118
)
103-
attempts, err := do(ctx, c.pool, op, c.config.Trace(), opts...)
119+
attempts, err := doWithAttempts(ctx, c.pool, op, c.config.Trace(), opts...)
104120
onDone(attempts, err)
105121

106122
return err
107123
}
108124

109-
func doTx(
125+
func doTxWithAttempts(
110126
ctx context.Context,
111127
pool *pool.Pool[*Session, Session],
112128
op query.TxOperation,
@@ -115,7 +131,7 @@ func doTx(
115131
) (attempts int, err error) {
116132
doTxOpts := options.ParseDoTxOpts(t, opts...)
117133

118-
attempts, err = do(ctx, pool, func(ctx context.Context, s query.Session) (err error) {
134+
attempts, err = doWithAttempts(ctx, pool, func(ctx context.Context, s query.Session) (err error) {
119135
tx, err := s.Begin(ctx, doTxOpts.TxSettings())
120136
if err != nil {
121137
return xerrors.WithStackTrace(err)
@@ -161,21 +177,72 @@ func (c *Client) ReadRow(ctx context.Context, q string, opts ...options.ExecuteO
161177
onDone(err)
162178
}()
163179

164-
_, err = do(ctx, c.pool, func(ctx context.Context, s query.Session) (err error) {
180+
err = do(ctx, c.pool, func(ctx context.Context, s query.Session) (err error) {
165181
row, err = s.ReadRow(ctx, q, opts...)
166182
if err != nil {
167183
return xerrors.WithStackTrace(err)
168184
}
169185

170186
return nil
171-
}, c.config.Trace())
187+
})
172188
if err != nil {
173189
return nil, xerrors.WithStackTrace(err)
174190
}
175191

176192
return row, nil
177193
}
178194

195+
func clientExecute(ctx context.Context,
196+
pool *pool.Pool[*Session, Session],
197+
q string, opts ...options.ExecuteOption,
198+
) (r query.Result, err error) {
199+
err = do(ctx, pool, func(ctx context.Context, s query.Session) (err error) {
200+
_, r, err = s.Execute(ctx, q, opts...)
201+
if err != nil {
202+
return xerrors.WithStackTrace(err)
203+
}
204+
defer func() {
205+
_ = r.Close(ctx)
206+
}()
207+
208+
r, err = resultToMaterializedResult(ctx, r)
209+
if err != nil {
210+
return xerrors.WithStackTrace(err)
211+
}
212+
213+
if err = r.Err(); err != nil {
214+
return xerrors.WithStackTrace(err)
215+
}
216+
217+
return nil
218+
})
219+
if err != nil {
220+
return nil, xerrors.WithStackTrace(err)
221+
}
222+
223+
return r, nil
224+
}
225+
226+
func (c *Client) Execute(ctx context.Context, q string, opts ...options.ExecuteOption) (_ query.Result, err error) {
227+
ctx, cancel := xcontext.WithDone(ctx, c.done)
228+
defer cancel()
229+
230+
onDone := trace.QueryOnExecute(c.config.Trace(), &ctx,
231+
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.(*Client).Execute"),
232+
q,
233+
)
234+
defer func() {
235+
onDone(err)
236+
}()
237+
238+
r, err := clientExecute(ctx, c.pool, q, opts...)
239+
if err != nil {
240+
return nil, xerrors.WithStackTrace(err)
241+
}
242+
243+
return r, nil
244+
}
245+
179246
// ReadResultSet is a helper which read all rows from first result set in result
180247
func (c *Client) ReadResultSet(
181248
ctx context.Context, q string, opts ...options.ExecuteOption,
@@ -191,14 +258,14 @@ func (c *Client) ReadResultSet(
191258
onDone(err)
192259
}()
193260

194-
_, err = do(ctx, c.pool, func(ctx context.Context, s query.Session) (err error) {
261+
err = do(ctx, c.pool, func(ctx context.Context, s query.Session) (err error) {
195262
rs, err = s.ReadResultSet(ctx, q, opts...)
196263
if err != nil {
197264
return xerrors.WithStackTrace(err)
198265
}
199266

200267
return nil
201-
}, c.config.Trace())
268+
})
202269
if err != nil {
203270
return nil, xerrors.WithStackTrace(err)
204271
}
@@ -213,7 +280,7 @@ func (c *Client) DoTx(ctx context.Context, op query.TxOperation, opts ...options
213280
onDone := trace.QueryOnDoTx(c.config.Trace(), &ctx,
214281
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.(*Client).DoTx"),
215282
)
216-
attempts, err := doTx(ctx, c.pool, op, c.config.Trace(), opts...)
283+
attempts, err := doTxWithAttempts(ctx, c.pool, op, c.config.Trace(), opts...)
217284
onDone(attempts, err)
218285

219286
return err

0 commit comments

Comments
 (0)