Skip to content

Commit 606cbe2

Browse files
committed
refactor: memory usage improvements
1 parent 2302051 commit 606cbe2

File tree

8 files changed

+97
-58
lines changed

8 files changed

+97
-58
lines changed

apps/basket/src/index.ts

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,37 @@ import "./polyfills/compression";
22

33
import { Elysia } from "elysia";
44
import { logger } from "./lib/logger";
5-
import { getProducerStats } from "./lib/producer";
5+
import { disconnectProducer, getProducerStats } from "./lib/producer";
66
import {
77
endRequestSpan,
88
initTracing,
9+
shutdownTracing,
910
startRequestSpan,
1011
} from "./lib/tracing";
1112
import basketRouter from "./routes/basket";
1213
import emailRouter from "./routes/email";
14+
import { closeGeoIPReader } from "./utils/ip-geo";
1315

1416
initTracing();
1517

18+
process.on("SIGTERM", async () => {
19+
logger.info("SIGTERM received, shutting down gracefully...");
20+
await Promise.all([disconnectProducer(), shutdownTracing()]).catch((error) =>
21+
logger.error({ error }, "Shutdown error")
22+
);
23+
closeGeoIPReader();
24+
process.exit(0);
25+
});
26+
27+
process.on("SIGINT", async () => {
28+
logger.info("SIGINT received, shutting down gracefully...");
29+
await Promise.all([disconnectProducer(), shutdownTracing()]).catch((error) =>
30+
logger.error({ error }, "Shutdown error")
31+
);
32+
closeGeoIPReader();
33+
process.exit(0);
34+
});
35+
1636
function getKafkaHealth() {
1737
const stats = getProducerStats();
1838

@@ -99,7 +119,7 @@ const app = new Elysia()
99119

100120
const port = process.env.PORT || 4000;
101121

102-
console.log(`Starting basket service on port ${port}`);
122+
logger.info(`Starting basket service on port ${port}`);
103123

104124
export default {
105125
fetch: app.fetch,

apps/basket/src/lib/blocked-traffic.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ async function _logBlockedTrafficAsync(
2424
) || "";
2525

2626

27-
const [geo, ua, now] = await Promise.all([
27+
const [geo, ua] = await Promise.all([
2828
getGeo(ip),
2929
parseUserAgent(userAgent),
30-
Date.now(),
3130
]);
31+
const now = Date.now();
3232
const { anonymizedIP, country, region, city } = geo;
3333
const { browserName, browserVersion, osName, osVersion, deviceType } = ua;
3434

apps/basket/src/lib/event-service.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ export function insertError(
5959

6060
const { anonymizedIP, country, region } = geoData;
6161
const { browserName, browserVersion, osName, osVersion, deviceType } =
62-
parseUserAgent(userAgent);
62+
await parseUserAgent(userAgent);
6363

6464
const errorEvent: ErrorEvent = {
6565
id: randomUUID(),
@@ -141,7 +141,7 @@ export async function insertWebVitals(
141141

142142
const { country, region } = await getGeo(ip);
143143
const { browserName, browserVersion, osName, osVersion, deviceType } =
144-
parseUserAgent(userAgent);
144+
await parseUserAgent(userAgent);
145145

146146
const webVitalsEvent: WebVitalsEvent = {
147147
id: randomUUID(),
@@ -331,7 +331,7 @@ export function insertTrackEvent(
331331
deviceType,
332332
deviceBrand,
333333
deviceModel,
334-
} = parseUserAgent(userAgent);
334+
} = await parseUserAgent(userAgent);
335335
const now = Date.now();
336336

337337
const trackEvent: AnalyticsEvent = {

apps/basket/src/lib/producer.ts

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,8 @@ export class EventProducer {
193193
}
194194

195195
this.flushing = true;
196-
const items = this.buffer.splice(0);
196+
const batchSize = Math.min(this.buffer.length, this.config.bufferMax);
197+
const items = this.buffer.splice(0, batchSize);
197198

198199
try {
199200
const grouped = items.reduce(
@@ -238,7 +239,7 @@ export class EventProducer {
238239

239240
for (const { event, retries, timestamp } of items) {
240241
const age = Date.now() - timestamp;
241-
if (retries < this.config.maxRetries && age < 300_000) {
242+
if (retries < this.config.maxRetries && age < 300_000 && this.buffer.length < this.config.bufferHardMax) {
242243
this.buffer.push({
243244
table,
244245
event,
@@ -249,7 +250,7 @@ export class EventProducer {
249250
this.stats.dropped += 1;
250251
logger.error(
251252
{ error },
252-
`Dropped event (retries: ${retries}, age: ${age}ms)`,
253+
`Dropped event (retries: ${retries}, age: ${age}ms, buffer: ${this.buffer.length})`,
253254
{ table, eventId: (event as { event_id?: string }).event_id }
254255
);
255256
}
@@ -611,17 +612,3 @@ export const disconnectProducer = async (): Promise<void> => {
611612
};
612613

613614
export const getProducerStats = () => getDefaultProducer().getStats();
614-
615-
process.on("SIGTERM", async () => {
616-
await disconnectProducer().catch((error) =>
617-
logger.error({ error }, "SIGTERM error")
618-
);
619-
process.exit(0);
620-
});
621-
622-
process.on("SIGINT", async () => {
623-
await disconnectProducer().catch((error) =>
624-
logger.error({ error }, "SIGINT error")
625-
);
626-
process.exit(0);
627-
});

apps/basket/src/lib/tracing.ts

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,37 +19,37 @@ export function initTracing(): void {
1919
return;
2020
}
2121

22+
const exporter = new OTLPTraceExporter({
23+
url: "https://api.axiom.co/v1/traces",
24+
headers: {
25+
Authorization: `Bearer ${process.env.AXIOM_TOKEN}`,
26+
"X-Axiom-Dataset": process.env.AXIOM_DATASET ?? "basket",
27+
},
28+
});
29+
2230
sdk = new NodeSDK({
2331
resource: resourceFromAttributes({
2432
[ATTR_SERVICE_NAME]: "basket",
2533
[ATTR_SERVICE_VERSION]: pkg.version,
2634
}),
27-
traceExporter: new OTLPTraceExporter({
28-
url: "https://api.axiom.co/v1/traces",
29-
headers: {
30-
Authorization: `Bearer ${process.env.AXIOM_TOKEN}`,
31-
"X-Axiom-Dataset": process.env.AXIOM_DATASET ?? "basket",
32-
},
35+
spanProcessor: new BatchSpanProcessor(exporter, {
36+
scheduledDelayMillis: 1000,
37+
exportTimeoutMillis: 30_000,
38+
maxExportBatchSize: 512,
39+
maxQueueSize: 2048,
3340
}),
34-
spanProcessor: new BatchSpanProcessor(
35-
new OTLPTraceExporter({
36-
url: "https://api.axiom.co/v1/traces",
37-
headers: {
38-
Authorization: `Bearer ${process.env.AXIOM_TOKEN}`,
39-
"X-Axiom-Dataset": process.env.AXIOM_DATASET ?? "basket",
40-
},
41-
}),
42-
{
43-
scheduledDelayMillis: 1000,
44-
exportTimeoutMillis: 30_000,
45-
maxExportBatchSize: 512,
46-
}
47-
),
4841
});
4942

5043
sdk.start();
5144
}
5245

46+
export async function shutdownTracing(): Promise<void> {
47+
if (sdk) {
48+
await sdk.shutdown();
49+
sdk = null;
50+
}
51+
}
52+
5353
/**
5454
* Get tracer
5555
*/

apps/basket/src/utils/ip-geo.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ let reader: GeoIPReader | null = null;
1919
let isLoading = false;
2020
let loadPromise: Promise<void> | null = null;
2121
let loadError: Error | null = null;
22+
let dbBuffer: Buffer | null = null;
2223

2324
async function loadDatabaseFromCdn(): Promise<Buffer> {
2425
try {
@@ -61,12 +62,13 @@ function loadDatabase() {
6162
isLoading = true;
6263
loadPromise = (async () => {
6364
try {
64-
const dbBuffer = await loadDatabaseFromCdn();
65+
dbBuffer = await loadDatabaseFromCdn();
6566
reader = Reader.openBuffer(dbBuffer) as GeoIPReader;
6667
} catch (error) {
6768
logger.error({ error }, "Failed to load GeoIP database");
6869
loadError = error as Error;
6970
reader = null;
71+
dbBuffer = null;
7072
} finally {
7173
isLoading = false;
7274
}
@@ -206,3 +208,15 @@ export function extractIpFromRequest(request: Request): string {
206208

207209
return "";
208210
}
211+
212+
export function closeGeoIPReader(): void {
213+
if (reader) {
214+
reader = null;
215+
}
216+
if (dbBuffer) {
217+
dbBuffer = null;
218+
}
219+
loadPromise = null;
220+
loadError = null;
221+
isLoading = false;
222+
}

apps/basket/src/utils/user-agent.ts

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
* and platform identification.
66
*/
77

8+
import { cacheable } from "@databuddy/redis";
89
import { bots } from "@databuddy/shared/lists/bots";
910
import { UAParser } from "ua-parser-js";
1011
import { logger } from "../lib/logger";
@@ -21,9 +22,9 @@ export type UserAgentInfo = {
2122
};
2223

2324
/**
24-
* Parse user agent to extract useful information
25+
* Parse user agent to extract useful information (uncached)
2526
*/
26-
export function parseUserAgent(userAgent: string): {
27+
function _parseUserAgent(userAgent: string): {
2728
browserName?: string;
2829
browserVersion?: string;
2930
osName?: string;
@@ -58,10 +59,7 @@ export function parseUserAgent(userAgent: string): {
5859
deviceModel: result.device.model || undefined,
5960
};
6061
} catch (error) {
61-
logger.error(
62-
{ error, userAgent },
63-
"Failed to parse user agent"
64-
);
62+
logger.error({ error, userAgent }, "Failed to parse user agent");
6563
return {
6664
browserName: undefined,
6765
browserVersion: undefined,
@@ -74,6 +72,16 @@ export function parseUserAgent(userAgent: string): {
7472
}
7573
}
7674

75+
/**
76+
* Parse user agent to extract useful information (cached)
77+
*/
78+
export const parseUserAgent = cacheable(_parseUserAgent, {
79+
expireInSec: 3600,
80+
prefix: "ua_parse",
81+
staleWhileRevalidate: true,
82+
staleTime: 1800,
83+
});
84+
7785
export function detectBot(
7886
userAgent: string,
7987
request: Request

packages/redis/cacheable.ts

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@ import { getRedisCache } from "./redis";
22

33
const logger = console;
44

5+
const activeRevalidations = new Map<string, Promise<void>>();
6+
57
const stringifyRegex = /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.*Z$/;
68

7-
interface CacheOptions {
9+
type CacheOptions = {
810
expireInSec: number;
911
prefix?: string;
1012
serialize?: (data: unknown) => string;
@@ -47,9 +49,9 @@ export async function getCache<T>(
4749

4850
if (staleWhileRevalidate) {
4951
const ttl = await redis.ttl(key);
50-
if (ttl < staleTime) {
52+
if (ttl < staleTime && !activeRevalidations.has(key)) {
5153
// Return stale data and revalidate in background
52-
fn()
54+
const revalidationPromise = fn()
5355
.then(async (freshData: T) => {
5456
if (freshData !== undefined && freshData !== null) {
5557
const redis = getRedisCache();
@@ -61,7 +63,11 @@ export async function getCache<T>(
6163
`Background revalidation failed for key ${key}:`,
6264
error
6365
);
66+
})
67+
.finally(() => {
68+
activeRevalidations.delete(key);
6469
});
70+
activeRevalidations.set(key, revalidationPromise);
6571
}
6672
}
6773

@@ -74,7 +80,7 @@ export async function getCache<T>(
7480
}
7581
return data;
7682
} catch (error: unknown) {
77-
retries++;
83+
retries += 1;
7884
if (retries === maxRetries) {
7985
logger.error(
8086
`Cache error for key ${key} after ${maxRetries} retries:`,
@@ -157,9 +163,9 @@ export function cacheable<T extends (...args: any) => any>(
157163

158164
if (staleWhileRevalidate) {
159165
const ttl = await redis.ttl(key);
160-
if (ttl < staleTime) {
166+
if (ttl < staleTime && !activeRevalidations.has(key)) {
161167
// Return stale data and revalidate in background
162-
fn(...args)
168+
const revalidationPromise = fn(...args)
163169
.then(async (freshData: Awaited<ReturnType<T>>) => {
164170
if (freshData !== undefined && freshData !== null) {
165171
const redis = getRedisCache();
@@ -171,7 +177,11 @@ export function cacheable<T extends (...args: any) => any>(
171177
`Background revalidation failed for function ${fn.name}:`,
172178
error
173179
);
180+
})
181+
.finally(() => {
182+
activeRevalidations.delete(key);
174183
});
184+
activeRevalidations.set(key, revalidationPromise);
175185
}
176186
}
177187

@@ -184,7 +194,7 @@ export function cacheable<T extends (...args: any) => any>(
184194
}
185195
return result;
186196
} catch (error: unknown) {
187-
retries++;
197+
retries += 1;
188198
if (retries === maxRetries) {
189199
logger.error(
190200
`Cache error for function ${fn.name} after ${maxRetries} retries:`,

0 commit comments

Comments
 (0)