Skip to content

Commit 8333802

Browse files
committed
Enhance Redis configuration for better cluster support; implement stale-while-revalidate caching in chat controller; improve rate limiting middleware with Redis fallback; add exponential backoff for 429 errors in API client; implement throttling and debouncing in chat context for improved performance.
1 parent da9a81a commit 8333802

File tree

10 files changed

+684
-173
lines changed

10 files changed

+684
-173
lines changed

server/config/redis.js

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,7 @@ const buildRedisConfig = () => {
2525
port: parseInt(process.env.REDIS_PORT, 10) || 6379,
2626
maxRetriesPerRequest: null, // Required for BullMQ compatibility
2727
enableReadyCheck: true,
28-
enableOfflineQueue: false, // Prevent queuing commands when disconnected (important for cluster)
29-
lazyConnect: true, // Don't connect immediately
28+
enableOfflineQueue: true, // Queue commands while connecting
3029
retryStrategy: (times) => {
3130
if (times > 3) {
3231
if (isDev) {
@@ -62,6 +61,7 @@ const buildRedisConfig = () => {
6261
if (process.env.REDIS_TLS === 'true' || process.env.ELASTICACHE_TLS === 'true') {
6362
config.tls = {
6463
rejectUnauthorized: process.env.REDIS_TLS_REJECT_UNAUTHORIZED !== 'false',
64+
servername: process.env.REDIS_HOST || 'localhost', // Required for TLS handshake with ElastiCache
6565
};
6666
console.log('Redis: TLS enabled for ElastiCache connection');
6767
}
@@ -144,13 +144,6 @@ if (REDIS_ENABLED) {
144144
redis.on('reconnecting', (delay) => {
145145
console.log(`Redis: Reconnecting in ${delay}ms...`);
146146
});
147-
148-
// Attempt initial connection (non-blocking)
149-
redis.connect().catch(() => {
150-
if (isDev) {
151-
console.warn('Redis: Not available in development. Queued features disabled.');
152-
}
153-
});
154147
} else {
155148
console.log('Redis: Disabled via REDIS_ENABLED=false');
156149
}

server/controllers/chatController.js

Lines changed: 126 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,45 @@ import redis from '../config/redis.js';
55
import { notificationQueue } from '../config/queue.js';
66
import { emitToGroup } from '../utils/socketEmitter.js';
77

8-
// Cache TTL constants
9-
const CACHE_TTL_MESSAGES = 60; // 60 seconds
10-
const CACHE_TTL_UNREAD = 30; // 30 seconds
8+
// Cache TTL constants (increased for better performance under load)
9+
const CACHE_TTL_MESSAGES = 120; // 120 seconds (2 minutes)
10+
const CACHE_TTL_UNREAD = 60; // 60 seconds (1 minute)
11+
12+
// Stale-while-revalidate threshold (serve stale data while refreshing in background)
13+
const STALE_THRESHOLD_MESSAGES = 60; // Consider stale after 60 seconds
14+
const STALE_THRESHOLD_UNREAD = 30; // Consider stale after 30 seconds
15+
16+
/**
17+
* Non-blocking cache write helper - fire and forget
18+
*/
19+
const cacheSetAsync = (key, ttl, value) => {
20+
redis.setex(key, ttl, typeof value === 'string' ? value : JSON.stringify(value))
21+
.catch(err => console.error('Cache write error:', err));
22+
};
23+
24+
/**
25+
* Get cache with timestamp for stale-while-revalidate
26+
* Returns { data, isStale } or null if not cached
27+
*/
28+
const getCacheWithStaleness = async (key, staleThreshold) => {
29+
try {
30+
const ttl = await redis.ttl(key);
31+
if (ttl <= 0) return null;
32+
33+
const cached = await redis.get(key);
34+
if (cached === null) return null;
35+
36+
// Calculate age based on remaining TTL
37+
// If TTL is less than (maxTTL - staleThreshold), data is stale
38+
const maxTTL = staleThreshold * 2; // Approximate max TTL
39+
const isStale = ttl < (maxTTL - staleThreshold);
40+
41+
return { data: cached, isStale };
42+
} catch (err) {
43+
console.error('Cache read error:', err);
44+
return null;
45+
}
46+
};
1147

1248
// Rate limiting tracking (in-memory, backed by Redis for persistence)
1349
const MESSAGE_RATE_LIMIT = 100; // messages per minute per user per group
@@ -77,11 +113,49 @@ export const getMessages = async (req, res) => {
77113
// Parse and validate limit
78114
const parsedLimit = Math.min(Math.max(parseInt(limit) || 50, 1), 100);
79115

80-
// Try cache for first page (no 'before' cursor)
116+
// Try cache for first page (no 'before' cursor) with stale-while-revalidate
81117
if (!before) {
82-
const cached = await redis.get(`chat:${groupId}:latest`);
83-
if (cached) {
84-
return res.json(JSON.parse(cached));
118+
const cacheKey = `chat:${groupId}:latest`;
119+
const cacheResult = await getCacheWithStaleness(cacheKey, STALE_THRESHOLD_MESSAGES);
120+
121+
if (cacheResult) {
122+
const { data, isStale } = cacheResult;
123+
const parsedData = JSON.parse(data);
124+
125+
// If stale, trigger background refresh (non-blocking)
126+
if (isStale) {
127+
setImmediate(async () => {
128+
try {
129+
const freshMessages = await Message.find({ groupId, deletedAt: null })
130+
.populate('senderId', 'name email')
131+
.populate('metadata.expenseId', 'description amount currency')
132+
.populate('metadata.settlementId', 'amount currency')
133+
.sort({ _id: -1 })
134+
.limit(51)
135+
.lean();
136+
137+
const hasMore = freshMessages.length > 50;
138+
if (hasMore) freshMessages.pop();
139+
140+
const safeMessages = freshMessages.map(msg =>
141+
msg.deletedAt ? { ...msg, content: '[Message deleted]', metadata: {} } : msg
142+
);
143+
144+
const freshResult = {
145+
messages: safeMessages.reverse(),
146+
hasMore,
147+
oldestMessageId: safeMessages.length > 0 ? safeMessages[0]._id : null,
148+
};
149+
150+
cacheSetAsync(cacheKey, CACHE_TTL_MESSAGES, freshResult);
151+
} catch (err) {
152+
console.error('Background message refresh error:', err);
153+
}
154+
});
155+
}
156+
157+
// Return cached data immediately
158+
return res.json(parsedData);
85159
}
86160
}
87161

@@ -128,9 +202,9 @@ export const getMessages = async (req, res) => {
128202
oldestMessageId: safeMessages.length > 0 ? safeMessages[0]._id : null,
129203
};
130204

131-
// Cache first page
205+
// Cache first page (non-blocking)
132206
if (!before) {
133-
await redis.setex(`chat:${groupId}:latest`, CACHE_TTL_MESSAGES, JSON.stringify(result));
207+
cacheSetAsync(`chat:${groupId}:latest`, CACHE_TTL_MESSAGES, result);
134208
}
135209

136210
res.json(result);
@@ -447,23 +521,46 @@ export const getUnreadCount = async (req, res) => {
447521
return res.status(403).json({ message: 'Not authorized' });
448522
}
449523

450-
// Try cache first
524+
// Try cache first with stale-while-revalidate
451525
const cacheKey = `chat:${groupId}:unread:${req.user._id}`;
452-
const cached = await redis.get(cacheKey);
453-
if (cached !== null) {
454-
return res.json({ count: parseInt(cached) });
526+
const cacheResult = await getCacheWithStaleness(cacheKey, STALE_THRESHOLD_UNREAD);
527+
528+
if (cacheResult) {
529+
const { data, isStale } = cacheResult;
530+
const cachedCount = parseInt(data);
531+
532+
// If stale, trigger background refresh (non-blocking)
533+
if (isStale) {
534+
const userId = req.user._id;
535+
setImmediate(async () => {
536+
try {
537+
const freshCount = await Message.countDocuments({
538+
groupId,
539+
deletedAt: null,
540+
senderId: { $ne: userId },
541+
readBy: { $ne: userId },
542+
});
543+
cacheSetAsync(cacheKey, CACHE_TTL_UNREAD, freshCount.toString());
544+
} catch (err) {
545+
console.error('Background unread count refresh error:', err);
546+
}
547+
});
548+
}
549+
550+
// Return cached data immediately
551+
return res.json({ count: cachedCount });
455552
}
456553

457-
// Count unread messages
554+
// Count unread messages (cache miss)
458555
const count = await Message.countDocuments({
459556
groupId,
460557
deletedAt: null,
461558
senderId: { $ne: req.user._id },
462559
readBy: { $ne: req.user._id },
463560
});
464561

465-
// Cache the result
466-
await redis.setex(cacheKey, CACHE_TTL_UNREAD, count.toString());
562+
// Cache the result (non-blocking)
563+
cacheSetAsync(cacheKey, CACHE_TTL_UNREAD, count.toString());
467564

468565
res.json({ count });
469566
} catch (error) {
@@ -529,15 +626,25 @@ export const getBatchUnreadCounts = async (req, res) => {
529626

530627
const results = await Message.aggregate(pipeline);
531628

532-
// Process results and cache them
629+
// Process results and warm individual caches (non-blocking)
533630
for (const groupId of uncachedGroupIds) {
534631
const result = results.find(r => r._id.toString() === groupId);
535632
const count = result ? result.count : 0;
536633
counts[groupId] = count;
537634

538-
// Cache the result
635+
// Warm individual cache (non-blocking) - enables subsequent individual lookups to hit cache
636+
const cacheKey = `chat:${groupId}:unread:${userId}`;
637+
cacheSetAsync(cacheKey, CACHE_TTL_UNREAD, count.toString());
638+
}
639+
}
640+
641+
// Also warm cache for groups that had cache hits (extend TTL for frequently accessed)
642+
// This is non-blocking and helps keep hot data fresh
643+
for (const groupId of validGroupIds) {
644+
if (!uncachedGroupIds.includes(groupId)) {
539645
const cacheKey = `chat:${groupId}:unread:${userId}`;
540-
await redis.setex(cacheKey, CACHE_TTL_UNREAD, count.toString());
646+
// Touch the cache to extend TTL for frequently accessed groups
647+
redis.expire(cacheKey, CACHE_TTL_UNREAD).catch(() => {});
541648
}
542649
}
543650

server/middleware/ddosProtection.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,11 @@ export const ddosProtection = async (req, res, next) => {
2121
}
2222

2323
if (requests > 50) { // 50 requests per second
24-
return res.status(429).json({ message: 'Too many requests' });
24+
res.setHeader('Retry-After', '1');
25+
return res.status(429).json({
26+
message: 'Too many requests. Please wait 1 second.',
27+
retryAfter: 1,
28+
});
2529
}
2630

2731
next();

0 commit comments

Comments
 (0)