File tree Expand file tree Collapse file tree 1 file changed +16
-7
lines changed Expand file tree Collapse file tree 1 file changed +16
-7
lines changed Original file line number Diff line number Diff line change @@ -119,15 +119,24 @@ func (s *Consumer) Queue(task QueuedMessage) error {
119
119
120
120
// Request a new task from channel
121
121
func (s * Consumer ) Request () (QueuedMessage , error ) {
122
- select {
123
- case task , ok := <- s .taskQueue :
124
- if ! ok {
125
- return nil , ErrQueueHasBeenClosed
122
+ clock := 0
123
+ loop:
124
+ for {
125
+ select {
126
+ case task , ok := <- s .taskQueue :
127
+ if ! ok {
128
+ return nil , ErrQueueHasBeenClosed
129
+ }
130
+ return task , nil
131
+ case <- time .After (1 * time .Second ):
132
+ if clock == 5 {
133
+ break loop
134
+ }
135
+ clock += 1
126
136
}
127
- return task , nil
128
- default :
129
- return nil , ErrNoTaskInQueue
130
137
}
138
+
139
+ return nil , ErrNoTaskInQueue
131
140
}
132
141
133
142
// NewConsumer for create new consumer instance
You can’t perform that action at this time.
0 commit comments