diff --git a/go.mod b/go.mod index 8118d231e..f2943ce45 100644 --- a/go.mod +++ b/go.mod @@ -5,10 +5,10 @@ require ( github.com/antchfx/jsonquery v1.1.4 github.com/antchfx/xmlquery v1.3.1 github.com/antchfx/xpath v1.1.10 - github.com/go-redis/redis/v7 v7.0.0-beta.3.0.20190824101152-d19aba07b476 + github.com/go-redis/redis/v7 v7.4.1 github.com/godbus/dbus/v5 v5.1.0 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b - github.com/google/go-cmp v0.4.0 + github.com/google/go-cmp v0.7.0 github.com/kylelemons/godebug v1.1.0 github.com/maruel/natural v1.1.1 github.com/openconfig/gnmi v0.0.0-20200617225440-d2b4e6a45802 @@ -16,12 +16,15 @@ require ( github.com/openconfig/ygot v0.7.1 github.com/philopon/go-toposort v0.0.0-20170620085441-9be86dbd762f github.com/pkg/profile v1.7.0 + github.com/redis/go-redis/v9 v9.6.1 golang.org/x/text v0.3.3 google.golang.org/grpc v1.28.0 inet.af/netaddr v0.0.0-20230525184311-b8eac61e914a ) require ( + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/felixge/fgprof v0.9.3 // indirect github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0 // indirect diff --git a/go.sum b/go.sum index 404176218..df2751790 100644 --- a/go.sum +++ b/go.sum @@ -9,8 +9,12 @@ github.com/antchfx/xmlquery v1.3.1/go.mod h1:64w0Xesg2sTaawIdNqMB+7qaW/bSqkQm+ss github.com/antchfx/xpath v1.1.7/go.mod h1:Yee4kTMuNiPYJ7nSNorELQMr1J33uOpXDMByNYhvtNk= github.com/antchfx/xpath v1.1.10 h1:cJ0pOvEdN/WvYXxvRrzQH9x5QWKpzHacYO8qzCcDYAg= github.com/antchfx/xpath v1.1.10/go.mod h1:Yee4kTMuNiPYJ7nSNorELQMr1J33uOpXDMByNYhvtNk= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/cenkalti/backoff/v4 v4.0.0/go.mod h1:eEew/i+1Q6OrCDZh3WiXYv3+nJwBASZ8Bog/87DQnVg= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= @@ -19,6 +23,8 @@ github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGX github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dvyukov/go-fuzz v0.0.0-20210103155950-6a8e9d1f2415/go.mod h1:11Gm+ccJnvAhCNLlf5+cS9KjtbaD5I5zaZpFMsTHWTw= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -27,8 +33,8 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7 github.com/felixge/fgprof v0.9.3 h1:VvyZxILNuCiUCSXtPtYmmtGvb65nqXh2QFWc0Wpf2/g= github.com/felixge/fgprof v0.9.3/go.mod h1:RdbpDgzqYVh/T9fPELJyV7EYJuHB55UTEULNun8eiPw= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= -github.com/go-redis/redis/v7 v7.0.0-beta.3.0.20190824101152-d19aba07b476 h1:WNSiFp8Ww4ZP7XUzW56zDYv5roKQ4VfsdHCLoh8oDj4= -github.com/go-redis/redis/v7 v7.0.0-beta.3.0.20190824101152-d19aba07b476/go.mod h1:xhhSbUMTsleRPur+Vgx9sUHtyN33bdjxY+9/0n9Ig8s= +github.com/go-redis/redis/v7 v7.4.1 h1:PASvf36gyUpr2zdOUS/9Zqc80GbM+9BDyiJSJDDOrTI= +github.com/go-redis/redis/v7 v7.4.1/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg= github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk= github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= @@ -49,23 +55,27 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/pprof v0.0.0-20211214055906-6f57359322fd h1:1FjCyPC+syAzJ5/2S8fqdZK1R22vvA0J7JZKcuOIQ7Y= github.com/google/pprof v0.0.0-20211214055906-6f57359322fd/go.mod h1:KgnwoLYCZ8IQu3XUZ8Nc/bM9CCZFOyjUNOSygVozoDg= github.com/google/protobuf v3.11.4+incompatible/go.mod h1:lUQ9D1ePzbH2PrIS7ob/bjm9HXyH5WHB0Akwh7URreM= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/maruel/natural v1.1.1 h1:Hja7XhhmvEFhcByqDoHz9QZbkWey+COd9xWfCfn1ioo= github.com/maruel/natural v1.1.1/go.mod h1:v+Rfd79xlw1AgVBjbO0BEQmptqb5HvL/k9GRHB7ZKEg= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/ginkgo v1.8.0 h1:VkHVNpR4iVnU8XQR6DBm8BqYjN7CRzw+xKUbVVbbW9w= -github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/gomega v1.5.0 h1:izbySO9zDPmjJ8rDjLvkA2zJHIo+HkYXHnf7eN7SSyo= -github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/onsi/ginkgo v1.10.1 h1:q/mM8GF/n0shIN8SaAZ0V+jnLPzen6WIVZdiwrRlMlo= +github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME= +github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/openconfig/gnmi v0.0.0-20200307010808-e7106f7f5493/go.mod h1:jMSUQIR4z9WTtM58/QBHbElXAwbUnomFdty1aund1uY= github.com/openconfig/gnmi v0.0.0-20200617225440-d2b4e6a45802 h1:WXFwJlWOJINlwlyAZuNo4GdYZS6qPX36+rRUncLmN8Q= github.com/openconfig/gnmi v0.0.0-20200617225440-d2b4e6a45802/go.mod h1:M/EcuapNQgvzxo1DDXHK4tx3QpYM/uG4l591v33jG2A= @@ -82,6 +92,8 @@ github.com/pkg/profile v1.7.0/go.mod h1:8Uer0jas47ZQMJ7VD+OHknK4YDY07LPUC6dEvqDj github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4= +github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -109,6 +121,7 @@ golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200301022130-244492dfa37a/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjND0hgjPMMyO1RoIXQNI= @@ -122,12 +135,14 @@ golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac h1:oN6lz7iLW/YC7un8pq+9bOLyXrprv2+DKfkJY+2LJJw= golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -140,7 +155,6 @@ golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= @@ -163,12 +177,14 @@ google.golang.org/protobuf v1.20.1/go.mod h1:KqelGeouBkcbcuB3HCk4/YH2tmNLk6YSWA5 google.golang.org/protobuf v1.21.0 h1:qdOKuR/EIArgaWNjetjgTzgVTAZ+S/WXVrq9HW9zimw= google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= -gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I= +gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/translib/app_utils_test.go b/translib/app_utils_test.go index e92ad2302..677b890da 100644 --- a/translib/app_utils_test.go +++ b/translib/app_utils_test.go @@ -151,9 +151,10 @@ func processDeleteRequest(url string) func(*testing.T) { func getConfigDb() *db.DB { configDb, _ := db.NewDB(db.Options{ - DBNo: db.ConfigDB, - TableNameSeparator: "|", - KeySeparator: "|", + DBNo: db.ConfigDB, + TableNameSeparator: "|", + KeySeparator: "|", + ForceNewRedisConnection: false, }) return configDb diff --git a/translib/cs/cs.go b/translib/cs/cs.go index 3704ceef0..0748ab876 100644 --- a/translib/cs/cs.go +++ b/translib/cs/cs.go @@ -181,8 +181,10 @@ func newCS(name string, username string, roles []string, pid int32) (*configSess } ccDB, err := db.NewDB(db.Options{DBNo: db.ConfigDB, - IsSession: true, - TxCmdsLim: ccDbTxCmdsLim}) + IsSession: true, + TxCmdsLim: ccDbTxCmdsLim, + ForceNewRedisConnection: true, + }) if err != nil { glog.Errorf("newCS: db.NewDB err %s", err) return nil, err diff --git a/translib/cs/cs_getdb.go b/translib/cs/cs_getdb.go index 633f4e685..e60db97cb 100644 --- a/translib/cs/cs_getdb.go +++ b/translib/cs/cs_getdb.go @@ -43,7 +43,7 @@ func (sess *Session) GetConfigDB(opts *db.Options) (*db.DB, bool, func(), if opts != nil { dopts = opts } else { - dopts = &(db.Options{DBNo: db.ConfigDB}) + dopts = &(db.Options{DBNo: db.ConfigDB, ForceNewRedisConnection: true}) } dopts.DBNo = db.ConfigDB d, err = db.NewDB(*dopts) diff --git a/translib/cs/cs_session_test.go b/translib/cs/cs_session_test.go index 5f3f65017..6fbe34849 100644 --- a/translib/cs/cs_session_test.go +++ b/translib/cs/cs_session_test.go @@ -157,7 +157,7 @@ func TestCSGetConfigDB(t *testing.T) { } t.Logf("Ensure GetConfigDB gets Candidate Config DB.") - dOpts := db.Options{DBNo: db.ConfigDB} + dOpts := db.Options{DBNo: db.ConfigDB, ForceNewRedisConnection: true} d, isCS, cleanup, err := sess.GetConfigDB(&dOpts) if d == nil || !isCS || err != nil { @@ -337,7 +337,7 @@ func TestCSTxWithSession(t *testing.T) { } t.Logf("Ensure GetConfigDB gets Candidate Config DB.") - dOpts := db.Options{DBNo: db.ConfigDB} + dOpts := db.Options{DBNo: db.ConfigDB, ForceNewRedisConnection: true} d, isCS, cleanup, err := sess.GetConfigDB(&dOpts) defer cleanup() @@ -378,7 +378,7 @@ func TestCSTxNoSession(t *testing.T) { } t.Logf("Ensure GetConfigDB gets Candidate Config DB.") - dOpts := db.Options{DBNo: db.ConfigDB} + dOpts := db.Options{DBNo: db.ConfigDB, ForceNewRedisConnection: true} d, isCS, cleanup, err := sess.GetConfigDB(&dOpts) defer cleanup() diff --git a/translib/db/cvl_db_access.go b/translib/db/cvl_db_access.go index 0baf12878..fca9fc159 100644 --- a/translib/db/cvl_db_access.go +++ b/translib/db/cvl_db_access.go @@ -20,6 +20,7 @@ package db import ( + "context" "encoding/json" "strconv" "strings" @@ -27,8 +28,8 @@ import ( "github.com/Azure/sonic-mgmt-common/cvl" ctypes "github.com/Azure/sonic-mgmt-common/cvl/common" "github.com/Azure/sonic-mgmt-common/translib/tlerr" - "github.com/go-redis/redis/v7" log "github.com/golang/glog" + "github.com/redis/go-redis/v9" ) type cvlDBAccess struct { @@ -294,7 +295,7 @@ func (p *dbAccessPipe) Keys(pattern string) ctypes.StrSliceResult { log.Infof("dbAccessPipe: Keys: for the given pattern: %v", pattern) } - pr := &pipeKeysResult{pattern: pattern, rsRes: p.rp.Keys(pattern)} + pr := &pipeKeysResult{pattern: pattern, rsRes: p.rp.Keys(context.Background(), pattern)} p.qryResList = append(p.qryResList, pr) return &pr.sRes } @@ -373,7 +374,7 @@ func (p *dbAccessPipe) HMGet(key string, fields ...string) ctypes.SliceResult { } } } else { - pr.rsRes = p.rp.HMGet(key, fields...) + pr.rsRes = p.rp.HMGet(context.Background(), key, fields...) } p.qryResList = append(p.qryResList, pr) @@ -418,7 +419,7 @@ func (p *dbAccessPipe) HGet(key, field string) ctypes.StrResult { if txEntry, ok := p.dbAccess.Db.txTsEntryMap[ts.Name][key]; ok { pr.val, pr.fldExist = txEntry.Field[field] } else { - pr.rsRes = p.rp.HGet(key, field) + pr.rsRes = p.rp.HGet(context.Background(), key, field) } p.qryResList = append(p.qryResList, pr) @@ -443,7 +444,7 @@ func (pr *pipeHGetResult) update(c *cvlDBAccess) { type pipeHGetAllResult struct { key string sRes mapResult - rsRes *redis.StringStringMapCmd + rsRes *redis.MapStringStringCmd fnvMap map[string]string } @@ -464,7 +465,7 @@ func (p *dbAccessPipe) HGetAll(key string) ctypes.StrMapResult { pr.fnvMap[k] = v } } else { - pr.rsRes = p.rp.HGetAll(key) + pr.rsRes = p.rp.HGetAll(context.Background(), key) } p.qryResList = append(p.qryResList, pr) @@ -490,7 +491,7 @@ func (p *dbAccessPipe) Exec() error { log.Infof("dbAccessPipe: Exec: query list: %v", p.qryResList) } - cmder, err := p.rp.Exec() + cmder, err := p.rp.Exec(context.Background()) if err != nil && err != redis.Nil { log.Warningf("dbAccessPipe: Exec: error in pipeline.Exec; error: %v; "+ "cmder: %v; pw.qryMap: %v", err, cmder, p.qryResList) @@ -511,7 +512,6 @@ func (p *dbAccessPipe) Close() { if log.V(5) { log.Infof("dbAccessPipe: Close: redis pipeliner: %v", p.rp) } - p.rp.Close() } //================================== diff --git a/translib/db/db.go b/translib/db/db.go index 81b65bb5e..87b241014 100644 --- a/translib/db/db.go +++ b/translib/db/db.go @@ -102,10 +102,10 @@ Example: package db import ( + "context" "fmt" "strconv" - // "reflect" "errors" "strings" "time" @@ -113,8 +113,8 @@ import ( "github.com/Azure/sonic-mgmt-common/cvl" cmn "github.com/Azure/sonic-mgmt-common/cvl/common" "github.com/Azure/sonic-mgmt-common/translib/tlerr" - "github.com/go-redis/redis/v7" "github.com/golang/glog" + "github.com/redis/go-redis/v9" ) const ( @@ -169,6 +169,12 @@ type Options struct { IsWriteDisabled bool //Is write/set mode disabled ? IsCacheEnabled bool //Is cache (Per Connection) allowed? + // Redis connection pools do not support transactional Redis operations. + // If the DB object will be used to perform transactions, the new connection + // flag must be set to request a unique Redis client. Transactions include + // SCAN and MULTI. + ForceNewRedisConnection bool + // OnChange caching for the DBs passed from Translib's Subscribe Infra // to the Apps. SDB is the SubscribeDB() returned handle on which // notifications of change are received. @@ -195,10 +201,10 @@ type Options struct { func (o Options) String() string { return fmt.Sprintf( - "{ DBNo: %v, InitIndicator: %v, TableNameSeparator: %v, KeySeparator: %v, IsWriteDisabled: %v, IsCacheEnabled: %v, IsOnChangeEnabled: %v, SDB: %v, DisableCVLCheck: %v, IsSession: %v, ConfigDBLazyLock: %v, TxCmdsLim: %v }", + "{ DBNo: %v, InitIndicator: %v, TableNameSeparator: %v, KeySeparator: %v, IsWriteDisabled: %v, IsCacheEnabled: %v, IsOnChangeEnabled: %v, ForceNewRedisConnection: %v, SDB: %v, DisableCVLCheck: %v, IsSession: %v, ConfigDBLazyLock: %v, TxCmdsLim: %v }", o.DBNo, o.InitIndicator, o.TableNameSeparator, o.KeySeparator, - o.IsWriteDisabled, o.IsCacheEnabled, o.IsOnChangeEnabled, o.SDB, - o.DisableCVLCheck, o.IsSession, o.ConfigDBLazyLock, o.TxCmdsLim) + o.IsWriteDisabled, o.IsCacheEnabled, o.IsOnChangeEnabled, o.ForceNewRedisConnection, + o.SDB, o.DisableCVLCheck, o.IsSession, o.ConfigDBLazyLock, o.TxCmdsLim) } type _txState int @@ -402,7 +408,15 @@ func NewDB(opt Options) (*DB, error) { var dur time.Duration now = time.Now() - d := DB{client: redis.NewClient(adjustRedisOpts(&opt)), + var rc *redis.Client + if opt.ForceNewRedisConnection { + rc = TransactionalRedisClient(opt.DBNo) + } else { + rc = RedisClient(opt.DBNo) + } + + d := DB{ + client: rc, Opts: &opt, txState: txStateNone, txCmds: make([]_txCmd, 0, InitialTxPipelineSize), @@ -455,7 +469,7 @@ func NewDB(opt Options) (*DB, error) { if opt.IsSession && opt.IsOnChangeEnabled { glog.Error("NewDB: Subscription on Config Session not supported : ", d.Name()) - d.client.Close() + CloseRedisClient(d.client) e = tlerr.TranslibDBNotSupported{ Description: "Subscription on Config Session not supported"} goto NewDBExit @@ -464,7 +478,7 @@ func NewDB(opt Options) (*DB, error) { if opt.IsSession && opt.DBNo != ConfigDB { glog.Error("NewDB: Non-Config DB on Config Session not supported : ", d.Name()) - d.client.Close() + CloseRedisClient(d.client) e = tlerr.TranslibDBNotSupported{ Description: "Non-Config DB on Config Session not supported"} goto NewDBExit @@ -490,7 +504,7 @@ func NewDB(opt Options) (*DB, error) { } else { glog.V(3).Info("NewDB: RedisCmd: ", d.Name(), ": ", "GET ", d.Opts.InitIndicator) - if init, err := d.client.Get(d.Opts.InitIndicator).Int(); init != 1 { + if init, err := d.client.Get(context.Background(), d.Opts.InitIndicator).Int(); init != 1 { glog.Error("NewDB: Database not inited: ", d.Name(), ": GET ", d.Opts.InitIndicator) @@ -498,7 +512,7 @@ func NewDB(opt Options) (*DB, error) { glog.Error("NewDB: Database not inited: ", d.Name(), ": GET ", d.Opts.InitIndicator, " returns err: ", err) } - d.client.Close() + CloseRedisClient(d.client) e = tlerr.TranslibDBNotInit{} goto NewDBExit } @@ -512,7 +526,7 @@ func NewDB(opt Options) (*DB, error) { if e = ConfigDBTryLock(noSessionToken); e != nil { glog.Errorf("NewDB: ConfigDB possibly locked: %s", e) - d.client.Close() + CloseRedisClient(d.client) goto NewDBExit } d.configDBLocked = true @@ -570,7 +584,7 @@ func (d *DB) DeleteDB() error { d.unRegisterSessionDB() } - err := d.client.Close() + err := CloseRedisClient(d.client) d.client = nil return err } @@ -580,16 +594,15 @@ func (d *DB) IsOpen() bool { } func (d *DB) key2redis(ts *TableSpec, key Key) string { - + rv := ts.Name + fields := strings.Join(key.Comp, d.Opts.KeySeparator) + if fields != "" { + rv = rv + d.Opts.KeySeparator + fields + } if glog.V(5) { - glog.Info("key2redis: Begin: ", - ts.Name+ - d.Opts.TableNameSeparator+ - strings.Join(key.Comp, d.Opts.KeySeparator)) - } - return ts.Name + - d.Opts.TableNameSeparator + - strings.Join(key.Comp, d.Opts.KeySeparator) + glog.Info("key2redis: Begin: ", rv) + } + return rv } func (d *DB) redis2key(ts *TableSpec, redisKey string) Key { @@ -689,7 +702,7 @@ func (d *DB) getEntry(ts *TableSpec, key Key, forceReadDB bool) (Value, error) { if glog.V(3) { glog.Info("getEntry: RedisCmd: ", d.Name(), ": ", "HGETALL ", entry) } - v, e = d.client.HGetAll(entry).Result() + v, e = d.client.HGetAll(context.Background(), entry).Result() value = Value{Field: v} } @@ -831,7 +844,7 @@ func (d *DB) GetKeysPattern(ts *TableSpec, pat Key) ([]Key, error) { glog.Info("GetKeysPattern: RedisCmd: ", d.Name(), ": ", "KEYS ", d.key2redis(ts, pat)) } var redisKeys []string - redisKeys, e = d.client.Keys(d.key2redis(ts, pat)).Result() + redisKeys, e = d.client.Keys(context.Background(), d.key2redis(ts, pat)).Result() keys = make([]Key, 0, len(redisKeys)) // On error, return promptly @@ -1142,7 +1155,7 @@ func (d *DB) doWrite(ts *TableSpec, op _txOp, k Key, val interface{}) error { for k, v := range val.(Value).Field { vintf[k] = v } - e = d.client.HMSet(d.key2redis(ts, key), vintf).Err() + e = d.client.HMSet(context.Background(), d.key2redis(ts, key), vintf).Err() if e != nil { glog.Error("doWrite: ", d.Name(), ": HMSet: ", key, " : ", @@ -1155,14 +1168,14 @@ func (d *DB) doWrite(ts *TableSpec, op _txOp, k Key, val interface{}) error { fields = append(fields, k) } - e = d.client.HDel(d.key2redis(ts, key), fields...).Err() + e = d.client.HDel(context.Background(), d.key2redis(ts, key), fields...).Err() if e != nil { glog.Error("doWrite: ", d.Name(), ": HDel: ", key, " : ", fields, " e: ", e) } case txOpDel: - e = d.client.Del(d.key2redis(ts, key)).Err() + e = d.client.Del(context.Background(), d.key2redis(ts, key)).Err() if e != nil { glog.Error("doWrite: ", d.Name(), ": Del: ", key, " : ", e) } @@ -1195,8 +1208,8 @@ func (d *DB) doWrite(ts *TableSpec, op _txOp, k Key, val interface{}) error { entry := d.key2redis(ts, key) if _, ok := d.txTsEntryMap[ts.Name][entry]; !ok { var v map[string]string - glog.Info("doWrite: RedisCmd: ", d.Name(), ": ", "HGETALL ", d.key2redis(ts, key)) - v, e = d.client.HGetAll(d.key2redis(ts, key)).Result() + glog.V(3).Info("doWrite: RedisCmd: ", d.Name(), ": ", "HGETALL ", d.key2redis(ts, key)) + v, e = d.client.HGetAll(context.Background(), d.key2redis(ts, key)).Result() if len(v) != 0 { d.txTsEntryMap[ts.Name][entry] = Value{Field: v} } else { @@ -1360,7 +1373,7 @@ func (d *DB) Publish(channel string, message interface{}) error { return ConnectionClosed } - e := d.client.Publish(channel, message).Err() + e := d.client.Publish(context.Background(), channel, message).Err() return e } @@ -1374,7 +1387,7 @@ func (d *DB) RunScript(script *redis.Script, keys []string, args ...interface{}) return nil } - return script.Run(d.client, keys, args...) + return script.Run(context.Background(), d.client, keys, args...) } // DeleteEntry deletes an entry(row) in the table. @@ -1679,8 +1692,8 @@ func (d *DB) performWatch(w []WatchKeys, tss []*TableSpec) error { continue } - glog.Info("performWatch: RedisCmd: ", d.Name(), ": ", "KEYS ", redisKey) - redisKeys, e := d.client.Keys(redisKey).Result() + glog.V(3).Info("performWatch: RedisCmd: ", d.Name(), ": ", "KEYS ", redisKey) + redisKeys, e := d.client.Keys(context.Background(), redisKey).Result() if e != nil { glog.Warning("performWatch: Keys: " + e.Error()) if first_e == nil { @@ -1705,8 +1718,8 @@ func (d *DB) performWatch(w []WatchKeys, tss []*TableSpec) error { } // Issue the WATCH - glog.Info("performWatch: Do: ", args) - _, e = d.client.Do(args...).Result() + glog.V(3).Info("performWatch: Do: ", args) + _, e = d.client.Do(context.Background(), args...).Result() if e != nil { glog.Warning("performWatch: Do: WATCH ", args, " e: ", e.Error()) @@ -1762,7 +1775,7 @@ func (d *DB) commitTx() error { // Issue MULTI glog.Info("CommitTx: Do: MULTI") - _, e = d.client.Do("MULTI").Result() + _, e = d.client.Do(context.Background(), "MULTI").Result() if e != nil { glog.Warning("CommitTx: Do: MULTI e: ", e.Error()) @@ -1791,7 +1804,7 @@ func (d *DB) commitTx() error { args = append(args, k, v) } - _, e = d.client.Do(args...).Result() + _, e = d.client.Do(context.Background(), args...).Result() case txOpHDel: @@ -1802,14 +1815,14 @@ func (d *DB) commitTx() error { args = append(args, k) } - _, e = d.client.Do(args...).Result() + _, e = d.client.Do(context.Background(), args...).Result() case txOpDel: args = make([]interface{}, 0, 2) args = append(args, "DEL", redisKey) - _, e = d.client.Do(args...).Result() + _, e = d.client.Do(context.Background(), args...).Result() default: glog.Error("CommitTx: Unknown, op: ", d.txCmds[i].op) @@ -1833,7 +1846,7 @@ func (d *DB) commitTx() error { if glog.V(4) { glog.Info("CommitTx: Do: SET ", d.ts2redisUpdated(&ts), " 1") } - _, e = d.client.Do("SET", d.ts2redisUpdated(&ts), "1").Result() + _, e = d.client.Do(context.Background(), "SET", d.ts2redisUpdated(&ts), "1").Result() if e != nil { glog.Warning("CommitTx: Do: SET ", d.ts2redisUpdated(&ts), " 1: e: ", @@ -1852,7 +1865,7 @@ func (d *DB) commitTx() error { // Issue EXEC glog.Info("CommitTx: Do: EXEC") - _, e = d.client.Do("EXEC").Result() + _, e = d.client.Do(context.Background(), "EXEC").Result() if e != nil { glog.Warning("CommitTx: Do: EXEC e: ", e.Error()) @@ -1914,8 +1927,8 @@ func (d *DB) abortTx() error { } // Issue UNWATCH - glog.Info("AbortTx: Do: UNWATCH") - _, e = d.client.Do("UNWATCH").Result() + glog.V(3).Info("AbortTx: Do: UNWATCH") + _, e = d.client.Do(context.Background(), "UNWATCH").Result() if e != nil { glog.Warning("AbortTx: Do: UNWATCH e: ", e.Error()) @@ -1949,7 +1962,7 @@ func (d *DB) markConfigDBUpdated() error { glog.Info("markConfigDBUpdated: Do: SET ", d.ts2redisUpdated(&TableSpec{Name: "*"}), " 1") } - _, e := d.client.Do("SET", d.ts2redisUpdated(&TableSpec{Name: "*"}), + _, e := d.client.Do(context.Background(), "SET", d.ts2redisUpdated(&TableSpec{Name: "*"}), strconv.FormatInt(time.Now().UnixNano(), 10)).Result() if e != nil { glog.Warning("markConfigDBUpdated: Do: SET ", @@ -2186,29 +2199,28 @@ func setError(e error, idx int, errors *[]error, numKeys int) { } // getMultiEntry retrieves the entries of the given keys using "redis pipeline". -func (d *DB) getMultiEntry(ts *TableSpec, keys []string) ([]*redis.StringStringMapCmd, error) { +func (d *DB) getMultiEntry(ts *TableSpec, keys []string) ([]*redis.MapStringStringCmd, error) { if glog.V(3) { glog.Info("getMultiEntry: Begin: ts: ", ts) } - var results = make([]*redis.StringStringMapCmd, len(keys)) + var results = make([]*redis.MapStringStringCmd, len(keys)) pipe := d.client.Pipeline() - defer pipe.Close() if glog.V(3) { glog.Info("getMultiEntry: RedisCmd: ", d.Name(), ": ", "pipe.HGetAll for the ", keys) } for i, key := range keys { - results[i] = pipe.HGetAll(key) + results[i] = pipe.HGetAll(context.Background(), key) } if glog.V(3) { glog.Info("getMultiEntry: RedisCmd: ", d.Name(), ": ", "pipe.Exec") } - _, err := pipe.Exec() + _, err := pipe.Exec(context.Background()) if glog.V(3) { glog.Info("getMultiEntry: End: ts: ", ts, "results: ", results, "err: ", err) diff --git a/translib/db/db_cursor.go b/translib/db/db_cursor.go index 99ade4b0b..8a3f53594 100644 --- a/translib/db/db_cursor.go +++ b/translib/db/db_cursor.go @@ -20,6 +20,7 @@ package db import ( + "context" "fmt" "reflect" @@ -88,7 +89,7 @@ type fieldScanner struct { } func (scnr *keyScanner) scan(sc *ScanCursor, countHint int64) ([]string, uint64, error) { - return sc.db.client.Scan(sc.cursor, + return sc.db.client.Scan(context.Background(), sc.cursor, sc.db.key2redis(sc.ts, sc.pattern), countHint).Result() } @@ -97,7 +98,7 @@ func (scnr *fieldScanner) scan(sc *ScanCursor, countHint int64) ([]string, uint6 if len(sc.pattern.Comp) > 0 { key = sc.db.key2redis(sc.ts, sc.pattern) } - return sc.db.client.HScan(key, sc.cursor, scnr.fldNamePattern, countHint).Result() + return sc.db.client.HScan(context.Background(), key, sc.cursor, scnr.fldNamePattern, countHint).Result() } //////////////////////////////////////////////////////////////////////////////// diff --git a/translib/db/db_cursor_test.go b/translib/db/db_cursor_test.go index 3885ab241..c93c1e54a 100644 --- a/translib/db/db_cursor_test.go +++ b/translib/db/db_cursor_test.go @@ -92,11 +92,12 @@ func TestNewScanCursor(t *testing.T) { var pid int = os.Getpid() d, e := NewDB(Options{ - DBNo: ConfigDB, - InitIndicator: "", - TableNameSeparator: "|", - KeySeparator: "|", - DisableCVLCheck: false, + DBNo: ConfigDB, + InitIndicator: "", + TableNameSeparator: "|", + KeySeparator: "|", + DisableCVLCheck: false, + ForceNewRedisConnection: false, }) if (d == nil) || (e != nil) { diff --git a/translib/db/db_cvl_hint_test.go b/translib/db/db_cvl_hint_test.go index 8fd281c8e..5110fef07 100644 --- a/translib/db/db_cvl_hint_test.go +++ b/translib/db/db_cvl_hint_test.go @@ -28,10 +28,11 @@ var hValue = map[string]string{"a": "1", "b": "2"} func newEnableCVLDB(dBNum DBNum) (*DB, error) { d, e := NewDB(Options{ - DBNo: dBNum, - InitIndicator: "", - TableNameSeparator: "|", - KeySeparator: "|", + DBNo: dBNum, + InitIndicator: "", + TableNameSeparator: "|", + KeySeparator: "|", + ForceNewRedisConnection: false, }) return d, e } diff --git a/translib/db/db_get.go b/translib/db/db_get.go index f8119a3d4..fbf984e55 100644 --- a/translib/db/db_get.go +++ b/translib/db/db_get.go @@ -20,6 +20,7 @@ package db import ( + "context" "strings" "github.com/Azure/sonic-mgmt-common/translib/tlerr" @@ -49,8 +50,8 @@ func (d *DB) Get(key string) (string, error) { return "", UseGetEntry } - glog.Info("Get: RedisCmd: ", d.Name(), ": ", "GET ", key) - val, e := d.client.Get(key).Result() + glog.V(2).Info("Get: RedisCmd: ", d.Name(), ": ", "GET ", key) + val, e := d.client.Get(context.Background(), key).Result() if glog.V(3) { glog.Info("Get: End: key: ", key, " val: ", val, " e: ", e) diff --git a/translib/db/db_get_config.go b/translib/db/db_get_config.go index 29e83db42..d16c0d826 100644 --- a/translib/db/db_get_config.go +++ b/translib/db/db_get_config.go @@ -20,9 +20,11 @@ package db import ( - "github.com/go-redis/redis/v7" + "context" + "github.com/golang/glog" "github.com/kylelemons/godebug/pretty" + "github.com/redis/go-redis/v9" ) type GetConfigOptions struct { @@ -131,7 +133,7 @@ func (d *DB) GetConfig(tables []*TableSpec, opt *GetConfigOptions) (map[TableSpe pipe := d.client.Pipeline() tss := make([]*TableSpec, 0, len(redisKeys)) - presults := make([]*redis.StringStringMapCmd, 0, len(redisKeys)) + presults := make([]*redis.MapStringStringCmd, 0, len(redisKeys)) keys := make([]Key, 0, len(redisKeys)) for index, redisKey := range redisKeys { @@ -157,7 +159,7 @@ func (d *DB) GetConfig(tables []*TableSpec, opt *GetConfigOptions) (map[TableSpe tss = append(tss, &rKts) keys = append(keys, key) - presults = append(presults, pipe.HGetAll(redisKey)) + presults = append(presults, pipe.HGetAll(context.Background(), redisKey)) } if glog.V(3) { @@ -169,10 +171,7 @@ func (d *DB) GetConfig(tables []*TableSpec, opt *GetConfigOptions) (map[TableSpe if glog.V(3) { glog.Info("GetConfig: RedisCmd: ", d.Name(), ": ", "pipe.Exec") } - _, err = pipe.Exec() // Ignore returned Cmds. If any err, log it. - - // Close the Pipeline - pipe.Close() + _, err = pipe.Exec(context.Background()) // Ignore returned Cmds. If any err, log it. if err != nil { glog.Error("GetConfig: pipe.Exec() err: ", err) diff --git a/translib/db/db_get_config_test.go b/translib/db/db_get_config_test.go index 04a5b621c..d292384b5 100644 --- a/translib/db/db_get_config_test.go +++ b/translib/db/db_get_config_test.go @@ -20,6 +20,7 @@ package db import ( + "context" "fmt" "reflect" "strings" @@ -70,7 +71,7 @@ func verifyGetConfigAllTables(t *testing.T, db *DB, opts *GetConfigOptions) { // Count the keys in all the tables tsM := make(map[TableSpec]int, 10) - redisKeys, e := db.client.Keys("*").Result() + redisKeys, e := db.client.Keys(context.Background(), "*").Result() if e != nil { t.Errorf("client.Keys() returns err: %v", e) } diff --git a/translib/db/db_keys_pattern.go b/translib/db/db_keys_pattern.go index 159147ffe..17f74083d 100644 --- a/translib/db/db_keys_pattern.go +++ b/translib/db/db_keys_pattern.go @@ -20,11 +20,12 @@ package db import ( + "context" "time" "github.com/Azure/sonic-mgmt-common/translib/tlerr" - "github.com/go-redis/redis/v7" "github.com/golang/glog" + "github.com/redis/go-redis/v9" ) //////////////////////////////////////////////////////////////////////////////// @@ -117,7 +118,7 @@ func (d *DB) ExistKeysPattern(ts *TableSpec, pat Key) (bool, error) { if d.Opts.IsWriteDisabled && !exists { var luaExists interface{} - if luaExists, err = luaScriptExistsKeysPatterns.Run(d.client, + if luaExists, err = luaScriptExistsKeysPatterns.Run(context.Background(), d.client, []string{d.key2redis(ts, pat)}).Result(); err == nil { if existsString, ok := luaExists.(string); !ok { diff --git a/translib/db/db_lock.go b/translib/db/db_lock.go index 071dfd854..d343f426e 100644 --- a/translib/db/db_lock.go +++ b/translib/db/db_lock.go @@ -22,6 +22,7 @@ package db // DB Layer Lock import ( + "context" "flag" "os" "path/filepath" @@ -31,9 +32,8 @@ import ( "time" "github.com/Azure/sonic-mgmt-common/translib/tlerr" - - "github.com/go-redis/redis/v7" "github.com/golang/glog" + "github.com/redis/go-redis/v9" ) const ( @@ -76,13 +76,13 @@ func (lt *LockStruct) tryLock() error { if client, err = getStateDB(); err != nil { return err } - defer client.Close() + defer CloseRedisClient(client) // HSETNX: Set Hash Field if Not Exist args := []interface{}{"HSETNX", lockTableKey, lt.Name, lt.comm + ":" + lt.Id} - glog.Info("tryLock: RedisCmd: STATE_DB: ", args) - if reply, err = client.Do(args...).Result(); err == nil { + glog.V(3).Info("tryLock: RedisCmd: STATE_DB: ", args) + if reply, err = client.Do(context.Background(), args...).Result(); err == nil { if intReply, ok := reply.(int64); !ok { glog.Errorf("tryLock: Reply %v Not int64: %v Type: %v", args, reply, reflect.TypeOf(reply)) @@ -116,10 +116,10 @@ func (lt *LockStruct) unlock() error { if client, err = getStateDB(); err != nil { return err } - defer client.Close() + defer CloseRedisClient(client) // Run the LUA Script to HDEL if we set the key - if reply, err = luaScriptUnlock.Run(client, + if reply, err = luaScriptUnlock.Run(context.Background(), client, []string{lockTableKey}, []string{lt.Name, lt.comm, lt.Id}).Result(); err == nil { @@ -152,7 +152,7 @@ func (lt *LockStruct) dbLockedError(c *redis.Client) error { lockId = lt.Id } else if c == nil { glog.Warningf("dbLockedError: nil db connection; assuming generic lock") - } else if v, err := c.HGet(lockTableKey, lt.Name).Result(); err != nil { + } else if v, err := c.HGet(context.Background(), lockTableKey, lt.Name).Result(); err != nil { glog.Warningf("dbLockedError: 'HGET %s %s' failed; err=%v", lockTableKey, lt.Name, err) } else if parts := strings.SplitN(v, ":", 2); len(parts) != 2 { glog.Warningf("dbLockedError: 'HGET %s %s' returned unknown value %q", lockTableKey, lt.Name, v) @@ -265,8 +265,7 @@ func dumpStack(begin, end int) { func getStateDB() (*redis.Client, error) { var client *redis.Client var err error - if client = redis.NewClient(adjustRedisOpts(&Options{ - DBNo: StateDB})); client == nil { + if client = TransactionalRedisClient(StateDB); client == nil { glog.Error("getStateDB: Could not create redis client: STATE_DB") err = tlerr.TranslibDBCannotOpen{} diff --git a/translib/db/db_lock_test.go b/translib/db/db_lock_test.go index 89029e912..5423f46c5 100644 --- a/translib/db/db_lock_test.go +++ b/translib/db/db_lock_test.go @@ -35,7 +35,7 @@ var stateDB *DB func setupKey(t *testing.T, ts *TableSpec, key Key, val Value) { var err error - if stateDB, err = NewDB(Options{DBNo: StateDB}); err != nil { + if stateDB, err = NewDB(Options{DBNo: StateDB, KeySeparator: "|"}); err != nil { t.Errorf("setupKey: NewDB(StateDB) fails: %v", err) } stateDB.DeleteEntry(ts, key) @@ -316,12 +316,13 @@ func TestLockConfigDBDeleteEntryFields(t *testing.T) { var pid int = os.Getpid() d, e := NewDB(Options{ - DBNo: ConfigDB, - InitIndicator: "", - TableNameSeparator: "|", - KeySeparator: "|", - ConfigDBLazyLock: true, - DisableCVLCheck: true, + DBNo: ConfigDB, + InitIndicator: "", + TableNameSeparator: "|", + KeySeparator: "|", + ConfigDBLazyLock: true, + DisableCVLCheck: true, + ForceNewRedisConnection: true, }) if d == nil { diff --git a/translib/db/db_onchangecache_test.go b/translib/db/db_onchangecache_test.go index da6eb5a31..ceea8e863 100644 --- a/translib/db/db_onchangecache_test.go +++ b/translib/db/db_onchangecache_test.go @@ -40,8 +40,10 @@ func TestOnChangeCacheReg(t *testing.T) { ts := &TableSpec{Name: "PORT"} t.Run("occDisable", func(t *testing.T) { d := newTestDB(t, Options{ - DBNo: ConfigDB, - IsWriteDisabled: true, + DBNo: ConfigDB, + IsWriteDisabled: true, + ForceNewRedisConnection: false, + KeySeparator: "|", }) if err := d.RegisterTableForOnChangeCaching(ts); err == nil { t.Fatal("RegisterTableForOnChangeCaching should have failed when IsEnableOnChange=false") @@ -53,9 +55,11 @@ func TestOnChangeCacheReg(t *testing.T) { t.Run("occEnable", func(t *testing.T) { d := newTestDB(t, Options{ - DBNo: ConfigDB, - IsWriteDisabled: true, - IsOnChangeEnabled: true, + DBNo: ConfigDB, + IsWriteDisabled: true, + IsOnChangeEnabled: true, + ForceNewRedisConnection: false, + KeySeparator: "|", }) if err := d.RegisterTableForOnChangeCaching(ts); err != nil { t.Fatal("RegisterTableForOnChangeCaching failed; ", err) @@ -70,8 +74,10 @@ func TestOnChangeCacheReg(t *testing.T) { t.Run("writeEnable", func(t *testing.T) { _, err := NewDB(Options{ - DBNo: ConfigDB, - IsOnChangeEnabled: true, + DBNo: ConfigDB, + IsOnChangeEnabled: true, + ForceNewRedisConnection: false, + KeySeparator: "|", }) if err == nil { t.Error("NewDB should have failed") @@ -81,9 +87,9 @@ func TestOnChangeCacheReg(t *testing.T) { func TestOnChangeCache(t *testing.T) { // OnChange cache enabled db - d := newTestDB(t, Options{DBNo: ConfigDB, IsWriteDisabled: true, IsOnChangeEnabled: true}) + d := newTestDB(t, Options{DBNo: ConfigDB, IsWriteDisabled: true, IsOnChangeEnabled: true, ForceNewRedisConnection: false, KeySeparator: "|"}) // Writale db to write test keys - dw := newTestDB(t, Options{DBNo: ConfigDB, DisableCVLCheck: true}) + dw := newTestDB(t, Options{DBNo: ConfigDB, DisableCVLCheck: true, ForceNewRedisConnection: false, KeySeparator: "|"}) tsA := &TableSpec{Name: "TESTOCC_A"} tsB := &TableSpec{Name: "TESTOCC_B"} diff --git a/translib/db/db_opts_to_test.go b/translib/db/db_opts_to_test.go index e78a85952..5920f55b0 100644 --- a/translib/db/db_opts_to_test.go +++ b/translib/db/db_opts_to_test.go @@ -29,7 +29,9 @@ import ( // "github.com/Azure/sonic-mgmt-common/translib/tlerr" // "os/exec" + "context" "os" + // "reflect" "strconv" "strings" @@ -37,7 +39,7 @@ import ( "testing" "time" - "github.com/go-redis/redis/v7" + "github.com/redis/go-redis/v9" ) func TestDefaultTimeout(t *testing.T) { @@ -47,12 +49,13 @@ func TestDefaultTimeout(t *testing.T) { t.Logf("TestDefaultTimeout: %s: begin", time.Now().String()) d, e := NewDB(Options{ - DBNo: ConfigDB, - InitIndicator: "", - TableNameSeparator: "|", - KeySeparator: "|", - IsWriteDisabled: true, - DisableCVLCheck: true, + DBNo: ConfigDB, + InitIndicator: "", + TableNameSeparator: "|", + KeySeparator: "|", + IsWriteDisabled: true, + DisableCVLCheck: true, + ForceNewRedisConnection: false, }) if d == nil { @@ -113,12 +116,13 @@ func blockLUAScript(wg *sync.WaitGroup, secs int, t *testing.T) { t.Logf("blockLUAScript: %s: begin: secs: %v", time.Now().String(), secs) d, e := NewDB(Options{ - DBNo: ConfigDB, - InitIndicator: "", - TableNameSeparator: "|", - KeySeparator: "|", - IsWriteDisabled: true, - DisableCVLCheck: true, + DBNo: ConfigDB, + InitIndicator: "", + TableNameSeparator: "|", + KeySeparator: "|", + IsWriteDisabled: true, + DisableCVLCheck: true, + ForceNewRedisConnection: false, }) if e != nil { @@ -135,7 +139,7 @@ end return i `) - if _, e := luaScript.Run(d.client, []string{}, secs).Int(); e != nil { + if _, e := luaScript.Run(context.Background(), d.client, []string{}, secs).Int(); e != nil { t.Logf("blockLUAScript: luaScript.Run() fails e = %v", e) } diff --git a/translib/db/db_redis_opts.go b/translib/db/db_redis_opts.go index 023fd6072..ecfa74d15 100644 --- a/translib/db/db_redis_opts.go +++ b/translib/db/db_redis_opts.go @@ -20,6 +20,7 @@ package db import ( + "context" "errors" "flag" "reflect" @@ -28,8 +29,8 @@ import ( "sync" "time" - "github.com/go-redis/redis/v7" "github.com/golang/glog" + "github.com/redis/go-redis/v9" ) //////////////////////////////////////////////////////////////////////////////// @@ -144,7 +145,7 @@ func adjustRedisOpts(dbOpt *Options) *redis.Options { return &redisOpts } -func init() { +func initializeRedisOpts() { flag.StringVar(&goRedisOpts, "go_redis_opts", "", "Options for go-redis") } @@ -197,7 +198,9 @@ func (config *_DBRedisOptsConfig) handleReconfigureSignal() error { //////////////////////////////////////////////////////////////////////////////// func (config *_DBRedisOptsConfig) readFromDB() error { - fields, e := readRedis("TRANSLIB_DB|default") + rc := TransactionalRedisClient(ConfigDB) + defer CloseRedisClient(rc) + fields, e := rc.HGetAll(context.Background(), "TRANSLIB_DB|default").Result() if e == nil { if optsString, ok := fields["go_redis_opts"]; ok { // Parse optsString into config.opts @@ -266,8 +269,8 @@ func (config *_DBRedisOptsConfig) parseRedisOptsConfig(optsString string) error } else { config.opts.MinIdleConns = int(intVal) } - case "MaxConnAge": - if config.opts.MaxConnAge, optSAErr = + case "MaxConnAge", "ConnMaxLifetime": + if config.opts.ConnMaxLifetime, optSAErr = time.ParseDuration(optSA[1]); optSAErr != nil { eS += ("Parse Error: " + optSA[0] + " :" + optSAErr.Error()) } @@ -276,13 +279,8 @@ func (config *_DBRedisOptsConfig) parseRedisOptsConfig(optsString string) error time.ParseDuration(optSA[1]); optSAErr != nil { eS += ("Parse Error: " + optSA[0] + " :" + optSAErr.Error()) } - case "IdleTimeout": - if config.opts.IdleTimeout, optSAErr = - time.ParseDuration(optSA[1]); optSAErr != nil { - eS += ("Parse Error: " + optSA[0] + " :" + optSAErr.Error()) - } - case "IdleCheckFrequency": - if config.opts.IdleCheckFrequency, optSAErr = + case "IdleTimeout", "ConnMaxIdleTime": + if config.opts.ConnMaxIdleTime, optSAErr = time.ParseDuration(optSA[1]); optSAErr != nil { eS += ("Parse Error: " + optSA[0] + " :" + optSAErr.Error()) } diff --git a/translib/db/db_redis_opts_test.go b/translib/db/db_redis_opts_test.go index 865917b66..b04c257a8 100644 --- a/translib/db/db_redis_opts_test.go +++ b/translib/db/db_redis_opts_test.go @@ -24,7 +24,7 @@ import ( "testing" "time" - "github.com/go-redis/redis/v7" + "github.com/redis/go-redis/v9" ) func TestSetGoRedisOpts(t *testing.T) { @@ -55,11 +55,12 @@ func compareRedisOptsString2Struct(t *testing.T, optsS string, opts *redis.Optio func compareRedisOptsDBRead2Struct(t *testing.T, optsS string, opts *redis.Options) { d, e := NewDB(Options{ - DBNo: ConfigDB, - InitIndicator: "", - TableNameSeparator: "|", - KeySeparator: "|", - DisableCVLCheck: true, + DBNo: ConfigDB, + InitIndicator: "", + TableNameSeparator: "|", + KeySeparator: "|", + DisableCVLCheck: true, + ForceNewRedisConnection: false, }) if d == nil { diff --git a/translib/db/db_redis_pipe_test.go b/translib/db/db_redis_pipe_test.go index 5d399b3a9..dea97b3e1 100644 --- a/translib/db/db_redis_pipe_test.go +++ b/translib/db/db_redis_pipe_test.go @@ -35,11 +35,12 @@ import ( func newDB(dBNum DBNum) (*DB, error) { d, e := NewDB(Options{ - DBNo: dBNum, - InitIndicator: "", - TableNameSeparator: "|", - KeySeparator: "|", - DisableCVLCheck: true, + DBNo: dBNum, + InitIndicator: "", + TableNameSeparator: "|", + KeySeparator: "|", + DisableCVLCheck: true, + ForceNewRedisConnection: true, }) return d, e } diff --git a/translib/db/db_rpcdb.go b/translib/db/db_rpcdb.go index bb98c7510..73a12d9f8 100644 --- a/translib/db/db_rpcdb.go +++ b/translib/db/db_rpcdb.go @@ -21,11 +21,12 @@ package db import ( // "sync" + "context" "time" "github.com/Azure/sonic-mgmt-common/translib/tlerr" - "github.com/go-redis/redis/v7" "github.com/golang/glog" + "github.com/redis/go-redis/v9" ) //////////////////////////////////////////////////////////////////////////////// @@ -58,8 +59,8 @@ func PubSubRpcDB(opt Options, responseChannel string) (*DB, error) { // responseChannel db.Subscribe() // db.receive () - d.rPubSub = d.client.Subscribe(responseChannel) - msg, e := d.rPubSub.Receive() + d.rPubSub = d.client.Subscribe(context.Background(), responseChannel) + msg, e := d.rPubSub.Receive(context.Background()) if e != nil { glog.Error("PubSubRpcDB: ", d.Name(), ": ", responseChannel, ": Receive() Error: ", e.Error()) @@ -92,7 +93,7 @@ func (d *DB) SendRpcRequest(requestChannel string, message string) (int, error) } // Publish on this channel - listeners, e := d.client.Publish(requestChannel, message).Result() + listeners, e := d.client.Publish(context.Background(), requestChannel, message).Result() if glog.V(3) { glog.Info("SendRpcRequest: End: listeners: ", requestChannel, diff --git a/translib/db/db_rpcdb_test.go b/translib/db/db_rpcdb_test.go index b107b44fe..d437ce089 100644 --- a/translib/db/db_rpcdb_test.go +++ b/translib/db/db_rpcdb_test.go @@ -61,7 +61,7 @@ func getChannelName() { } func openPubSubRpcDB(t *testing.T, nm string) *DB { - d, e := PubSubRpcDB(Options{DBNo: LogLevelDB}, nm) + d, e := PubSubRpcDB(Options{DBNo: LogLevelDB, ForceNewRedisConnection: true}, nm) if (d == nil) || (e != nil) { t.Fatalf("PubSubRpcDB() fails e = %v", e) diff --git a/translib/db/db_savepoint_test.go b/translib/db/db_savepoint_test.go index 161b4650c..3b5df4b78 100644 --- a/translib/db/db_savepoint_test.go +++ b/translib/db/db_savepoint_test.go @@ -97,12 +97,13 @@ func runSP(t *testing.T, tc *spTC) { } ccd, e := NewDB(Options{ - DBNo: ConfigDB, - InitIndicator: "", - TableNameSeparator: "|", - KeySeparator: "|", - IsSession: true, - DisableCVLCheck: true, + DBNo: ConfigDB, + InitIndicator: "", + TableNameSeparator: "|", + KeySeparator: "|", + IsSession: true, + DisableCVLCheck: true, + ForceNewRedisConnection: true, }) if e != nil { @@ -208,11 +209,12 @@ func runSP(t *testing.T, tc *spTC) { func TestSPDeclareSP(t *testing.T) { ccd, e := NewDB(Options{ - DBNo: ConfigDB, - InitIndicator: "", - TableNameSeparator: "|", - KeySeparator: "|", - IsSession: true, + DBNo: ConfigDB, + InitIndicator: "", + TableNameSeparator: "|", + KeySeparator: "|", + IsSession: true, + ForceNewRedisConnection: true, }) if e != nil { diff --git a/translib/db/db_stats.go b/translib/db/db_stats.go index 47a5c41a7..4af846120 100644 --- a/translib/db/db_stats.go +++ b/translib/db/db_stats.go @@ -20,11 +20,13 @@ package db import ( + "context" + "fmt" "reflect" "sync" "time" - "github.com/go-redis/redis/v7" + "github.com/redis/go-redis/v9" ) //////////////////////////////////////////////////////////////////////////////// @@ -101,11 +103,12 @@ type Stats struct { } type DBStats struct { - Name string `json:"name"` - AllTables Stats `json:"all-tables"` - AllMaps Stats `json:"all-maps"` - Tables map[string]Stats `json:"tables,omitempty"` - Maps map[string]Stats `json:"maps,omitempty"` + Name string `json:"name"` + AllTables Stats `json:"all-tables"` + AllMaps Stats `json:"all-maps"` + Tables map[string]Stats `json:"tables,omitempty"` + Maps map[string]Stats `json:"maps,omitempty"` + RedisPoolStats redis.PoolStats `json:"redis-pool-stats,omitempty"` } type DBGlobalStats struct { @@ -118,6 +121,12 @@ type DBGlobalStats struct { ZeroGetHits uint `json:"zero-get-ops-db"` + // Redis Stats + + CurTransactionalClients uint `json:"cur-transactional-clients,omitempty"` + TotalPoolClientsRequested uint `json:"total-pool-clients-requested,omitempty"` + TotalTransactionalClientsRequested uint `json:"total-transactional-clients-requested,omitempty"` + // TableStats are being collected (true) Databases []DBStats `json:"dbs,omitempty"` @@ -158,6 +167,22 @@ func (d *DB) GetStats() *DBStats { return &(d.stats) } +//////////////////////////////////////////////////////////////////////////////// +// Utility Function to create and return Redis Config DB client // +//////////////////////////////////////////////////////////////////////////////// + +func NewRedisConfigDBClient() (*redis.Client, error) { + client := RedisClient(ConfigDB) + + if client == nil { + return nil, fmt.Errorf("Cannot create Redis Config DB client.") + } + if _, err := client.Ping(context.Background()).Result(); err != nil { + return nil, err + } + return client, nil +} + //////////////////////////////////////////////////////////////////////////////// // Internal Functions // //////////////////////////////////////////////////////////////////////////////// @@ -196,9 +221,11 @@ func (stats *DBGlobalStats) getStats() (*DBGlobalStats, error) { mutexDBGlobalStats.Lock() + rcmCounters := RedisClientManagerCounters() dbGlobalStats = *stats for dbnum, db := range stats.Databases { - dbGlobalStats.Databases[dbnum].Name = DBNum(dbnum).String() + name := DBNum(dbnum).String() + dbGlobalStats.Databases[dbnum].Name = name dbGlobalStats.Databases[dbnum].Tables = make(map[string]Stats, len(db.Tables)) for name, table := range db.Tables { @@ -209,6 +236,10 @@ func (stats *DBGlobalStats) getStats() (*DBGlobalStats, error) { for name, mAP := range db.Maps { dbGlobalStats.Databases[dbnum].Maps[name] = mAP } + + if poolStats := rcmCounters.PoolStatsPerDB[name]; poolStats != nil { + dbGlobalStats.Databases[dbnum].RedisPoolStats = *poolStats + } } mutexDBGlobalStats.Unlock() @@ -222,6 +253,11 @@ func (stats *DBGlobalStats) getStatsTotals() (uint, time.Duration, time.Duration mutexDBGlobalStats.Lock() + rcmCounters := RedisClientManagerCounters() + stats.CurTransactionalClients = uint(rcmCounters.CurTransactionalClients) + stats.TotalPoolClientsRequested = uint(rcmCounters.TotalPoolClientsRequested) + stats.TotalTransactionalClientsRequested = uint(rcmCounters.TotalTransactionalClientsRequested) + for _, db := range stats.Databases { if db.AllTables.Hits != 0 { @@ -523,30 +559,10 @@ func (config *DBStatsConfig) readFromDB() error { //////////////////////////////////////////////////////////////////////////////// func readRedis(key string) (map[string]string, error) { + client := RedisClient(ConfigDB) + defer CloseRedisClient(client) - ipAddr := DefaultRedisLocalTCPEP - dbId := int(ConfigDB) - dbPassword := "" - if dbInstName := getDBInstName(ConfigDB); dbInstName != "" { - if isDbInstPresent(dbInstName) { - ipAddr = getDbTcpAddr(dbInstName) - dbId = getDbId(dbInstName) - dbPassword = getDbPassword(dbInstName) - } - } - - client := redis.NewClient(&redis.Options{ - Network: "tcp", - Addr: ipAddr, - Password: dbPassword, - DB: dbId, - DialTimeout: 0, - PoolSize: 1, - }) - - fields, e := client.HGetAll(key).Result() - - client.Close() + fields, e := client.HGetAll(context.Background(), key).Result() return fields, e } diff --git a/translib/db/db_subscribe_test.go b/translib/db/db_subscribe_test.go index be2a83429..9897f4f41 100644 --- a/translib/db/db_subscribe_test.go +++ b/translib/db/db_subscribe_test.go @@ -48,12 +48,13 @@ var unknownCCNotif bool func newCCDB(dBNum DBNum) (*DB, error) { d, e := NewDB(Options{ - DBNo: dBNum, - InitIndicator: "", - TableNameSeparator: "|", - KeySeparator: "|", - DisableCVLCheck: true, - IsSession: true, + DBNo: dBNum, + InitIndicator: "", + TableNameSeparator: "|", + KeySeparator: "|", + DisableCVLCheck: true, + IsSession: true, + ForceNewRedisConnection: true, }) return d, e } @@ -155,10 +156,11 @@ func TestSubscribeHFunc(t *testing.T) { }}) subD, e := SubscribeDB(Options{ - DBNo: ConfigDB, - InitIndicator: "", - TableNameSeparator: "|", - KeySeparator: "|", + DBNo: ConfigDB, + InitIndicator: "", + TableNameSeparator: "|", + KeySeparator: "|", + ForceNewRedisConnection: true, }, sKeys, subHdlr) if e != nil { @@ -314,10 +316,11 @@ func TestSubscribeHFuncSA(t *testing.T) { }}) subD, e := SubscribeDBSA(Options{ - DBNo: ConfigDB, - InitIndicator: "", - TableNameSeparator: "|", - KeySeparator: "|", + DBNo: ConfigDB, + InitIndicator: "", + TableNameSeparator: "|", + KeySeparator: "|", + ForceNewRedisConnection: true, }, sKeys, subHdlrSA) if e != nil { @@ -439,10 +442,11 @@ func TestSubscribeNoCCNotif2HFuncSA(t *testing.T) { }}) subD, e := SubscribeDBSA(Options{ - DBNo: ConfigDB, - InitIndicator: "", - TableNameSeparator: "|", - KeySeparator: "|", + DBNo: ConfigDB, + InitIndicator: "", + TableNameSeparator: "|", + KeySeparator: "|", + ForceNewRedisConnection: true, }, sKeys, subHdlrSA) if e != nil { diff --git a/translib/db/db_table_pattern.go b/translib/db/db_table_pattern.go index d52180746..2fe217c19 100644 --- a/translib/db/db_table_pattern.go +++ b/translib/db/db_table_pattern.go @@ -20,11 +20,12 @@ package db import ( + "context" "time" "github.com/Azure/sonic-mgmt-common/translib/tlerr" - "github.com/go-redis/redis/v7" "github.com/golang/glog" + "github.com/redis/go-redis/v9" ) //////////////////////////////////////////////////////////////////////////////// @@ -120,7 +121,7 @@ func (d *DB) GetTablePattern(ts *TableSpec, pat Key) (Table, error) { } // Run the Lua script - luaTable, err = luaScriptGetTable.Run(d.client, + luaTable, err = luaScriptGetTable.Run(context.Background(), d.client, []string{d.key2redis(ts, pat)}).Result() if err != nil { return table, err diff --git a/translib/db/db_table_pattern_test.go b/translib/db/db_table_pattern_test.go index 175d6a73d..429798360 100644 --- a/translib/db/db_table_pattern_test.go +++ b/translib/db/db_table_pattern_test.go @@ -123,8 +123,10 @@ func TestGetTablePatternCompOrigEmpty(t *testing.T) { func TestGetTablePattern_txCache(t *testing.T) { d := newTestDB(t, Options{ - DBNo: ConfigDB, - DisableCVLCheck: true, + DBNo: ConfigDB, + DisableCVLCheck: true, + ForceNewRedisConnection: true, + KeySeparator: "|", }) setupTestData(t, d.client, map[string]map[string]interface{}{ "TEST_INTERFACE|Ethernet0": {"vrf": "Vrf1"}, @@ -167,6 +169,9 @@ func TestGetTablePattern_txCache(t *testing.T) { d.CreateEntry(testTable, *NewKey("Ethernet2"), nullValue) d.CreateEntry(testTable, *NewKey("Ethernet2", "102.0.0.1/24"), nullValue) d.CreateEntry(testTable, *NewKey("Ethernet2", "102.0.0.2/32"), nullValue) + if err := d.CommitTx(); err != nil { + t.Fatal("CommitTx() failed: ", err) + } // Rerun the tests diff --git a/translib/db/db_test.go b/translib/db/db_test.go index 17d10316d..1a6001dc6 100644 --- a/translib/db/db_test.go +++ b/translib/db/db_test.go @@ -20,14 +20,16 @@ package db import ( + "context" "fmt" "os" "reflect" "strconv" + "sync" "testing" "time" - "github.com/go-redis/redis/v7" + "github.com/redis/go-redis/v9" ) var dbConfig = ` @@ -115,25 +117,27 @@ var dbOnC *DB func newReadOnlyDB(dBNum DBNum) (*DB, error) { d, e := NewDB(Options{ - DBNo: dBNum, - InitIndicator: "", - TableNameSeparator: "|", - KeySeparator: "|", - DisableCVLCheck: true, - IsWriteDisabled: true, + DBNo: dBNum, + InitIndicator: "", + TableNameSeparator: "|", + KeySeparator: "|", + DisableCVLCheck: true, + IsWriteDisabled: true, + ForceNewRedisConnection: false, }) return d, e } func newOnCDB(dBNum DBNum) (*DB, error) { d, e := NewDB(Options{ - DBNo: dBNum, - InitIndicator: "", - TableNameSeparator: "|", - KeySeparator: "|", - DisableCVLCheck: true, - IsWriteDisabled: true, - IsOnChangeEnabled: true, + DBNo: dBNum, + InitIndicator: "", + TableNameSeparator: "|", + KeySeparator: "|", + DisableCVLCheck: true, + IsWriteDisabled: true, + IsOnChangeEnabled: true, + ForceNewRedisConnection: true, }) return d, e } @@ -142,10 +146,10 @@ func newOnCDB(dBNum DBNum) (*DB, error) { // whne the test case ends. func setupTestData(t *testing.T, redis *redis.Client, data map[string]map[string]interface{}) { keys := make([]string, 0, len(data)) - t.Cleanup(func() { redis.Del(keys...) }) + t.Cleanup(func() { redis.Del(context.Background(), keys...) }) for k, v := range data { keys = append(keys, k) - if _, err := redis.HMSet(k, v).Result(); err != nil { + if _, err := redis.HMSet(context.Background(), k, v).Result(); err != nil { t.Fatalf("HMSET %s failed; err=%v", k, err) } } @@ -233,11 +237,12 @@ func TestMain(m *testing.M) { func TestNewDB(t *testing.T) { d, e := NewDB(Options{ - DBNo: ConfigDB, - InitIndicator: "", - TableNameSeparator: "|", - KeySeparator: "|", - DisableCVLCheck: true, + DBNo: ConfigDB, + InitIndicator: "", + TableNameSeparator: "|", + KeySeparator: "|", + DisableCVLCheck: true, + ForceNewRedisConnection: false, }) if d == nil { @@ -262,11 +267,12 @@ func TestNoTransaction(t *testing.T) { var pid int = os.Getpid() d, e := NewDB(Options{ - DBNo: ConfigDB, - InitIndicator: "", - TableNameSeparator: "|", - KeySeparator: "|", - DisableCVLCheck: true, + DBNo: ConfigDB, + InitIndicator: "", + TableNameSeparator: "|", + KeySeparator: "|", + DisableCVLCheck: true, + ForceNewRedisConnection: false, }) if d == nil { @@ -328,11 +334,12 @@ func TestTable(t *testing.T) { var pid int = os.Getpid() d, e := NewDB(Options{ - DBNo: ConfigDB, - InitIndicator: "", - TableNameSeparator: "|", - KeySeparator: "|", - DisableCVLCheck: true, + DBNo: ConfigDB, + InitIndicator: "", + TableNameSeparator: "|", + KeySeparator: "|", + DisableCVLCheck: true, + ForceNewRedisConnection: false, }) if d == nil { @@ -512,11 +519,12 @@ func TestTransactionCacheWithDBContentKeysPattern(t *testing.T) { var pid int = os.Getpid() d, e := NewDB(Options{ - DBNo: ConfigDB, - InitIndicator: "", - TableNameSeparator: "|", - KeySeparator: "|", - DisableCVLCheck: true, + DBNo: ConfigDB, + InitIndicator: "", + TableNameSeparator: "|", + KeySeparator: "|", + DisableCVLCheck: true, + ForceNewRedisConnection: true, }) if d == nil { @@ -609,11 +617,12 @@ func TestTransactionCacheMultiKeysPattern(t *testing.T) { var pid int = os.Getpid() d, e := NewDB(Options{ - DBNo: ConfigDB, - InitIndicator: "", - TableNameSeparator: "|", - KeySeparator: "|", - DisableCVLCheck: true, + DBNo: ConfigDB, + InitIndicator: "", + TableNameSeparator: "|", + KeySeparator: "|", + DisableCVLCheck: true, + ForceNewRedisConnection: true, }) if d == nil { @@ -706,11 +715,12 @@ func TestTransactionCacheWithDBContentKeys(t *testing.T) { var pid int = os.Getpid() d, e := NewDB(Options{ - DBNo: ConfigDB, - InitIndicator: "", - TableNameSeparator: "|", - KeySeparator: "|", - DisableCVLCheck: true, + DBNo: ConfigDB, + InitIndicator: "", + TableNameSeparator: "|", + KeySeparator: "|", + DisableCVLCheck: true, + ForceNewRedisConnection: true, }) if d == nil { @@ -782,11 +792,12 @@ func TestTransactionCacheWithDBContentDel(t *testing.T) { var pid int = os.Getpid() d, e := NewDB(Options{ - DBNo: ConfigDB, - InitIndicator: "", - TableNameSeparator: "|", - KeySeparator: "|", - DisableCVLCheck: true, + DBNo: ConfigDB, + InitIndicator: "", + TableNameSeparator: "|", + KeySeparator: "|", + DisableCVLCheck: true, + ForceNewRedisConnection: true, }) if d == nil { @@ -856,11 +867,12 @@ func TestTransactionCacheWithDBContentDelFields(t *testing.T) { var pid int = os.Getpid() d, e := NewDB(Options{ - DBNo: ConfigDB, - InitIndicator: "", - TableNameSeparator: "|", - KeySeparator: "|", - DisableCVLCheck: true, + DBNo: ConfigDB, + InitIndicator: "", + TableNameSeparator: "|", + KeySeparator: "|", + DisableCVLCheck: true, + ForceNewRedisConnection: true, }) if d == nil { @@ -937,11 +949,12 @@ func TestTransactionCacheWithDBContentMod(t *testing.T) { var pid int = os.Getpid() d, e := NewDB(Options{ - DBNo: ConfigDB, - InitIndicator: "", - TableNameSeparator: "|", - KeySeparator: "|", - DisableCVLCheck: true, + DBNo: ConfigDB, + InitIndicator: "", + TableNameSeparator: "|", + KeySeparator: "|", + DisableCVLCheck: true, + ForceNewRedisConnection: true, }) if d == nil { @@ -1017,11 +1030,12 @@ func TestTransactionCacheWithDBContentSet(t *testing.T) { var pid int = os.Getpid() d, e := NewDB(Options{ - DBNo: ConfigDB, - InitIndicator: "", - TableNameSeparator: "|", - KeySeparator: "|", - DisableCVLCheck: true, + DBNo: ConfigDB, + InitIndicator: "", + TableNameSeparator: "|", + KeySeparator: "|", + DisableCVLCheck: true, + ForceNewRedisConnection: true, }) if d == nil { @@ -1090,11 +1104,12 @@ func testTransactionCache(t *testing.T, transRun TransRun) { var pid int = os.Getpid() d, e := NewDB(Options{ - DBNo: ConfigDB, - InitIndicator: "", - TableNameSeparator: "|", - KeySeparator: "|", - DisableCVLCheck: true, + DBNo: ConfigDB, + InitIndicator: "", + TableNameSeparator: "|", + KeySeparator: "|", + DisableCVLCheck: true, + ForceNewRedisConnection: true, }) if d == nil { @@ -1471,11 +1486,12 @@ func testTransaction(t *testing.T, transRun TransRun) { switch transRun { case TransRunFailWatchKeys, TransRunFailTable: d2, e2 := NewDB(Options{ - DBNo: ConfigDB, - InitIndicator: "", - TableNameSeparator: "|", - KeySeparator: "|", - DisableCVLCheck: true, + DBNo: ConfigDB, + InitIndicator: "", + TableNameSeparator: "|", + KeySeparator: "|", + DisableCVLCheck: true, + ForceNewRedisConnection: true, }) if e2 != nil { @@ -1525,11 +1541,12 @@ func TestMap(t *testing.T) { var pid int = os.Getpid() d, e := NewDB(Options{ - DBNo: ConfigDB, - InitIndicator: "", - TableNameSeparator: "|", - KeySeparator: "|", - DisableCVLCheck: true, + DBNo: ConfigDB, + InitIndicator: "", + TableNameSeparator: "|", + KeySeparator: "|", + DisableCVLCheck: true, + ForceNewRedisConnection: false, }) if d == nil { @@ -1569,11 +1586,12 @@ func TestSubscribe(t *testing.T) { var hSetCalled, hDelCalled, delCalled bool d, e := NewDB(Options{ - DBNo: ConfigDB, - InitIndicator: "", - TableNameSeparator: "|", - KeySeparator: "|", - DisableCVLCheck: true, + DBNo: ConfigDB, + InitIndicator: "", + TableNameSeparator: "|", + KeySeparator: "|", + DisableCVLCheck: true, + ForceNewRedisConnection: true, }) if (d == nil) || (e != nil) { @@ -1597,11 +1615,12 @@ func TestSubscribe(t *testing.T) { }}) s, e := SubscribeDB(Options{ - DBNo: ConfigDB, - InitIndicator: "", - TableNameSeparator: "|", - KeySeparator: "|", - DisableCVLCheck: true, + DBNo: ConfigDB, + InitIndicator: "", + TableNameSeparator: "|", + KeySeparator: "|", + DisableCVLCheck: true, + ForceNewRedisConnection: true, }, skeys, func(s *DB, skey *SKey, key *Key, event SEvent) error { @@ -1641,3 +1660,503 @@ func TestSubscribe(t *testing.T) { t.Errorf("DeleteDB() fails e = %v", e) } } + +func TestCreateRedisClient(t *testing.T) { + tests := []struct { + name string + db DBNum + }{ + { + name: "ValidDB", + db: ConfigDB, + }, + { + name: "NonexistentDB", + db: 12, + }, + { + name: "InvalidDB", + db: MaxDB, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + rc := createRedisClient(test.db, 1) + if rc == nil { + t.Fatal("Nil client returned!") + } + }) + } +} + +func TestRedisClient(t *testing.T) { + tests := []struct { + name string + db DBNum + valid bool + }{ + { + name: "ValidDB", + db: ConfigDB, + valid: true, + }, + { + name: "NonexistentDB", + db: 12, + valid: false, + }, + { + name: "InvalidDB", + db: MaxDB, + valid: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + rc := RedisClient(test.db) + if test.valid == (rc == nil) { + t.Fatalf("Test expected valid=%v but got rc=%v", test.valid, rc) + } + }) + } +} + +func TestTransactionalRedisClient(t *testing.T) { + tests := []struct { + name string + db DBNum + valid bool + }{ + { + name: "ValidDB", + db: ConfigDB, + valid: true, + }, + { + name: "NonexistentDB", + db: 12, + valid: false, + }, + { + name: "InvalidDB", + db: MaxDB, + valid: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + rc := TransactionalRedisClient(test.db) + if test.valid == (rc == nil) { + t.Fatalf("Test expected valid=%v but got rc=%v", test.valid, rc) + } + }) + } +} + +func TestCloseRedisClient(t *testing.T) { + tests := []struct { + name string + client *redis.Client + closed bool + }{ + { + name: "ValidRedisClient", + client: RedisClient(ConfigDB), + closed: false, + }, + { + name: "NonexistentRedisClient", + client: RedisClient(12), + closed: false, + }, + { + name: "InvalidRedisClient", + client: RedisClient(MaxDB), + closed: false, + }, + { + name: "ValidRedisClientForTransaction", + client: TransactionalRedisClient(ConfigDB), + closed: true, + }, + { + name: "NonexistentRedisClientForTransaction", + client: TransactionalRedisClient(12), + closed: true, + }, + { + name: "InvalidRedisClientForTransaction", + client: TransactionalRedisClient(MaxDB), + closed: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + if err := CloseRedisClient(test.client); err != nil { + t.Fatalf("Failed to close Redis client: %v", err) + } + + if test.client != nil { + // The client should only be closed for transactional clients + _, err := test.client.Ping(context.Background()).Result() + /* This if case is added because connections close status changes based on usePools"*/ + if (test.name == "ValidRedisClient") && (*usePools == false) { + if (err == nil) == !test.closed { + t.Fatalf("Expected client closed=%v, but got err=%v", test.closed, err) + } + } else { + if (err == nil) == test.closed { + t.Fatalf("Expected client closed=%v, but got err=%v", test.closed, err) + } + } + } + }) + } + + // Test double close behavior + t.Run("DoubleCloseClient", func(t *testing.T) { + client := TransactionalRedisClient(ConfigDB) + if err := CloseRedisClient(client); err != nil { + t.Fatalf("Failed to close redis client on the first attempt: %v", err) + } + if err := CloseRedisClient(client); err == nil { + t.Fatalf("Second close attempt did not return an error!") + } + }) +} + +func TestIsTransactionalClient(t *testing.T) { + tests := []struct { + name string + client *redis.Client + transactional bool + }{ + { + name: "NonTransactionalRedisClient", + client: RedisClient(ConfigDB), + transactional: false, + }, + { + name: "TransactionalRedisClient", + client: TransactionalRedisClient(ConfigDB), + transactional: true, + }, + { + name: "NilRedisClient", + client: nil, + transactional: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + tc := IsTransactionalClient(test.client) + if (test.name == "NonTransactionalRedisClient") && (*usePools == false) { + if tc == test.transactional { + t.Fatalf("Invalid response from IsTransactionalClient! for NonTransactionalRedisClient with usePools as false - got:%v want:%v", tc, !test.transactional) + } + + } else { + + if tc != test.transactional { + t.Fatalf("Invalid response from IsTransactionalClient! got:%v want:%v", tc, test.transactional) + } + } + }) + } +} + +func TestConnectionPoolDisable(t *testing.T) { + origUsePools := *usePools + *usePools = false + defer func() { *usePools = origUsePools }() + client := RedisClient(ConfigDB) + defer CloseRedisClient(client) + + if err := client.Ping(context.Background()).Err(); err != nil { + t.Fatalf("Failed to create client: %v", err) + } + if ps := client.Options().PoolSize; ps != 1 { + t.Fatalf("Incorrect pool size: %v", ps) + } +} + +func TestNilRCM(t *testing.T) { + var client *redis.Client + + rcm = nil + client = RedisClient(ConfigDB) + if rcm == nil { + t.Fatal("RCM is still nil!") + } + if client == nil { + t.Fatalf("Invalid return value for GetRedisClient: %v", client) + } + + client = nil + rcm = nil + client = TransactionalRedisClient(ConfigDB) + if rcm == nil { + t.Fatal("RCM is still nil!") + } + if client == nil { + t.Fatalf("Invalid return value for GetRedisClientForTransaction: %v", client) + } + + rcm = nil + if err := CloseRedisClient(client); err == nil { + t.Fatalf("CloseRedisClient did not return an error!") + } +} + +func TestRedisCounters(t *testing.T) { + t.Logf("usePools is %v", *usePools) + if *usePools { + // Reset RCM + rcm = nil + initializeRedisClientManager() + + if ctc := rcm.curTransactionalClients.Load(); ctc != 0 { + t.Fatalf("RCM curTransactionalClients expected=%v, got=%v", 0, ctc) + } + if trcr := rcm.totalPoolClientsRequested.Load(); trcr != 0 { + t.Fatalf("RCM totalPoolClientsRequested expected=%v, got=%v", 0, trcr) + } + if ttcr := rcm.totalTransactionalClientsRequested.Load(); ttcr != 0 { + t.Fatalf("RCM totalTransactionalRedisClientsRequested expected=%v, got=%v", 0, ttcr) + } + + // Getting a Redis Client from the cache increments correct counter + rc := RedisClient(ConfigDB) + if ctc := rcm.curTransactionalClients.Load(); ctc != 0 { + t.Fatalf("RCM curTransactionalClients expected=%v, got=%v", 0, ctc) + } + if trcr := rcm.totalPoolClientsRequested.Load(); trcr != 1 { + t.Fatalf("RCM totalPoolClientsRequested expected=%v, got=%v", 1, trcr) + } + if ttcr := rcm.totalTransactionalClientsRequested.Load(); ttcr != 0 { + t.Fatalf("RCM totalTransactionalRedisClientsRequested expected=%v, got=%v", 0, ttcr) + } + + // Getting a transactional Redis Client should increment the counter + trc1 := TransactionalRedisClient(ConfigDB) + if ctc := rcm.curTransactionalClients.Load(); ctc != 1 { + t.Fatalf("RCM curTransactionalClients expected=%v, got=%v", 1, ctc) + } + if trcr := rcm.totalPoolClientsRequested.Load(); trcr != 1 { + t.Fatalf("RCM totalPoolClientsRequested expected=%v, got=%v", 1, trcr) + } + if ttcr := rcm.totalTransactionalClientsRequested.Load(); ttcr != 1 { + t.Fatalf("RCM totalTransactionalRedisClientsRequested expected=%v, got=%v", 1, ttcr) + } + + trc2 := TransactionalRedisClient(StateDB) + if ctc := rcm.curTransactionalClients.Load(); ctc != 2 { + t.Fatalf("RCM curTransactionalClients expected=%v, got=%v", 2, ctc) + } + if trcr := rcm.totalPoolClientsRequested.Load(); trcr != 1 { + t.Fatalf("RCM totalPoolClientsRequested expected=%v, got=%v", 1, trcr) + } + if ttcr := rcm.totalTransactionalClientsRequested.Load(); ttcr != 2 { + t.Fatalf("RCM totalTransactionalRedisClientsRequested expected=%v, got=%v", 2, ttcr) + } + + // Closing a Redis Client from the cache should not decrement any counters + if err := CloseRedisClient(rc); err != nil { + t.Fatalf("Got error while closing Redis Client: %v", err) + } + if ctc := rcm.curTransactionalClients.Load(); ctc != 2 { + t.Fatalf("RCM curTransactionalClients expected=%v, got=%v", 2, ctc) + } + if trcr := rcm.totalPoolClientsRequested.Load(); trcr != 1 { + t.Fatalf("RCM totalPoolClientsRequested expected=%v, got=%v", 1, trcr) + } + if ttcr := rcm.totalTransactionalClientsRequested.Load(); ttcr != 2 { + t.Fatalf("RCM totalTransactionalRedisClientsRequested expected=%v, got=%v", 2, ttcr) + } + + // Closing a transactional Redis Client should decrement the right counter + if err := CloseRedisClient(trc1); err != nil { + t.Fatalf("Got error while closing Redis Client: %v", err) + } + if ctc := rcm.curTransactionalClients.Load(); ctc != 1 { + t.Fatalf("RCM curTransactionalClients expected=%v, got=%v", 1, ctc) + } + if trcr := rcm.totalPoolClientsRequested.Load(); trcr != 1 { + t.Fatalf("RCM totalPoolClientsRequested expected=%v, got=%v", 1, trcr) + } + if ttcr := rcm.totalTransactionalClientsRequested.Load(); ttcr != 2 { + t.Fatalf("RCM totalTransactionalRedisClientsRequested expected=%v, got=%v", 2, ttcr) + } + + if err := CloseRedisClient(trc2); err != nil { + t.Fatalf("Got error while closing Redis Client: %v", err) + } + if ctc := rcm.curTransactionalClients.Load(); ctc != 0 { + t.Fatalf("RCM curTransactionalClients expected=%v, got=%v", 0, ctc) + } + if trcr := rcm.totalPoolClientsRequested.Load(); trcr != 1 { + t.Fatalf("RCM totalPoolClientsRequested expected=%v, got=%v", 1, trcr) + } + if ttcr := rcm.totalTransactionalClientsRequested.Load(); ttcr != 2 { + t.Fatalf("RCM totalTransactionalRedisClientsRequested expected=%v, got=%v", 2, ttcr) + } + } else { + // Reset RCM + rcm = nil + initializeRedisClientManager() + + if ctc := rcm.curTransactionalClients.Load(); ctc != 0 { + t.Fatalf("RCM curTransactionalClients expected=%v, got=%v", 0, ctc) + } + if trcr := rcm.totalPoolClientsRequested.Load(); trcr != 0 { + t.Fatalf("RCM totalPoolClientsRequested expected=%v, got=%v", 0, trcr) + } + if ttcr := rcm.totalTransactionalClientsRequested.Load(); ttcr != 0 { + t.Fatalf("RCM totalTransactionalRedisClientsRequested expected=%v, got=%v", 0, ttcr) + } + + // Getting a Redis Client from the cache increments correct counter + rc := RedisClient(ConfigDB) + if ctc := rcm.curTransactionalClients.Load(); ctc == 0 { + t.Fatalf("RCM curTransactionalClients expected=%v, got=%v", 0, ctc) + } + if trcr := rcm.totalPoolClientsRequested.Load(); trcr == 1 { + t.Fatalf("RCM totalPoolClientsRequested expected=%v, got=%v", 1, trcr) + } + if ttcr := rcm.totalTransactionalClientsRequested.Load(); ttcr == 0 { + t.Fatalf("RCM totalTransactionalRedisClientsRequested expected=%v, got=%v", 0, ttcr) + } + + // Getting a transactional Redis Client should increment the counter + trc1 := TransactionalRedisClient(ConfigDB) + if ctc := rcm.curTransactionalClients.Load(); ctc == 1 { + t.Fatalf("RCM curTransactionalClients expected=%v, got=%v", 1, ctc) + } + if trcr := rcm.totalPoolClientsRequested.Load(); trcr == 1 { + t.Fatalf("RCM totalPoolClientsRequested expected=%v, got=%v", 1, trcr) + } + if ttcr := rcm.totalTransactionalClientsRequested.Load(); ttcr == 1 { + t.Fatalf("RCM totalTransactionalRedisClientsRequested expected=%v, got=%v", 1, ttcr) + } + + trc2 := TransactionalRedisClient(StateDB) + if ctc := rcm.curTransactionalClients.Load(); ctc == 0 { + t.Fatalf("RCM curTransactionalClients expected=%v, got=%v", 2, ctc) + } + if trcr := rcm.totalPoolClientsRequested.Load(); trcr == 1 { + t.Fatalf("RCM totalPoolClientsRequested expected=%v, got=%v", 1, trcr) + } + if ttcr := rcm.totalTransactionalClientsRequested.Load(); ttcr == 2 { + t.Fatalf("RCM totalTransactionalRedisClientsRequested expected=%v, got=%v", 2, ttcr) + } + + // Closing a Redis Client from the cache should not decrement any counters + if err := CloseRedisClient(rc); err != nil { + t.Fatalf("Got error while closing Redis Client: %v", err) + } + if ctc := rcm.curTransactionalClients.Load(); ctc != 2 { + t.Fatalf("RCM curTransactionalClients expected=%v, got=%v", 2, ctc) + } + if trcr := rcm.totalPoolClientsRequested.Load(); trcr == 1 { + t.Fatalf("RCM totalPoolClientsRequested expected=%v, got=%v", 1, trcr) + } + if ttcr := rcm.totalTransactionalClientsRequested.Load(); ttcr == 2 { + t.Fatalf("RCM totalTransactionalRedisClientsRequested expected=%v, got=%v", 2, ttcr) + } + + // Closing a transactional Redis Client should decrement the right counter + if err := CloseRedisClient(trc1); err != nil { + t.Fatalf("Got error while closing Redis Client: %v", err) + } + if ctc := rcm.curTransactionalClients.Load(); ctc != 1 { + t.Fatalf("RCM curTransactionalClients expected=%v, got=%v", 1, ctc) + } + if trcr := rcm.totalPoolClientsRequested.Load(); trcr == 1 { + t.Fatalf("RCM totalPoolClientsRequested expected=%v, got=%v", 1, trcr) + } + if ttcr := rcm.totalTransactionalClientsRequested.Load(); ttcr == 2 { + t.Fatalf("RCM totalTransactionalRedisClientsRequested expected=%v, got=%v", 2, ttcr) + } + + if err := CloseRedisClient(trc2); err != nil { + t.Fatalf("Got error while closing Redis Client: %v", err) + } + if ctc := rcm.curTransactionalClients.Load(); ctc != 0 { + t.Fatalf("RCM curTransactionalClients expected=%v, got=%v", 0, ctc) + } + if trcr := rcm.totalPoolClientsRequested.Load(); trcr == 1 { + t.Fatalf("RCM totalPoolClientsRequested expected=%v, got=%v", 1, trcr) + } + if ttcr := rcm.totalTransactionalClientsRequested.Load(); ttcr == 2 { + t.Fatalf("RCM totalTransactionalRedisClientsRequested expected=%v, got=%v", 2, ttcr) + } + } +} + +func TestRedisClientManagerCounters(t *testing.T) { + if rcm == nil { + initializeRedisClientManager() + } + rcm.curTransactionalClients.Store(10) + rcm.totalPoolClientsRequested.Store(50) + rcm.totalTransactionalClientsRequested.Store(15) + + counters := RedisClientManagerCounters() + if counters.CurTransactionalClients != 10 { + t.Fatalf("RCM curTransactionalClients expected=%v, got=%v", 10, counters.CurTransactionalClients) + } + if counters.TotalPoolClientsRequested != 50 { + t.Fatalf("RCM totalPoolClientsRequested expected=%v, got=%v", 50, counters.TotalPoolClientsRequested) + } + if counters.TotalTransactionalClientsRequested != 15 { + t.Fatalf("RCM totalTransactionalClientsRequested expected=%v, got=%v", 15, counters.TotalTransactionalClientsRequested) + } + + for db, poolStats := range counters.PoolStatsPerDB { + if poolStats == nil { + t.Fatalf("RCM PoolStats is nil for db=%v", db) + } + } +} + +func TestRedisClientPoolExhaustion(t *testing.T) { + testPoolSize := 2 + testClients := 5 + + rc := createRedisClient(ConfigDB, testPoolSize) + defer rc.Close() + if err := rc.HSet(context.Background(), "REDIS_TEST_TABLE|test", map[string]interface{}{"test_field": "test_value"}).Err(); err != nil { + t.Fatalf("Failed to set test data: %v", err) + } + + wg := sync.WaitGroup{} + for i := 0; i < testClients; i++ { + wg.Add(1) + go func() { + defer wg.Done() + entry, err := rc.HGetAll(context.Background(), "REDIS_TEST_TABLE|test").Result() + if err != nil { + t.Fatalf("Failed to read the test table: %v", err) + } + if value := entry["test_field"]; value != "test_value" { + t.Fatalf("Got incorrect data from DB read: want=%v, got=%v", "test_value", value) + } + }() + } + wg.Wait() + + // Verify pool stats + poolStats := rc.PoolStats() + t.Logf("Redis Client PoolStats: %v", poolStats) + if tc := poolStats.TotalConns; tc != uint32(testPoolSize) { + t.Errorf("Invalid TotalConns value: want=%v, got=%v", testPoolSize, tc) + } + if timeouts := poolStats.Timeouts; timeouts != 0 { + t.Errorf("Invalid Timeouts value: want=%v, got=%v", 0, timeouts) + } +} diff --git a/translib/db/map.go b/translib/db/map.go index cf809960f..b67cd2d21 100644 --- a/translib/db/map.go +++ b/translib/db/map.go @@ -23,6 +23,7 @@ Package db implements a wrapper over the go-redis/redis. package db import ( + "context" "time" "github.com/golang/glog" @@ -71,7 +72,7 @@ func (d *DB) GetMap(ts *TableSpec, mapKey string) (string, error) { glog.Info("GetMap: RedisCmd: ", d.Name(), ": ", "HGET ", ts.Name, mapKey) - v, e = d.client.HGet(ts.Name, mapKey).Result() + v, e = d.client.HGet(context.Background(), ts.Name, mapKey).Result() // If cache SetCache (i.e. a cache miss) if d.dbCacheConfig.PerConnection && d.dbCacheConfig.isCacheMap(ts.Name) { @@ -166,8 +167,8 @@ func (d *DB) GetMapAll(ts *TableSpec) (Value, error) { if !cacheHit { - glog.Info("GetMapAll: RedisCmd: ", d.Name(), ": ", "HGETALL ", ts.Name) - v, e = d.client.HGetAll(ts.Name).Result() + glog.V(3).Info("GetMapAll: RedisCmd: ", d.Name(), ": ", "HGETALL ", ts.Name) + v, e = d.client.HGetAll(context.Background(), ts.Name).Result() if len(v) != 0 { @@ -248,7 +249,7 @@ func (d *DB) SetMap(ts *TableSpec, mapKey string, mapValue string) error { ":", mapValue) } - b, e := d.client.HSet(ts.Name, mapKey, mapValue).Result() + b, e := d.client.HSet(context.Background(), ts.Name, mapKey, mapValue).Result() if glog.V(3) { glog.Info("GetMap: End: ", "b: ", b, " e: ", e) @@ -267,7 +268,7 @@ func (d *DB) DeleteMapAll(ts *TableSpec) error { glog.Info("DeleteMapAll: Begin: ", "ts: ", ts) } - e := d.client.Del(ts.Name).Err() + e := d.client.Del(context.Background(), ts.Name).Err() if glog.V(3) { glog.Info("DeleteMapAll: End: ", " e: ", e) diff --git a/translib/db/rcm.go b/translib/db/rcm.go new file mode 100644 index 000000000..99f51a3eb --- /dev/null +++ b/translib/db/rcm.go @@ -0,0 +1,197 @@ +package db + +import ( + "context" + "flag" + "fmt" + "sync" + "sync/atomic" + + log "github.com/golang/glog" + "github.com/redis/go-redis/v9" +) + +var usePools = flag.Bool("use_connection_pools", false, "use connection pools for Redis Clients") + +const ( + POOL_SIZE = 25 +) + +var rcm *redisClientManager +var initMu = &sync.Mutex{} + +type redisClientManager struct { + // clients holds one Redis Client for each DBNum + clients [MaxDB + 1]*redis.Client + mu *sync.RWMutex + curTransactionalClients atomic.Int32 + totalPoolClientsRequested atomic.Uint64 + totalTransactionalClientsRequested atomic.Uint64 +} + +type RedisCounters struct { + CurTransactionalClients uint32 // The number of transactional clients currently opened. + TotalPoolClientsRequested uint64 // The total number of Redis Clients using a connection pool requested. + TotalTransactionalClientsRequested uint64 // The total number of Transactional Redis Clients requested. + PoolStatsPerDB map[string]*redis.PoolStats // The pool counters for each Redis Client in the cache. +} + +func init() { + initializeRedisOpts() + initializeRedisClientManager() +} + +func initializeRedisClientManager() { + initMu.Lock() + defer initMu.Unlock() + if rcm != nil { + return + } + rcm = &redisClientManager{ + clients: [MaxDB + 1]*redis.Client{}, + mu: &sync.RWMutex{}, + curTransactionalClients: atomic.Int32{}, + totalPoolClientsRequested: atomic.Uint64{}, + totalTransactionalClientsRequested: atomic.Uint64{}, + } + rcm.mu.Lock() + defer rcm.mu.Unlock() + for dbNum := DBNum(0); dbNum < MaxDB; dbNum++ { + if len(getDBInstName(dbNum)) == 0 { + continue + } + // Create a Redis Client for each database. + rcm.clients[int(dbNum)] = createRedisClient(dbNum, POOL_SIZE) + } +} + +func createRedisClient(db DBNum, poolSize int) *redis.Client { + opts := adjustRedisOpts(&Options{DBNo: db}) + opts.PoolSize = poolSize + client := redis.NewClient(opts) + if _, err := client.Ping(context.Background()).Result(); err != nil { + log.V(0).Infof("RCM error during Redis Client creation for DBNum=%v: %v", db, err) + } + return client +} + +func createRedisClientWithOpts(opts *redis.Options) *redis.Client { + client := redis.NewClient(opts) + if _, err := client.Ping(context.Background()).Result(); err != nil { + log.V(0).Infof("RCM error during Redis Client creation for DBNum=%v: %v", opts.DB, err) + } + return client +} + +func getClient(db DBNum) *redis.Client { + rcm.mu.RLock() + defer rcm.mu.RUnlock() + return rcm.clients[int(db)] +} + +// RedisClient will return a Redis Client that can be used for non-transactional Redis operations. +// The client returned by this function is shared among many DB readers/writers and uses +// a connection pool. For transactional Redis operations, please use GetRedisClientForTransaction(). +func RedisClient(db DBNum) *redis.Client { + if rcm == nil { + initializeRedisClientManager() + } + if !(*usePools) { // Connection Pooling is disabled. + return TransactionalRedisClient(db) + } + if len(getDBInstName(db)) == 0 { + log.V(0).Infof("Invalid DBNum requested: %v", db) + return nil + } + rcm.totalPoolClientsRequested.Add(1) + rc := getClient(db) + if rc == nil { + log.V(0).Infof("RCM Redis client for DBNum=%v is nil!", db) + rcm.mu.Lock() + defer rcm.mu.Unlock() + if rc = rcm.clients[int(db)]; rc != nil { + return rc + } + rc = createRedisClient(db, POOL_SIZE) + rcm.clients[int(db)] = rc + } + return rc +} + +// TransactionalRedisClient will create and return a unique Redis client. This client can be used +// for transactional operations. These operations include MULTI, PSUBSCRIBE (PubSub), and SCAN. This +// client must be closed using CloseRedisClient when it is no longer needed. +func TransactionalRedisClient(db DBNum) *redis.Client { + if rcm == nil { + initializeRedisClientManager() + } + if len(getDBInstName(db)) == 0 { + log.V(0).Infof("Invalid DBNum requested: %v", db) + return nil + } + rcm.totalTransactionalClientsRequested.Add(1) + client := createRedisClient(db, 1) + rcm.curTransactionalClients.Add(1) + return client +} + +func TransactionalRedisClientWithOpts(opts *redis.Options) *redis.Client { + if rcm == nil { + initializeRedisClientManager() + } + rcm.totalTransactionalClientsRequested.Add(1) + opts.PoolSize = 1 + client := createRedisClientWithOpts(opts) + rcm.curTransactionalClients.Add(1) + return client +} + +// CloseUniqueRedisClient will close the Redis client that is passed in. +func CloseRedisClient(rc *redis.Client) error { + if rcm == nil { + return fmt.Errorf("RCM is nil when trying to close Redis Client: %v", rc) + } + if rc == nil { + return nil + } + // Closing a Redis Client with a connection pool is a no-op because these clients need to stay open. + if !IsTransactionalClient(rc) { + return nil + } + if err := rc.Close(); err != nil { + return err + } + rcm.curTransactionalClients.Add(-1) + return nil +} + +// IsTransactionalClient returns true if rc is a transactional client and false otherwise. +func IsTransactionalClient(rc *redis.Client) bool { + if rc == nil { + return false + } + return rc.Options().PoolSize == 1 +} + +// RedisClientManagerCounters returns the counters stored in the RCM. +func RedisClientManagerCounters() *RedisCounters { + if rcm == nil { + initializeRedisClientManager() + } + counters := &RedisCounters{ + CurTransactionalClients: uint32(rcm.curTransactionalClients.Load()), + TotalPoolClientsRequested: rcm.totalPoolClientsRequested.Load(), + TotalTransactionalClientsRequested: rcm.totalTransactionalClientsRequested.Load(), + PoolStatsPerDB: map[string]*redis.PoolStats{}, + } + rcm.mu.RLock() + defer rcm.mu.RUnlock() + for db, client := range rcm.clients { + dbName := getDBInstName(DBNum(db)) + if dbName == "" || client == nil { + continue + } + counters.PoolStatsPerDB[dbName] = client.PoolStats() + } + return counters +} diff --git a/translib/db/subscribe.go b/translib/db/subscribe.go index 19147c052..f8afddfd0 100644 --- a/translib/db/subscribe.go +++ b/translib/db/subscribe.go @@ -23,12 +23,19 @@ Package db implements a wrapper over the go-redis/redis. package db import ( + "context" "errors" "strconv" "strings" "github.com/Azure/sonic-mgmt-common/translib/tlerr" "github.com/golang/glog" + "github.com/redis/go-redis/v9" +) + +const ( + subscriptionChannelSize = 1000 + channelDepthWarnThreshold = 100 ) // SKey is (TableSpec, Key, []SEvent) 3-tuples to be watched in a Transaction. @@ -140,6 +147,7 @@ func iSubscribeDB(opt Options, skeys []*SKey, handler interface{}) (*DB, error) } opt.IsSubscribeDB = true + opt.ForceNewRedisConnection = false // NewDB d, e := NewDB(opt) @@ -150,7 +158,7 @@ func iSubscribeDB(opt Options, skeys []*SKey, handler interface{}) (*DB, error) // Make sure that the DB is configured for key space notifications // Optimize with LUA scripts to atomically add "Kgshxe". - s, e = d.client.ConfigSet("notify-keyspace-events", "AKE").Result() + s, e = d.client.ConfigSet(context.Background(), "notify-keyspace-events", "AKE").Result() if e != nil { glog.Error("SubscribeDB: ConfigSet(): e: ", e, " s: ", s) @@ -169,7 +177,7 @@ func iSubscribeDB(opt Options, skeys []*SKey, handler interface{}) (*DB, error) glog.Info("SubscribeDB: patterns: ", patterns) - d.sPubSub = d.client.PSubscribe(patterns[:]...) + d.sPubSub = d.client.PSubscribe(context.Background(), patterns[:]...) if d.sPubSub == nil { glog.Error("SubscribeDB: PSubscribe() nil: pats: ", patterns) @@ -180,7 +188,7 @@ func iSubscribeDB(opt Options, skeys []*SKey, handler interface{}) (*DB, error) d.sOnCCacheDB = d.Opts.SDB // Wait for confirmation, of channel creation - _, e = d.sPubSub.Receive() + _, e = d.sPubSub.Receive(context.Background()) if e != nil { glog.Error("SubscribeDB: Receive() fails: e: ", e) @@ -193,9 +201,16 @@ func iSubscribeDB(opt Options, skeys []*SKey, handler interface{}) (*DB, error) // Start a goroutine to read messages and call handler. go func() { - for msg := range d.sPubSub.Channel() { - if glog.V(4) { - glog.Info("SubscribeDB: msg: ", msg) + maxchandepth := 0 + pschan := d.sPubSub.Channel(redis.WithChannelSize(subscriptionChannelSize)) + for msg := range pschan { + glog.V(3).Info("SubscribeDB: msg: ", msg) + curdepth := len(pschan) + if curdepth > maxchandepth { + maxchandepth = curdepth + if maxchandepth > channelDepthWarnThreshold { + glog.V(1).Infof("Max observed depth for len(pschan): %d", maxchandepth) + } } // Should this be a goroutine, in case each notification CB diff --git a/translib/subscribe.go b/translib/subscribe.go index 302570188..816f0c8c3 100644 --- a/translib/subscribe.go +++ b/translib/subscribe.go @@ -173,7 +173,7 @@ func Subscribe(req SubscribeRequest) error { paths := req.Paths log.Infof("[%v] Subscribe: paths = %v", sid, paths) - dbs, err := getAllDbs(withWriteDisable, withOnChange) + dbs, err := getAllDbs(withWriteDisable, withOnChange, withForceNewRedisConnection) if err != nil { return err } diff --git a/translib/transformer/transformer_test.go b/translib/transformer/transformer_test.go index 1a98df0e4..f6f5addc3 100644 --- a/translib/transformer/transformer_test.go +++ b/translib/transformer/transformer_test.go @@ -19,6 +19,7 @@ package transformer_test import ( + "context" "encoding/json" "fmt" "io/ioutil" @@ -26,8 +27,8 @@ import ( "github.com/Azure/sonic-mgmt-common/translib/db" "github.com/Azure/sonic-mgmt-common/translib/ocbinds" - "github.com/go-redis/redis/v7" "github.com/openconfig/ygot/ytypes" + "github.com/redis/go-redis/v9" "testing" ) @@ -106,13 +107,13 @@ func clearDb() { for dbNum, tblList := range dbNumTblList { for _, tbl := range tblList { - tblKeys, keysErr := rclientDBNum[dbNum].Keys(tbl + "|*").Result() + tblKeys, keysErr := rclientDBNum[dbNum].Keys(context.Background(), tbl+"|*").Result() if keysErr != nil { fmt.Printf("Couldn't fetch keys for table %v", tbl) continue } for _, key := range tblKeys { - e := rclientDBNum[dbNum].Del(key).Err() + e := rclientDBNum[dbNum].Del(context.Background(), key).Err() if e != nil { fmt.Printf("Couldn't delete key %v", key) } @@ -176,7 +177,7 @@ func teardown() error { clearDb() for dbNum := range rclientDBNum { if rclientDBNum[dbNum] != nil { - rclientDBNum[dbNum].Close() + db.CloseRedisClient(rclientDBNum[dbNum]) } } @@ -191,7 +192,7 @@ func loadDB(dbNum db.DBNum, mpi map[string]interface{}) { case map[string]interface{}: for subKey, subValue := range fv.(map[string]interface{}) { newKey := key + opts.KeySeparator + subKey - _, err := client.HMSet(newKey, subValue.(map[string]interface{})).Result() + _, err := client.HMSet(context.Background(), newKey, subValue.(map[string]interface{})).Result() if err != nil { fmt.Printf("Invalid data for db: %v : %v %v", newKey, subValue, err) @@ -212,7 +213,7 @@ func unloadDB(dbNum db.DBNum, mpi map[string]interface{}) { case map[string]interface{}: for subKey, subValue := range fv.(map[string]interface{}) { newKey := key + opts.KeySeparator + subKey - _, err := client.Del(newKey).Result() + _, err := client.Del(context.Background(), newKey).Result() if err != nil { fmt.Printf("Invalid data for db: %v : %v %v", newKey, subValue, err) @@ -227,30 +228,8 @@ func unloadDB(dbNum db.DBNum, mpi map[string]interface{}) { } func getDbClient(dbNum int) *redis.Client { - addr := "localhost:6379" - pass := "" - for _, d := range dbConfig.Databases { - if id, ok := d["id"]; !ok || int(id.(float64)) != dbNum { - continue - } - - dbi := dbConfig.Instances[d["instance"].(string)] - addr = fmt.Sprintf("%v:%v", dbi["hostname"], dbi["port"]) - if p, ok := dbi["password_path"].(string); ok { - pwd, _ := ioutil.ReadFile(p) - pass = string(pwd) - } - break - } - - rclient := redis.NewClient(&redis.Options{ - Network: "tcp", - Addr: addr, - Password: pass, - DB: dbNum, - DialTimeout: 0, - }) - _, err := rclient.Ping().Result() + rclient := db.TransactionalRedisClient(db.DBNum(dbNum)) + _, err := rclient.Ping(context.Background()).Result() if err != nil { fmt.Printf("failed to connect to redis server %v", err) } diff --git a/translib/transformer/utils_test.go b/translib/transformer/utils_test.go index e54193a6c..cef0decf7 100644 --- a/translib/transformer/utils_test.go +++ b/translib/transformer/utils_test.go @@ -19,6 +19,7 @@ package transformer_test import ( + "context" "encoding/json" "fmt" "io/ioutil" @@ -27,7 +28,7 @@ import ( . "github.com/Azure/sonic-mgmt-common/translib" db "github.com/Azure/sonic-mgmt-common/translib/db" - "github.com/go-redis/redis/v7" + "github.com/redis/go-redis/v9" ) type queryParamsUT struct { @@ -216,7 +217,7 @@ func getConfigDb() *db.DB { func verifyDbResult(client *redis.Client, key string, expectedResult map[string]interface{}, errorCase bool) func(*testing.T) { return func(t *testing.T) { - result, err := client.HGetAll(key).Result() + result, err := client.HGetAll(context.Background(), key).Result() if err != nil { t.Fatalf("Error %v hgetall for key: %s", err, key) } diff --git a/translib/transformer/xlate_utils.go b/translib/transformer/xlate_utils.go index e3966dad3..beeb1f4f0 100644 --- a/translib/transformer/xlate_utils.go +++ b/translib/transformer/xlate_utils.go @@ -615,32 +615,14 @@ func findInMap(m map[string]string, str string) string { return "" } -func getDBOptions(dbNo db.DBNum, opts ...func(*db.Options)) db.Options { +func getDBOptions(dbNo db.DBNum) db.Options { var opt db.Options - separator := "" - if dbConfigMap != nil { - dbName := db.GetDBInstName(dbNo) - dbList, ok := dbConfigMap["DATABASES"].(map[string]interface{}) - if ok { - dbSep, ok := dbList[dbName].(map[string]interface{})["separator"] - if ok { - separator = dbSep.(string) - } - } - } - - if separator == "" { - switch dbNo { - case db.ApplDB, db.CountersDB, db.ErrorDB, db.FlexCounterDB, db.AsicDB, db.LogLevelDB: - separator = ":" - case db.SnmpDB, db.ConfigDB, db.StateDB, db.EventDB: - separator = "|" - } - } - opt = getDBOptionsWithSeparator(dbNo, "", separator, separator) - for _, setopt := range opts { - setopt(&opt) + switch dbNo { + case db.ApplDB, db.CountersDB, db.FlexCounterDB, db.AsicDB: + opt = getDBOptionsWithSeparator(dbNo, "", ":", ":") + case db.ConfigDB, db.StateDB: + opt = getDBOptionsWithSeparator(dbNo, "", "|", "|") } return opt diff --git a/translib/translib.go b/translib/translib.go index efa69f6f7..a98c986cb 100644 --- a/translib/translib.go +++ b/translib/translib.go @@ -185,7 +185,7 @@ func Create(req SetRequest) (SetResponse, error) { writeMutex.Lock() defer writeMutex.Unlock() - d, err := db.NewDB(getDBOptions(db.ConfigDB)) + d, err := db.NewDB(getDBOptions(db.ConfigDB, withForceNewRedisConnection)) if err != nil { resp.ErrSrc = ProtoErr @@ -258,7 +258,7 @@ func Update(req SetRequest) (SetResponse, error) { writeMutex.Lock() defer writeMutex.Unlock() - d, err := db.NewDB(getDBOptions(db.ConfigDB)) + d, err := db.NewDB(getDBOptions(db.ConfigDB, withForceNewRedisConnection)) if err != nil { resp.ErrSrc = ProtoErr @@ -332,7 +332,7 @@ func Replace(req SetRequest) (SetResponse, error) { writeMutex.Lock() defer writeMutex.Unlock() - d, err := db.NewDB(getDBOptions(db.ConfigDB)) + d, err := db.NewDB(getDBOptions(db.ConfigDB, withForceNewRedisConnection)) if err != nil { resp.ErrSrc = ProtoErr @@ -405,7 +405,7 @@ func Delete(req SetRequest) (SetResponse, error) { writeMutex.Lock() defer writeMutex.Unlock() - d, err := db.NewDB(getDBOptions(db.ConfigDB)) + d, err := db.NewDB(getDBOptions(db.ConfigDB, withForceNewRedisConnection)) if err != nil { resp.ErrSrc = ProtoErr @@ -474,7 +474,7 @@ func Get(req GetRequest) (GetResponse, error) { return resp, err } - dbs, err := getAllDbs(withWriteDisable) + dbs, err := getAllDbs(withWriteDisable, withForceNewRedisConnection) if err != nil { resp = GetResponse{Payload: payload, ErrSrc: ProtoErr} @@ -530,7 +530,7 @@ func Action(req ActionRequest) (ActionResponse, error) { writeMutex.Lock() defer writeMutex.Unlock() - dbs, err := getAllDbs() + dbs, err := getAllDbs(withForceNewRedisConnection) if err != nil { resp = ActionResponse{Payload: payload, ErrSrc: ProtoErr} @@ -571,7 +571,7 @@ func Bulk(req BulkRequest) (BulkResponse, error) { writeMutex.Lock() defer writeMutex.Unlock() - d, err := db.NewDB(getDBOptions(db.ConfigDB)) + d, err := db.NewDB(getDBOptions(db.ConfigDB, withForceNewRedisConnection)) if err != nil { return resp, err @@ -783,6 +783,14 @@ func getDBOptions(dbNo db.DBNum, opts ...func(*db.Options)) db.Options { for _, setopt := range opts { setopt(&o) } + switch dbNo { + case db.ApplDB, db.CountersDB, db.AsicDB, db.FlexCounterDB, db.LogLevelDB, db.ErrorDB: + o.TableNameSeparator = ":" + o.KeySeparator = ":" + case db.ConfigDB, db.StateDB, db.SnmpDB: + o.TableNameSeparator = "|" + o.KeySeparator = "|" + } return o } @@ -793,6 +801,9 @@ func withWriteDisable(o *db.Options) { func withOnChange(o *db.Options) { o.IsOnChangeEnabled = true } +func withForceNewRedisConnection(o *db.Options) { + o.ForceNewRedisConnection = true +} func getAppModule(path string, clientVer Version) (*appInterface, *appInfo, error) { var app appInterface