Skip to content

Commit 5844e8c

Browse files
committed
fix connection problems
1 parent fda4b9a commit 5844e8c

File tree

4 files changed

+69
-114
lines changed

4 files changed

+69
-114
lines changed

Config.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
"github.com/go-while/go-loggedrwmutex"
88
)
99

10-
const UseSharedCC = false // experimental devel flag, to test sharedConn between routines
10+
const UseSharedCC = true // experimental devel flag, to test sharedConn between routines
1111
const UseReadDeadConn = false // experimental devel flag, to test reading from dead connections
1212
const UseNoDeadline = true // experimental devel flag, to test reading from dead connections
1313

@@ -39,9 +39,9 @@ const (
3939
// DefaultConnectTimeout defines the timeout for connecting to a server
4040
DefaultConnectTimeout = 9 * time.Second
4141
// DefaultConnectErrSleep defines the time to wait before retrying a connection after an error
42-
DefaultConnectErrSleep = 9 * time.Second
42+
DefaultConnectErrSleep = 3 * time.Second
4343
// DefaultRequeueDelay defines the delay before requeuing an item in the segment channel
44-
DefaultRequeueDelay = 9 * time.Second
44+
//DefaultRequeueDelay = 9 * time.Second
4545
)
4646

4747
type Config struct {

ConnPool.go

Lines changed: 29 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -208,8 +208,7 @@ func (c *ConnPool) connect() (connitem *ConnItem, err error) {
208208
if isNetworkUnreachable(err) {
209209
return nil, fmt.Errorf("error connect Unreachable network! '%s' @ '%s' err='%v'", c.provider.Host, c.provider.Name, err)
210210
}
211-
dlog(always, "ERROR connect Dial '%s' retry in %.0fs wants_ssl=%t err='%v'", c.provider.Name, DefaultConnectErrSleep.Seconds(), c.provider.SSL, err)
212-
//time.Sleep(DefaultConnectErrSleep)
211+
dlog(always, "ERROR connect Dial '%s' no retry err='%v'", c.provider.Name, err)
213212
return nil, err
214213
}
215214
case true:
@@ -225,8 +224,7 @@ func (c *ConnPool) connect() (connitem *ConnItem, err error) {
225224
if isNetworkUnreachable(err) {
226225
return nil, fmt.Errorf("error connect Unreachable network! '%s' @ '%s' err='%v'", c.provider.Host, c.provider.Name, err)
227226
}
228-
dlog(always, "ERROR connect Dial '%s' retry in %.0fs wants_ssl=%t err='%v'", c.provider.Name, DefaultConnectErrSleep.Seconds(), c.provider.SSL, err)
229-
//time.Sleep(DefaultConnectErrSleep)
227+
dlog(always, "ERROR connect Dial '%s' no-retry err='%v'", c.provider.Name, err)
230228
return nil, err
231229
}
232230
} // end switch
@@ -244,8 +242,7 @@ func (c *ConnPool) connect() (connitem *ConnItem, err error) {
244242
if conn != nil {
245243
conn.Close()
246244
}
247-
dlog(always, "ERROR connect welcome '%s' retry in %.0fs code=%d msg='%s' err='%v'", c.provider.Name, DefaultConnectErrSleep.Seconds(), code, msg, err)
248-
//time.Sleep(DefaultConnectErrSleep)
245+
dlog(always, "ERROR in connect '%s' welcome banner code=%d err='%v'", c.provider.Name, code, err)
249246
return nil, err
250247
}
251248
dlog(cfg.opt.DebugConnPool, "ConnPool connect welcome '%s' time0=(%d ms) time00=(%d ms) time000=(%d ms)", c.provider.Name, time.Since(time0).Milliseconds(), time.Since(time00).Milliseconds(), time.Since(time000).Milliseconds())
@@ -256,30 +253,28 @@ func (c *ConnPool) connect() (connitem *ConnItem, err error) {
256253
} // end func connect
257254

258255
func (c *ConnPool) auth(srvtp *textproto.Conn, conn net.Conn, start time.Time) (connitem *ConnItem, err error) {
259-
var code int
260-
var msg string
256+
//var code int
257+
//var msg string
261258
time1 := time.Now()
262259
// send auth user sequence
263260
id, err := srvtp.Cmd("AUTHINFO USER %s", c.provider.Username)
264261
if err != nil {
265262
if conn != nil {
266263
conn.Close()
267264
}
268-
dlog(always, "ERROR AUTH#1 Cmd(AUTHINFO USER ...) '%s' retry in %.0fs err='%v' ", c.provider.Name, DefaultConnectErrSleep.Seconds(), err)
269-
time.Sleep(DefaultConnectErrSleep)
265+
//dlog(always, "ERROR AUTH#1 Cmd(AUTHINFO USER ...) '%s' no retry err='%v' ", c.provider.Name, err)
270266
return nil, err
271267
}
272268
time2 := time.Now()
273269
// await response from server
274270
srvtp.StartResponse(id)
275-
code, msg, err = srvtp.ReadCodeLine(nntpMoreInfoCode) // 381 is the code for "more information required"
271+
_, _, err = srvtp.ReadCodeLine(nntpMoreInfoCode) // 381 is the code for "more information required"
276272
srvtp.EndResponse(id)
277273
if err != nil {
278274
if conn != nil {
279275
conn.Close()
280276
}
281-
dlog(always, "ERROR AUTH#2 ReadCodeLine(381) step#2 '%s' retry in %.0fs code=%d msg='%s' err='%v'", c.provider.Name, DefaultConnectErrSleep.Seconds(), code, msg, err)
282-
time.Sleep(DefaultConnectErrSleep)
277+
//dlog(always, "ERROR AUTH#2 ReadCodeLine(381) step#2 '%s' code=%d msg='%s' err='%v'", c.provider.Name, code, msg, err)
283278
return nil, err
284279
}
285280
time3 := time.Now()
@@ -289,21 +284,19 @@ func (c *ConnPool) auth(srvtp *textproto.Conn, conn net.Conn, start time.Time) (
289284
if conn != nil {
290285
conn.Close()
291286
}
292-
dlog(always, "ERROR AUTH#3 Cmd(AUTHINFO PASS ...) '%s' retry in %.0fs err='%v'", c.provider.Name, DefaultConnectErrSleep.Seconds(), err)
293-
time.Sleep(DefaultConnectErrSleep)
287+
//dlog(always, "ERROR AUTH#3 Cmd(AUTHINFO PASS ...) '%s' no retry err='%v'", c.provider.Name, err)
294288
return nil, err
295289
}
296290
time4 := time.Now()
297291
// await response from server
298292
srvtp.StartResponse(id)
299-
code, msg, err = srvtp.ReadCodeLine(nntpAuthSuccess) // 281 is the code for "authentication successful"
293+
_, _, err = srvtp.ReadCodeLine(nntpAuthSuccess) // 281 is the code for "authentication successful"
300294
srvtp.EndResponse(id)
301295
if err != nil {
302296
if conn != nil {
303297
conn.Close()
304298
}
305-
dlog(always, "ERROR AUTH#4 ReadCodeLine(281) '%s' retry in %.0fs code=%d msg='%s' err='%v'", c.provider.Name, DefaultConnectErrSleep.Seconds(), code, msg, err)
306-
time.Sleep(DefaultConnectErrSleep)
299+
//dlog(always, "ERROR AUTH#4 ReadCodeLine(281) '%s' no retry err='%v'", c.provider.Name, err)
307300
return nil, err
308301
}
309302
time5 := time.Now()
@@ -329,38 +322,20 @@ func (c *ConnPool) NewConn() (connitem *ConnItem, err error) {
329322
c.mux.Unlock() // mutex c459a
330323
start := time.Now()
331324
// connect to the provider
332-
if c.provider.MaxConnErrors <= 0 {
333-
for {
334-
connitem, err = c.connect()
335-
if err != nil || connitem == nil || connitem.conn == nil {
336-
continue // retry connect
337-
}
338-
c.mux.RLock() // mutex c461
339-
dlog(cfg.opt.DebugConnPool, "ConnPool NewConn: got new connid=%d '%s' openConns after=%d/%d before=%d connectTook=(%d ms) err='%v", connitem.connid, c.provider.Name, c.openConns, c.provider.MaxConns, openConnsBefore, time.Since(start).Milliseconds(), err)
340-
c.mux.RUnlock() // mutex c461
341-
// we have a new connection!
342-
GCounter.Incr("TOTAL_NewConns")
343-
return // established new connection and returns connitem
344-
}
345-
} else {
346-
for retried := 0; retried < c.provider.MaxConnErrors; retried++ {
347-
connitem, err = c.connect()
348-
if err != nil || connitem == nil || connitem.conn == nil {
349-
continue // retry connect
350-
}
351-
c.mux.RLock() // mutex c461
352-
dlog(cfg.opt.DebugConnPool, "ConnPool NewConn: got new connid=%d '%s' openConns after=%d/%d before=%d connectTook=(%d ms) err='%v", connitem.connid, c.provider.Name, c.openConns, c.provider.MaxConns, openConnsBefore, time.Since(start).Milliseconds(), err)
353-
c.mux.RUnlock() // mutex c461
354-
// we have a new connection!
355-
GCounter.Incr("TOTAL_NewConns")
356-
return connitem, nil // established new connection and returns connitem
357-
}
325+
connitem, err = c.connect()
326+
if err != nil || connitem == nil || connitem.conn == nil {
327+
c.mux.Lock() // mutex c460
328+
c.openConns--
329+
dlog(always, "ERROR in ConnPool NewConn: connect failed '%s' openConns=%d connitem='%v' err='%v'", c.provider.Name, c.openConns, connitem, err)
330+
c.mux.Unlock() // mutex c460
331+
return nil, fmt.Errorf("error in ConnPool NewConn: connect failed '%s' err='%v'", c.provider.Name, err)
358332
}
359-
c.mux.Lock() // mutex c460
360-
c.openConns--
361-
dlog(always, "ERROR in ConnPool NewConn: connect failed '%s' openConns=%d connitem='%v' err='%v'", c.provider.Name, c.openConns, connitem, err)
362-
c.mux.Unlock() // mutex c460
363-
return nil, fmt.Errorf("error in ConnPool NewConn: connect failed '%s' err='%v'", c.provider.Name, err)
333+
c.mux.RLock() // mutex c461
334+
dlog(cfg.opt.DebugConnPool, "ConnPool NewConn: got new connid=%d '%s' openConns after=%d/%d before=%d connectTook=(%d ms) err='%v", connitem.connid, c.provider.Name, c.openConns, c.provider.MaxConns, openConnsBefore, time.Since(start).Milliseconds(), err)
335+
c.mux.RUnlock() // mutex c461
336+
// we have a new connection!
337+
GCounter.Incr("TOTAL_NewConns")
338+
return connitem, nil // established new connection and returns connitem
364339
}
365340
c.mux.Unlock() // mutex c459a
366341
return nil, fmt.Errorf("error in ConnPool NewConn: openConns=%d > provider.MaxConns", c.openConns)
@@ -419,7 +394,7 @@ getConnFromPool:
419394
dlog(always, "INFO ConnPool GetConn: got long idle=(%d ms) '%s', close and get NewConn", time.Since(connitem.parktime).Milliseconds(), c.provider.Name)
420395
c.CloseConn(connitem, nil)
421396
connitem, err = c.NewConn()
422-
if connitem == nil || err != nil {
397+
if err != nil || connitem == nil || connitem.conn == nil {
423398
continue getConnFromPool
424399
}
425400
// extend the read deadline of the new connection
@@ -466,16 +441,13 @@ getConnFromPool:
466441

467442
// try to open a new connection
468443
newconnitem, aerr := c.NewConn()
469-
if newconnitem == nil || aerr != nil {
444+
if aerr != nil || newconnitem == nil || newconnitem.conn == nil {
470445
dlog(cfg.opt.DebugConnPool, "WARN in ConnPool GetConn: NewConn failed '%s' connitem='%v' aerr='%v'", c.provider.Name, newconnitem, aerr)
471446
time.Sleep(DefaultConnectErrSleep) // wait a bit before retrying
472447
retried++
473448
if retried >= c.provider.MaxConnErrors {
474449
// we have retried too often
475-
dlog(always, "ERROR in ConnPool GetConn: retried %d times to get a new conn '%s' giving up! aerr='%v'", retried, c.provider.Name, aerr)
476-
//c.mux.Lock() // mutex c457
477-
//c.openConns-- // decrease open conns as we failed to get a new one
478-
//c.mux.Unlock() // mutex c457
450+
dlog(cfg.opt.DebugConnPool, "ERROR in ConnPool GetConn: retried %d times to get a new conn '%s' giving up! aerr='%v'", retried, c.provider.Name, aerr)
479451
err = aerr
480452
break getConnFromPool // break out of the for loop
481453
}
@@ -729,13 +701,12 @@ func isNetworkUnreachable(err error) bool {
729701
// The sharedCC channel is used to share connections between (anonymous) goroutines
730702
// which will work on the same item.
731703
func GetNewSharedConnChannel(wid int, provider *Provider) (sharedCC chan *ConnItem, err error) {
732-
sharedCC = make(chan *ConnItem, 1) // buffered channel with size 1
733704
connitem, err := provider.ConnPool.GetConn() // get an idle or new connection from the pool
734705
if err != nil {
735-
dlog(always, "ERROR a GoWorker (%d) failed to connect '%s' err='%v'", wid, provider.Name, err)
706+
dlog(cfg.opt.DebugWorker, "ERROR in GoWorker (%d) GetNewSharedConnChannel failed to connect '%s' err='%v'", wid, provider.Name, err)
736707
return nil, err
737708
}
738-
//sharedCC <- nil // put a nil item into the channel to signal that no connection is available yet
709+
sharedCC = make(chan *ConnItem, 1) // shares 1 connection between goroutines
739710
sharedCC <- connitem
740711
return sharedCC, nil
741712
}

Session.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -367,17 +367,20 @@ func (p *PROCESSOR) LaunchSession(s *SESSION, nzbfilepath string, waitSession *s
367367
s.segmentCheckStartTime = time.Now()
368368
// booting work divider
369369
go s.GoWorkDivider(&waitDivider, &waitDividerDone)
370-
dlog(cfg.opt.Debug, "sess: waitDividerDone.Wait()")
370+
dlog(always, "sess: waiting on waitDividerDone.Wait()")
371371
waitDividerDone.Wait()
372-
dlog(cfg.opt.Debug, "sess: waitDividerDone.Wait() released")
372+
dlog(always, "sess: released waitDividerDone.Wait()")
373373

374374
s.StopRoutines()
375375

376-
dlog(cfg.opt.Debug, "sess: waitWorker.Wait()")
376+
dlog(always, "sess: waiting on waitWorker.Wait()")
377377
waitWorker.Wait()
378-
dlog(cfg.opt.Debug, "sess: waitWorker.Wait() released, waiting on waitPool.Wait()")
378+
dlog(always, "sess: released waitWorker.Wait(), waiting on waitPool.Wait()")
379379
waitPool.Wait()
380-
dlog(cfg.opt.Debug, "sess: waitPool.Wait() released")
380+
dlog(always, "sess: released waitPool.Wait() closing all provider connections")
381+
for _, provider := range s.providerList {
382+
KillConnPool(provider) // close the connection pool for this provider
383+
}
381384

382385
result, runtime_info := s.Results(s.preparationStartTime)
383386

0 commit comments

Comments
 (0)