Skip to content

Commit d9ed993

Browse files
🐛 [RUM-11439] Fix deflate encoder sending data twice in some cases (#3806)
When calling `finishSync()`, pending data was not cleared, which lead to those problematic scenarios: * Pending data can be send multiple times: ``` encoder.write('data1') encoder.finishSync().pendingData === 'data1' encoder.finishSync().pendingData === 'data1' // duplicated data ``` * Pending data can be corrupted: ``` encoder.write('{"type":"view"}') encoder.finishSync().pendingData === '{"type":"view"}' encoder.write('{"type":"action"}') encoder.finishSync().pendingData === '{"type":"view"}{"type":"action"}' // corrupted data, missing newline between events ``` * Compressed data can be corrupted: ``` encoder.write('data1') encoder.finishSync() encoder.write('data2') encoder.finish(({ result }) => { // result contains compressed bytes corresponding to 'data1', which // might lack the zlib header. }) ``` By clearing any pending data/write action when calling `finishSync()`, we ensure that data is taken into account only once.
1 parent ee10a9b commit d9ed993

File tree

2 files changed

+65
-26
lines changed

2 files changed

+65
-26
lines changed

packages/rum/src/domain/deflate/deflateEncoder.spec.ts

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,4 +213,44 @@ describe('createDeflateEncoder', () => {
213213
},
214214
])
215215
})
216+
217+
it('do not notify data twice when calling finishSync() then finish()', () => {
218+
const encoder = createDeflateEncoder(configuration, worker, DeflateEncoderStreamId.REPLAY)
219+
const finishCallbackSpy = jasmine.createSpy<(result: EncoderResult<Uint8ArrayBuffer>) => void>()
220+
221+
encoder.write('foo')
222+
encoder.finishSync()
223+
224+
encoder.write('bar')
225+
encoder.finish(finishCallbackSpy)
226+
227+
worker.processAllMessages()
228+
229+
expect(finishCallbackSpy).toHaveBeenCalledOnceWith({
230+
rawBytesCount: 3,
231+
output: new Uint8Array([...ENCODED_BAR, ...TRAILER]),
232+
outputBytesCount: 4,
233+
encoding: 'deflate',
234+
})
235+
})
236+
237+
it('do not notify data twice when calling finishSync() then finishSync()', () => {
238+
const encoder = createDeflateEncoder(configuration, worker, DeflateEncoderStreamId.REPLAY)
239+
240+
encoder.write('foo')
241+
encoder.finishSync()
242+
243+
encoder.write('bar')
244+
expect(encoder.finishSync().pendingData).toBe('bar')
245+
})
246+
247+
it('does not unsubscribe when there is no pending action', () => {
248+
const encoder = createDeflateEncoder(configuration, worker, DeflateEncoderStreamId.REPLAY)
249+
250+
encoder.write('foo')
251+
encoder.finishSync()
252+
worker.processAllMessages()
253+
254+
expect(worker.messageListenersCount).toBe(1)
255+
})
216256
})

packages/rum/src/domain/deflate/deflateEncoder.ts

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ export function createDeflateEncoder(
1818
let compressedData: Uint8ArrayBuffer[] = []
1919
let compressedDataTrailer: Uint8ArrayBuffer
2020

21+
let isEmpty = true
2122
let nextWriteActionId = 0
2223
const pendingWriteActions: Array<{
2324
writeCallback?: (additionalEncodedBytesCount: number) => void
@@ -35,20 +36,24 @@ export function createDeflateEncoder(
3536
return
3637
}
3738

38-
rawBytesCount += workerResponse.additionalBytesCount
39-
compressedData.push(workerResponse.result)
40-
compressedDataTrailer = workerResponse.trailer
41-
42-
const nextPendingAction = pendingWriteActions.shift()
43-
if (nextPendingAction && nextPendingAction.id === workerResponse.id) {
44-
if (nextPendingAction.writeCallback) {
45-
nextPendingAction.writeCallback(workerResponse.result.byteLength)
46-
} else if (nextPendingAction.finishCallback) {
47-
nextPendingAction.finishCallback()
39+
const nextPendingAction = pendingWriteActions[0]
40+
if (nextPendingAction) {
41+
if (nextPendingAction.id === workerResponse.id) {
42+
pendingWriteActions.shift()
43+
44+
rawBytesCount += workerResponse.additionalBytesCount
45+
compressedData.push(workerResponse.result)
46+
compressedDataTrailer = workerResponse.trailer
47+
48+
if (nextPendingAction.writeCallback) {
49+
nextPendingAction.writeCallback(workerResponse.result.byteLength)
50+
} else if (nextPendingAction.finishCallback) {
51+
nextPendingAction.finishCallback()
52+
}
53+
} else if (nextPendingAction.id < workerResponse.id) {
54+
removeMessageListener()
55+
addTelemetryDebug('Worker responses received out of order.')
4856
}
49-
} else {
50-
removeMessageListener()
51-
addTelemetryDebug('Worker responses received out of order.')
5257
}
5358
}
5459
)
@@ -68,20 +73,20 @@ export function createDeflateEncoder(
6873
}
6974

7075
function sendResetIfNeeded() {
71-
if (nextWriteActionId > 0) {
76+
if (!isEmpty) {
7277
worker.postMessage({
7378
action: 'reset',
7479
streamId,
7580
})
76-
nextWriteActionId = 0
81+
isEmpty = true
7782
}
7883
}
7984

8085
return {
8186
isAsync: true,
8287

8388
get isEmpty() {
84-
return nextWriteActionId === 0
89+
return isEmpty
8590
},
8691

8792
write(data, callback) {
@@ -96,6 +101,7 @@ export function createDeflateEncoder(
96101
writeCallback: callback,
97102
data,
98103
})
104+
isEmpty = false
99105
nextWriteActionId += 1
100106
},
101107

@@ -117,16 +123,9 @@ export function createDeflateEncoder(
117123

118124
finishSync() {
119125
sendResetIfNeeded()
120-
121-
const pendingData = pendingWriteActions
122-
.map((pendingWriteAction) => {
123-
// Make sure we do not call any write or finish callback
124-
delete pendingWriteAction.writeCallback
125-
delete pendingWriteAction.finishCallback
126-
return pendingWriteAction.data
127-
})
128-
.join('')
129-
126+
const pendingData = pendingWriteActions.map((pendingWriteAction) => pendingWriteAction.data).join('')
127+
// Ignore all pending write actions responses from the worker
128+
pendingWriteActions.length = 0
130129
return { ...consumeResult(), pendingData }
131130
},
132131

0 commit comments

Comments
 (0)