@@ -34,8 +34,7 @@ import (
3434// with 100 emails and the request will succeed. You can override this default behavior with `DRAIN_OVER_LIMIT`
3535
3636// Implements token bucket algorithm for rate limiting. https://en.wikipedia.org/wiki/Token_bucket
37- func tokenBucket (ctx context.Context , s Store , c Cache , r * RateLimitReq ) (resp * RateLimitResp , err error ) {
38-
37+ func tokenBucket (ctx context.Context , s Store , c Cache , r * RateLimitReq , reqState RateLimitReqState ) (resp * RateLimitResp , err error ) {
3938 tokenBucketTimer := prometheus .NewTimer (metricFuncTimeDuration .WithLabelValues ("tokenBucket" ))
4039 defer tokenBucketTimer .ObserveDuration ()
4140
@@ -100,7 +99,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
10099 s .Remove (ctx , hashKey )
101100 }
102101
103- return tokenBucketNewItem (ctx , s , c , r )
102+ return tokenBucketNewItem (ctx , s , c , r , reqState )
104103 }
105104
106105 // Update the limit if it changed.
@@ -133,12 +132,12 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
133132 }
134133
135134 // If our new duration means we are currently expired.
136- now := MillisecondNow ()
137- if expire <= now {
135+ createdAt := * r . CreatedAt
136+ if expire <= createdAt {
138137 // Renew item.
139138 span .AddEvent ("Limit has expired" )
140- expire = now + r .Duration
141- t .CreatedAt = now
139+ expire = createdAt + r .Duration
140+ t .CreatedAt = createdAt
142141 t .Remaining = t .Limit
143142 }
144143
@@ -147,7 +146,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
147146 rl .ResetTime = expire
148147 }
149148
150- if s != nil {
149+ if s != nil && reqState . IsOwner {
151150 defer func () {
152151 s .OnChange (ctx , r , item )
153152 }()
@@ -162,7 +161,9 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
162161 // If we are already at the limit.
163162 if rl .Remaining == 0 && r .Hits > 0 {
164163 trace .SpanFromContext (ctx ).AddEvent ("Already over the limit" )
165- metricOverLimitCounter .Add (1 )
164+ if reqState .IsOwner {
165+ metricOverLimitCounter .Add (1 )
166+ }
166167 rl .Status = Status_OVER_LIMIT
167168 t .Status = rl .Status
168169 return rl , nil
@@ -180,7 +181,9 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
180181 // without updating the cache.
181182 if r .Hits > t .Remaining {
182183 trace .SpanFromContext (ctx ).AddEvent ("Over the limit" )
183- metricOverLimitCounter .Add (1 )
184+ if reqState .IsOwner {
185+ metricOverLimitCounter .Add (1 )
186+ }
184187 rl .Status = Status_OVER_LIMIT
185188 if HasBehavior (r .Behavior , Behavior_DRAIN_OVER_LIMIT ) {
186189 // DRAIN_OVER_LIMIT behavior drains the remaining counter.
@@ -196,19 +199,19 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
196199 }
197200
198201 // Item is not found in cache or store, create new.
199- return tokenBucketNewItem (ctx , s , c , r )
202+ return tokenBucketNewItem (ctx , s , c , r , reqState )
200203}
201204
202205// Called by tokenBucket() when adding a new item in the store.
203- func tokenBucketNewItem (ctx context.Context , s Store , c Cache , r * RateLimitReq ) (resp * RateLimitResp , err error ) {
204- now := MillisecondNow ()
205- expire := now + r .Duration
206+ func tokenBucketNewItem (ctx context.Context , s Store , c Cache , r * RateLimitReq , reqState RateLimitReqState ) (resp * RateLimitResp , err error ) {
207+ createdAt := * r . CreatedAt
208+ expire := createdAt + r .Duration
206209
207210 t := & TokenBucketItem {
208211 Limit : r .Limit ,
209212 Duration : r .Duration ,
210213 Remaining : r .Limit - r .Hits ,
211- CreatedAt : now ,
214+ CreatedAt : createdAt ,
212215 }
213216
214217 // Add a new rate limit to the cache.
@@ -236,31 +239,33 @@ func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq)
236239 // Client could be requesting that we always return OVER_LIMIT.
237240 if r .Hits > r .Limit {
238241 trace .SpanFromContext (ctx ).AddEvent ("Over the limit" )
239- metricOverLimitCounter .Add (1 )
242+ if reqState .IsOwner {
243+ metricOverLimitCounter .Add (1 )
244+ }
240245 rl .Status = Status_OVER_LIMIT
241246 rl .Remaining = r .Limit
242247 t .Remaining = r .Limit
243248 }
244249
245250 c .Add (item )
246251
247- if s != nil {
252+ if s != nil && reqState . IsOwner {
248253 s .OnChange (ctx , r , item )
249254 }
250255
251256 return rl , nil
252257}
253258
254259// Implements leaky bucket algorithm for rate limiting https://en.wikipedia.org/wiki/Leaky_bucket
255- func leakyBucket (ctx context.Context , s Store , c Cache , r * RateLimitReq ) (resp * RateLimitResp , err error ) {
260+ func leakyBucket (ctx context.Context , s Store , c Cache , r * RateLimitReq , reqState RateLimitReqState ) (resp * RateLimitResp , err error ) {
256261 leakyBucketTimer := prometheus .NewTimer (metricFuncTimeDuration .WithLabelValues ("V1Instance.getRateLimit_leakyBucket" ))
257262 defer leakyBucketTimer .ObserveDuration ()
258263
259264 if r .Burst == 0 {
260265 r .Burst = r .Limit
261266 }
262267
263- now := MillisecondNow ()
268+ createdAt := * r . CreatedAt
264269
265270 // Get rate limit from cache.
266271 hashKey := r .HashKey ()
@@ -309,7 +314,7 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
309314 s .Remove (ctx , hashKey )
310315 }
311316
312- return leakyBucketNewItem (ctx , s , c , r )
317+ return leakyBucketNewItem (ctx , s , c , r , reqState )
313318 }
314319
315320 if HasBehavior (r .Behavior , Behavior_RESET_REMAINING ) {
@@ -349,16 +354,16 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
349354 }
350355
351356 if r .Hits != 0 {
352- c .UpdateExpiration (r .HashKey (), now + duration )
357+ c .UpdateExpiration (r .HashKey (), createdAt + duration )
353358 }
354359
355360 // Calculate how much leaked out of the bucket since the last time we leaked a hit
356- elapsed := now - b .UpdatedAt
361+ elapsed := createdAt - b .UpdatedAt
357362 leak := float64 (elapsed ) / rate
358363
359364 if int64 (leak ) > 0 {
360365 b .Remaining += leak
361- b .UpdatedAt = now
366+ b .UpdatedAt = createdAt
362367 }
363368
364369 if int64 (b .Remaining ) > b .Burst {
@@ -369,20 +374,22 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
369374 Limit : b .Limit ,
370375 Remaining : int64 (b .Remaining ),
371376 Status : Status_UNDER_LIMIT ,
372- ResetTime : now + (b .Limit - int64 (b .Remaining ))* int64 (rate ),
377+ ResetTime : createdAt + (b .Limit - int64 (b .Remaining ))* int64 (rate ),
373378 }
374379
375380 // TODO: Feature missing: check for Duration change between item/request.
376381
377- if s != nil {
382+ if s != nil && reqState . IsOwner {
378383 defer func () {
379384 s .OnChange (ctx , r , item )
380385 }()
381386 }
382387
383388 // If we are already at the limit
384389 if int64 (b .Remaining ) == 0 && r .Hits > 0 {
385- metricOverLimitCounter .Add (1 )
390+ if reqState .IsOwner {
391+ metricOverLimitCounter .Add (1 )
392+ }
386393 rl .Status = Status_OVER_LIMIT
387394 return rl , nil
388395 }
@@ -391,14 +398,16 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
391398 if int64 (b .Remaining ) == r .Hits {
392399 b .Remaining = 0
393400 rl .Remaining = int64 (b .Remaining )
394- rl .ResetTime = now + (rl .Limit - rl .Remaining )* int64 (rate )
401+ rl .ResetTime = createdAt + (rl .Limit - rl .Remaining )* int64 (rate )
395402 return rl , nil
396403 }
397404
398405 // If requested is more than available, then return over the limit
399406 // without updating the bucket, unless `DRAIN_OVER_LIMIT` is set.
400407 if r .Hits > int64 (b .Remaining ) {
401- metricOverLimitCounter .Add (1 )
408+ if reqState .IsOwner {
409+ metricOverLimitCounter .Add (1 )
410+ }
402411 rl .Status = Status_OVER_LIMIT
403412
404413 // DRAIN_OVER_LIMIT behavior drains the remaining counter.
@@ -417,16 +426,16 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
417426
418427 b .Remaining -= float64 (r .Hits )
419428 rl .Remaining = int64 (b .Remaining )
420- rl .ResetTime = now + (rl .Limit - rl .Remaining )* int64 (rate )
429+ rl .ResetTime = createdAt + (rl .Limit - rl .Remaining )* int64 (rate )
421430 return rl , nil
422431 }
423432
424- return leakyBucketNewItem (ctx , s , c , r )
433+ return leakyBucketNewItem (ctx , s , c , r , reqState )
425434}
426435
427436// Called by leakyBucket() when adding a new item in the store.
428- func leakyBucketNewItem (ctx context.Context , s Store , c Cache , r * RateLimitReq ) (resp * RateLimitResp , err error ) {
429- now := MillisecondNow ()
437+ func leakyBucketNewItem (ctx context.Context , s Store , c Cache , r * RateLimitReq , reqState RateLimitReqState ) (resp * RateLimitResp , err error ) {
438+ createdAt := * r . CreatedAt
430439 duration := r .Duration
431440 rate := float64 (duration ) / float64 (r .Limit )
432441 if HasBehavior (r .Behavior , Behavior_DURATION_IS_GREGORIAN ) {
@@ -445,36 +454,38 @@ func leakyBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq)
445454 Remaining : float64 (r .Burst - r .Hits ),
446455 Limit : r .Limit ,
447456 Duration : duration ,
448- UpdatedAt : now ,
457+ UpdatedAt : createdAt ,
449458 Burst : r .Burst ,
450459 }
451460
452461 rl := RateLimitResp {
453462 Status : Status_UNDER_LIMIT ,
454463 Limit : b .Limit ,
455464 Remaining : r .Burst - r .Hits ,
456- ResetTime : now + (b .Limit - (r .Burst - r .Hits ))* int64 (rate ),
465+ ResetTime : createdAt + (b .Limit - (r .Burst - r .Hits ))* int64 (rate ),
457466 }
458467
459468 // Client could be requesting that we start with the bucket OVER_LIMIT
460469 if r .Hits > r .Burst {
461- metricOverLimitCounter .Add (1 )
470+ if reqState .IsOwner {
471+ metricOverLimitCounter .Add (1 )
472+ }
462473 rl .Status = Status_OVER_LIMIT
463474 rl .Remaining = 0
464- rl .ResetTime = now + (rl .Limit - rl .Remaining )* int64 (rate )
475+ rl .ResetTime = createdAt + (rl .Limit - rl .Remaining )* int64 (rate )
465476 b .Remaining = 0
466477 }
467478
468479 item := & CacheItem {
469- ExpireAt : now + duration ,
480+ ExpireAt : createdAt + duration ,
470481 Algorithm : r .Algorithm ,
471482 Key : r .HashKey (),
472483 Value : & b ,
473484 }
474485
475486 c .Add (item )
476487
477- if s != nil {
488+ if s != nil && reqState . IsOwner {
478489 s .OnChange (ctx , r , item )
479490 }
480491
0 commit comments