Skip to content

Commit 0f30526

Browse files
authored
chore: close queue after shutdown. (#22)
1 parent a4ac305 commit 0f30526

File tree

2 files changed

+36
-0
lines changed

2 files changed

+36
-0
lines changed

queue.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,10 @@ func (q *Queue) Wait() {
128128

129129
// Queue to queue all job
130130
func (q *Queue) Queue(job QueuedMessage) error {
131+
if atomic.LoadInt32(&q.stopFlag) == 1 {
132+
return ErrQueueShutdown
133+
}
134+
131135
return q.worker.Queue(Job{
132136
Timeout: q.timeout,
133137
Body: job.Bytes(),
@@ -136,6 +140,10 @@ func (q *Queue) Queue(job QueuedMessage) error {
136140

137141
// Queue to queue all job
138142
func (q *Queue) QueueWithTimeout(timeout time.Duration, job QueuedMessage) error {
143+
if atomic.LoadInt32(&q.stopFlag) == 1 {
144+
return ErrQueueShutdown
145+
}
146+
139147
return q.worker.Queue(Job{
140148
Timeout: timeout,
141149
Body: job.Bytes(),

queue_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,3 +146,31 @@ func TestCapacityReached(t *testing.T) {
146146
message: "foobar",
147147
}))
148148
}
149+
150+
func TestCloseQueueAfterShutdown(t *testing.T) {
151+
w := &queueWorker{
152+
messages: make(chan QueuedMessage, 10),
153+
}
154+
q, err := NewQueue(
155+
WithWorker(w),
156+
WithWorkerCount(5),
157+
WithLogger(NewEmptyLogger()),
158+
)
159+
assert.NoError(t, err)
160+
assert.NotNil(t, q)
161+
162+
assert.NoError(t, q.Queue(mockMessage{
163+
message: "foobar",
164+
}))
165+
q.Shutdown()
166+
err = q.Queue(mockMessage{
167+
message: "foobar",
168+
})
169+
assert.Error(t, err)
170+
assert.Equal(t, ErrQueueShutdown, err)
171+
err = q.QueueWithTimeout(10*time.Millisecond, mockMessage{
172+
message: "foobar",
173+
})
174+
assert.Error(t, err)
175+
assert.Equal(t, ErrQueueShutdown, err)
176+
}

0 commit comments

Comments
 (0)