@@ -2,6 +2,8 @@ import crypto, { createHash, randomUUID } from 'node:crypto';
22import {
33 type AnalyticsEvent ,
44 type BlockedTraffic ,
5+ type CustomEvent ,
6+ type CustomOutgoingLink ,
57 clickHouse ,
68 type ErrorEvent ,
79 type WebVitalsEvent ,
@@ -13,7 +15,9 @@ import { getWebsiteByIdV2, isValidOrigin } from '../hooks/auth';
1315import { logger } from '../lib/logger' ;
1416import {
1517 analyticsEventSchema ,
18+ customEventSchema ,
1619 errorEventSchema ,
20+ outgoingLinkSchema ,
1721 webVitalsEventSchema ,
1822} from '../utils/event-schema' ;
1923import { extractIpFromRequest , getGeo } from '../utils/ip-geo' ;
@@ -288,6 +292,104 @@ async function insertWebVitals(
288292 }
289293}
290294
295+ async function insertCustomEvent (
296+ customData : any ,
297+ clientId : string ,
298+ userAgent : string ,
299+ ip : string
300+ ) : Promise < void > {
301+ const eventId = sanitizeString (
302+ customData . eventId ,
303+ VALIDATION_LIMITS . SHORT_STRING_MAX_LENGTH
304+ ) ;
305+ if ( await checkDuplicate ( eventId , 'custom' ) ) {
306+ return ;
307+ }
308+
309+ const now = Date . now ( ) ;
310+
311+ const customEvent : CustomEvent = {
312+ id : randomUUID ( ) ,
313+ client_id : clientId ,
314+ event_name : sanitizeString (
315+ customData . name ,
316+ VALIDATION_LIMITS . SHORT_STRING_MAX_LENGTH
317+ ) ,
318+ anonymous_id : sanitizeString (
319+ customData . anonymousId ,
320+ VALIDATION_LIMITS . SHORT_STRING_MAX_LENGTH
321+ ) ,
322+ session_id : validateSessionId ( customData . sessionId ) ,
323+ properties : customData . properties
324+ ? JSON . stringify ( customData . properties )
325+ : '{}' ,
326+ timestamp :
327+ typeof customData . timestamp === 'number' ? customData . timestamp : now ,
328+ } ;
329+
330+ try {
331+ await clickHouse . insert ( {
332+ table : 'analytics.custom_events' ,
333+ values : [ customEvent ] ,
334+ format : 'JSONEachRow' ,
335+ } ) ;
336+ } catch ( err ) {
337+ logger . error ( 'Failed to insert custom event' , {
338+ error : err as Error ,
339+ eventId,
340+ } ) ;
341+ throw err ;
342+ }
343+ }
344+
345+ async function insertOutgoingLink (
346+ linkData : any ,
347+ clientId : string ,
348+ userAgent : string ,
349+ ip : string
350+ ) : Promise < void > {
351+ const eventId = sanitizeString (
352+ linkData . eventId ,
353+ VALIDATION_LIMITS . SHORT_STRING_MAX_LENGTH
354+ ) ;
355+ if ( await checkDuplicate ( eventId , 'outgoing_link' ) ) {
356+ return ;
357+ }
358+
359+ const now = Date . now ( ) ;
360+
361+ const outgoingLinkEvent : CustomOutgoingLink = {
362+ id : randomUUID ( ) ,
363+ client_id : clientId ,
364+ anonymous_id : sanitizeString (
365+ linkData . anonymousId ,
366+ VALIDATION_LIMITS . SHORT_STRING_MAX_LENGTH
367+ ) ,
368+ session_id : validateSessionId ( linkData . sessionId ) ,
369+ href : sanitizeString ( linkData . href , VALIDATION_LIMITS . PATH_MAX_LENGTH ) ,
370+ text : sanitizeString ( linkData . text , VALIDATION_LIMITS . TEXT_MAX_LENGTH ) ,
371+ properties : linkData . properties
372+ ? JSON . stringify ( linkData . properties )
373+ : '{}' ,
374+ timestamp :
375+ typeof linkData . timestamp === 'number' ? linkData . timestamp : now ,
376+ } ;
377+
378+ try {
379+ await clickHouse . insert ( {
380+ table : 'analytics.outgoing_links' ,
381+ values : [ outgoingLinkEvent ] ,
382+ format : 'JSONEachRow' ,
383+ } ) ;
384+ } catch ( err ) {
385+ logger . error ( 'Failed to insert outgoing link event' , {
386+ error : err as Error ,
387+ eventId,
388+ } ) ;
389+ throw err ;
390+ }
391+ }
392+
291393async function insertTrackEvent (
292394 trackData : any ,
293395 clientId : string ,
@@ -649,6 +751,62 @@ const app = new Elysia()
649751 return { status : 'success' , type : 'web_vitals' } ;
650752 }
651753
754+ if ( eventType === 'custom' ) {
755+ const parseResult = customEventSchema . safeParse ( body ) ;
756+ if ( ! parseResult . success ) {
757+ console . error (
758+ 'Blocked event schema errors:' ,
759+ parseResult . error . issues ,
760+ 'Payload:' ,
761+ body
762+ ) ;
763+ await logBlockedTraffic (
764+ request ,
765+ body ,
766+ query ,
767+ 'invalid_schema' ,
768+ 'Schema Validation' ,
769+ undefined ,
770+ clientId
771+ ) ;
772+ return {
773+ status : 'error' ,
774+ message : 'Invalid event schema' ,
775+ errors : parseResult . error . issues ,
776+ } ;
777+ }
778+ insertCustomEvent ( body , clientId , userAgent , ip ) ;
779+ return { status : 'success' , type : 'custom' } ;
780+ }
781+
782+ if ( eventType === 'outgoing_link' ) {
783+ const parseResult = outgoingLinkSchema . safeParse ( body ) ;
784+ if ( ! parseResult . success ) {
785+ console . error (
786+ 'Blocked event schema errors:' ,
787+ parseResult . error . issues ,
788+ 'Payload:' ,
789+ body
790+ ) ;
791+ await logBlockedTraffic (
792+ request ,
793+ body ,
794+ query ,
795+ 'invalid_schema' ,
796+ 'Schema Validation' ,
797+ undefined ,
798+ clientId
799+ ) ;
800+ return {
801+ status : 'error' ,
802+ message : 'Invalid event schema' ,
803+ errors : parseResult . error . issues ,
804+ } ;
805+ }
806+ insertOutgoingLink ( body , clientId , userAgent , ip ) ;
807+ return { status : 'success' , type : 'outgoing_link' } ;
808+ }
809+
652810 return { status : 'error' , message : 'Unknown event type' } ;
653811 }
654812 )
@@ -818,6 +976,90 @@ const app = new Elysia()
818976 } ;
819977 }
820978 }
979+ if ( eventType === 'custom' ) {
980+ const parseResult = customEventSchema . safeParse ( event ) ;
981+ if ( ! parseResult . success ) {
982+ console . error (
983+ 'Blocked event schema errors:' ,
984+ parseResult . error . issues ,
985+ 'Payload:' ,
986+ event
987+ ) ;
988+ await logBlockedTraffic (
989+ request ,
990+ event ,
991+ query ,
992+ 'invalid_schema' ,
993+ 'Schema Validation' ,
994+ undefined ,
995+ clientId
996+ ) ;
997+ return {
998+ status : 'error' ,
999+ message : 'Invalid event schema' ,
1000+ eventType,
1001+ errors : parseResult . error . issues ,
1002+ eventId : event . eventId ,
1003+ } ;
1004+ }
1005+ try {
1006+ await insertCustomEvent ( event , clientId , userAgent , ip ) ;
1007+ return {
1008+ status : 'success' ,
1009+ type : 'custom' ,
1010+ eventId : event . eventId ,
1011+ } ;
1012+ } catch ( error ) {
1013+ return {
1014+ status : 'error' ,
1015+ message : 'Processing failed' ,
1016+ eventType,
1017+ error : String ( error ) ,
1018+ } ;
1019+ }
1020+ }
1021+ if ( eventType === 'outgoing_link' ) {
1022+ const parseResult = outgoingLinkSchema . safeParse ( event ) ;
1023+ if ( ! parseResult . success ) {
1024+ console . error (
1025+ 'Blocked event schema errors:' ,
1026+ parseResult . error . issues ,
1027+ 'Payload:' ,
1028+ event
1029+ ) ;
1030+ await logBlockedTraffic (
1031+ request ,
1032+ event ,
1033+ query ,
1034+ 'invalid_schema' ,
1035+ 'Schema Validation' ,
1036+ undefined ,
1037+ clientId
1038+ ) ;
1039+ return {
1040+ status : 'error' ,
1041+ message : 'Invalid event schema' ,
1042+ eventType,
1043+ errors : parseResult . error . issues ,
1044+ eventId : event . eventId ,
1045+ } ;
1046+ }
1047+ try {
1048+ await insertOutgoingLink ( event , clientId , userAgent , ip ) ;
1049+ return {
1050+ status : 'success' ,
1051+ type : 'outgoing_link' ,
1052+ eventId : event . eventId ,
1053+ } ;
1054+ } catch ( error ) {
1055+ return {
1056+ status : 'error' ,
1057+ message : 'Processing failed' ,
1058+ eventType,
1059+ error : String ( error ) ,
1060+ } ;
1061+ }
1062+ }
8211063 return {
8221064 status : 'error' ,
8231065 message : 'Unknown event type' ,
0 commit comments