Skip to content

Commit eeb49d3

Browse files
authored
Merge pull request #2358 from monkey92t/fifo
feat: hook mode changed to FIFO
2 parents e314cd9 + 767109c commit eeb49d3

File tree

7 files changed

+190
-121
lines changed

7 files changed

+190
-121
lines changed

cluster.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -838,7 +838,7 @@ type ClusterClient struct {
838838
state *clusterStateHolder
839839
cmdsInfoCache *cmdsInfoCache
840840
cmdable
841-
hooks
841+
hooksMixin
842842
}
843843

844844
// NewClusterClient returns a Redis Cluster client as described in
@@ -855,9 +855,12 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
855855
c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo)
856856
c.cmdable = c.Process
857857

858-
c.hooks.setProcess(c.process)
859-
c.hooks.setProcessPipeline(c.processPipeline)
860-
c.hooks.setProcessTxPipeline(c.processTxPipeline)
858+
c.initHooks(hooks{
859+
dial: nil,
860+
process: c.process,
861+
pipeline: c.processPipeline,
862+
txPipeline: c.processTxPipeline,
863+
})
861864

862865
return c
863866
}
@@ -889,7 +892,7 @@ func (c *ClusterClient) Do(ctx context.Context, args ...interface{}) *Cmd {
889892
}
890893

891894
func (c *ClusterClient) Process(ctx context.Context, cmd Cmder) error {
892-
err := c.hooks.process(ctx, cmd)
895+
err := c.processHook(ctx, cmd)
893896
cmd.SetErr(err)
894897
return err
895898
}
@@ -1187,7 +1190,7 @@ func (c *ClusterClient) loadState(ctx context.Context) (*clusterState, error) {
11871190

11881191
func (c *ClusterClient) Pipeline() Pipeliner {
11891192
pipe := Pipeline{
1190-
exec: pipelineExecer(c.hooks.processPipeline),
1193+
exec: pipelineExecer(c.processPipelineHook),
11911194
}
11921195
pipe.init()
11931196
return &pipe
@@ -1276,7 +1279,7 @@ func (c *ClusterClient) cmdsAreReadOnly(ctx context.Context, cmds []Cmder) bool
12761279
func (c *ClusterClient) processPipelineNode(
12771280
ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap,
12781281
) {
1279-
_ = node.Client.hooks.withProcessPipelineHook(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
1282+
_ = node.Client.withProcessPipelineHook(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
12801283
cn, err := node.Client.getConn(ctx)
12811284
if err != nil {
12821285
_ = c.mapCmdsByNode(ctx, failedCmds, cmds)
@@ -1380,7 +1383,7 @@ func (c *ClusterClient) TxPipeline() Pipeliner {
13801383
pipe := Pipeline{
13811384
exec: func(ctx context.Context, cmds []Cmder) error {
13821385
cmds = wrapMultiExec(ctx, cmds)
1383-
return c.hooks.processTxPipeline(ctx, cmds)
1386+
return c.processTxPipelineHook(ctx, cmds)
13841387
},
13851388
}
13861389
pipe.init()
@@ -1453,7 +1456,7 @@ func (c *ClusterClient) processTxPipelineNode(
14531456
ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap,
14541457
) {
14551458
cmds = wrapMultiExec(ctx, cmds)
1456-
_ = node.Client.hooks.withProcessPipelineHook(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
1459+
_ = node.Client.withProcessPipelineHook(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
14571460
cn, err := node.Client.getConn(ctx)
14581461
if err != nil {
14591462
_ = c.mapCmdsByNode(ctx, failedCmds, cmds)

cluster_commands.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88

99
func (c *ClusterClient) DBSize(ctx context.Context) *IntCmd {
1010
cmd := NewIntCmd(ctx, "dbsize")
11-
_ = c.hooks.withProcessHook(ctx, cmd, func(ctx context.Context, _ Cmder) error {
11+
_ = c.withProcessHook(ctx, cmd, func(ctx context.Context, _ Cmder) error {
1212
var size int64
1313
err := c.ForEachMaster(ctx, func(ctx context.Context, master *Client) error {
1414
n, err := master.DBSize(ctx).Result()
@@ -30,8 +30,8 @@ func (c *ClusterClient) DBSize(ctx context.Context) *IntCmd {
3030

3131
func (c *ClusterClient) ScriptLoad(ctx context.Context, script string) *StringCmd {
3232
cmd := NewStringCmd(ctx, "script", "load", script)
33-
_ = c.hooks.withProcessHook(ctx, cmd, func(ctx context.Context, _ Cmder) error {
34-
mu := &sync.Mutex{}
33+
_ = c.withProcessHook(ctx, cmd, func(ctx context.Context, _ Cmder) error {
34+
var mu sync.Mutex
3535
err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error {
3636
val, err := shard.ScriptLoad(ctx, script).Result()
3737
if err != nil {
@@ -56,7 +56,7 @@ func (c *ClusterClient) ScriptLoad(ctx context.Context, script string) *StringCm
5656

5757
func (c *ClusterClient) ScriptFlush(ctx context.Context) *StatusCmd {
5858
cmd := NewStatusCmd(ctx, "script", "flush")
59-
_ = c.hooks.withProcessHook(ctx, cmd, func(ctx context.Context, _ Cmder) error {
59+
_ = c.withProcessHook(ctx, cmd, func(ctx context.Context, _ Cmder) error {
6060
err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error {
6161
return shard.ScriptFlush(ctx).Err()
6262
})
@@ -82,7 +82,7 @@ func (c *ClusterClient) ScriptExists(ctx context.Context, hashes ...string) *Boo
8282
result[i] = true
8383
}
8484

85-
_ = c.hooks.withProcessHook(ctx, cmd, func(ctx context.Context, _ Cmder) error {
85+
_ = c.withProcessHook(ctx, cmd, func(ctx context.Context, _ Cmder) error {
8686
var mu sync.Mutex
8787
err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error {
8888
val, err := shard.ScriptExists(ctx, hashes...).Result()

0 commit comments

Comments
 (0)