Skip to content

Commit 6aa48a4

Browse files
authored
Merge pull request #1254 from brojeg/master
Enable funlen linter (part 8)
2 parents e1c6de0 + 026ee72 commit 6aa48a4

File tree

3 files changed

+218
-186
lines changed

3 files changed

+218
-186
lines changed

internal/table/client.go

Lines changed: 68 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,11 @@ type createSessionOptions struct {
108108
onClose []func(s *session)
109109
}
110110

111+
type sessionResult struct {
112+
s *session
113+
err error
114+
}
115+
111116
type createSessionOption func(o *createSessionOptions)
112117

113118
func withCreateSessionOnCreate(onCreate func(s *session)) createSessionOption {
@@ -123,101 +128,106 @@ func withCreateSessionOnClose(onClose func(s *session)) createSessionOption {
123128
}
124129

125130
func (c *Client) createSession(ctx context.Context, opts ...createSessionOption) (s *session, err error) {
131+
options := gatherOptions(opts)
132+
133+
defer func() {
134+
if s != nil {
135+
applyOptions(s, options)
136+
}
137+
}()
138+
139+
resultCh := make(chan sessionResult)
140+
if err := c.initiateSessionCreation(ctx, resultCh); err != nil {
141+
return nil, xerrors.WithStackTrace(err)
142+
}
143+
144+
return c.waitForSessionCreation(ctx, resultCh)
145+
}
146+
147+
func gatherOptions(opts []createSessionOption) createSessionOptions {
126148
options := createSessionOptions{}
127149
for _, opt := range opts {
128150
if opt != nil {
129151
opt(&options)
130152
}
131153
}
132154

133-
defer func() {
134-
if s == nil {
135-
return
136-
}
137-
for _, onCreate := range options.onCreate {
138-
onCreate(s)
139-
}
140-
s.onClose = append(s.onClose, options.onClose...)
141-
}()
155+
return options
156+
}
142157

143-
type result struct {
144-
s *session
145-
err error
158+
func applyOptions(s *session, options createSessionOptions) {
159+
for _, onCreate := range options.onCreate {
160+
onCreate(s)
146161
}
162+
s.onClose = append(s.onClose, options.onClose...)
163+
}
147164

148-
ch := make(chan result)
149-
165+
func (c *Client) initiateSessionCreation(ctx context.Context, resultCh chan<- sessionResult) error {
150166
select {
151167
case <-c.done:
152-
return nil, xerrors.WithStackTrace(errClosedClient)
168+
return errClosedClient
153169

154170
case <-ctx.Done():
155-
return nil, xerrors.WithStackTrace(ctx.Err())
171+
return ctx.Err()
156172

157173
default:
158174
c.mu.WithLock(func() {
159175
if c.isClosed() {
160176
return
161177
}
162178
c.wg.Add(1)
163-
go func() {
164-
defer c.wg.Done()
165-
166-
var (
167-
s *session
168-
err error
169-
)
170-
171-
createSessionCtx := xcontext.ValueOnly(ctx)
172-
173-
if timeout := c.config.CreateSessionTimeout(); timeout > 0 {
174-
var cancel context.CancelFunc
175-
createSessionCtx, cancel = xcontext.WithTimeout(createSessionCtx, timeout)
176-
defer cancel()
177-
}
178-
179-
closeSession := func(s *session) {
180-
if s == nil {
181-
return
182-
}
179+
go c.createSessionWorker(ctx, resultCh)
180+
})
183181

184-
closeSessionCtx := xcontext.ValueOnly(ctx)
182+
return nil
183+
}
184+
}
185185

186-
if timeout := c.config.DeleteTimeout(); timeout > 0 {
187-
var cancel context.CancelFunc
188-
createSessionCtx, cancel = xcontext.WithTimeout(closeSessionCtx, timeout)
189-
defer cancel()
190-
}
186+
func (c *Client) createSessionWorker(ctx context.Context, resultCh chan<- sessionResult) {
187+
defer c.wg.Done()
191188

192-
_ = s.Close(closeSessionCtx)
193-
}
189+
createSessionCtx := xcontext.ValueOnly(ctx)
190+
if timeout := c.config.CreateSessionTimeout(); timeout > 0 {
191+
var cancel context.CancelFunc
192+
createSessionCtx, cancel = xcontext.WithTimeout(createSessionCtx, timeout)
193+
defer cancel()
194+
}
194195

195-
s, err = c.build(createSessionCtx)
196+
s, err := c.build(createSessionCtx)
196197

197-
select {
198-
case ch <- result{
199-
s: s,
200-
err: err,
201-
}: // nop
198+
select {
199+
case resultCh <- sessionResult{s: s, err: err}:
200+
case <-c.done:
201+
c.closeSession(ctx, s)
202+
case <-ctx.Done():
203+
c.closeSession(ctx, s)
204+
}
205+
}
202206

203-
case <-c.done:
204-
closeSession(s)
207+
func (c *Client) closeSession(ctx context.Context, s *session) {
208+
if s == nil {
209+
return
210+
}
205211

206-
case <-ctx.Done():
207-
closeSession(s)
208-
}
209-
}()
210-
})
212+
closeSessionCtx := xcontext.ValueOnly(ctx)
213+
if timeout := c.config.DeleteTimeout(); timeout > 0 {
214+
var cancel context.CancelFunc
215+
closeSessionCtx, cancel = xcontext.WithTimeout(closeSessionCtx, timeout)
216+
defer cancel()
211217
}
212218

219+
_ = s.Close(closeSessionCtx)
220+
}
221+
222+
func (c *Client) waitForSessionCreation(ctx context.Context, resultCh <-chan sessionResult) (*session, error) {
213223
select {
214224
case <-c.done:
215225
return nil, xerrors.WithStackTrace(errClosedClient)
216226

217227
case <-ctx.Done():
218228
return nil, xerrors.WithStackTrace(ctx.Err())
219229

220-
case r := <-ch:
230+
case r := <-resultCh:
221231
if r.err != nil {
222232
return nil, xerrors.WithStackTrace(r.err)
223233
}

internal/xsql/conn.go

Lines changed: 78 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -319,95 +319,100 @@ func (c *conn) queryContext(ctx context.Context, query string, args []driver.Nam
319319
}
320320

321321
var (
322-
m = queryModeFromContext(ctx, c.defaultQueryMode)
323-
onDone = trace.DatabaseSQLOnConnQuery(
322+
queryMode = queryModeFromContext(ctx, c.defaultQueryMode)
323+
onDone = trace.DatabaseSQLOnConnQuery(
324324
c.trace, &ctx,
325325
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*conn).queryContext"),
326-
query, m.String(), xcontext.IsIdempotent(ctx), c.sinceLastUsage(),
326+
query, queryMode.String(), xcontext.IsIdempotent(ctx), c.sinceLastUsage(),
327327
)
328328
)
329329
defer func() {
330330
onDone(finalErr)
331331
}()
332332

333-
switch m {
334-
case DataQueryMode:
335-
normalizedQuery, parameters, err := c.normalize(query, args...)
336-
if err != nil {
337-
return nil, xerrors.WithStackTrace(err)
338-
}
339-
_, res, err := c.session.Execute(ctx,
340-
txControl(ctx, c.defaultTxControl),
341-
normalizedQuery, &parameters, c.dataQueryOptions(ctx)...,
342-
)
343-
if err != nil {
344-
return nil, badconn.Map(xerrors.WithStackTrace(err))
345-
}
346-
if err = res.Err(); err != nil {
347-
return nil, badconn.Map(xerrors.WithStackTrace(err))
348-
}
333+
normalizedQuery, parameters, err := c.normalize(query, args...)
334+
if err != nil {
335+
return nil, xerrors.WithStackTrace(err)
336+
}
349337

350-
return &rows{
351-
conn: c,
352-
result: res,
353-
}, nil
338+
switch queryMode {
339+
case DataQueryMode:
340+
return c.execDataQuery(ctx, normalizedQuery, parameters)
354341
case ScanQueryMode:
355-
normalizedQuery, parameters, err := c.normalize(query, args...)
356-
if err != nil {
357-
return nil, xerrors.WithStackTrace(err)
358-
}
359-
res, err := c.session.StreamExecuteScanQuery(ctx,
360-
normalizedQuery, &parameters, c.scanQueryOptions(ctx)...,
361-
)
362-
if err != nil {
363-
return nil, badconn.Map(xerrors.WithStackTrace(err))
364-
}
365-
if err = res.Err(); err != nil {
366-
return nil, badconn.Map(xerrors.WithStackTrace(err))
367-
}
368-
369-
return &rows{
370-
conn: c,
371-
result: res,
372-
}, nil
342+
return c.execScanQuery(ctx, normalizedQuery, parameters)
373343
case ExplainQueryMode:
374-
normalizedQuery, _, err := c.normalize(query, args...)
375-
if err != nil {
376-
return nil, xerrors.WithStackTrace(err)
377-
}
378-
exp, err := c.session.Explain(ctx, normalizedQuery)
379-
if err != nil {
380-
return nil, badconn.Map(xerrors.WithStackTrace(err))
381-
}
382-
383-
return &single{
384-
values: []sql.NamedArg{
385-
sql.Named("AST", exp.AST),
386-
sql.Named("Plan", exp.Plan),
387-
},
388-
}, nil
344+
return c.explainQuery(ctx, normalizedQuery)
389345
case ScriptingQueryMode:
390-
normalizedQuery, parameters, err := c.normalize(query, args...)
391-
if err != nil {
392-
return nil, xerrors.WithStackTrace(err)
393-
}
394-
res, err := c.connector.parent.Scripting().StreamExecute(ctx, normalizedQuery, &parameters)
395-
if err != nil {
396-
return nil, badconn.Map(xerrors.WithStackTrace(err))
397-
}
398-
if err = res.Err(); err != nil {
399-
return nil, badconn.Map(xerrors.WithStackTrace(err))
400-
}
401-
402-
return &rows{
403-
conn: c,
404-
result: res,
405-
}, nil
346+
return c.execScriptingQuery(ctx, normalizedQuery, parameters)
406347
default:
407-
return nil, fmt.Errorf("unsupported query mode '%s' on conn query", m)
348+
return nil, fmt.Errorf("unsupported query mode '%s' on conn query", queryMode)
408349
}
409350
}
410351

352+
func (c *conn) execDataQuery(ctx context.Context, query string, params params.Parameters) (driver.Rows, error) {
353+
_, res, err := c.session.Execute(ctx,
354+
txControl(ctx, c.defaultTxControl),
355+
query, &params, c.dataQueryOptions(ctx)...,
356+
)
357+
if err != nil {
358+
return nil, badconn.Map(xerrors.WithStackTrace(err))
359+
}
360+
if err = res.Err(); err != nil {
361+
return nil, badconn.Map(xerrors.WithStackTrace(err))
362+
}
363+
364+
return &rows{
365+
conn: c,
366+
result: res,
367+
}, nil
368+
}
369+
370+
func (c *conn) execScanQuery(ctx context.Context, query string, params params.Parameters) (driver.Rows, error) {
371+
res, err := c.session.StreamExecuteScanQuery(ctx,
372+
query, &params, c.scanQueryOptions(ctx)...,
373+
)
374+
if err != nil {
375+
return nil, badconn.Map(xerrors.WithStackTrace(err))
376+
}
377+
if err = res.Err(); err != nil {
378+
return nil, badconn.Map(xerrors.WithStackTrace(err))
379+
}
380+
381+
return &rows{
382+
conn: c,
383+
result: res,
384+
}, nil
385+
}
386+
387+
func (c *conn) explainQuery(ctx context.Context, query string) (driver.Rows, error) {
388+
exp, err := c.session.Explain(ctx, query)
389+
if err != nil {
390+
return nil, badconn.Map(xerrors.WithStackTrace(err))
391+
}
392+
393+
return &single{
394+
values: []sql.NamedArg{
395+
sql.Named("AST", exp.AST),
396+
sql.Named("Plan", exp.Plan),
397+
},
398+
}, nil
399+
}
400+
401+
func (c *conn) execScriptingQuery(ctx context.Context, query string, params params.Parameters) (driver.Rows, error) {
402+
res, err := c.connector.parent.Scripting().StreamExecute(ctx, query, &params)
403+
if err != nil {
404+
return nil, badconn.Map(xerrors.WithStackTrace(err))
405+
}
406+
if err = res.Err(); err != nil {
407+
return nil, badconn.Map(xerrors.WithStackTrace(err))
408+
}
409+
410+
return &rows{
411+
conn: c,
412+
result: res,
413+
}, nil
414+
}
415+
411416
func (c *conn) Ping(ctx context.Context) (finalErr error) {
412417
onDone := trace.DatabaseSQLOnConnPing(c.trace, &ctx,
413418
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*conn).Ping"),

0 commit comments

Comments
 (0)