Skip to content

Commit 4343a22

Browse files
authored
feat(worker): add timer in Request method (#12)
1 parent 8e59ab4 commit 4343a22

File tree

4 files changed

+22
-11
lines changed

4 files changed

+22
-11
lines changed

_example/single/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ go 1.16
44

55
require (
66
github.com/golang-queue/nsq v0.0.0-00010101000000-000000000000 // indirect
7-
github.com/golang-queue/queue v0.0.12-0.20220122071422-2c41650f045a
7+
github.com/golang-queue/queue v0.0.13-0.20220330060848-d1a0d31ce747
88
)
99

1010
replace github.com/golang-queue/nsq => ../../

_example/single/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8
22
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
33
github.com/golang-queue/queue v0.0.12-0.20220122071422-2c41650f045a h1:olgbdu53u5A6Babr1w+CUO19UdZASYPPqqIBfdFWrWE=
44
github.com/golang-queue/queue v0.0.12-0.20220122071422-2c41650f045a/go.mod h1:ku8iyjYffqYY6Duts+xl+QYfN3/KDK4MEvXMZUkHyio=
5+
github.com/golang-queue/queue v0.0.13-0.20220330060848-d1a0d31ce747 h1:uNTbCoWORAcna89KcKgP22WFGv5fsij05e70DCnLrUU=
6+
github.com/golang-queue/queue v0.0.13-0.20220330060848-d1a0d31ce747/go.mod h1:ku8iyjYffqYY6Duts+xl+QYfN3/KDK4MEvXMZUkHyio=
57
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
68
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
79
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=

_example/single/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,6 @@ func main() {
6565
// wait until all tasks done
6666
for i := 0; i < taskN; i++ {
6767
fmt.Println("message:", <-rets)
68-
time.Sleep(50 * time.Millisecond)
68+
time.Sleep(10 * time.Millisecond)
6969
}
7070
}

nsq.go

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -198,15 +198,24 @@ func (w *Worker) Queue(job queue.QueuedMessage) error {
198198

199199
// Request fetch new task from queue
200200
func (w *Worker) Request() (queue.QueuedMessage, error) {
201-
select {
202-
case task, ok := <-w.tasks:
203-
if !ok {
204-
return nil, queue.ErrQueueHasBeenClosed
201+
clock := 0
202+
loop:
203+
for {
204+
select {
205+
case task, ok := <-w.tasks:
206+
if !ok {
207+
return nil, queue.ErrQueueHasBeenClosed
208+
}
209+
var data queue.Job
210+
_ = json.Unmarshal(task.Body, &data)
211+
return data, nil
212+
case <-time.After(1 * time.Second):
213+
if clock == 5 {
214+
break loop
215+
}
216+
clock += 1
205217
}
206-
var data queue.Job
207-
_ = json.Unmarshal(task.Body, &data)
208-
return data, nil
209-
default:
210-
return nil, queue.ErrNoTaskInQueue
211218
}
219+
220+
return nil, queue.ErrNoTaskInQueue
212221
}

0 commit comments

Comments
 (0)