Skip to content

Commit f6c8591

Browse files
Merge pull request #12 from agencyenterprise/staging
Refactor rate limiting and session management
2 parents 14d615f + 6d2d0b3 commit f6c8591

File tree

4 files changed

+165
-14
lines changed

4 files changed

+165
-14
lines changed

src/middleware/rateLimit.ts

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
import { ipKeyGenerator, rateLimit } from "express-rate-limit";
2+
import {
3+
MCP_RATE_LIMIT_MAX,
4+
OAUTH_RATE_LIMIT_MAX,
5+
RATE_LIMIT_WINDOW_MS,
6+
} from "../shared/constants";
27

38
export const userRateLimiter = rateLimit({
4-
windowMs: 60 * 1000,
5-
max: 60,
9+
windowMs: RATE_LIMIT_WINDOW_MS,
10+
max: MCP_RATE_LIMIT_MAX,
611
keyGenerator: (req) => {
712
if (req.userId) {
813
return req.userId;
@@ -11,15 +16,15 @@ export const userRateLimiter = rateLimit({
1116
},
1217
message: {
1318
error: "rate_limit_exceeded",
14-
error_description: "Too many requests. Limit: 60 requests per minute.",
19+
error_description: `Too many requests. Limit: ${MCP_RATE_LIMIT_MAX} requests per minute.`,
1520
},
1621
standardHeaders: true,
1722
legacyHeaders: false,
1823
});
1924

2025
export const oauthRateLimiter = rateLimit({
21-
windowMs: 60 * 1000,
22-
max: 10,
26+
windowMs: RATE_LIMIT_WINDOW_MS,
27+
max: OAUTH_RATE_LIMIT_MAX,
2328
keyGenerator: (req) => ipKeyGenerator(req.ip ?? "unknown"),
2429
message: {
2530
error: "rate_limit_exceeded",

src/modules/sessions/manager.ts

Lines changed: 76 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
22
import { randomUUID } from "crypto";
33
import { logger } from "../../middleware/logger";
4-
import { SESSION_CLEANUP_INTERVAL_MS } from "../../shared/constants";
4+
import {
5+
IDLE_TRANSPORT_REAP_INTERVAL_MS,
6+
IDLE_TRANSPORT_TTL_MS,
7+
MAX_ACTIVE_TRANSPORTS_WARN,
8+
SESSION_CLEANUP_INTERVAL_MS,
9+
} from "../../shared/constants";
510
import { AppError } from "../../shared/errors";
611
import { ToolServer } from "../../tools/server";
712
import { cleanupExpiredMcpServerOAuthData } from "../oauth/service";
@@ -22,6 +27,7 @@ export class SessionManager {
2227
private activeTransports: Map<string, ActiveTransport>;
2328
private toolServer: ToolServer;
2429
private cleanupIntervalId: NodeJS.Timeout | null = null;
30+
private reapIntervalId: NodeJS.Timeout | null = null;
2531

2632
constructor() {
2733
this.activeTransports = new Map();
@@ -79,8 +85,22 @@ export class SessionManager {
7985
lastAccessedAt: new Date(),
8086
});
8187

88+
const activeCount = this.activeTransports.size;
89+
90+
if (activeCount > MAX_ACTIVE_TRANSPORTS_WARN) {
91+
logger.warn(
92+
{
93+
sessionId,
94+
userId,
95+
activeCount,
96+
threshold: MAX_ACTIVE_TRANSPORTS_WARN,
97+
},
98+
"Active transport count exceeds warning threshold",
99+
);
100+
}
101+
82102
logger.info(
83-
{ sessionId, userId, activeCount: this.activeTransports.size },
103+
{ sessionId, userId, activeCount },
84104
"Transport stored in memory",
85105
);
86106
}
@@ -132,6 +152,44 @@ export class SessionManager {
132152
this.activeTransports.delete(sessionId);
133153
}
134154

155+
async reapIdleTransports(): Promise<void> {
156+
const now = Date.now();
157+
const idleSessions: string[] = [];
158+
159+
for (const [sessionId, entry] of this.activeTransports) {
160+
const idleMs = now - entry.lastAccessedAt.getTime();
161+
if (idleMs > IDLE_TRANSPORT_TTL_MS) {
162+
idleSessions.push(sessionId);
163+
}
164+
}
165+
166+
if (idleSessions.length === 0) return;
167+
168+
const closePromises = idleSessions.map(async (sessionId) => {
169+
const entry = this.activeTransports.get(sessionId);
170+
if (entry) {
171+
try {
172+
await entry.transport.close();
173+
} catch (error) {
174+
logger.error({ sessionId, error }, "Error closing idle transport");
175+
}
176+
this.activeTransports.delete(sessionId);
177+
}
178+
try {
179+
await markSessionTerminated(sessionId);
180+
} catch {
181+
// Best-effort DB update
182+
}
183+
});
184+
185+
await Promise.all(closePromises);
186+
187+
logger.info(
188+
{ reaped: idleSessions.length, remaining: this.activeTransports.size },
189+
"Reaped idle transports",
190+
);
191+
}
192+
135193
async cleanupExpiredData(): Promise<void> {
136194
try {
137195
const expiredSessionIds = await findExpiredSessionIds();
@@ -186,18 +244,31 @@ export class SessionManager {
186244
SESSION_CLEANUP_INTERVAL_MS,
187245
);
188246

247+
this.reapIntervalId = setInterval(
248+
() => this.reapIdleTransports(),
249+
IDLE_TRANSPORT_REAP_INTERVAL_MS,
250+
);
251+
189252
logger.info(
190-
{ intervalMs: SESSION_CLEANUP_INTERVAL_MS },
191-
"Data cleanup scheduler started",
253+
{
254+
cleanupIntervalMs: SESSION_CLEANUP_INTERVAL_MS,
255+
reapIntervalMs: IDLE_TRANSPORT_REAP_INTERVAL_MS,
256+
idleTtlMs: IDLE_TRANSPORT_TTL_MS,
257+
},
258+
"Data cleanup and idle reaper schedulers started",
192259
);
193260
}
194261

195262
stopCleanupScheduler(): void {
196263
if (this.cleanupIntervalId) {
197264
clearInterval(this.cleanupIntervalId);
198265
this.cleanupIntervalId = null;
199-
logger.info("Session cleanup scheduler stopped");
200266
}
267+
if (this.reapIntervalId) {
268+
clearInterval(this.reapIntervalId);
269+
this.reapIntervalId = null;
270+
}
271+
logger.info("Cleanup and reaper schedulers stopped");
201272
}
202273

203274
async shutdown(): Promise<void> {

src/shared/constants.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,16 @@ export const MCP_SERVER_DEFAULT_SCOPE = "fathom:read";
88
export const SESSION_TTL_MS = 24 * 60 * 60 * 1000;
99
export const SESSION_CLEANUP_INTERVAL_MS = 60 * 60 * 1000;
1010
export const STALE_SESSION_CUTOFF_MS = 24 * 60 * 60 * 1000;
11+
export const IDLE_TRANSPORT_TTL_MS = 5 * 60 * 1000;
12+
export const IDLE_TRANSPORT_REAP_INTERVAL_MS = 60 * 1000;
13+
export const MAX_ACTIVE_TRANSPORTS_WARN = 100;
1114
export const GRACEFUL_SHUTDOWN_TIMEOUT_MS = 10 * 1000;
1215

16+
// Rate Limiting
17+
export const RATE_LIMIT_WINDOW_MS = 60 * 1000;
18+
export const MCP_RATE_LIMIT_MAX = 200;
19+
export const OAUTH_RATE_LIMIT_MAX = 10;
20+
1321
// Fathom API
1422
export const FATHOM_API_TIMEOUT_MS = 30 * 1000;
1523
export const FATHOM_API_SCOPE = "public_api";

src/test/modules/sessions/manager.test.ts

Lines changed: 71 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -159,23 +159,90 @@ describe("SessionManager", () => {
159159
});
160160
});
161161

162+
describe("reapIdleTransports", () => {
163+
it("reaps transports idle beyond IDLE_TRANSPORT_TTL_MS", async () => {
164+
vi.mocked(insertSession).mockResolvedValue(undefined);
165+
vi.mocked(markSessionTerminated).mockResolvedValue(undefined);
166+
167+
await sessionManager.createSession("user-123");
168+
await vi.advanceTimersByTimeAsync(0);
169+
170+
expect(
171+
sessionManager.getActiveTransport("mock-session-id"),
172+
).toBeDefined();
173+
174+
// Advance past idle TTL (5 minutes)
175+
vi.advanceTimersByTime(6 * 60 * 1000);
176+
177+
await sessionManager.reapIdleTransports();
178+
179+
expect(
180+
sessionManager.getActiveTransport("mock-session-id"),
181+
).toBeUndefined();
182+
expect(markSessionTerminated).toHaveBeenCalledWith("mock-session-id");
183+
});
184+
185+
it("leaves recently accessed transports alone", async () => {
186+
vi.mocked(insertSession).mockResolvedValue(undefined);
187+
188+
await sessionManager.createSession("user-123");
189+
await vi.advanceTimersByTimeAsync(0);
190+
191+
// Advance only 2 minutes (under 5 min TTL)
192+
vi.advanceTimersByTime(2 * 60 * 1000);
193+
194+
await sessionManager.reapIdleTransports();
195+
196+
expect(
197+
sessionManager.getActiveTransport("mock-session-id"),
198+
).toBeDefined();
199+
});
200+
201+
it("does nothing when no transports are idle", async () => {
202+
await sessionManager.reapIdleTransports();
203+
204+
expect(markSessionTerminated).not.toHaveBeenCalled();
205+
});
206+
207+
it("handles transport close errors gracefully", async () => {
208+
vi.mocked(insertSession).mockResolvedValue(undefined);
209+
vi.mocked(markSessionTerminated).mockResolvedValue(undefined);
210+
211+
const transport = await sessionManager.createSession("user-123");
212+
await vi.advanceTimersByTimeAsync(0);
213+
214+
vi.mocked(transport.close).mockRejectedValueOnce(
215+
new Error("close failed"),
216+
);
217+
218+
vi.advanceTimersByTime(6 * 60 * 1000);
219+
220+
await expect(sessionManager.reapIdleTransports()).resolves.not.toThrow();
221+
222+
expect(
223+
sessionManager.getActiveTransport("mock-session-id"),
224+
).toBeUndefined();
225+
});
226+
});
227+
162228
describe("startCleanupScheduler", () => {
163-
it("starts cleanup interval", () => {
229+
it("starts cleanup and reaper intervals", () => {
164230
sessionManager.startCleanupScheduler();
165231

166-
expect(vi.getTimerCount()).toBe(1);
232+
// 2 intervals: hourly cleanup + 60s reaper
233+
expect(vi.getTimerCount()).toBe(2);
167234
});
168235

169236
it("does not start duplicate scheduler", () => {
170237
sessionManager.startCleanupScheduler();
171238
sessionManager.startCleanupScheduler();
172239

173-
expect(vi.getTimerCount()).toBe(1);
240+
expect(vi.getTimerCount()).toBe(2);
174241
});
175242
});
176243

177244
describe("stopCleanupScheduler", () => {
178-
it("stops cleanup interval", () => {
245+
it("stops both cleanup and reaper intervals", () => {
179246
sessionManager.startCleanupScheduler();
180247
sessionManager.stopCleanupScheduler();
181248

0 commit comments

Comments
 (0)