Skip to content

Commit e603b21

Browse files
committed
fix: redpanda
1 parent e61b55f commit e603b21

File tree

3 files changed

+82
-353
lines changed

3 files changed

+82
-353
lines changed

infra/ingest/console-config.yml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,3 @@ authentication:
1313
useSecureCookies: false
1414
basic:
1515
enabled: true
16-
authorization:
17-
roleBindings:
18-
- roleName: admin
19-
users:
20-
- loginType: basic
21-
name: "admin"

packages/redis/cacheable.ts

Lines changed: 65 additions & 248 deletions
Original file line numberDiff line numberDiff line change
@@ -1,309 +1,126 @@
11
import { getRedisCache } from "./redis";
22

3-
const logger = console;
4-
53
const activeRevalidations = new Map<string, Promise<void>>();
6-
7-
const stringifyRegex = /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.*Z$/;
4+
const DATE_REGEX = /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.*Z$/;
85

96
let redisAvailable = true;
107
let lastRedisCheck = 0;
11-
const REDIS_CHECK_INTERVAL = 30_000;
128

139
type CacheOptions = {
1410
expireInSec: number;
1511
prefix?: string;
16-
serialize?: (data: unknown) => string;
17-
deserialize?: (data: string) => unknown;
1812
staleWhileRevalidate?: boolean;
1913
staleTime?: number;
20-
maxRetries?: number;
2114
};
2215

23-
const defaultSerialize = (data: unknown): string => JSON.stringify(data);
24-
const defaultDeserialize = (data: string): unknown =>
25-
JSON.parse(data, (_, value) => {
26-
if (typeof value === "string" && stringifyRegex.test(value)) {
16+
function deserialize(data: string): unknown {
17+
return JSON.parse(data, (_, value) => {
18+
if (typeof value === "string" && DATE_REGEX.test(value)) {
2719
return new Date(value);
2820
}
2921
return value;
3022
});
23+
}
3124

3225
function shouldSkipRedis(): boolean {
33-
const now = Date.now();
34-
if (!redisAvailable && now - lastRedisCheck < REDIS_CHECK_INTERVAL) {
26+
if (!redisAvailable && Date.now() - lastRedisCheck < 30_000) {
3527
return true;
3628
}
37-
if (!redisAvailable && now - lastRedisCheck >= REDIS_CHECK_INTERVAL) {
29+
if (!redisAvailable) {
3830
redisAvailable = true;
39-
lastRedisCheck = now;
31+
lastRedisCheck = Date.now();
4032
}
4133
return false;
4234
}
4335

44-
export async function getCache<T>(
45-
key: string,
46-
options: CacheOptions | number,
47-
fn: () => Promise<T>
48-
): Promise<T> {
49-
const {
50-
expireInSec,
51-
serialize = defaultSerialize,
52-
deserialize = defaultDeserialize,
53-
staleWhileRevalidate = false,
54-
staleTime = 0,
55-
maxRetries = 1,
56-
} = typeof options === "number" ? { expireInSec: options } : options;
57-
58-
if (shouldSkipRedis()) {
59-
return fn();
36+
function stringify(obj: unknown): string {
37+
if (obj === null) {
38+
return "null";
6039
}
61-
62-
let retries = 0;
63-
while (retries < maxRetries) {
64-
try {
65-
const redis = getRedisCache();
66-
const hit = await redis.get(key);
67-
redisAvailable = true;
68-
lastRedisCheck = Date.now();
69-
70-
if (hit) {
71-
const data = deserialize(hit) as T;
72-
73-
if (staleWhileRevalidate) {
74-
try {
75-
const ttl = await redis.ttl(key);
76-
if (ttl < staleTime && !activeRevalidations.has(key)) {
77-
const revalidationPromise = fn()
78-
.then(async (freshData: T) => {
79-
if (
80-
freshData !== undefined &&
81-
freshData !== null &&
82-
redisAvailable
83-
) {
84-
try {
85-
const redis = getRedisCache();
86-
await redis.setex(key, expireInSec, serialize(freshData));
87-
} catch {
88-
// Ignore SET failure
89-
}
90-
}
91-
})
92-
.catch((error: unknown) => {
93-
logger.error(
94-
`Background revalidation failed for key ${key}:`,
95-
error
96-
);
97-
})
98-
.finally(() => {
99-
activeRevalidations.delete(key);
100-
});
101-
activeRevalidations.set(key, revalidationPromise);
102-
}
103-
} catch {
104-
// Ignore TTL check failure
105-
}
106-
}
107-
108-
return data;
109-
}
110-
111-
const data = await fn();
112-
if (
113-
data !== undefined &&
114-
data !== null &&
115-
redisAvailable &&
116-
!shouldSkipRedis()
117-
) {
118-
try {
119-
await redis.setex(key, expireInSec, serialize(data));
120-
} catch {
121-
redisAvailable = false;
122-
lastRedisCheck = Date.now();
123-
}
124-
}
125-
return data;
126-
} catch (error: unknown) {
127-
retries += 1;
128-
if (retries >= maxRetries) {
129-
redisAvailable = false;
130-
lastRedisCheck = Date.now();
131-
logger.error(`Cache error for key ${key}, skipping Redis:`, error);
132-
return fn();
133-
}
134-
}
40+
if (obj === undefined) {
41+
return "undefined";
13542
}
136-
137-
return fn();
43+
if (typeof obj === "boolean") {
44+
return obj ? "true" : "false";
45+
}
46+
if (typeof obj === "number" || typeof obj === "string") {
47+
return String(obj);
48+
}
49+
if (typeof obj === "function") {
50+
return obj.toString();
51+
}
52+
if (Array.isArray(obj)) {
53+
return `[${obj.map(stringify).join(",")}]`;
54+
}
55+
if (typeof obj === "object") {
56+
return Object.entries(obj)
57+
.sort(([a], [b]) => a.localeCompare(b))
58+
.map(([k, v]) => `${k}:${stringify(v)}`)
59+
.join(":");
60+
}
61+
return String(obj);
13862
}
13963

140-
export function cacheable<T extends (...args: any) => any>(
64+
export function cacheable<T extends (...args: Parameters<T>) => Promise<Awaited<ReturnType<T>>>>(
14165
fn: T,
14266
options: CacheOptions | number
14367
) {
14468
const {
14569
expireInSec,
14670
prefix = fn.name,
147-
serialize = defaultSerialize,
148-
deserialize = defaultDeserialize,
14971
staleWhileRevalidate = false,
15072
staleTime = 0,
15173
} = typeof options === "number" ? { expireInSec: options } : options;
15274

15375
const cachePrefix = `cacheable:${prefix}`;
76+
const getKey = (...args: Parameters<T>) => `${cachePrefix}:${stringify(args)}`;
15477

155-
function stringify(obj: unknown): string {
156-
if (obj === null) {
157-
return "null";
158-
}
159-
if (obj === undefined) {
160-
return "undefined";
161-
}
162-
if (typeof obj === "boolean") {
163-
return obj ? "true" : "false";
164-
}
165-
if (typeof obj === "number") {
166-
return String(obj);
167-
}
168-
if (typeof obj === "string") {
169-
return obj;
170-
}
171-
if (typeof obj === "function") {
172-
return obj.toString();
173-
}
174-
175-
if (Array.isArray(obj)) {
176-
return `[${obj.map(stringify).join(",")}]`;
177-
}
178-
179-
if (typeof obj === "object") {
180-
const pairs = Object.entries(obj)
181-
.sort(([a], [b]) => a.localeCompare(b))
182-
.map(([key, value]) => `${key}:${stringify(value)}`);
183-
return pairs.join(":");
184-
}
185-
186-
return String(obj);
187-
}
188-
189-
const getKey = (...args: Parameters<T>) =>
190-
`${cachePrefix}:${stringify(args)}`;
191-
192-
const cachedFn = async (
193-
...args: Parameters<T>
194-
): Promise<Awaited<ReturnType<T>>> => {
195-
const key = getKey(...args);
196-
const retries = typeof options === "number" ? 1 : (options.maxRetries ?? 1);
197-
78+
const cachedFn = async (...args: Parameters<T>): Promise<Awaited<ReturnType<T>>> => {
19879
if (shouldSkipRedis()) {
19980
return fn(...args);
20081
}
20182

202-
let attempt = 0;
203-
while (attempt < retries) {
204-
try {
205-
const redis = getRedisCache();
206-
const cached = await redis.get(key);
207-
redisAvailable = true;
208-
lastRedisCheck = Date.now();
83+
const key = getKey(...args);
20984

210-
if (cached) {
211-
const data = deserialize(cached) as Awaited<ReturnType<T>>;
85+
try {
86+
const redis = getRedisCache();
87+
const cached = await redis.get(key);
88+
redisAvailable = true;
89+
lastRedisCheck = Date.now();
21290

213-
if (staleWhileRevalidate) {
214-
try {
215-
const ttl = await redis.ttl(key);
216-
if (ttl < staleTime && !activeRevalidations.has(key)) {
217-
const revalidationPromise = fn(...args)
218-
.then(async (freshData: Awaited<ReturnType<T>>) => {
219-
if (
220-
freshData !== undefined &&
221-
freshData !== null &&
222-
redisAvailable
223-
) {
224-
try {
225-
const redis = getRedisCache();
226-
await redis.setex(key, expireInSec, serialize(freshData));
227-
} catch {
228-
// Ignore SET failure
229-
}
230-
}
231-
})
232-
.catch((error: unknown) => {
233-
logger.error(
234-
`Background revalidation failed for function ${fn.name}:`,
235-
error
236-
);
237-
})
238-
.finally(() => {
239-
activeRevalidations.delete(key);
240-
});
241-
activeRevalidations.set(key, revalidationPromise);
242-
}
243-
} catch {
244-
// Ignore TTL check failure
245-
}
91+
if (cached) {
92+
if (staleWhileRevalidate) {
93+
const ttl = await redis.ttl(key).catch(() => expireInSec);
94+
if (ttl < staleTime && !activeRevalidations.has(key)) {
95+
const revalidation = fn(...args)
96+
.then(async (fresh) => {
97+
if (fresh != null && redisAvailable) {
98+
await redis.setex(key, expireInSec, JSON.stringify(fresh)).catch(() => { });
99+
}
100+
})
101+
.catch(() => { })
102+
.finally(() => activeRevalidations.delete(key));
103+
activeRevalidations.set(key, revalidation);
246104
}
247-
248-
return data;
249105
}
106+
return deserialize(cached) as Awaited<ReturnType<T>>;
107+
}
250108

251-
const result = await fn(...args);
252-
if (
253-
result !== undefined &&
254-
result !== null &&
255-
redisAvailable &&
256-
!shouldSkipRedis()
257-
) {
258-
try {
259-
await redis.setex(key, expireInSec, serialize(result));
260-
} catch {
261-
redisAvailable = false;
262-
lastRedisCheck = Date.now();
263-
}
264-
}
265-
return result;
266-
} catch (error: unknown) {
267-
attempt += 1;
268-
if (attempt >= retries) {
109+
const result = await fn(...args);
110+
if (result != null && redisAvailable) {
111+
await redis.setex(key, expireInSec, JSON.stringify(result)).catch(() => {
269112
redisAvailable = false;
270113
lastRedisCheck = Date.now();
271-
logger.error(
272-
`Cache error for function ${fn.name}, skipping Redis:`,
273-
error
274-
);
275-
return fn(...args);
276-
}
114+
});
277115
}
116+
return result;
117+
} catch {
118+
redisAvailable = false;
119+
lastRedisCheck = Date.now();
120+
return fn(...args);
278121
}
279-
280-
return fn(...args);
281122
};
282123

283124
cachedFn.getKey = getKey;
284-
cachedFn.clear = (...args: Parameters<T>) => {
285-
const key = getKey(...args);
286-
const redis = getRedisCache();
287-
return redis.del(key);
288-
};
289-
290-
cachedFn.clearAll = async () => {
291-
const redis = getRedisCache();
292-
const keys = await redis.keys(`${cachePrefix}:*`);
293-
if (keys.length > 0) {
294-
return redis.del(...keys);
295-
}
296-
};
297-
298-
cachedFn.invalidate = async (...args: Parameters<T>) => {
299-
const key = getKey(...args);
300-
const result = await fn(...args);
301-
if (result !== undefined && result !== null) {
302-
const redis = getRedisCache();
303-
await redis.setex(key, expireInSec, serialize(result));
304-
}
305-
return result;
306-
};
307-
308125
return cachedFn;
309126
}

0 commit comments

Comments
 (0)