Skip to content

Commit d98aada

Browse files
authored
Fix PayloadTooLargeError in analytics collector (#9604)
Signed-off-by: Alexander Platov <[email protected]>
1 parent e005c60 commit d98aada

File tree

4 files changed

+72
-18
lines changed

4 files changed

+72
-18
lines changed

dev/docker-compose.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,7 @@ services:
414414
# - STATS_URL=http://huly.local:4900
415415
# - POSTHOG_HOST=${POSTHOG_HOST}
416416
# - POSTHOG_API_KEY=${POSTHOG_API_KEY}
417+
# - MAX_PAYLOAD_SIZE=10mb
417418
msg2file:
418419
image: hardcoreeng/msg2file
419420
ports:

packages/analytics-providers/src/analyticsCollector.ts

Lines changed: 48 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import { type QueuedEvent } from './types'
2323
export class AnalyticsCollectorProvider implements AnalyticProvider {
2424
private readonly collectIntervalMs = 5000
2525
private readonly maxRetries = 3
26+
private readonly maxBatchSize = 100
27+
private readonly maxBatchSizeBytes = 5 * 1024 * 1024 // 5MB
2628
private readonly events: QueuedEvent[] = []
2729
private collectTimer: any = null
2830
private url: string = ''
@@ -70,24 +72,55 @@ export class AnalyticsCollectorProvider implements AnalyticProvider {
7072
const token = getMetadata(presentation.metadata.Token) ?? ''
7173
if (token === '') return
7274

73-
const eventsToSend = this.events.splice(0, this.events.length)
74-
75-
try {
76-
const response = await fetch(`${this.url}/collect`, {
77-
method: 'POST',
78-
headers: {
79-
Authorization: 'Bearer ' + token,
80-
'Content-Type': 'application/json'
81-
},
82-
body: JSON.stringify(eventsToSend)
83-
})
75+
const batches = this.createBatches(this.events)
76+
this.events.length = 0
77+
78+
for (const batch of batches) {
79+
try {
80+
const response = await fetch(`${this.url}/collect`, {
81+
method: 'POST',
82+
headers: {
83+
Authorization: 'Bearer ' + token,
84+
'Content-Type': 'application/json'
85+
},
86+
body: JSON.stringify(batch)
87+
})
88+
89+
if (!response.ok) {
90+
this.handleFailedEvents(batch)
91+
}
92+
} catch (err) {
93+
this.handleFailedEvents(batch)
94+
}
95+
}
96+
}
8497

85-
if (!response.ok) {
86-
this.handleFailedEvents(eventsToSend)
98+
private createBatches (events: QueuedEvent[]): QueuedEvent[][] {
99+
const batches: QueuedEvent[][] = []
100+
let currentBatch: QueuedEvent[] = []
101+
let currentBatchSize = 0
102+
103+
for (const event of events) {
104+
const eventSize = JSON.stringify(event).length
105+
106+
if (
107+
currentBatch.length >= this.maxBatchSize ||
108+
(currentBatchSize + eventSize > this.maxBatchSizeBytes && currentBatch.length > 0)
109+
) {
110+
batches.push(currentBatch)
111+
currentBatch = []
112+
currentBatchSize = 0
87113
}
88-
} catch (err) {
89-
this.handleFailedEvents(eventsToSend)
114+
115+
currentBatch.push(event)
116+
currentBatchSize += eventSize
90117
}
118+
119+
if (currentBatch.length > 0) {
120+
batches.push(currentBatch)
121+
}
122+
123+
return batches
91124
}
92125

93126
private handleFailedEvents (failedEvents: QueuedEvent[]): void {

services/analytics-collector/pod-analytics-collector/src/config.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ export interface Config {
2121
PostHogHost: string
2222
PostHogAPI: string
2323
SentryDSN?: string
24+
MaxPayloadSize?: string
2425
}
2526

2627
const parseNumber = (str: string | undefined): number | undefined => (str !== undefined ? Number(str) : undefined)
@@ -33,7 +34,8 @@ const config: Config = (() => {
3334
AccountsUrl: process.env.ACCOUNTS_URL,
3435
PostHogHost: process.env.POSTHOG_HOST,
3536
PostHogAPI: process.env.POSTHOG_API_KEY,
36-
SentryDSN: process.env.SENTRY_DSN ?? ''
37+
SentryDSN: process.env.SENTRY_DSN ?? '',
38+
MaxPayloadSize: process.env.MAX_PAYLOAD_SIZE ?? '10mb'
3739
}
3840

3941
const missingEnv = (Object.keys(params) as Array<keyof Config>).filter((key) => params[key] === undefined)

services/analytics-collector/pod-analytics-collector/src/server.ts

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,16 @@ function isContentValid (body: any[]): boolean {
5050
if (it == null) return true
5151
if (!('event' in it)) return true
5252
if (!('properties' in it)) return true
53-
return !('timestamp' in it)
53+
if (!('timestamp' in it)) return true
54+
55+
const eventSize = JSON.stringify(it).length
56+
if (eventSize > 1024 * 1024) {
57+
// maximum 1MB per event
58+
console.warn(`Event too large: ${eventSize} bytes, event: ${it.event}`)
59+
return true
60+
}
61+
62+
return false
5463
})
5564
}
5665

@@ -212,7 +221,7 @@ function preparePostHogEvent (event: AnalyticEvent, req: Request): Record<string
212221
export function createServer (): Express {
213222
const app = express()
214223
app.use(cors())
215-
app.use(express.json())
224+
app.use(express.json({ limit: config.MaxPayloadSize }))
216225

217226
app.post(
218227
'/collect',
@@ -226,6 +235,9 @@ export function createServer (): Express {
226235
}
227236

228237
const events: AnalyticEvent[] = req.body
238+
const payloadSize = JSON.stringify(req.body).length
239+
240+
console.log(`Received batch: ${events.length} events, ${payloadSize} bytes`)
229241

230242
const posthogEvents = events.map((event) => {
231243
return preparePostHogEvent(event, req)
@@ -236,6 +248,9 @@ export function createServer (): Express {
236248
batch: posthogEvents.reverse()
237249
}
238250

251+
const posthogPayloadSize = JSON.stringify(payload).length
252+
console.log(`Sending to PostHog: ${posthogEvents.length} events, ${posthogPayloadSize} bytes`)
253+
239254
try {
240255
const response = await fetch(`${config.PostHogHost}/batch/`, {
241256
method: 'POST',
@@ -249,6 +264,8 @@ export function createServer (): Express {
249264
if (!response.ok) {
250265
const errorText = await response.text()
251266
console.error(`PostHog API error: ${response.status} ${response.statusText}`, errorText)
267+
} else {
268+
console.log(`Successfully sent ${posthogEvents.length} events to PostHog`)
252269
}
253270
} catch (error) {
254271
console.error('Failed to send events to PostHog:', error)
@@ -258,6 +275,7 @@ export function createServer (): Express {
258275
res.json({
259276
received: events.length,
260277
processed: posthogEvents.length,
278+
payloadSize,
261279
timestamp: new Date().toISOString()
262280
})
263281
})

0 commit comments

Comments
 (0)