Skip to content

Commit f8fe420

Browse files
committed
feat: add inspector
1 parent 1032733 commit f8fe420

File tree

10 files changed

+473
-224
lines changed

10 files changed

+473
-224
lines changed

pkg/sasynq/README.md

Lines changed: 256 additions & 167 deletions
Large diffs are not rendered by default.

pkg/sasynq/client.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,13 @@ func NewClient(cfg RedisConfig) *Client {
1818
}
1919
}
2020

21+
// NewFromClient creates a new producer client from an existing asynq.Client.
22+
func NewFromClient(c *asynq.Client) *Client {
23+
return &Client{
24+
Client: c,
25+
}
26+
}
27+
2128
// Enqueue enqueues the given task to a queue.
2229
func (c *Client) Enqueue(task *asynq.Task, opts ...asynq.Option) (*asynq.TaskInfo, error) {
2330
return c.Client.Enqueue(task, opts...)
@@ -57,3 +64,14 @@ func (c *Client) EnqueueAt(t time.Time, typeName string, payload any, opts ...as
5764
info, err := c.Client.Enqueue(task, opts...)
5865
return task, info, err
5966
}
67+
68+
// EnqueueUnique enqueues a task with unique in the queue for a specified duration.
69+
func (c *Client) EnqueueUnique(keepTime time.Duration, typeName string, payload any, opts ...asynq.Option) (*asynq.Task, *asynq.TaskInfo, error) {
70+
task, err := NewTask(typeName, payload)
71+
if err != nil {
72+
return nil, nil, err
73+
}
74+
opts = append(opts, asynq.Unique(keepTime))
75+
info, err := c.Client.Enqueue(task, opts...)
76+
return task, info, err
77+
}

pkg/sasynq/client_test.go

Lines changed: 67 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,105 @@
11
package sasynq
22

33
import (
4+
"fmt"
45
"log"
6+
"math/rand"
57
"testing"
68
"time"
9+
10+
"github.com/hibiken/asynq"
711
)
812

913
func runProducer(client *Client) error {
1014
userPayload1 := &EmailPayload{UserID: 101, Message: "Critical Update"}
11-
_, info, err := client.EnqueueNow(TypeEmailSend, userPayload1, WithQueue("critical"), WithRetry(5))
15+
_, info1, err := client.EnqueueNow(TypeEmailSend, userPayload1, WithQueue("critical"), WithMaxRetry(5), asynq.Retention(60*time.Second))
1216
if err != nil {
1317
return err
1418
}
15-
log.Printf("enqueued task: type=%s, id=%s queue=%s", TypeEmailSend, info.ID, info.Queue)
19+
log.Printf("enqueued task: type=%s, id=%s queue=%s", TypeEmailSend, info1.ID, info1.Queue)
1620

1721
userPayload2 := &SMSPayload{UserID: 202, Message: "Weekly Newsletter"}
18-
_, info, err = client.EnqueueIn(time.Second*5, TypeSMSSend, userPayload2, WithQueue("default"), WithRetry(3))
22+
_, info2, err := client.EnqueueIn(time.Second*5, TypeSMSSend, userPayload2, WithQueue("default"), WithMaxRetry(3), asynq.Retention(60*time.Second))
1923
if err != nil {
2024
return err
2125
}
22-
log.Printf("enqueued task: type=%s, id=%s queue=%s", TypeSMSSend, info.ID, info.Queue)
26+
log.Printf("enqueued task: type=%s, id=%s queue=%s", TypeSMSSend, info2.ID, info2.Queue)
27+
cancelTask("default", info2.ID, true) // cancel task will succeed
2328

2429
userPayload3 := &MsgNotificationPayload{UserID: 303, Message: "Promotional Offer"}
25-
_, info, err = client.EnqueueAt(time.Now().Add(time.Second*10), TypeMsgNotification, userPayload3, WithQueue("low"), WithRetry(1))
30+
_, info3, err := client.EnqueueAt(time.Now().Add(time.Second*10), TypeMsgNotification, userPayload3, WithQueue("low"), WithMaxRetry(1), asynq.Retention(60*time.Second))
2631
if err != nil {
2732
return err
2833
}
29-
log.Printf("enqueued task: type=%s, id=%s queue=%s", TypeMsgNotification, info.ID, info.Queue)
34+
log.Printf("enqueued task: type=%s, id=%s queue=%s", TypeMsgNotification, info3.ID, info3.Queue)
35+
cancelTask("low", info3.ID, true) // cancel task will succeed
36+
37+
userPayload4 := &UniqueTaskPayload{UserID: 404, Message: "unique task"}
38+
_, info4, err := client.EnqueueUnique(time.Minute, TypeUniqueTask, userPayload4, WithQueue("default"), WithMaxRetry(2))
39+
if err != nil {
40+
return err
41+
}
42+
log.Printf("enqueued task: type=%s, id=%s queue=%s", TypeUniqueTask, info4.ID, info4.Queue)
43+
_, _, err = client.EnqueueUnique(time.Minute, TypeUniqueTask, userPayload4, WithQueue("default"), WithMaxRetry(2))
44+
if err != nil {
45+
log.Printf("triggered duplicate task error:%v", err)
46+
}
3047

3148
// Equivalent EnqueueNow function
32-
userPayload4 := &EmailPayload{UserID: 404, Message: "Important Notification"}
33-
task, err := NewTask(TypeEmailSend, userPayload4)
49+
userPayload5 := &EmailPayload{UserID: 505, Message: "Important Notification"}
50+
task, err := NewTask(TypeEmailSend, userPayload5)
3451
if err != nil {
3552
return err
3653
}
37-
info, err = client.Enqueue(task, WithQueue("low"), WithRetry(1), WithDeadline(time.Now().Add(time.Second*15)), WithUniqueID("custom-id-xxxx-xxxx"))
54+
info5, err := client.Enqueue(task, WithQueue("low"), WithMaxRetry(3), WithDeadline(time.Now().Add(time.Second*15)), WithTaskID(fmt.Sprintf("unique-%d", rand.Int63n(1e10))), asynq.Retention(60*time.Second))
3855
if err != nil {
3956
return err
4057
}
41-
log.Printf("enqueued task: type=%s, id=%s queue=%s", TypeEmailSend, info.ID, info.Queue)
58+
log.Printf("enqueued task: type=%s, id=%s queue=%s", TypeEmailSend, info5.ID, info5.Queue)
4259

4360
return nil
4461
}
4562

46-
func TestNewClient(t *testing.T) {
63+
func cancelTask(queue string, taskID string, isScheduled bool) {
64+
fmt.Println()
65+
defer fmt.Println()
66+
time.Sleep(time.Second)
67+
68+
inspector := NewInspector(getRedisConfig())
69+
70+
info, err := inspector.GetTaskInfo(queue, taskID)
71+
if err != nil {
72+
log.Printf("get task info failed: %s, queue=%s, taskID=%s", err, queue, taskID)
73+
return
74+
}
75+
log.Printf("task status: type=%s, id=%s queue=%s, status=%s", info.Type, info.ID, info.Queue, info.State.String())
76+
if info.State == asynq.TaskStateCompleted {
77+
return
78+
}
79+
80+
time.Sleep(time.Millisecond * 100)
81+
if isScheduled {
82+
err = inspector.CancelTask(queue, info.ID)
83+
} else {
84+
err = inspector.CancelTask("", info.ID) // queue is empty string for non-scheduled tasks
85+
}
86+
87+
if err != nil {
88+
log.Printf("cancel task failed: %s, queue=%s, taskID=%s", err, queue, info.ID)
89+
return
90+
}
91+
log.Printf("cancel task succeeded: type=%s, id=%s queue=%s", info.Type, info.ID, info.Queue)
92+
93+
time.Sleep(time.Millisecond * 100)
94+
info2, err := inspector.GetTaskInfo(queue, info.ID)
95+
if err != nil {
96+
log.Printf("get task info after cancel failed: %s, queue=%s, taskID=%s", err, queue, info.ID)
97+
return
98+
}
99+
log.Printf("get task status after cancel: type=%s, id=%s queue=%s, status=%s", info2.Type, info2.ID, info2.Queue, info2.State.String())
100+
}
101+
102+
func TestProducer(t *testing.T) {
47103
client := NewClient(getRedisConfig())
48104

49105
err := runProducer(client)

pkg/sasynq/config.go

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
package sasynq
22

33
import (
4+
"crypto/tls"
5+
46
"github.com/hibiken/asynq"
5-
"go.uber.org/zap"
67
)
78

89
// RedisMode defines the Redis connection mode.
@@ -35,11 +36,13 @@ type RedisConfig struct {
3536
Username string `yaml:"username"`
3637
Password string `yaml:"password"`
3738
DB int `yaml:"db"`
39+
40+
TLSConfig *tls.Config `yaml:"tlsConfig"`
3841
}
3942

4043
// GetAsynqRedisConnOpt converts RedisConfig to asynq's RedisConnOpt interface.
4144
// This is the core of the high-availability switching logic.
42-
func (c *RedisConfig) GetAsynqRedisConnOpt() asynq.RedisConnOpt {
45+
func (c RedisConfig) GetAsynqRedisConnOpt() asynq.RedisConnOpt {
4346
switch c.Mode {
4447
case RedisModeSentinel:
4548
return asynq.RedisFailoverClientOpt{
@@ -48,21 +51,24 @@ func (c *RedisConfig) GetAsynqRedisConnOpt() asynq.RedisConnOpt {
4851
Username: c.Username,
4952
Password: c.Password,
5053
DB: c.DB,
54+
TLSConfig: c.TLSConfig,
5155
}
5256
case RedisModeCluster:
5357
return asynq.RedisClusterClientOpt{
54-
Addrs: c.ClusterAddrs,
55-
Username: c.Username,
56-
Password: c.Password,
58+
Addrs: c.ClusterAddrs,
59+
Username: c.Username,
60+
Password: c.Password,
61+
TLSConfig: c.TLSConfig,
5762
}
5863
case RedisModeSingle:
5964
fallthrough
6065
default:
6166
return asynq.RedisClientOpt{
62-
Addr: c.Addr,
63-
Username: c.Username,
64-
Password: c.Password,
65-
DB: c.DB,
67+
Addr: c.Addr,
68+
Username: c.Username,
69+
Password: c.Password,
70+
DB: c.DB,
71+
TLSConfig: c.TLSConfig,
6672
}
6773
}
6874
}
@@ -73,13 +79,9 @@ type ServerConfig struct {
7379
}
7480

7581
// DefaultServerConfig returns a default server configuration.
76-
func DefaultServerConfig(l ...*zap.Logger) ServerConfig {
77-
var zapLogger *zap.Logger
78-
if len(l) == 0 {
79-
zapLogger = defaultLogger
80-
} else {
81-
zapLogger = l[0]
82-
}
82+
func DefaultServerConfig(opts ...LoggerOption) ServerConfig {
83+
o := defaultLoggerOptions()
84+
o.apply(opts...)
8385

8486
cfg := &asynq.Config{
8587
Concurrency: 10,
@@ -88,7 +90,7 @@ func DefaultServerConfig(l ...*zap.Logger) ServerConfig {
8890
"default": 3,
8991
"low": 1,
9092
},
91-
Logger: NewZapLogger(zapLogger),
93+
Logger: NewZapLogger(o.logger, o.zapSkip),
9294
}
9395

9496
return ServerConfig{cfg}

pkg/sasynq/inspector.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package sasynq
2+
3+
import (
4+
"github.com/hibiken/asynq"
5+
)
6+
7+
// Inspector provides access to the Redis backend used by asynq.
8+
type Inspector struct {
9+
*asynq.Inspector
10+
}
11+
12+
// NewInspector creates a new Inspector instance.
13+
// Note: A new Redis connection will be created, in actual use, only once
14+
func NewInspector(cfg RedisConfig) *Inspector {
15+
return &Inspector{asynq.NewInspector(cfg.GetAsynqRedisConnOpt())}
16+
}
17+
18+
// CancelTask cancels the processing of a task.
19+
func (i *Inspector) CancelTask(queue string, taskID string) error {
20+
if queue == "" {
21+
return i.Inspector.CancelProcessing(taskID)
22+
}
23+
24+
return i.Inspector.DeleteTask(queue, taskID)
25+
}
26+
27+
// GetTaskInfo returns information about a task.
28+
func (i *Inspector) GetTaskInfo(queue string, taskID string) (*asynq.TaskInfo, error) {
29+
return i.Inspector.GetTaskInfo(queue, taskID)
30+
}
31+
32+
// Close closes the inspector.
33+
func (i *Inspector) Close() error {
34+
if i.Inspector == nil {
35+
return nil
36+
}
37+
return i.Inspector.Close()
38+
}

pkg/sasynq/logger_middleware.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ type LoggerOption func(*loggerOptions)
2121

2222
type loggerOptions struct {
2323
logger *zap.Logger
24-
maxLength int
24+
zapSkip int // default is 2
25+
maxLength int // default is 300
2526
}
2627

2728
func (o *loggerOptions) apply(opts ...LoggerOption) {
@@ -34,6 +35,7 @@ func defaultLoggerOptions() *loggerOptions {
3435
return &loggerOptions{
3536
logger: defaultLogger,
3637
maxLength: defaultMaxLength,
38+
zapSkip: 2,
3739
}
3840
}
3941

@@ -55,6 +57,15 @@ func WithMaxLength(l int) LoggerOption {
5557
}
5658
}
5759

60+
// WithZapSkip sets the number of callers to skip when logging.
61+
func WithZapSkip(s int) LoggerOption {
62+
return func(o *loggerOptions) {
63+
if s >= 0 {
64+
o.zapSkip = s
65+
}
66+
}
67+
}
68+
5869
// LoggingMiddleware logs information about each processed task.
5970
func LoggingMiddleware(opts ...LoggerOption) func(next asynq.Handler) asynq.Handler {
6071
o := defaultLoggerOptions()
@@ -116,9 +127,8 @@ type ZapLogger struct {
116127
zLog *zap.Logger
117128
}
118129

119-
func NewZapLogger(l *zap.Logger) asynq.Logger {
120-
//zLog := l.WithOptions(zap.AddCallerSkip(5)).With(zap.String("component", "asynq"))
121-
zLog := l.WithOptions(zap.AddCallerSkip(2)).With(zap.String("component", "asynq"))
130+
func NewZapLogger(l *zap.Logger, skip int) asynq.Logger {
131+
zLog := l.WithOptions(zap.AddCallerSkip(skip)).With(zap.String("asynq", "true"))
122132
return &ZapLogger{
123133
zLog: zLog,
124134
}

pkg/sasynq/scheduler.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"fmt"
55

66
"github.com/hibiken/asynq"
7-
"go.uber.org/zap"
87
)
98

109
// SchedulerOption set options.
@@ -43,12 +42,11 @@ func WithSchedulerLogLevel(level asynq.LogLevel) SchedulerOption {
4342
}
4443

4544
// WithSchedulerLogger sets the logger for the scheduler.
46-
func WithSchedulerLogger(l *zap.Logger) SchedulerOption {
45+
func WithSchedulerLogger(opts ...LoggerOption) SchedulerOption {
46+
opt := defaultLoggerOptions()
47+
opt.apply(opts...)
4748
return func(o *schedulerOptions) {
48-
if l == nil {
49-
return
50-
}
51-
o.logger = NewZapLogger(l)
49+
o.logger = NewZapLogger(opt.logger, opt.zapSkip)
5250
}
5351
}
5452

@@ -94,7 +92,7 @@ func (s *Scheduler) RegisterTask(cronSpec string, typeName string, payload any,
9492
return s.Scheduler.Register(cronSpec, task, opts...)
9593
}
9694

97-
// Unregister removes a periodic task.
95+
// Unregister removes a periodic task, cancel task execution.
9896
func (s *Scheduler) Unregister(entryID string) error {
9997
return s.Scheduler.Unregister(entryID)
10098
}

0 commit comments

Comments
 (0)