@@ -39,11 +39,15 @@ func NewConcurrentBuffer(locker DistLocker, concurrentStateBackend ConcurrentBuf
3939func (c * ConcurrentBuffer ) Limit (ctx context.Context , key string ) error {
4040 c .mu .Lock ()
4141 defer c .mu .Unlock ()
42- if err := c .locker .Lock (ctx ); err != nil {
42+
43+ err := c .locker .Lock (ctx )
44+ if err != nil {
4345 return err
4446 }
47+
4548 defer func () {
46- if err := c .locker .Unlock (ctx ); err != nil {
49+ err := c .locker .Unlock (ctx )
50+ if err != nil {
4751 c .logger .Log (err )
4852 }
4953 }()
@@ -52,9 +56,11 @@ func (c *ConcurrentBuffer) Limit(ctx context.Context, key string) error {
5256 if err != nil {
5357 return err
5458 }
59+
5560 if counter > c .capacity {
5661 // Rollback the Add() operation.
57- if err = c .backend .Remove (ctx , key ); err != nil {
62+ err = c .backend .Remove (ctx , key )
63+ if err != nil {
5864 c .logger .Log (err )
5965 }
6066
@@ -89,6 +95,7 @@ func NewConcurrentBufferInMemory(registry *Registry, ttl time.Duration, clock Cl
8995func (c * ConcurrentBufferInMemory ) Add (ctx context.Context , key string ) (int64 , error ) {
9096 c .mu .Lock ()
9197 defer c .mu .Unlock ()
98+
9299 now := c .clock .Now ()
93100 c .registry .DeleteExpired (now )
94101 c .registry .GetOrCreate (key , func () interface {} {
@@ -102,6 +109,7 @@ func (c *ConcurrentBufferInMemory) Add(ctx context.Context, key string) (int64,
102109func (c * ConcurrentBufferInMemory ) Remove (_ context.Context , key string ) error {
103110 c .mu .Lock ()
104111 defer c .mu .Unlock ()
112+
105113 c .registry .Delete (key )
106114
107115 return nil
@@ -125,11 +133,16 @@ func NewConcurrentBufferRedis(cli redis.UniversalClient, key string, ttl time.Du
125133// Add adds the request with the given key to the sorted set in Redis and returns the total number of requests in it.
126134// It also removes the keys with expired TTL.
127135func (c * ConcurrentBufferRedis ) Add (ctx context.Context , key string ) (int64 , error ) {
128- var countCmd * redis.IntCmd
129- var err error
136+ var (
137+ countCmd * redis.IntCmd
138+ err error
139+ )
140+
130141 done := make (chan struct {})
142+
131143 go func () {
132144 defer close (done )
145+
133146 _ , err = c .cli .Pipelined (ctx , func (pipeliner redis.Pipeliner ) error {
134147 // Remove expired items.
135148 now := c .clock .Now ()
@@ -186,14 +199,20 @@ type SortedSetNode struct {
186199// It also removes the keys with expired TTL.
187200func (c * ConcurrentBufferMemcached ) Add (ctx context.Context , element string ) (int64 , error ) {
188201 var err error
202+
189203 done := make (chan struct {})
190204 now := c .clock .Now ()
191- var newNodes []SortedSetNode
192- var casId uint64 = 0
205+
206+ var (
207+ newNodes []SortedSetNode
208+ casId uint64 = 0
209+ )
193210
194211 go func () {
195212 defer close (done )
213+
196214 var item * memcache.Item
215+
197216 item , err = c .cli .Get (c .key )
198217 if err != nil {
199218 if ! errors .Is (err , memcache .ErrCacheMiss ) {
@@ -202,17 +221,23 @@ func (c *ConcurrentBufferMemcached) Add(ctx context.Context, element string) (in
202221 } else {
203222 casId = item .CasID
204223 b := bytes .NewBuffer (item .Value )
224+
205225 var oldNodes []SortedSetNode
226+
206227 _ = gob .NewDecoder (b ).Decode (& oldNodes )
207228 for _ , node := range oldNodes {
208229 if node .CreatedAt > now .UnixNano () && node .Value != element {
209230 newNodes = append (newNodes , node )
210231 }
211232 }
212233 }
234+
213235 newNodes = append (newNodes , SortedSetNode {CreatedAt : now .Add (c .ttl ).UnixNano (), Value : element })
236+
214237 var b bytes.Buffer
238+
215239 _ = gob .NewEncoder (& b ).Encode (newNodes )
240+
216241 item = & memcache.Item {
217242 Key : c .key ,
218243 Value : b .Bytes (),
@@ -245,9 +270,14 @@ func (c *ConcurrentBufferMemcached) Add(ctx context.Context, element string) (in
245270// Remove removes the request identified by the key from the slice in Memcached.
246271func (c * ConcurrentBufferMemcached ) Remove (ctx context.Context , key string ) error {
247272 var err error
273+
248274 now := c .clock .Now ()
249- var newNodes []SortedSetNode
250- var casID uint64
275+
276+ var (
277+ newNodes []SortedSetNode
278+ casID uint64
279+ )
280+
251281 item , err := c .cli .Get (c .key )
252282 if err != nil {
253283 if errors .Is (err , memcache .ErrCacheMiss ) {
@@ -256,8 +286,11 @@ func (c *ConcurrentBufferMemcached) Remove(ctx context.Context, key string) erro
256286
257287 return errors .Wrap (err , "failed to Get" )
258288 }
289+
259290 casID = item .CasID
291+
260292 var oldNodes []SortedSetNode
293+
261294 _ = gob .NewDecoder (bytes .NewBuffer (item .Value )).Decode (& oldNodes )
262295 for _ , node := range oldNodes {
263296 if node .CreatedAt > now .UnixNano () && node .Value != key {
@@ -266,12 +299,14 @@ func (c *ConcurrentBufferMemcached) Remove(ctx context.Context, key string) erro
266299 }
267300
268301 var b bytes.Buffer
302+
269303 _ = gob .NewEncoder (& b ).Encode (newNodes )
270304 item = & memcache.Item {
271305 Key : c .key ,
272306 Value : b .Bytes (),
273307 CasID : casID ,
274308 }
309+
275310 err = c .cli .CompareAndSwap (item )
276311 if err != nil && (errors .Is (err , memcache .ErrCASConflict ) || errors .Is (err , memcache .ErrNotStored ) || errors .Is (err , memcache .ErrCacheMiss )) {
277312 return c .Remove (ctx , key )
0 commit comments