Skip to content

Commit 89f346a

Browse files
authored
Fix ratelimiter logic (#390)
1 parent ce07a78 commit 89f346a

File tree

2 files changed

+32
-27
lines changed

2 files changed

+32
-27
lines changed

include/RateLimiter.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "SimpleMutex.h"
55

66
#include <cstdint>
7+
#include <deque>
78
#include <vector>
89

910
namespace OpenShock {
@@ -33,6 +34,6 @@ namespace OpenShock {
3334
int64_t m_nextSlot;
3435
int64_t m_nextCleanup;
3536
std::vector<Limit> m_limits;
36-
std::vector<int64_t> m_requests;
37+
std::deque<int64_t> m_requests;
3738
};
3839
} // namespace OpenShock

src/RateLimiter.cpp

Lines changed: 30 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -23,24 +23,22 @@ OpenShock::RateLimiter::~RateLimiter()
2323

2424
void OpenShock::RateLimiter::addLimit(uint32_t durationMs, uint16_t count)
2525
{
26-
m_mutex.lock(portMAX_DELAY);
26+
OpenShock::ScopedLock lock__(&m_mutex);
2727

2828
// Insert sorted
2929
m_limits.insert(std::upper_bound(m_limits.begin(), m_limits.end(), durationMs, [](int64_t durationMs, const Limit& limit) { return durationMs < limit.durationMs; }), {durationMs, count});
3030

3131
m_nextSlot = 0;
3232
m_nextCleanup = 0;
33-
34-
m_mutex.unlock();
3533
}
3634

3735
void OpenShock::RateLimiter::clearLimits()
3836
{
39-
m_mutex.lock(portMAX_DELAY);
37+
OpenShock::ScopedLock lock__(&m_mutex);
4038

4139
m_limits.clear();
42-
43-
m_mutex.unlock();
40+
m_nextSlot = 0;
41+
m_nextCleanup = 0;
4442
}
4543

4644
bool OpenShock::RateLimiter::tryRequest()
@@ -57,40 +55,49 @@ bool OpenShock::RateLimiter::tryRequest()
5755
return true;
5856
}
5957

58+
// Cleanup based on longest limit
6059
if (m_nextCleanup <= now) {
6160
int64_t longestLimit = m_limits.back().durationMs;
6261
int64_t expiresAt = now - longestLimit;
6362

64-
auto nextToExpire = std::find_if(m_requests.begin(), m_requests.end(), [expiresAt](int64_t requestedAtMs) { return requestedAtMs > expiresAt; });
65-
if (nextToExpire != m_requests.end()) {
66-
m_requests.erase(m_requests.begin(), nextToExpire);
67-
}
63+
// erase everything that’s expired
64+
auto firstAlive = std::upper_bound(m_requests.begin(), m_requests.end(), expiresAt);
65+
m_requests.erase(m_requests.begin(), firstAlive);
6866

69-
m_nextCleanup = m_requests.front() + longestLimit;
67+
if (!m_requests.empty()) {
68+
m_nextCleanup = m_requests.front() + longestLimit;
69+
} else {
70+
// nothing to clean until we add a new request
71+
m_nextCleanup = now + longestLimit;
72+
}
7073
}
7174

7275
if (m_nextSlot > now) {
7376
return false;
7477
}
7578

76-
// Check if we've exceeded any limits, starting with the highest limit first
79+
// Check if we've exceeded any limits, starting from the largest duration
7780
for (std::size_t i = m_limits.size(); i > 0;) {
7881
const auto& limit = m_limits[--i];
7982

8083
// Calculate the window start time
8184
int64_t windowStart = now - limit.durationMs;
8285

83-
// Check how many requests are inside the limit window
86+
// Check how many requests are inside the limit window and track earliest in-window element
8487
std::size_t insideWindow = 0;
85-
for (int64_t request : m_requests) {
86-
if (request > windowStart) {
87-
insideWindow++;
88-
}
88+
auto it = m_requests.rbegin();
89+
for (; it != m_requests.rend(); ++it) {
90+
if (*it < windowStart) break;
91+
++insideWindow;
8992
}
9093

9194
// If the window is full, set the wait time until its available, and reject the request
9295
if (insideWindow >= limit.count) {
93-
m_nextSlot = m_requests.back() + limit.durationMs;
96+
auto firstInWindow = (it == m_requests.rend())
97+
? m_requests.begin()
98+
: it.base();
99+
100+
m_nextSlot = *firstInWindow + limit.durationMs;
94101
return false;
95102
}
96103
}
@@ -102,20 +109,17 @@ bool OpenShock::RateLimiter::tryRequest()
102109
}
103110
void OpenShock::RateLimiter::clearRequests()
104111
{
105-
m_mutex.lock(portMAX_DELAY);
112+
OpenShock::ScopedLock lock__(&m_mutex);
106113

107114
m_requests.clear();
108-
109-
m_mutex.unlock();
115+
m_nextSlot = 0;
116+
m_nextCleanup = 0;
110117
}
111118

112119
void OpenShock::RateLimiter::blockFor(int64_t blockForMs)
113120
{
114-
int64_t blockUntil = OpenShock::millis() + blockForMs;
115-
116-
m_mutex.lock(portMAX_DELAY);
121+
OpenShock::ScopedLock lock__(&m_mutex);
117122

123+
int64_t blockUntil = OpenShock::millis() + blockForMs;
118124
m_nextSlot = std::max(m_nextSlot, blockUntil);
119-
120-
m_mutex.unlock();
121125
}

0 commit comments

Comments
 (0)