Skip to content

Commit 3397b6f

Browse files
committed
cleanup, added logic to refresh the cache
1 parent 64245f8 commit 3397b6f

File tree

5 files changed

+33
-180
lines changed

5 files changed

+33
-180
lines changed

command.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4343,8 +4343,9 @@ func (cmd *CommandsInfoCmd) Clone() Cmder {
43434343
type cmdsInfoCache struct {
43444344
fn func(ctx context.Context) (map[string]*CommandInfo, error)
43454345

4346-
once internal.Once
4347-
cmds map[string]*CommandInfo
4346+
once internal.Once
4347+
refreshLock sync.Mutex
4348+
cmds map[string]*CommandInfo
43484349
}
43494350

43504351
func newCmdsInfoCache(fn func(ctx context.Context) (map[string]*CommandInfo, error)) *cmdsInfoCache {
@@ -4354,6 +4355,9 @@ func newCmdsInfoCache(fn func(ctx context.Context) (map[string]*CommandInfo, err
43544355
}
43554356

43564357
func (c *cmdsInfoCache) Get(ctx context.Context) (map[string]*CommandInfo, error) {
4358+
c.refreshLock.Lock()
4359+
defer c.refreshLock.Unlock()
4360+
43574361
err := c.once.Do(func() error {
43584362
cmds, err := c.fn(ctx)
43594363
if err != nil {
@@ -4374,6 +4378,14 @@ func (c *cmdsInfoCache) Get(ctx context.Context) (map[string]*CommandInfo, error
43744378
return c.cmds, err
43754379
}
43764380

4381+
// TODO: Call it on client reconnect
4382+
func (c *cmdsInfoCache) Refresh() {
4383+
c.refreshLock.Lock()
4384+
defer c.refreshLock.Unlock()
4385+
4386+
c.once = internal.Once{}
4387+
}
4388+
43774389
// ------------------------------------------------------------------------------
43784390
const requestPolicy = "request_policy"
43794391
const responsePolicy = "response_policy"

command_policy_manager.go

Lines changed: 0 additions & 169 deletions
This file was deleted.

osscluster.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -922,11 +922,10 @@ func (c *clusterStateHolder) ReloadOrGet(ctx context.Context) (*clusterState, er
922922
// or more underlying connections. It's safe for concurrent use by
923923
// multiple goroutines.
924924
type ClusterClient struct {
925-
opt *ClusterOptions
926-
nodes *clusterNodes
927-
state *clusterStateHolder
928-
cmdsInfoCache *cmdsInfoCache
929-
cmdPolicyManager *commandPolicyManager
925+
opt *ClusterOptions
926+
nodes *clusterNodes
927+
state *clusterStateHolder
928+
cmdsInfoCache *cmdsInfoCache
930929
cmdable
931930
hooksMixin
932931
}
@@ -942,9 +941,9 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
942941
}
943942

944943
c.state = newClusterStateHolder(c.loadState)
944+
// TODO: execute on handshake, should be called again on reconnect
945945
c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo)
946946
c.cmdable = c.Process
947-
c.cmdPolicyManager = newCommandPolicyManager(nil)
948947
c.initHooks(hooks{
949948
dial: nil,
950949
process: c.process,
@@ -1336,7 +1335,7 @@ func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmd
13361335

13371336
if c.opt.ReadOnly && c.cmdsAreReadOnly(ctx, cmds) {
13381337
for _, cmd := range cmds {
1339-
policy := c.cmdPolicyManager.getCmdPolicy(cmd)
1338+
policy := c.getCommandPolicy(ctx, cmd)
13401339
if policy != nil && !policy.CanBeUsedInPipeline() {
13411340
return fmt.Errorf(
13421341
"redis: cannot pipeline command %q with request policy ReqAllNodes/ReqAllShards/ReqMultiShard; Note: This behavior is subject to change in the future", cmd.Name(),
@@ -1353,7 +1352,7 @@ func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmd
13531352
}
13541353

13551354
for _, cmd := range cmds {
1356-
policy := c.cmdPolicyManager.getCmdPolicy(cmd)
1355+
policy := c.getCommandPolicy(ctx, cmd)
13571356
if policy != nil && !policy.CanBeUsedInPipeline() {
13581357
return fmt.Errorf(
13591358
"redis: cannot pipeline command %q with request policy ReqAllNodes/ReqAllShards/ReqMultiShard; Note: This behavior is subject to change in the future", cmd.Name(),
@@ -1863,6 +1862,7 @@ func (c *ClusterClient) cmdsInfo(ctx context.Context) (map[string]*CommandInfo,
18631862
return nil, firstErr
18641863
}
18651864

1865+
// cmdInfo will fetch and cache the command policies after the first execution
18661866
func (c *ClusterClient) cmdInfo(ctx context.Context, name string) *CommandInfo {
18671867
// Use a separate context that won't be canceled to ensure command info lookup
18681868
// doesn't fail due to original context cancellation

osscluster_router.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ type slotResult struct {
2121

2222
// routeAndRun routes a command to the appropriate cluster nodes and executes it
2323
func (c *ClusterClient) routeAndRun(ctx context.Context, cmd Cmder, node *clusterNode) error {
24-
policy := c.cmdPolicyManager.getCmdPolicy(cmd)
24+
policy := c.getCommandPolicy(ctx, cmd)
2525

2626
switch {
2727
case policy != nil && policy.Request == routing.ReqAllNodes:
@@ -37,6 +37,14 @@ func (c *ClusterClient) routeAndRun(ctx context.Context, cmd Cmder, node *cluste
3737
}
3838
}
3939

40+
// getCommandPolicy retrieves the routing policy for a command
41+
func (c *ClusterClient) getCommandPolicy(ctx context.Context, cmd Cmder) *routing.CommandPolicy {
42+
if cmdInfo := c.cmdInfo(ctx, cmd.Name()); cmdInfo != nil && cmdInfo.Tips != nil {
43+
return cmdInfo.Tips
44+
}
45+
return nil
46+
}
47+
4048
// executeDefault handles standard command routing based on keys
4149
func (c *ClusterClient) executeDefault(ctx context.Context, cmd Cmder, node *clusterNode) error {
4250
if c.hasKeys(cmd) {

osscluster_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"errors"
77
"fmt"
88
"net"
9+
"reflect"
910
"slices"
1011
"strconv"
1112
"strings"
@@ -1567,6 +1568,7 @@ var _ = Describe("ClusterClient timeout", func() {
15671568
return nil
15681569
})
15691570
Expect(err).To(HaveOccurred())
1571+
fmt.Println("qko greshki male", reflect.TypeOf(err).String(), reflect.TypeOf(err).Kind().String())
15701572
Expect(err.(net.Error).Timeout()).To(BeTrue())
15711573
})
15721574

0 commit comments

Comments
 (0)