Skip to content

Commit 8749d66

Browse files
author
Gleb Brozhe
committed
funlen rework
1 parent 25f4889 commit 8749d66

File tree

4 files changed

+113
-81
lines changed

4 files changed

+113
-81
lines changed

internal/conn/conn.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@ func (c *conn) Close(ctx context.Context) (err error) {
299299
return c.wrapError(err)
300300
}
301301

302+
//nolint:funlen
302303
func (c *conn) Invoke(
303304
ctx context.Context,
304305
method string,

internal/coordination/session.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ func (s *session) updateCancelStream(cancel context.CancelFunc) {
109109
}
110110

111111
// Create a new gRPC stream using an independent context.
112+
//
113+
//nolint:funlen
112114
func (s *session) newStream(
113115
streamCtx context.Context,
114116
cancelStream context.CancelFunc,
@@ -197,6 +199,7 @@ func (s *session) newStream(
197199
}
198200
}
199201

202+
//nolint:funlen
200203
func (s *session) mainLoop(path string, sessionStartedChan chan struct{}) {
201204
defer s.client.sessionClosed(s)
202205
defer close(s.sessionClosedChan)
@@ -363,6 +366,7 @@ func (s *session) mainLoop(path string, sessionStartedChan chan struct{}) {
363366
}
364367
}
365368

369+
//nolint:funlen
366370
func (s *session) receiveLoop(
367371
wg *sync.WaitGroup,
368372
sessionClient Ydb_Coordination_V1.CoordinationService_SessionClient,
@@ -441,7 +445,7 @@ func (s *session) receiveLoop(
441445
}
442446
}
443447

444-
//nolint:revive
448+
//nolint:revive,funlen
445449
func (s *session) sendLoop(
446450
wg *sync.WaitGroup,
447451
sessionClient Ydb_Coordination_V1.CoordinationService_SessionClient,
@@ -765,6 +769,7 @@ func convertSemaphoreSession(
765769
return &result
766770
}
767771

772+
//nolint:funlen
768773
func (s *session) AcquireSemaphore(
769774
ctx context.Context,
770775
name string,

internal/topic/topicwriterinternal/writer_reconnector.go

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,6 @@ func (w *WriterReconnector) close(ctx context.Context, reason error) (resErr err
374374
}
375375

376376
func (w *WriterReconnector) connectionLoop(ctx context.Context) {
377-
doneCtx := ctx.Done()
378377
attempt := 0
379378

380379
createStreamContext := func() (context.Context, context.CancelFunc) {
@@ -409,22 +408,7 @@ func (w *WriterReconnector) connectionLoop(ctx context.Context) {
409408
prevAttemptTime = now
410409

411410
if reconnectReason != nil {
412-
retryDuration := w.cfg.clock.Since(startOfRetries)
413-
if backoff, retry := topic.CheckRetryMode(reconnectReason, w.retrySettings, retryDuration); retry {
414-
delay := backoff.Delay(attempt)
415-
delayTimer := w.cfg.clock.NewTimer(delay)
416-
select {
417-
case <-doneCtx:
418-
delayTimer.Stop()
419-
420-
return
421-
case <-delayTimer.Chan():
422-
delayTimer.Stop() // no really need, stop for common style only
423-
// pass
424-
}
425-
} else {
426-
_ = w.close(ctx, fmt.Errorf("%w, was retried (%v)", reconnectReason, retryDuration))
427-
411+
if w.handleReconnectRetry(ctx, reconnectReason, attempt, startOfRetries) {
428412
return
429413
}
430414
}
@@ -440,6 +424,34 @@ func (w *WriterReconnector) connectionLoop(ctx context.Context) {
440424
}
441425
}
442426

427+
func (w *WriterReconnector) handleReconnectRetry(
428+
ctx context.Context,
429+
reconnectReason error,
430+
attempt int,
431+
startOfRetries time.Time,
432+
) bool {
433+
retryDuration := w.cfg.clock.Since(startOfRetries)
434+
if backoff, retry := topic.CheckRetryMode(reconnectReason, w.retrySettings, retryDuration); retry {
435+
delay := backoff.Delay(attempt)
436+
delayTimer := w.cfg.clock.NewTimer(delay)
437+
select {
438+
case <-ctx.Done():
439+
delayTimer.Stop()
440+
441+
return true
442+
case <-delayTimer.Chan():
443+
delayTimer.Stop() // no really need, stop for common style only
444+
// pass
445+
}
446+
} else {
447+
_ = w.close(ctx, fmt.Errorf("%w, was retried (%v)", reconnectReason, retryDuration))
448+
449+
return true
450+
}
451+
452+
return false
453+
}
454+
443455
func (w *WriterReconnector) startWriteStream(ctx, streamCtx context.Context, attempt int) (
444456
writer *SingleStreamWriter,
445457
err error,

internal/xsql/conn.go

Lines changed: 77 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"github.com/ydb-platform/ydb-go-sdk/v3/scheme"
2222
"github.com/ydb-platform/ydb-go-sdk/v3/table"
2323
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
24-
"github.com/ydb-platform/ydb-go-sdk/v3/table/result"
2524
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
2625
)
2726

@@ -179,9 +178,11 @@ func (c *conn) sinceLastUsage() time.Duration {
179178
return time.Since(time.Unix(c.lastUsage.Load(), 0))
180179
}
181180

182-
func (c *conn) execContext(ctx context.Context, query string, args []driver.NamedValue) (
183-
_ driver.Result, finalErr error,
184-
) {
181+
func (c *conn) execContext(
182+
ctx context.Context,
183+
query string,
184+
args []driver.NamedValue,
185+
) (_ driver.Result, finalErr error) {
185186
defer func() {
186187
c.lastUsage.Store(time.Now().Unix())
187188
}()
@@ -194,79 +195,92 @@ func (c *conn) execContext(ctx context.Context, query string, args []driver.Name
194195
return c.currentTx.ExecContext(ctx, query, args)
195196
}
196197

197-
var (
198-
m = queryModeFromContext(ctx, c.defaultQueryMode)
199-
onDone = trace.DatabaseSQLOnConnExec(
200-
c.trace, &ctx,
201-
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*conn).execContext"),
202-
query, m.String(), xcontext.IsIdempotent(ctx), c.sinceLastUsage(),
203-
)
198+
m := queryModeFromContext(ctx, c.defaultQueryMode)
199+
onDone := trace.DatabaseSQLOnConnExec(
200+
c.trace, &ctx,
201+
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/xsql.(*conn).execContext"),
202+
query, m.String(), xcontext.IsIdempotent(ctx), c.sinceLastUsage(),
204203
)
205204
defer func() {
206205
onDone(finalErr)
207206
}()
208207

209208
switch m {
210209
case DataQueryMode:
211-
normalizedQuery, parameters, err := c.normalize(query, args...)
212-
if err != nil {
213-
return nil, xerrors.WithStackTrace(err)
214-
}
215-
_, res, err := c.session.Execute(ctx,
216-
txControl(ctx, c.defaultTxControl),
217-
normalizedQuery, &parameters, c.dataQueryOptions(ctx)...,
218-
)
219-
if err != nil {
220-
return nil, badconn.Map(xerrors.WithStackTrace(err))
221-
}
222-
defer func() {
223-
_ = res.Close()
224-
}()
225-
if err = res.NextResultSetErr(ctx); !xerrors.Is(err, nil, io.EOF) {
226-
return nil, badconn.Map(xerrors.WithStackTrace(err))
227-
}
228-
if err = res.Err(); err != nil {
229-
return nil, badconn.Map(xerrors.WithStackTrace(err))
230-
}
231-
232-
return resultNoRows{}, nil
210+
return c.executeDataQuery(ctx, query, args)
233211
case SchemeQueryMode:
234-
normalizedQuery, _, err := c.normalize(query)
235-
if err != nil {
236-
return nil, xerrors.WithStackTrace(err)
237-
}
238-
err = c.session.ExecuteSchemeQuery(ctx, normalizedQuery)
239-
if err != nil {
240-
return nil, badconn.Map(xerrors.WithStackTrace(err))
241-
}
242-
243-
return resultNoRows{}, nil
212+
return c.executeSchemeQuery(ctx, query)
244213
case ScriptingQueryMode:
245-
var res result.StreamResult
246-
normalizedQuery, parameters, err := c.normalize(query, args...)
247-
if err != nil {
248-
return nil, xerrors.WithStackTrace(err)
249-
}
250-
res, err = c.connector.parent.Scripting().StreamExecute(ctx, normalizedQuery, &parameters)
251-
if err != nil {
252-
return nil, badconn.Map(xerrors.WithStackTrace(err))
253-
}
254-
defer func() {
255-
_ = res.Close()
256-
}()
257-
if err = res.NextResultSetErr(ctx); !xerrors.Is(err, nil, io.EOF) {
258-
return nil, badconn.Map(xerrors.WithStackTrace(err))
259-
}
260-
if err = res.Err(); err != nil {
261-
return nil, badconn.Map(xerrors.WithStackTrace(err))
262-
}
263-
264-
return resultNoRows{}, nil
214+
return c.executeScriptingQuery(ctx, query, args)
265215
default:
266216
return nil, fmt.Errorf("unsupported query mode '%s' for execute query", m)
267217
}
268218
}
269219

220+
func (c *conn) executeDataQuery(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
221+
normalizedQuery, parameters, err := c.normalize(query, args...)
222+
if err != nil {
223+
return nil, xerrors.WithStackTrace(err)
224+
}
225+
226+
_, res, err := c.session.Execute(ctx,
227+
txControl(ctx, c.defaultTxControl),
228+
normalizedQuery, &parameters, c.dataQueryOptions(ctx)...,
229+
)
230+
if err != nil {
231+
return nil, badconn.Map(xerrors.WithStackTrace(err))
232+
}
233+
defer res.Close()
234+
235+
if err := res.NextResultSetErr(ctx); err != nil && !xerrors.Is(err, nil, io.EOF) {
236+
return nil, badconn.Map(xerrors.WithStackTrace(err))
237+
}
238+
if err := res.Err(); err != nil {
239+
return nil, badconn.Map(xerrors.WithStackTrace(err))
240+
}
241+
242+
return resultNoRows{}, nil
243+
}
244+
245+
func (c *conn) executeSchemeQuery(ctx context.Context, query string) (driver.Result, error) {
246+
normalizedQuery, _, err := c.normalize(query)
247+
if err != nil {
248+
return nil, xerrors.WithStackTrace(err)
249+
}
250+
251+
if err := c.session.ExecuteSchemeQuery(ctx, normalizedQuery); err != nil {
252+
return nil, badconn.Map(xerrors.WithStackTrace(err))
253+
}
254+
255+
return resultNoRows{}, nil
256+
}
257+
258+
func (c *conn) executeScriptingQuery(
259+
ctx context.Context,
260+
query string,
261+
args []driver.NamedValue,
262+
) (driver.Result, error) {
263+
normalizedQuery, parameters, err := c.normalize(query, args...)
264+
if err != nil {
265+
return nil, xerrors.WithStackTrace(err)
266+
}
267+
268+
res, err := c.connector.parent.Scripting().StreamExecute(ctx, normalizedQuery, &parameters)
269+
if err != nil {
270+
return nil, badconn.Map(xerrors.WithStackTrace(err))
271+
}
272+
defer res.Close()
273+
274+
if err := res.NextResultSetErr(ctx); err != nil && !xerrors.Is(err, nil, io.EOF) {
275+
return nil, badconn.Map(xerrors.WithStackTrace(err))
276+
}
277+
if err := res.Err(); err != nil {
278+
return nil, badconn.Map(xerrors.WithStackTrace(err))
279+
}
280+
281+
return resultNoRows{}, nil
282+
}
283+
270284
func (c *conn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (_ driver.Result, _ error) {
271285
if !c.isReady() {
272286
return nil, badconn.Map(xerrors.WithStackTrace(errNotReadyConn))

0 commit comments

Comments
 (0)