Skip to content

Commit bc48e0c

Browse files
authored
feat(consumer): start consumer once automatically. (#1)
1 parent f40b42c commit bc48e0c

File tree

3 files changed

+39
-43
lines changed

3 files changed

+39
-43
lines changed

helper.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package redisdb
2+
3+
import "unsafe"
4+
5+
// BytesToStr converts byte slice to a string without memory allocation.
6+
// See https://groups.google.com/forum/#!msg/Golang-Nuts/ENgbUzYvCuU/90yGx7GUAgAJ .
7+
//
8+
// Note it may break if string and/or slice header will change
9+
// in the future go versions.
10+
func BytesToStr(b []byte) string {
11+
return *(*string)(unsafe.Pointer(&b))
12+
}
13+
14+
// StrToBytes converts string to byte slice without a memory allocation.
15+
func StrToBytes(s string) (b []byte) {
16+
return *(*[]byte)(unsafe.Pointer(
17+
&struct {
18+
string
19+
Cap int
20+
}{s, len(s)},
21+
))
22+
}

options.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ type options struct {
2121
cluster bool
2222
group string
2323
consumer string
24-
disableConsumer bool
2524
}
2625

2726
// WithAddr setup the addr of redis
@@ -66,13 +65,6 @@ func WithConsumer(name string) Option {
6665
}
6766
}
6867

69-
// WithDisableConsumer disable consumer
70-
func WithDisableConsumer() Option {
71-
return func(w *options) {
72-
w.disableConsumer = true
73-
}
74-
}
75-
7668
// WithPassword redis password
7769
func WithPassword(passwd string) Option {
7870
return func(w *options) {

redis.go

Lines changed: 17 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"sync"
88
"sync/atomic"
99
"time"
10-
"unsafe"
1110

1211
"github.com/golang-queue/queue"
1312
"github.com/golang-queue/queue/core"
@@ -20,12 +19,13 @@ var _ core.Worker = (*Worker)(nil)
2019
// Worker for Redis
2120
type Worker struct {
2221
// redis config
23-
rdb redis.Cmdable
24-
tasks chan redis.XMessage
25-
stopFlag int32
26-
stopOnce sync.Once
27-
stop chan struct{}
28-
opts options
22+
rdb redis.Cmdable
23+
tasks chan redis.XMessage
24+
stopFlag int32
25+
stopOnce sync.Once
26+
startOnce sync.Once
27+
stop chan struct{}
28+
opts options
2929
}
3030

3131
// NewWorker for struc
@@ -64,22 +64,22 @@ func NewWorker(opts ...Option) *Worker {
6464
w.opts.logger.Fatal(err)
6565
}
6666

67-
if !w.opts.disableConsumer {
68-
err = w.rdb.XGroupCreateMkStream(
67+
return w
68+
}
69+
70+
func (w *Worker) startConsumer() {
71+
w.startOnce.Do(func() {
72+
if err := w.rdb.XGroupCreateMkStream(
6973
context.Background(),
7074
w.opts.streamName,
7175
w.opts.group,
7276
"$",
73-
).Err()
74-
75-
if err != nil {
76-
w.opts.logger.Fatal(err)
77+
).Err(); err != nil {
78+
w.opts.logger.Error(err)
7779
}
7880

7981
go w.fetchTask()
80-
}
81-
82-
return w
82+
})
8383
}
8484

8585
func (w *Worker) fetchTask() {
@@ -222,6 +222,7 @@ func (w *Worker) Run(task core.QueuedMessage) error {
222222
// Request a new task
223223
func (w *Worker) Request() (core.QueuedMessage, error) {
224224
clock := 0
225+
w.startConsumer()
225226
loop:
226227
for {
227228
select {
@@ -242,22 +243,3 @@ loop:
242243

243244
return nil, queue.ErrNoTaskInQueue
244245
}
245-
246-
// BytesToStr converts byte slice to a string without memory allocation.
247-
// See https://groups.google.com/forum/#!msg/Golang-Nuts/ENgbUzYvCuU/90yGx7GUAgAJ .
248-
//
249-
// Note it may break if string and/or slice header will change
250-
// in the future go versions.
251-
func BytesToStr(b []byte) string {
252-
return *(*string)(unsafe.Pointer(&b))
253-
}
254-
255-
// StrToBytes converts string to byte slice without a memory allocation.
256-
func StrToBytes(s string) (b []byte) {
257-
return *(*[]byte)(unsafe.Pointer(
258-
&struct {
259-
string
260-
Cap int
261-
}{s, len(s)},
262-
))
263-
}

0 commit comments

Comments
 (0)