Skip to content

Commit ff6941a

Browse files
committed
fix comments
1 parent dfd26b4 commit ff6941a

File tree

11 files changed

+148
-141
lines changed

11 files changed

+148
-141
lines changed

apps/api/src/hooks/duplicate.hook.ts

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,19 @@ export async function duplicateHook(
88
}>,
99
reply: FastifyReply,
1010
) {
11-
const isDuplicate = await isDuplicatedEvent({
12-
ip: req.clientIp ?? '',
13-
origin: req.headers.origin ?? '',
14-
payload: req.body,
15-
projectId: (req.headers['openpanel-client-id'] as string) || '',
16-
});
11+
const ip = req.clientIp;
12+
const origin = req.headers.origin;
13+
const clientId = req.headers['openpanel-client-id'];
14+
const shouldCheck = ip && origin && clientId;
15+
16+
const isDuplicate = shouldCheck
17+
? await isDuplicatedEvent({
18+
ip,
19+
origin,
20+
payload: req.body,
21+
projectId: clientId as string,
22+
})
23+
: false;
1724

1825
if (isDuplicate) {
1926
return reply.status(200).send('Duplicate event');

apps/api/src/routes/track.router.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@ import { handler } from '@/controllers/track.controller';
22
import type { FastifyPluginCallback } from 'fastify';
33

44
import { clientHook } from '@/hooks/client.hook';
5+
import { duplicateHook } from '@/hooks/duplicate.hook';
56
import { isBotHook } from '@/hooks/is-bot.hook';
67

78
const trackRouter: FastifyPluginCallback = async (fastify) => {
8-
fastify.addHook('preHandler', clientHook);
99
fastify.addHook('preHandler', isBotHook);
10+
fastify.addHook('preValidation', duplicateHook);
11+
fastify.addHook('preHandler', clientHook);
1012

1113
fastify.route({
1214
method: 'POST',

apps/api/src/utils/auth.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ export async function validateSdkRequest(
137137

138138
if (client.secret && clientSecret) {
139139
const isVerified = await getCache(
140-
`client:auth:${clientId}:${clientSecret.slice(0, 5)}`,
140+
`client:auth:${clientId}:${Buffer.from(clientSecret).toString('base64')}`,
141141
60 * 5,
142142
async () => await verifyPassword(clientSecret, client.secret!),
143143
true,

apps/worker/scripts/cleanup-old-event-buffer-keys.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,12 @@ async function cleanupOldEventBufferKeys(): Promise<CleanupStats> {
104104
const events = await redis.lrange(sessionKey, 0, -1);
105105

106106
if (events.length > 0) {
107-
// Move events to new queue
108-
await redis.rpush(newQueueKey, ...events);
107+
// Move events to new queue in safe batches to avoid exceeding V8 arg limits
108+
const chunkSize = 1000;
109+
for (let offset = 0; offset < events.length; offset = chunkSize) {
110+
const chunk = events.slice(offset, offset + chunkSize);
111+
await redis.rpush(newQueueKey, ...chunk);
112+
}
109113
// Update buffer counter
110114
await redis.incrby('event_buffer:total_count', events.length);
111115
totalEventsMigrated += events.length;

apps/worker/src/boot-cron.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ export async function bootCron() {
5858
// TODO: Switch to getJobSchedulers
5959
const repeatableJobs = await cronQueue.getRepeatableJobs();
6060
for (const repeatableJob of repeatableJobs) {
61-
cronQueue.removeRepeatableByKey(repeatableJob.key);
61+
await cronQueue.removeRepeatableByKey(repeatableJob.key);
6262
}
6363

6464
// Add repeatable jobs

apps/worker/src/boot-workers.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import { setTimeout as sleep } from 'node:timers/promises';
1919
import { Worker as GroupWorker } from 'groupmq';
2020

2121
import { cronJob } from './jobs/cron';
22-
import { incomingEventPure } from './jobs/events.incoming-event';
22+
import { incomingEvent } from './jobs/events.incoming-event';
2323
import { importJob } from './jobs/import';
2424
import { miscJob } from './jobs/misc';
2525
import { notificationJob } from './jobs/notification';
@@ -122,7 +122,7 @@ export async function bootWorkers() {
122122
process.env.EVENT_BLOCKING_TIMEOUT_SEC || '1',
123123
),
124124
handler: async (job) => {
125-
return await incomingEventPure(job.data);
125+
return await incomingEvent(job.data);
126126
},
127127
});
128128

@@ -184,7 +184,7 @@ export async function bootWorkers() {
184184
concurrency,
185185
});
186186
workers.push(importWorker);
187-
logger.info('Started worker for misc', { concurrency });
187+
logger.info('Started worker for import', { concurrency });
188188
}
189189

190190
if (workers.length === 0) {

apps/worker/src/jobs/events.incoming-event.ts

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,18 +45,8 @@ async function createEventAndNotify(
4545
}
4646

4747
export async function incomingEvent(
48-
job: Job<EventsQueuePayloadIncomingEvent>,
49-
token?: string,
50-
) {
51-
return incomingEventPure(job.data.payload, job, token);
52-
}
53-
54-
export async function incomingEventPure(
5548
jobPayload: EventsQueuePayloadIncomingEvent['payload'],
56-
job?: Job<EventsQueuePayloadIncomingEvent>,
57-
token?: string,
5849
) {
59-
await getRedisCache().incr('queue:counter');
6050
const {
6151
geo,
6252
event: body,

0 commit comments

Comments
 (0)