@@ -59,7 +59,7 @@ func (q *delayedStarter) setInternal(atShutdown func(func()), handle HandlerFunc
5959 if s , ok := cfg .([]byte ); ok {
6060 cfg = string (s )
6161 }
62- return fmt .Errorf ("Timedout creating queue %v with cfg %#v in %s" , q .underlying , cfg , q .name )
62+ return fmt .Errorf ("timedout creating queue %v with cfg %#v in %s" , q .underlying , cfg , q .name )
6363 default :
6464 queue , err := NewQueue (q .underlying , handle , q .cfg , exemplar )
6565 if err == nil {
@@ -76,9 +76,9 @@ func (q *delayedStarter) setInternal(atShutdown func(func()), handle HandlerFunc
7676 i ++
7777 if q .maxAttempts > 0 && i > q .maxAttempts {
7878 if bs , ok := q .cfg .([]byte ); ok {
79- return fmt .Errorf ("Unable to create queue %v for %s with cfg %s by max attempts: error: %v" , q .underlying , q .name , string (bs ), err )
79+ return fmt .Errorf ("unable to create queue %v for %s with cfg %s by max attempts: error: %v" , q .underlying , q .name , string (bs ), err )
8080 }
81- return fmt .Errorf ("Unable to create queue %v for %s with cfg %#v by max attempts: error: %v" , q .underlying , q .name , q .cfg , err )
81+ return fmt .Errorf ("unable to create queue %v for %s with cfg %#v by max attempts: error: %v" , q .underlying , q .name , q .cfg , err )
8282 }
8383 sleepTime := 100 * time .Millisecond
8484 if q .timeout > 0 && q .maxAttempts > 0 {
@@ -271,6 +271,46 @@ func (q *WrappedQueue) Terminate() {
271271 log .Debug ("WrappedQueue: %s Terminated" , q .name )
272272}
273273
274+ // IsPaused will return if the pool or queue is paused
275+ func (q * WrappedQueue ) IsPaused () bool {
276+ q .lock .Lock ()
277+ defer q .lock .Unlock ()
278+ pausable , ok := q .internal .(Pausable )
279+ return ok && pausable .IsPaused ()
280+ }
281+
282+ // Pause will pause the pool or queue
283+ func (q * WrappedQueue ) Pause () {
284+ q .lock .Lock ()
285+ defer q .lock .Unlock ()
286+ if pausable , ok := q .internal .(Pausable ); ok {
287+ pausable .Pause ()
288+ }
289+ }
290+
291+ // Resume will resume the pool or queue
292+ func (q * WrappedQueue ) Resume () {
293+ q .lock .Lock ()
294+ defer q .lock .Unlock ()
295+ if pausable , ok := q .internal .(Pausable ); ok {
296+ pausable .Resume ()
297+ }
298+ }
299+
300+ // IsPausedIsResumed will return a bool indicating if the pool or queue is paused and a channel that will be closed when it is resumed
301+ func (q * WrappedQueue ) IsPausedIsResumed () (paused , resumed <- chan struct {}) {
302+ q .lock .Lock ()
303+ defer q .lock .Unlock ()
304+ if pausable , ok := q .internal .(Pausable ); ok {
305+ return pausable .IsPausedIsResumed ()
306+ }
307+ return context .Background ().Done (), closedChan
308+ }
309+
310+ var closedChan chan struct {}
311+
274312func init () {
275313 queuesMap [WrappedQueueType ] = NewWrappedQueue
314+ closedChan = make (chan struct {})
315+ close (closedChan )
276316}
0 commit comments