Skip to content

Commit 304cde8

Browse files
authored
Merge pull request #1178 Fixed timers leaks
2 parents f41615b + 894d91b commit 304cde8

File tree

7 files changed

+42
-10
lines changed

7 files changed

+42
-10
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Fixed leak timers
12
* Changed default StartTime (time of retries for connect to server) for topic writer from 1 minute to infinite (can be overrided by WithWriterStartTimeout topic option)
23
* Added `Struct` support for `Variant` in `ydb.ParamsBuilder()`
34
* Added `go` with anonymous function case in `gstack`

internal/coordination/session.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,12 @@ func (s *session) newStream(
139139

140140
var client Ydb_Coordination_V1.CoordinationService_SessionClient
141141
if lastChance {
142+
timer := time.NewTimer(s.options.SessionKeepAliveTimeout)
142143
select {
143-
case <-time.After(s.options.SessionKeepAliveTimeout):
144+
case <-timer.C:
144145
case client = <-result:
145146
}
147+
timer.Stop()
146148

147149
if client != nil {
148150
return client, nil
@@ -175,10 +177,12 @@ func (s *session) newStream(
175177
}
176178

177179
// Waiting for some time before trying to reconnect.
180+
sessionReconnectDelay := time.NewTimer(s.options.SessionReconnectDelay)
178181
select {
179-
case <-time.After(s.options.SessionReconnectDelay):
182+
case <-sessionReconnectDelay.C:
180183
case <-s.ctx.Done():
181184
}
185+
sessionReconnectDelay.Stop()
182186

183187
if s.ctx.Err() != nil {
184188
// Give this session the last chance to stop gracefully if the session is canceled in the reconnect cycle.
@@ -247,6 +251,7 @@ func (s *session) mainLoop(path string, sessionStartedChan chan struct{}) {
247251

248252
// Wait for the session started response unless the stream context is done. We intentionally do not take into
249253
// account stream context cancellation in order to proceed with the graceful shutdown if it requires reconnect.
254+
sessionStartTimer := time.NewTimer(s.options.SessionStartTimeout)
250255
select {
251256
case start := <-sessionStarted:
252257
trace.CoordinationOnSessionStarted(s.client.config.Trace(), start.GetSessionId(), s.sessionID)
@@ -258,13 +263,14 @@ func (s *session) mainLoop(path string, sessionStartedChan chan struct{}) {
258263
cancelStream()
259264
}
260265
close(startSending)
261-
case <-time.After(s.options.SessionStartTimeout):
266+
case <-sessionStartTimer.C:
262267
// Reconnect if no response was received before the timeout occurred.
263268
trace.CoordinationOnSessionStartTimeout(s.client.config.Trace(), s.options.SessionStartTimeout)
264269
cancelStream()
265270
case <-streamCtx.Done():
266271
case <-s.ctx.Done():
267272
}
273+
sessionStartTimer.Stop()
268274

269275
for {
270276
// Respect the failure reason priority: if the session context is done, we must stop the session, even
@@ -280,8 +286,9 @@ func (s *session) mainLoop(path string, sessionStartedChan chan struct{}) {
280286
}
281287

282288
keepAliveTime := time.Until(s.getLastGoodResponseTime().Add(s.options.SessionKeepAliveTimeout))
289+
keepAliveTimeTimer := time.NewTimer(keepAliveTime)
283290
select {
284-
case <-time.After(keepAliveTime):
291+
case <-keepAliveTimeTimer.C:
285292
last := s.getLastGoodResponseTime()
286293
if time.Since(last) > s.options.SessionKeepAliveTimeout {
287294
// Reconnect if the underlying stream is likely to be dead.
@@ -295,6 +302,7 @@ func (s *session) mainLoop(path string, sessionStartedChan chan struct{}) {
295302
case <-streamCtx.Done():
296303
case <-s.ctx.Done():
297304
}
305+
keepAliveTimeTimer.Stop()
298306
}
299307

300308
if closing {
@@ -318,8 +326,10 @@ func (s *session) mainLoop(path string, sessionStartedChan chan struct{}) {
318326
)
319327

320328
// Wait for the session stopped response unless the stream context is done.
329+
sessionStopTimeout := time.NewTimer(s.options.SessionStopTimeout)
321330
select {
322331
case stop := <-sessionStopped:
332+
sessionStopTimeout.Stop()
323333
trace.CoordinationOnSessionStopped(s.client.config.Trace(), stop.GetSessionId(), s.sessionID)
324334
if stop.GetSessionId() == s.sessionID {
325335
cancelStream()
@@ -329,15 +339,19 @@ func (s *session) mainLoop(path string, sessionStartedChan chan struct{}) {
329339

330340
// Reconnect if the server response is invalid.
331341
cancelStream()
332-
case <-time.After(s.options.SessionStopTimeout):
342+
case <-sessionStopTimeout.C:
343+
sessionStopTimeout.Stop() // no really need, call stop for common style only
344+
333345
// Reconnect if no response was received before the timeout occurred.
334346
trace.CoordinationOnSessionStopTimeout(s.client.config.Trace(), s.options.SessionStopTimeout)
335347
cancelStream()
336348
case <-s.ctx.Done():
349+
sessionStopTimeout.Stop()
337350
cancelStream()
338351

339352
return
340353
case <-streamCtx.Done():
354+
sessionStopTimeout.Stop()
341355
}
342356
}
343357

internal/table/client.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -489,7 +489,10 @@ func (c *Client) internalPoolWaitFromCh(ctx context.Context, t *trace.Table) (s
489489

490490
var createSessionTimeoutCh <-chan time.Time
491491
if timeout := c.config.CreateSessionTimeout(); timeout > 0 {
492-
createSessionTimeoutCh = c.clock.After(timeout)
492+
createSessionTimeoutChTimer := c.clock.NewTimer(timeout)
493+
defer createSessionTimeoutChTimer.Stop()
494+
495+
createSessionTimeoutCh = createSessionTimeoutChTimer.Chan()
493496
}
494497

495498
select {

internal/topic/topicreaderinternal/committer.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,10 @@ func (c *committer) waitSendTrigger(ctx context.Context) {
166166
return
167167
}
168168

169-
finish := c.clock.After(c.BufferTimeLagTrigger)
169+
bufferTimeLagTriggerTimer := c.clock.NewTimer(c.BufferTimeLagTrigger)
170+
defer bufferTimeLagTriggerTimer.Stop()
171+
172+
finish := bufferTimeLagTriggerTimer.Chan()
170173
if c.BufferCountTrigger == 0 {
171174
select {
172175
case <-ctxDone:

internal/topic/topicreaderinternal/stream_reconnector.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,9 +328,12 @@ func (r *readerReconnector) connectWithTimeout() (_ batchedStreamReader, err err
328328
result <- connectResult{stream: stream, err: err}
329329
}()
330330

331+
connectionTimoutTimer := r.clock.NewTimer(r.connectTimeout)
332+
defer connectionTimoutTimer.Stop()
333+
331334
var res connectResult
332335
select {
333-
case <-r.clock.After(r.connectTimeout):
336+
case <-connectionTimoutTimer.Chan():
334337
// cancel connection context only if timeout exceed while connection
335338
// because if cancel context after connect - it will break
336339
cancel()

internal/topic/topicwriterinternal/writer_reconnector.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,10 +383,14 @@ func (w *WriterReconnector) connectionLoop(ctx context.Context) {
383383
retryDuration := w.clock.Since(startOfRetries)
384384
if backoff, retry := topic.CheckRetryMode(reconnectReason, w.retrySettings, retryDuration); retry {
385385
delay := backoff.Delay(attempt)
386+
delayTimer := w.clock.NewTimer(delay)
386387
select {
387388
case <-doneCtx:
389+
delayTimer.Stop()
390+
388391
return
389-
case <-w.clock.After(delay):
392+
case <-delayTimer.Chan():
393+
delayTimer.Stop() // no really need, stop for common style only
390394
// pass
391395
}
392396
} else {

internal/xsql/connector.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,10 +262,14 @@ func (c *Connector) idleCloser() (idleStopper func()) {
262262
ctx, idleStopper = xcontext.WithCancel(context.Background())
263263
go func() {
264264
for {
265+
idleThresholdTimer := c.clock.NewTimer(c.idleThreshold)
265266
select {
266267
case <-ctx.Done():
268+
idleThresholdTimer.Stop()
269+
267270
return
268-
case <-c.clock.After(c.idleThreshold):
271+
case <-idleThresholdTimer.Chan():
272+
idleThresholdTimer.Stop() // no really need, stop for common style only
269273
c.connsMtx.RLock()
270274
conns := make([]*conn, 0, len(c.conns))
271275
for cc := range c.conns {

0 commit comments

Comments
 (0)