Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions plugin-server/src/ingestion/cookieless/cookieless-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,114 @@ describe('CookielessManager', () => {
}
})
})
describe('timestamp out of range', () => {
beforeEach(async () => {
await setModeForTeam(CookielessServerHashMode.Stateful)
})

it('should drop only the event with out-of-range timestamp, not other events in batch', async () => {
// Create an event with a timestamp that's too old (more than 72h + timezone buffer in the past)
const oldTimestamp = new Date('2025-01-05T11:00:00') // 5 days before "now" (2025-01-10)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const oldTimestamp = new Date('2025-01-05T11:00:00') // 5 days before "now" (2025-01-10)
const oldTimestamp = new Date('2025-01-05T11:00:00Z') // 5 days before "now" (2025-01-10)

to match the constants above.

Though to keep this test file consistent you might want to move this constant next to aBitLater above, and call it fiveDaysAgo or similar

const eventWithOldTimestamp = deepFreeze({
...event,
now: oldTimestamp.toISOString(),
uuid: new UUID7(oldTimestamp.getTime()).toString(),
})

const response = await hub.cookielessManager.doBatch([
{
event: eventWithOldTimestamp,
team,
message,
headers: { force_disable_person_processing: false },
},
{ event, team, message, headers: { force_disable_person_processing: false } },
{ event: nonCookielessEvent, team, message, headers: { force_disable_person_processing: false } },
])
expect(response.length).toBe(3)

// Event with old timestamp should be dropped
const oldTimestampResult = response[0]
expect(oldTimestampResult.type).toBe(PipelineResultType.DROP)
if (oldTimestampResult.type === PipelineResultType.DROP) {
expect(oldTimestampResult.reason).toBe('cookieless_timestamp_out_of_range')
}

// Valid cookieless event should pass through
const validCookielessResult = response[1]
expect(validCookielessResult.type).toBe(PipelineResultType.OK)

// Non-cookieless event should pass through
const nonCookielessResult = response[2]
expect(nonCookielessResult.type).toBe(PipelineResultType.OK)
if (nonCookielessResult.type === PipelineResultType.OK) {
expect(nonCookielessResult.value.event).toBe(nonCookielessEvent)
}
})

it('should drop events with timestamps too far in the future', async () => {
// Create an event with a timestamp that's too far in the future
const futureTimestamp = new Date('2025-01-12T11:00:00') // 2 days after "now" (2025-01-10)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const futureTimestamp = new Date('2025-01-12T11:00:00') // 2 days after "now" (2025-01-10)
const futureTimestamp = new Date('2025-01-12T11:00:00Z') // 2 days after "now" (2025-01-10)

const eventWithFutureTimestamp = deepFreeze({
...event,
now: futureTimestamp.toISOString(),
uuid: new UUID7(futureTimestamp.getTime()).toString(),
})

const response = await hub.cookielessManager.doBatch([
{
event: eventWithFutureTimestamp,
team,
message,
headers: { force_disable_person_processing: false },
},
{ event, team, message, headers: { force_disable_person_processing: false } },
])
expect(response.length).toBe(2)

// Event with future timestamp should be dropped
const futureTimestampResult = response[0]
expect(futureTimestampResult.type).toBe(PipelineResultType.DROP)
if (futureTimestampResult.type === PipelineResultType.DROP) {
expect(futureTimestampResult.reason).toBe('cookieless_timestamp_out_of_range')
}

// Valid cookieless event should pass through
const validCookielessResult = response[1]
expect(validCookielessResult.type).toBe(PipelineResultType.OK)
})

it('should include ingestion warning for dropped events', async () => {
const oldTimestamp = new Date('2025-01-05T11:00:00')
const eventWithOldTimestamp = deepFreeze({
...event,
now: oldTimestamp.toISOString(),
uuid: new UUID7(oldTimestamp.getTime()).toString(),
})

const response = await hub.cookielessManager.doBatch([
{
event: eventWithOldTimestamp,
team,
message,
headers: { force_disable_person_processing: false },
},
])
expect(response.length).toBe(1)

const result = response[0]
expect(result.type).toBe(PipelineResultType.DROP)
if (result.type === PipelineResultType.DROP) {
expect(result.warnings.length).toBe(1)
expect(result.warnings[0].type).toBe('cookieless_timestamp_out_of_range')
expect(result.warnings[0].details).toMatchObject({
eventUuid: eventWithOldTimestamp.uuid,
event: eventWithOldTimestamp.event,
distinctId: eventWithOldTimestamp.distinct_id,
})
}
})
})
describe('disabled', () => {
beforeEach(async () => {
await setModeForTeam(CookielessServerHashMode.Disabled)
Expand Down
67 changes: 50 additions & 17 deletions plugin-server/src/ingestion/cookieless/cookieless-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ const MAX_NEGATIVE_TIMEZONE_HOURS = 12
const MAX_POSITIVE_TIMEZONE_HOURS = 14
const MAX_SUPPORTED_INGESTION_LAG_HOURS = 72 // if changing this, you will also need to change the TTLs

// Result type for getSaltForDay which can fail if the date is out of range
type SaltResult = { success: true; salt: Buffer } | { success: false; reason: 'date_out_of_range' }

interface CookielessConfig {
disabled: boolean
forceStatelessMode: boolean
Expand Down Expand Up @@ -130,22 +133,22 @@ export class CookielessManager {
this.cleanupInterval.unref()
}

getSaltForDay(yyyymmdd: string, timestampMs: number): Promise<Buffer> {
getSaltForDay(yyyymmdd: string, timestampMs: number): Promise<SaltResult> {
if (!isCalendarDateValid(yyyymmdd)) {
throw new Error('Date is out of range')
return Promise.resolve({ success: false, reason: 'date_out_of_range' })
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch!

}

// see if we have it locally
if (this.localSaltMap[yyyymmdd]) {
return Promise.resolve(this.localSaltMap[yyyymmdd])
return Promise.resolve({ success: true, salt: this.localSaltMap[yyyymmdd] })
}

// get the salt for the day from redis, but only do this once for this node process
return this.mutex.run({
fn: async (): Promise<Buffer> => {
fn: async (): Promise<SaltResult> => {
// check if we got the salt while waiting for the mutex
if (this.localSaltMap[yyyymmdd]) {
return this.localSaltMap[yyyymmdd]
return { success: true, salt: this.localSaltMap[yyyymmdd] }
}

// try to get it from redis instead
Expand All @@ -159,7 +162,7 @@ export class CookielessManager {
cookielessCacheHitCounter.labels({ operation: 'getSaltForDay', day: yyyymmdd }).inc()
const salt = Buffer.from(saltBase64, 'base64')
this.localSaltMap[yyyymmdd] = salt
return salt
return { success: true, salt }
}
cookielessCacheMissCounter.labels({ operation: 'getSaltForDay', day: yyyymmdd }).inc()

Expand All @@ -174,7 +177,7 @@ export class CookielessManager {
)
if (setResult === 'OK') {
this.localSaltMap[yyyymmdd] = newSalt
return newSalt
return { success: true, salt: newSalt }
}

// if we couldn't write, it means that it exists in redis already
Expand All @@ -191,7 +194,7 @@ export class CookielessManager {
const salt = Buffer.from(saltBase64Retry, 'base64')
this.localSaltMap[yyyymmdd] = salt

return salt
return { success: true, salt }
},
priority: timestampMs,
})
Expand Down Expand Up @@ -241,11 +244,17 @@ export class CookielessManager {
n?: number
hashExtra?: string
hashCache?: Record<string, Buffer>
}) {
}): Promise<SaltResult> {
const yyyymmdd = toYYYYMMDDInTimezoneSafe(timestampMs, eventTimeZone, teamTimeZone)
const salt = await this.getSaltForDay(yyyymmdd, timestampMs)
const saltResult = await this.getSaltForDay(yyyymmdd, timestampMs)
if (!saltResult.success) {
return saltResult
}
const rootDomain = extractRootDomain(host)
return CookielessManager.doHash(salt, teamId, ip, rootDomain, userAgent, n, hashExtra, hashCache)
return {
success: true,
salt: CookielessManager.doHash(saltResult.salt, teamId, ip, rootDomain, userAgent, n, hashExtra, hashCache),
}
}

static doHash(
Expand Down Expand Up @@ -424,7 +433,7 @@ export class CookielessManager {
continue
}

const baseHash = await this.doHashForDay({
const baseHashResult = await this.doHashForDay({
timestampMs,
eventTimeZone,
teamTimeZone: team.timezone,
Expand All @@ -436,6 +445,24 @@ export class CookielessManager {
hashCache,
})

if (!baseHashResult.success) {
results[i] = drop(
'cookieless_timestamp_out_of_range',
[],
[
{
type: 'cookieless_timestamp_out_of_range',
details: {
eventUuid: event.uuid,
event: event.event,
distinctId: event.distinct_id,
},
},
]
)
continue
}

eventsWithStatus.push({
event,
team,
Expand All @@ -449,7 +476,7 @@ export class CookielessManager {
ip,
host,
hashExtra,
baseHash,
baseHash: baseHashResult.salt,
},
})
}
Expand Down Expand Up @@ -510,7 +537,7 @@ export class CookielessManager {
// Do a third pass to set the distinct and device ID, and find the `sessionRedisKey`s we need to load from redis
const sessionKeys = new Set<string>()
for (const eventWithProcessing of eventsWithStatus) {
const { event, team, firstPass } = eventWithProcessing
const { event, team, firstPass, originalIndex } = eventWithProcessing
if (!firstPass?.secondPass) {
continue
}
Expand All @@ -534,7 +561,7 @@ export class CookielessManager {
n = identifiesCacheItem.identifyEventIds.size - 1
}

const hashValue = await this.doHashForDay({
const hashValueResult = await this.doHashForDay({
timestampMs,
eventTimeZone,
teamTimeZone: team.timezone,
Expand All @@ -545,8 +572,14 @@ export class CookielessManager {
hashExtra,
n,
})
const distinctId = hashToDistinctId(hashValue)
const sessionRedisKey = getRedisSessionsKey(hashValue, team.id)
// This should not fail since we already validated the timestamp in the first pass,
// but if it does, DLQ the event rather than failing the entire batch
if (!hashValueResult.success) {
results[originalIndex] = dlq('cookieless_unexpected_date_validation_failure')
continue
}
const distinctId = hashToDistinctId(hashValueResult.salt)
const sessionRedisKey = getRedisSessionsKey(hashValueResult.salt, team.id)
sessionKeys.add(sessionRedisKey)
secondPass.thirdPass = {
distinctId,
Expand Down
Loading