Skip to content

Commit 11225cc

Browse files
⚡♻️ simplify replica (#3552)
* ✨ httpRequest can send requests to multiple endpoints * ♻️ use createBatch in telemetry * ♻️ use createBatch in rum * ♻️ use createBatch in logs * 🔥 remove now unused startBatchWithReplica
1 parent 7d53aa5 commit 11225cc

File tree

17 files changed

+137
-302
lines changed

17 files changed

+137
-302
lines changed

packages/core/src/domain/configuration/endpointBuilder.spec.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,13 @@ describe('endpointBuilder', () => {
4646
expect(url).not.toContain('/rum?ddsource')
4747
expect(url).toContain('ddsource=browser')
4848
})
49+
50+
it('accepts extra parameters', () => {
51+
const extraParameters = ['application.id=1234', 'application.version=1.0.0']
52+
const url = createEndpointBuilder(initConfiguration, 'rum', extraParameters).build('fetch', DEFAULT_PAYLOAD)
53+
expect(url).toContain('application.id=1234')
54+
expect(url).toContain('application.version=1.0.0')
55+
})
4956
})
5057

5158
describe('proxy configuration', () => {

packages/core/src/domain/configuration/endpointBuilder.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,16 @@ export type ApiType =
1919

2020
export type EndpointBuilder = ReturnType<typeof createEndpointBuilder>
2121

22-
export function createEndpointBuilder(initConfiguration: InitConfiguration, trackType: TrackType) {
22+
export function createEndpointBuilder(
23+
initConfiguration: InitConfiguration,
24+
trackType: TrackType,
25+
extraParameters?: string[]
26+
) {
2327
const buildUrlWithParameters = createEndpointUrlWithParametersBuilder(initConfiguration, trackType)
2428

2529
return {
2630
build(api: ApiType, payload: Payload) {
27-
const parameters = buildEndpointParameters(initConfiguration, trackType, api, payload)
31+
const parameters = buildEndpointParameters(initConfiguration, trackType, api, payload, extraParameters)
2832
return buildUrlWithParameters(parameters)
2933
},
3034
trackType,
@@ -84,15 +88,16 @@ function buildEndpointParameters(
8488
{ clientToken, internalAnalyticsSubdomain }: InitConfiguration,
8589
trackType: TrackType,
8690
api: ApiType,
87-
{ retry, encoding }: Payload
91+
{ retry, encoding }: Payload,
92+
extraParameters: string[] = []
8893
) {
8994
const parameters = [
9095
'ddsource=browser',
9196
`dd-api-key=${clientToken}`,
9297
`dd-evp-origin-version=${encodeURIComponent(__BUILD_ENV__SDK_VERSION__)}`,
9398
'dd-evp-origin=browser',
9499
`dd-request-id=${generateUUID()}`,
95-
]
100+
].concat(extraParameters)
96101

97102
if (encoding) {
98103
parameters.push(`dd-evp-encoding=${encoding}`)

packages/core/src/domain/configuration/transportConfiguration.spec.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,20 @@ describe('transportConfiguration', () => {
5050
})
5151
})
5252

53+
it('adds the replica application id to the rum replica endpoint', () => {
54+
const replicaApplicationId = 'replica-application-id'
55+
const configuration = computeTransportConfiguration({
56+
clientToken,
57+
replica: {
58+
clientToken: 'replica-client-token',
59+
applicationId: replicaApplicationId,
60+
},
61+
})
62+
expect(configuration.replica!.rumEndpointBuilder.build('fetch', DEFAULT_PAYLOAD)).toContain(
63+
`application.id=${replicaApplicationId}`
64+
)
65+
})
66+
5367
describe('isIntakeUrl', () => {
5468
const v1IntakePath = `/v1/input/${clientToken}`
5569
;[

packages/core/src/domain/configuration/transportConfiguration.ts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ export interface TransportConfiguration {
1616
}
1717

1818
export interface ReplicaConfiguration {
19-
applicationId?: string
2019
logsEndpointBuilder: EndpointBuilder
2120
rumEndpointBuilder: EndpointBuilder
2221
}
@@ -55,12 +54,12 @@ function computeReplicaConfiguration(initConfiguration: InitConfiguration): Repl
5554
clientToken: initConfiguration.replica.clientToken,
5655
}
5756

58-
const replicaEndpointBuilders = {
57+
return {
5958
logsEndpointBuilder: createEndpointBuilder(replicaConfiguration, 'logs'),
60-
rumEndpointBuilder: createEndpointBuilder(replicaConfiguration, 'rum'),
59+
rumEndpointBuilder: createEndpointBuilder(replicaConfiguration, 'rum', [
60+
`application.id=${initConfiguration.replica.applicationId}`,
61+
]),
6162
}
62-
63-
return { applicationId: initConfiguration.replica.applicationId, ...replicaEndpointBuilders }
6463
}
6564

6665
export function isIntakeUrl(url: string): boolean {

packages/core/src/domain/deflate/deflate.types.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,5 @@ export type DeflateEncoder = Encoder<Uint8ArrayBuffer> & { stop: () => void }
5252
export const enum DeflateEncoderStreamId {
5353
REPLAY = 1,
5454
RUM = 2,
55-
RUM_REPLICA = 3,
5655
TELEMETRY = 4,
57-
TELEMETRY_REPLICA = 5,
5856
}

packages/core/src/domain/telemetry/index.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ export {
55
addTelemetryError,
66
resetTelemetry,
77
startTelemetry,
8-
isTelemetryReplicationAllowed,
98
addTelemetryConfiguration,
109
addTelemetryUsage,
1110
addTelemetryMetrics,

packages/core/src/domain/telemetry/telemetry.ts

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,13 @@ import { NonErrorPrefix } from '../error/error.types'
1717
import type { StackTrace } from '../../tools/stackTrace/computeStackTrace'
1818
import { computeStackTrace } from '../../tools/stackTrace/computeStackTrace'
1919
import { getConnectivity } from '../connectivity'
20-
import { canUseEventBridge, getEventBridge, startBatchWithReplica } from '../../transport'
20+
import {
21+
canUseEventBridge,
22+
createFlushController,
23+
createHttpRequest,
24+
getEventBridge,
25+
createBatch,
26+
} from '../../transport'
2127
import type { Encoder } from '../../tools/encoder'
2228
import type { PageMayExitEvent } from '../../browser/pageMayExitObservable'
2329
import { DeflateEncoderStreamId } from '../deflate'
@@ -190,30 +196,30 @@ function startTelemetryTransport(
190196
if (canUseEventBridge()) {
191197
const bridge = getEventBridge<'internal_telemetry', TelemetryEvent>()!
192198
const telemetrySubscription = telemetryObservable.subscribe((event) => bridge.send('internal_telemetry', event))
193-
cleanupTasks.push(() => telemetrySubscription.unsubscribe())
199+
cleanupTasks.push(telemetrySubscription.unsubscribe)
194200
} else {
195-
const telemetryBatch = startBatchWithReplica(
196-
configuration,
197-
{
198-
endpoint: configuration.rumEndpointBuilder,
199-
encoder: createEncoder(DeflateEncoderStreamId.TELEMETRY),
200-
},
201-
configuration.replica && {
202-
endpoint: configuration.replica.rumEndpointBuilder,
203-
encoder: createEncoder(DeflateEncoderStreamId.TELEMETRY_REPLICA),
204-
},
205-
reportError,
206-
pageMayExitObservable,
207-
208-
// We don't use an actual session expire observable here, to make telemetry collection
209-
// independent of the session. This allows to start and send telemetry events ealier.
210-
new Observable()
211-
)
212-
cleanupTasks.push(() => telemetryBatch.stop())
213-
const telemetrySubscription = telemetryObservable.subscribe((event) => {
214-
telemetryBatch.add(event, isTelemetryReplicationAllowed(configuration))
201+
const endpoints = [configuration.rumEndpointBuilder]
202+
if (configuration.replica && isTelemetryReplicationAllowed(configuration)) {
203+
endpoints.push(configuration.replica.rumEndpointBuilder)
204+
}
205+
const telemetryBatch = createBatch({
206+
encoder: createEncoder(DeflateEncoderStreamId.TELEMETRY),
207+
request: createHttpRequest(endpoints, configuration.batchBytesLimit, reportError),
208+
flushController: createFlushController({
209+
messagesLimit: configuration.batchMessagesLimit,
210+
bytesLimit: configuration.batchBytesLimit,
211+
durationLimit: configuration.flushTimeout,
212+
pageMayExitObservable,
213+
214+
// We don't use an actual session expire observable here, to make telemetry collection
215+
// independent of the session. This allows to start and send telemetry events earlier.
216+
sessionExpireObservable: new Observable(),
217+
}),
218+
messageBytesLimit: configuration.messageBytesLimit,
215219
})
216-
cleanupTasks.push(() => telemetrySubscription.unsubscribe())
220+
cleanupTasks.push(telemetryBatch.stop)
221+
const telemetrySubscription = telemetryObservable.subscribe(telemetryBatch.add)
222+
cleanupTasks.push(telemetrySubscription.unsubscribe)
217223
}
218224

219225
return {
@@ -236,7 +242,7 @@ export function resetTelemetry() {
236242
* Avoid mixing telemetry events from different data centers
237243
* but keep replicating staging events for reliability
238244
*/
239-
export function isTelemetryReplicationAllowed(configuration: Configuration) {
245+
function isTelemetryReplicationAllowed(configuration: Configuration) {
240246
return configuration.site === INTAKE_SITE_STAGING
241247
}
242248

packages/core/src/index.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ export {
4747
addTelemetryError,
4848
resetTelemetry,
4949
TelemetryService,
50-
isTelemetryReplicationAllowed,
5150
addTelemetryConfiguration,
5251
addTelemetryUsage,
5352
addTelemetryMetrics,
@@ -69,7 +68,7 @@ export {
6968
getEventBridge,
7069
bridgeSupports,
7170
BridgeCapability,
72-
startBatchWithReplica,
71+
createBatch,
7372
createFlushController,
7473
} from './transport'
7574
export * from './tools/display'

packages/core/src/transport/httpRequest.spec.ts

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ describe('httpRequest', () => {
2626
interceptor = interceptRequests()
2727
requests = interceptor.requests
2828
endpointBuilder = mockEndpointBuilder(ENDPOINT_URL)
29-
request = createHttpRequest(endpointBuilder, BATCH_BYTES_LIMIT, noop)
29+
request = createHttpRequest([endpointBuilder], BATCH_BYTES_LIMIT, noop)
3030
})
3131

3232
describe('send', () => {
@@ -90,6 +90,24 @@ describe('httpRequest', () => {
9090
await interceptor.waitForAllFetchCalls()
9191
await collectAsyncCalls(fetchSpy, 2)
9292
})
93+
94+
it('sends the payload to multiple endpoints', async () => {
95+
const endpointBuilder2 = mockEndpointBuilder('http://my.website2')
96+
97+
request = createHttpRequest([endpointBuilder, endpointBuilder2], BATCH_BYTES_LIMIT, noop)
98+
99+
interceptor.withFetch(DEFAULT_FETCH_MOCK, DEFAULT_FETCH_MOCK)
100+
101+
const payloadData = '{"foo":"bar1"}\n{"foo":"bar2"}'
102+
request.send({ data: payloadData, bytesCount: 10 })
103+
104+
await interceptor.waitForAllFetchCalls()
105+
expect(requests.length).toEqual(2)
106+
expect(requests[0].url).toContain('http://my.website')
107+
expect(requests[0].body).toEqual(payloadData)
108+
expect(requests[1].url).toContain('http://my.website2')
109+
expect(requests[1].body).toEqual(payloadData)
110+
})
93111
})
94112

95113
describe('fetchKeepAliveStrategy onResponse', () => {
@@ -313,7 +331,7 @@ describe('httpRequest intake parameters', () => {
313331
interceptor = interceptRequests()
314332
requests = interceptor.requests
315333
endpointBuilder = createEndpointBuilder({ clientToken }, 'logs')
316-
request = createHttpRequest(endpointBuilder, BATCH_BYTES_LIMIT, noop)
334+
request = createHttpRequest([endpointBuilder], BATCH_BYTES_LIMIT, noop)
317335
})
318336

319337
it('should have a unique request id', async () => {

packages/core/src/transport/httpRequest.ts

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -63,33 +63,35 @@ export interface RetryInfo {
6363
}
6464

6565
export function createHttpRequest<Body extends Payload = Payload>(
66-
endpointBuilder: EndpointBuilder,
66+
endpointBuilders: EndpointBuilder[],
6767
bytesLimit: number,
6868
reportError: (error: RawError) => void
6969
): HttpRequest<Body> {
7070
const observable = new Observable<HttpRequestEvent<Body>>()
7171
const retryState = newRetryState<Body>()
72-
const sendStrategyForRetry = (payload: Body, onResponse: (r: HttpResponse) => void) =>
73-
fetchKeepAliveStrategy(endpointBuilder, bytesLimit, payload, onResponse)
7472

7573
return {
7674
observable,
7775
send: (payload: Body) => {
78-
sendWithRetryStrategy(
79-
payload,
80-
retryState,
81-
sendStrategyForRetry,
82-
endpointBuilder.trackType,
83-
reportError,
84-
observable
85-
)
76+
for (const endpointBuilder of endpointBuilders) {
77+
sendWithRetryStrategy(
78+
payload,
79+
retryState,
80+
(payload, onResponse) => fetchKeepAliveStrategy(endpointBuilder, bytesLimit, payload, onResponse),
81+
endpointBuilder.trackType,
82+
reportError,
83+
observable
84+
)
85+
}
8686
},
8787
/**
8888
* Since fetch keepalive behaves like regular fetch on Firefox,
8989
* keep using sendBeaconStrategy on exit
9090
*/
9191
sendOnExit: (payload: Body) => {
92-
sendBeaconStrategy(endpointBuilder, bytesLimit, payload)
92+
for (const endpointBuilder of endpointBuilders) {
93+
sendBeaconStrategy(endpointBuilder, bytesLimit, payload)
94+
}
9395
},
9496
}
9597
}

0 commit comments

Comments
 (0)