Skip to content

Commit 5647624

Browse files
authored
AJS Retry improvements for 500 and 429, normal and batch (#1084)
1 parent 87811dc commit 5647624

File tree

9 files changed

+244
-16
lines changed

9 files changed

+244
-16
lines changed

.changeset/flat-dryers-wink.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'@segment/analytics-next': minor
3+
'@segment/analytics-core': minor
4+
---
5+
6+
Adding support for 429 response from the server

packages/browser/src/plugins/segmentio/__tests__/batched-dispatcher.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ jest.mock('unfetch', () => {
44
return fetch
55
})
66

7+
import { createSuccess } from '../../../test-helpers/factories'
78
import batch from '../batched-dispatcher'
89

910
const fatEvent = {
@@ -52,6 +53,7 @@ describe('Batching', () => {
5253
jest.useFakeTimers({
5354
now: new Date('9 Jun 1993 00:00:00Z').getTime(),
5455
})
56+
fetch.mockReturnValue(createSuccess({}))
5557
})
5658

5759
afterEach(() => {

packages/browser/src/plugins/segmentio/__tests__/retries.test.ts

Lines changed: 133 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
const fetch = jest.fn()
2+
jest.mock('unfetch', () => {
3+
return fetch
4+
})
5+
16
import { segmentio, SegmentioSettings } from '..'
27
import { Analytics } from '../../../core/analytics'
38
// @ts-ignore isOffline mocked dependency is accused as unused
@@ -8,11 +13,135 @@ import { scheduleFlush } from '../schedule-flush'
813
import * as PPQ from '../../../lib/priority-queue/persisted'
914
import * as PQ from '../../../lib/priority-queue'
1015
import { Context } from '../../../core/context'
16+
import { createError, createSuccess } from '../../../test-helpers/factories'
1117

12-
jest.mock('../schedule-flush')
18+
//jest.mock('../schedule-flush')
1319

1420
type QueueType = 'priority' | 'persisted'
1521

22+
describe('Segment.io retries 500s and 429', () => {
23+
let options: SegmentioSettings
24+
let analytics: Analytics
25+
let segment: Plugin
26+
beforeEach(async () => {
27+
jest.useRealTimers()
28+
jest.resetAllMocks()
29+
jest.restoreAllMocks()
30+
31+
options = { apiKey: 'foo' }
32+
analytics = new Analytics(
33+
{ writeKey: options.apiKey },
34+
{
35+
retryQueue: true,
36+
}
37+
)
38+
segment = await segmentio(analytics, options, {})
39+
await analytics.register(segment, envEnrichment)
40+
})
41+
42+
test('retries on 500', async () => {
43+
jest.useFakeTimers({ advanceTimers: true })
44+
fetch.mockReturnValue(createError({ status: 500 }))
45+
// .mockReturnValue(createSuccess({}))
46+
const ctx = await analytics.track('event')
47+
jest.runAllTimers()
48+
49+
expect(ctx.attempts).toBeGreaterThanOrEqual(3) // Gets incremented after use
50+
expect(fetch.mock.calls.length).toBeGreaterThanOrEqual(2)
51+
expect(fetch.mock.lastCall[1].body).toContain('"retryCount":')
52+
})
53+
54+
test('delays retry on 429', async () => {
55+
const headers = new Headers()
56+
const resetTime = 1234
57+
headers.set('x-ratelimit-reset', resetTime.toString())
58+
fetch
59+
.mockReturnValueOnce(
60+
createError({
61+
status: 429,
62+
statusText: 'Too Many Requests',
63+
headers: headers,
64+
})
65+
)
66+
.mockReturnValue(createSuccess({}))
67+
const spy = jest.spyOn(PQ.PriorityQueue.prototype, 'pushWithBackoff')
68+
await analytics.track('event')
69+
expect(spy).toHaveBeenLastCalledWith(expect.anything(), resetTime * 1000)
70+
})
71+
})
72+
73+
describe('Batches retry 500s and 429', () => {
74+
let options: SegmentioSettings
75+
let analytics: Analytics
76+
let segment: Plugin
77+
beforeEach(async () => {
78+
jest.useRealTimers()
79+
jest.resetAllMocks()
80+
jest.restoreAllMocks()
81+
82+
options = {
83+
apiKey: 'foo',
84+
deliveryStrategy: {
85+
strategy: 'batching',
86+
// timeout is set very low to get consistent behavior out of scheduleflush
87+
config: { size: 3, timeout: 1, maxRetries: 2 },
88+
},
89+
}
90+
analytics = new Analytics(
91+
{ writeKey: options.apiKey },
92+
{
93+
retryQueue: true,
94+
}
95+
)
96+
segment = await segmentio(analytics, options, {})
97+
await analytics.register(segment, envEnrichment)
98+
})
99+
100+
test('retries on 500', async () => {
101+
fetch
102+
.mockReturnValueOnce(createError({ status: 500 }))
103+
.mockReturnValue(createSuccess({}))
104+
105+
await analytics.track('event1')
106+
const ctx = await analytics.track('event2')
107+
// wait a bit for retries - timeout is only 1 ms
108+
await new Promise((resolve) => setTimeout(resolve, 100))
109+
110+
expect(ctx.attempts).toBe(2)
111+
expect(analytics.queue.queue.getAttempts(ctx)).toBe(1)
112+
expect(fetch).toHaveBeenCalledTimes(2)
113+
})
114+
115+
test('delays retry on 429', async () => {
116+
const headers = new Headers()
117+
const resetTime = 1
118+
headers.set('x-ratelimit-reset', resetTime.toString())
119+
fetch.mockReturnValue(
120+
createError({
121+
status: 429,
122+
statusText: 'Too Many Requests',
123+
headers: headers,
124+
})
125+
)
126+
127+
await analytics.track('event1')
128+
const ctx = await analytics.track('event2')
129+
130+
await new Promise((resolve) => setTimeout(resolve, 100))
131+
132+
expect(ctx.attempts).toBe(2)
133+
expect(fetch).toHaveBeenCalledTimes(1)
134+
await new Promise((resolve) => setTimeout(resolve, 1000))
135+
expect(fetch).toHaveBeenCalledTimes(2)
136+
await new Promise((resolve) => setTimeout(resolve, 1000))
137+
expect(fetch).toHaveBeenCalledTimes(3)
138+
await new Promise((resolve) => setTimeout(resolve, 1000))
139+
expect(fetch).toHaveBeenCalledTimes(3) // capped at 2 retries (+ intial attempt)
140+
// Check the metadata about retry count
141+
expect(fetch.mock.lastCall[1].body).toContain('"retryCount":2')
142+
})
143+
})
144+
16145
describe('Segment.io retries', () => {
17146
let options: SegmentioSettings
18147
let analytics: Analytics
@@ -23,11 +152,14 @@ describe('Segment.io retries', () => {
23152
;[false, true].forEach((persistenceIsDisabled) => {
24153
describe(`disableClientPersistence: ${persistenceIsDisabled}`, () => {
25154
beforeEach(async () => {
155+
jest.useRealTimers()
26156
jest.resetAllMocks()
27157
jest.restoreAllMocks()
28158

29159
// @ts-expect-error reassign import
30160
isOffline = jest.fn().mockImplementation(() => true)
161+
// @ts-expect-error reassign import
162+
scheduleFlush = jest.fn().mockImplementation(() => {})
31163

32164
options = { apiKey: 'foo' }
33165
analytics = new Analytics(
@@ -58,7 +190,6 @@ describe('Segment.io retries', () => {
58190
}
59191

60192
segment = await segmentio(analytics, options, {})
61-
62193
await analytics.register(segment, envEnrichment)
63194
})
64195

packages/browser/src/plugins/segmentio/batched-dispatcher.ts

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
import { SegmentEvent } from '../../core/events'
22
import { fetch } from '../../lib/fetch'
33
import { onPageChange } from '../../lib/on-page-change'
4+
import { SegmentFacade } from '../../lib/to-facade'
5+
import { RateLimitError } from './ratelimit-error'
6+
import { Context } from '../../core/context'
47

58
export type BatchingDispatchConfig = {
69
size?: number
710
timeout?: number
11+
maxRetries?: number
812
keepalive?: boolean
913
}
1014

@@ -63,6 +67,7 @@ export default function batch(
6367

6468
const limit = config?.size ?? 10
6569
const timeout = config?.timeout ?? 5000
70+
let rateLimitTimeout = 0
6671

6772
function sendBatch(batch: object[]) {
6873
if (batch.length === 0) {
@@ -88,28 +93,66 @@ export default function batch(
8893
batch: updatedBatch,
8994
sentAt: new Date().toISOString(),
9095
}),
96+
}).then((res) => {
97+
if (res.status >= 500) {
98+
throw new Error(`Bad response from server: ${res.status}`)
99+
}
100+
if (res.status === 429) {
101+
const retryTimeoutStringSecs = res.headers?.get('x-ratelimit-reset')
102+
const retryTimeoutMS =
103+
typeof retryTimeoutStringSecs == 'string'
104+
? parseInt(retryTimeoutStringSecs) * 1000
105+
: timeout
106+
throw new RateLimitError(
107+
`Rate limit exceeded: ${res.status}`,
108+
retryTimeoutMS
109+
)
110+
}
91111
})
92112
}
93113

94-
async function flush(): Promise<unknown> {
114+
async function flush(attempt = 1): Promise<unknown> {
95115
if (buffer.length) {
96116
const batch = buffer
97117
buffer = []
98-
return sendBatch(batch)
118+
return sendBatch(batch)?.catch((error) => {
119+
const ctx = Context.system()
120+
ctx.log('error', 'Error sending batch', error)
121+
if (attempt <= (config?.maxRetries ?? 10)) {
122+
if (error.name === 'RateLimitError') {
123+
rateLimitTimeout = error.retryTimeout
124+
}
125+
buffer.push(...batch)
126+
buffer.map((event) => {
127+
if ('_metadata' in event) {
128+
const segmentEvent = event as ReturnType<SegmentFacade['json']>
129+
segmentEvent._metadata = {
130+
...segmentEvent._metadata,
131+
retryCount: attempt,
132+
}
133+
}
134+
})
135+
scheduleFlush(attempt + 1)
136+
}
137+
})
99138
}
100139
}
101140

102141
let schedule: NodeJS.Timeout | undefined
103142

104-
function scheduleFlush(): void {
143+
function scheduleFlush(attempt = 1): void {
105144
if (schedule) {
106145
return
107146
}
108147

109-
schedule = setTimeout(() => {
110-
schedule = undefined
111-
flush().catch(console.error)
112-
}, timeout)
148+
schedule = setTimeout(
149+
() => {
150+
schedule = undefined
151+
flush(attempt).catch(console.error)
152+
},
153+
rateLimitTimeout ? rateLimitTimeout : timeout
154+
)
155+
rateLimitTimeout = 0
113156
}
114157

115158
onPageChange((unloaded) => {

packages/browser/src/plugins/segmentio/fetch-dispatcher.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { fetch } from '../../lib/fetch'
2+
import { RateLimitError } from './ratelimit-error'
23

34
export type Dispatcher = (url: string, body: object) => Promise<unknown>
45

@@ -15,6 +16,20 @@ export default function (config?: StandardDispatcherConfig): {
1516
headers: { 'Content-Type': 'text/plain' },
1617
method: 'post',
1718
body: JSON.stringify(body),
19+
}).then((res) => {
20+
if (res.status >= 500) {
21+
throw new Error(`Bad response from server: ${res.status}`)
22+
}
23+
if (res.status === 429) {
24+
const retryTimeoutStringSecs = res.headers?.get('x-ratelimit-reset')
25+
const retryTimeoutMS = retryTimeoutStringSecs
26+
? parseInt(retryTimeoutStringSecs) * 1000
27+
: 5000
28+
throw new RateLimitError(
29+
`Rate limit exceeded: ${res.status}`,
30+
retryTimeoutMS
31+
)
32+
}
1833
})
1934
}
2035

packages/browser/src/plugins/segmentio/index.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,11 +109,17 @@ export function segmentio(
109109
return client
110110
.dispatch(
111111
`${remote}/${path}`,
112-
normalize(analytics, json, settings, integrations)
112+
normalize(analytics, json, settings, integrations, ctx)
113113
)
114114
.then(() => ctx)
115-
.catch(() => {
116-
buffer.pushWithBackoff(ctx)
115+
.catch((error) => {
116+
ctx.log('error', 'Error sending event', error)
117+
if (error.name === 'RateLimitError') {
118+
const timeout = error.retryTimeout
119+
buffer.pushWithBackoff(ctx, timeout)
120+
} else {
121+
buffer.pushWithBackoff(ctx)
122+
}
117123
// eslint-disable-next-line @typescript-eslint/no-use-before-define
118124
scheduleFlush(flushing, buffer, segmentio, scheduleFlush)
119125
return ctx

packages/browser/src/plugins/segmentio/normalize.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@ import { Analytics } from '../../core/analytics'
22
import { CDNSettings } from '../../browser'
33
import { SegmentFacade } from '../../lib/to-facade'
44
import { SegmentioSettings } from './index'
5+
import { Context } from '../../core/context'
56

67
export function normalize(
78
analytics: Analytics,
89
json: ReturnType<SegmentFacade['json']>,
910
settings?: SegmentioSettings,
10-
integrations?: CDNSettings['integrations']
11+
integrations?: CDNSettings['integrations'],
12+
ctx?: Context
1113
): object {
1214
const user = analytics.user()
1315

@@ -25,6 +27,16 @@ export function normalize(
2527
json._metadata = { failedInitializations: failed }
2628
}
2729

30+
if (ctx != null) {
31+
if (ctx.attempts > 1) {
32+
json._metadata = {
33+
...json._metadata,
34+
retryCount: ctx.attempts,
35+
}
36+
}
37+
ctx.attempts++
38+
}
39+
2840
const bundled: string[] = []
2941
const unbundled: string[] = []
3042

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
export class RateLimitError extends Error {
2+
retryTimeout: number
3+
4+
constructor(message: string, retryTimeout: number) {
5+
super(message)
6+
this.retryTimeout = retryTimeout
7+
this.name = 'RateLimitError'
8+
}
9+
}

0 commit comments

Comments
 (0)