Skip to content

Commit 9f0584a

Browse files
authored
feat(redis): added redis option for rate limiter, 10x speed improvement in rate limit checks & reduction of DB load (#2263)
* feat(redis): added redis option for rate limiter, 10x speed improvement in rate limit checks & reduction of DB load * ack PR comments * improvements
1 parent 6b4d762 commit 9f0584a

File tree

13 files changed

+414
-237
lines changed

13 files changed

+414
-237
lines changed

apps/sim/app/api/users/me/usage-limits/route.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ import { checkServerSideUsageLimits } from '@/lib/billing'
44
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
55
import { getEffectiveCurrentPeriodCost } from '@/lib/billing/core/usage'
66
import { getUserStorageLimit, getUserStorageUsage } from '@/lib/billing/storage'
7+
import { RateLimiter } from '@/lib/core/rate-limiter'
78
import { createLogger } from '@/lib/logs/console/logger'
89
import { createErrorResponse } from '@/app/api/workflows/utils'
9-
import { RateLimiter } from '@/services/queue'
1010

1111
const logger = createLogger('UsageLimitsAPI')
1212

apps/sim/app/api/v1/logs/meta.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { checkServerSideUsageLimits } from '@/lib/billing'
22
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
33
import { getEffectiveCurrentPeriodCost } from '@/lib/billing/core/usage'
4-
import { RateLimiter } from '@/services/queue'
4+
import { RateLimiter } from '@/lib/core/rate-limiter'
55

66
export interface UserLimits {
77
workflowExecutionRateLimit: {

apps/sim/app/api/v1/middleware.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import { type NextRequest, NextResponse } from 'next/server'
22
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
3+
import { RateLimiter } from '@/lib/core/rate-limiter/rate-limiter'
34
import { createLogger } from '@/lib/logs/console/logger'
45
import { authenticateV1Request } from '@/app/api/v1/auth'
5-
import { RateLimiter } from '@/services/queue/RateLimiter'
66

77
const logger = createLogger('V1Middleware')
88
const rateLimiter = new RateLimiter()

apps/sim/app/api/webhooks/trigger/[path]/route.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ vi.mock('@/lib/workspaces/utils', async () => {
140140
}
141141
})
142142

143-
vi.mock('@/services/queue', () => ({
143+
vi.mock('@/lib/core/rate-limiter', () => ({
144144
RateLimiter: vi.fn().mockImplementation(() => ({
145145
checkRateLimit: vi.fn().mockResolvedValue({
146146
allowed: true,

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -395,8 +395,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
395395
triggerType: loggingTriggerType,
396396
executionId,
397397
requestId,
398-
checkRateLimit: false, // Manual executions bypass rate limits
399-
checkDeployment: !shouldUseDraftState, // Check deployment unless using draft
398+
checkDeployment: !shouldUseDraftState,
400399
loggingSession,
401400
})
402401

apps/sim/background/workspace-notification-delivery.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,13 @@ import { and, eq, isNull, lte, or, sql } from 'drizzle-orm'
1111
import { v4 as uuidv4 } from 'uuid'
1212
import { checkUsageStatus } from '@/lib/billing/calculations/usage-monitor'
1313
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
14+
import { RateLimiter } from '@/lib/core/rate-limiter'
1415
import { decryptSecret } from '@/lib/core/security/encryption'
1516
import { getBaseUrl } from '@/lib/core/utils/urls'
1617
import { createLogger } from '@/lib/logs/console/logger'
1718
import type { TraceSpan, WorkflowExecutionLog } from '@/lib/logs/types'
1819
import { sendEmail } from '@/lib/messaging/email/mailer'
1920
import type { AlertConfig } from '@/lib/notifications/alert-rules'
20-
import { RateLimiter } from '@/services/queue'
2121

2222
const logger = createLogger('WorkspaceNotificationDelivery')
2323

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
export { RateLimiter } from '@/lib/core/rate-limiter/rate-limiter'
2+
export type {
3+
RateLimitConfig,
4+
SubscriptionPlan,
5+
TriggerType,
6+
} from '@/lib/core/rate-limiter/types'
7+
export { RATE_LIMITS, RateLimitError } from '@/lib/core/rate-limiter/types'
Lines changed: 309 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,309 @@
1+
import { beforeEach, describe, expect, it, vi } from 'vitest'
2+
import { RateLimiter } from '@/lib/core/rate-limiter/rate-limiter'
3+
import { MANUAL_EXECUTION_LIMIT, RATE_LIMITS } from '@/lib/core/rate-limiter/types'
4+
5+
vi.mock('@sim/db', () => ({
6+
db: {
7+
select: vi.fn(),
8+
insert: vi.fn(),
9+
update: vi.fn(),
10+
delete: vi.fn(),
11+
},
12+
}))
13+
14+
vi.mock('drizzle-orm', () => ({
15+
eq: vi.fn((field, value) => ({ field, value })),
16+
sql: vi.fn((strings, ...values) => ({ sql: strings.join('?'), values })),
17+
and: vi.fn((...conditions) => ({ and: conditions })),
18+
}))
19+
20+
vi.mock('@/lib/core/config/redis', () => ({
21+
getRedisClient: vi.fn().mockReturnValue(null),
22+
}))
23+
24+
import { db } from '@sim/db'
25+
import { getRedisClient } from '@/lib/core/config/redis'
26+
27+
describe('RateLimiter', () => {
28+
const rateLimiter = new RateLimiter()
29+
const testUserId = 'test-user-123'
30+
const freeSubscription = { plan: 'free', referenceId: testUserId }
31+
32+
beforeEach(() => {
33+
vi.clearAllMocks()
34+
vi.mocked(getRedisClient).mockReturnValue(null)
35+
})
36+
37+
describe('checkRateLimitWithSubscription', () => {
38+
it('should allow unlimited requests for manual trigger type', async () => {
39+
const result = await rateLimiter.checkRateLimitWithSubscription(
40+
testUserId,
41+
freeSubscription,
42+
'manual',
43+
false
44+
)
45+
46+
expect(result.allowed).toBe(true)
47+
expect(result.remaining).toBe(MANUAL_EXECUTION_LIMIT)
48+
expect(result.resetAt).toBeInstanceOf(Date)
49+
expect(db.select).not.toHaveBeenCalled()
50+
})
51+
52+
it('should allow first API request for sync execution (DB fallback)', async () => {
53+
vi.mocked(db.select).mockReturnValue({
54+
from: vi.fn().mockReturnValue({
55+
where: vi.fn().mockReturnValue({
56+
limit: vi.fn().mockResolvedValue([]),
57+
}),
58+
}),
59+
} as any)
60+
61+
vi.mocked(db.insert).mockReturnValue({
62+
values: vi.fn().mockReturnValue({
63+
onConflictDoUpdate: vi.fn().mockReturnValue({
64+
returning: vi.fn().mockResolvedValue([
65+
{
66+
syncApiRequests: 1,
67+
asyncApiRequests: 0,
68+
apiEndpointRequests: 0,
69+
windowStart: new Date(),
70+
},
71+
]),
72+
}),
73+
}),
74+
} as any)
75+
76+
const result = await rateLimiter.checkRateLimitWithSubscription(
77+
testUserId,
78+
freeSubscription,
79+
'api',
80+
false
81+
)
82+
83+
expect(result.allowed).toBe(true)
84+
expect(result.remaining).toBe(RATE_LIMITS.free.syncApiExecutionsPerMinute - 1)
85+
expect(result.resetAt).toBeInstanceOf(Date)
86+
})
87+
88+
it('should allow first API request for async execution (DB fallback)', async () => {
89+
vi.mocked(db.select).mockReturnValue({
90+
from: vi.fn().mockReturnValue({
91+
where: vi.fn().mockReturnValue({
92+
limit: vi.fn().mockResolvedValue([]),
93+
}),
94+
}),
95+
} as any)
96+
97+
vi.mocked(db.insert).mockReturnValue({
98+
values: vi.fn().mockReturnValue({
99+
onConflictDoUpdate: vi.fn().mockReturnValue({
100+
returning: vi.fn().mockResolvedValue([
101+
{
102+
syncApiRequests: 0,
103+
asyncApiRequests: 1,
104+
apiEndpointRequests: 0,
105+
windowStart: new Date(),
106+
},
107+
]),
108+
}),
109+
}),
110+
} as any)
111+
112+
const result = await rateLimiter.checkRateLimitWithSubscription(
113+
testUserId,
114+
freeSubscription,
115+
'api',
116+
true
117+
)
118+
119+
expect(result.allowed).toBe(true)
120+
expect(result.remaining).toBe(RATE_LIMITS.free.asyncApiExecutionsPerMinute - 1)
121+
expect(result.resetAt).toBeInstanceOf(Date)
122+
})
123+
124+
it('should work for all trigger types except manual (DB fallback)', async () => {
125+
const triggerTypes = ['api', 'webhook', 'schedule', 'chat'] as const
126+
127+
for (const triggerType of triggerTypes) {
128+
vi.mocked(db.select).mockReturnValue({
129+
from: vi.fn().mockReturnValue({
130+
where: vi.fn().mockReturnValue({
131+
limit: vi.fn().mockResolvedValue([]),
132+
}),
133+
}),
134+
} as any)
135+
136+
vi.mocked(db.insert).mockReturnValue({
137+
values: vi.fn().mockReturnValue({
138+
onConflictDoUpdate: vi.fn().mockReturnValue({
139+
returning: vi.fn().mockResolvedValue([
140+
{
141+
syncApiRequests: 1,
142+
asyncApiRequests: 0,
143+
apiEndpointRequests: 0,
144+
windowStart: new Date(),
145+
},
146+
]),
147+
}),
148+
}),
149+
} as any)
150+
151+
const result = await rateLimiter.checkRateLimitWithSubscription(
152+
testUserId,
153+
freeSubscription,
154+
triggerType,
155+
false
156+
)
157+
158+
expect(result.allowed).toBe(true)
159+
expect(result.remaining).toBe(RATE_LIMITS.free.syncApiExecutionsPerMinute - 1)
160+
}
161+
})
162+
163+
it('should use Redis when available', async () => {
164+
const mockRedis = {
165+
eval: vi.fn().mockResolvedValue(1), // Lua script returns count after INCR
166+
}
167+
vi.mocked(getRedisClient).mockReturnValue(mockRedis as any)
168+
169+
const result = await rateLimiter.checkRateLimitWithSubscription(
170+
testUserId,
171+
freeSubscription,
172+
'api',
173+
false
174+
)
175+
176+
expect(result.allowed).toBe(true)
177+
expect(result.remaining).toBe(RATE_LIMITS.free.syncApiExecutionsPerMinute - 1)
178+
expect(mockRedis.eval).toHaveBeenCalled()
179+
expect(db.select).not.toHaveBeenCalled()
180+
})
181+
182+
it('should deny requests when Redis rate limit exceeded', async () => {
183+
const mockRedis = {
184+
eval: vi.fn().mockResolvedValue(RATE_LIMITS.free.syncApiExecutionsPerMinute + 1),
185+
}
186+
vi.mocked(getRedisClient).mockReturnValue(mockRedis as any)
187+
188+
const result = await rateLimiter.checkRateLimitWithSubscription(
189+
testUserId,
190+
freeSubscription,
191+
'api',
192+
false
193+
)
194+
195+
expect(result.allowed).toBe(false)
196+
expect(result.remaining).toBe(0)
197+
})
198+
199+
it('should fall back to DB when Redis fails', async () => {
200+
const mockRedis = {
201+
eval: vi.fn().mockRejectedValue(new Error('Redis connection failed')),
202+
}
203+
vi.mocked(getRedisClient).mockReturnValue(mockRedis as any)
204+
205+
vi.mocked(db.select).mockReturnValue({
206+
from: vi.fn().mockReturnValue({
207+
where: vi.fn().mockReturnValue({
208+
limit: vi.fn().mockResolvedValue([]),
209+
}),
210+
}),
211+
} as any)
212+
213+
vi.mocked(db.insert).mockReturnValue({
214+
values: vi.fn().mockReturnValue({
215+
onConflictDoUpdate: vi.fn().mockReturnValue({
216+
returning: vi.fn().mockResolvedValue([
217+
{
218+
syncApiRequests: 1,
219+
asyncApiRequests: 0,
220+
apiEndpointRequests: 0,
221+
windowStart: new Date(),
222+
},
223+
]),
224+
}),
225+
}),
226+
} as any)
227+
228+
const result = await rateLimiter.checkRateLimitWithSubscription(
229+
testUserId,
230+
freeSubscription,
231+
'api',
232+
false
233+
)
234+
235+
expect(result.allowed).toBe(true)
236+
expect(db.select).toHaveBeenCalled()
237+
})
238+
})
239+
240+
describe('getRateLimitStatusWithSubscription', () => {
241+
it('should return unlimited for manual trigger type', async () => {
242+
const status = await rateLimiter.getRateLimitStatusWithSubscription(
243+
testUserId,
244+
freeSubscription,
245+
'manual',
246+
false
247+
)
248+
249+
expect(status.used).toBe(0)
250+
expect(status.limit).toBe(MANUAL_EXECUTION_LIMIT)
251+
expect(status.remaining).toBe(MANUAL_EXECUTION_LIMIT)
252+
expect(status.resetAt).toBeInstanceOf(Date)
253+
})
254+
255+
it('should return sync API limits for API trigger type (DB fallback)', async () => {
256+
vi.mocked(db.select).mockReturnValue({
257+
from: vi.fn().mockReturnValue({
258+
where: vi.fn().mockReturnValue({
259+
limit: vi.fn().mockResolvedValue([]),
260+
}),
261+
}),
262+
} as any)
263+
264+
const status = await rateLimiter.getRateLimitStatusWithSubscription(
265+
testUserId,
266+
freeSubscription,
267+
'api',
268+
false
269+
)
270+
271+
expect(status.used).toBe(0)
272+
expect(status.limit).toBe(RATE_LIMITS.free.syncApiExecutionsPerMinute)
273+
expect(status.remaining).toBe(RATE_LIMITS.free.syncApiExecutionsPerMinute)
274+
expect(status.resetAt).toBeInstanceOf(Date)
275+
})
276+
277+
it('should use Redis for status when available', async () => {
278+
const mockRedis = {
279+
get: vi.fn().mockResolvedValue('5'),
280+
}
281+
vi.mocked(getRedisClient).mockReturnValue(mockRedis as any)
282+
283+
const status = await rateLimiter.getRateLimitStatusWithSubscription(
284+
testUserId,
285+
freeSubscription,
286+
'api',
287+
false
288+
)
289+
290+
expect(status.used).toBe(5)
291+
expect(status.limit).toBe(RATE_LIMITS.free.syncApiExecutionsPerMinute)
292+
expect(status.remaining).toBe(RATE_LIMITS.free.syncApiExecutionsPerMinute - 5)
293+
expect(mockRedis.get).toHaveBeenCalled()
294+
expect(db.select).not.toHaveBeenCalled()
295+
})
296+
})
297+
298+
describe('resetRateLimit', () => {
299+
it('should delete rate limit record for user', async () => {
300+
vi.mocked(db.delete).mockReturnValue({
301+
where: vi.fn().mockResolvedValue({}),
302+
} as any)
303+
304+
await rateLimiter.resetRateLimit(testUserId)
305+
306+
expect(db.delete).toHaveBeenCalled()
307+
})
308+
})
309+
})

0 commit comments

Comments
 (0)