Skip to content

Commit f8a6ad4

Browse files
committed
Mitigate ooms
1 parent 727c780 commit f8a6ad4

File tree

1 file changed

+104
-98
lines changed

1 file changed

+104
-98
lines changed

src/index.ts

Lines changed: 104 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@ import http from 'node:http'
44
import https from 'node:https'
55
import path from 'node:path'
66
import readline from 'node:readline'
7+
import { Readable } from 'node:stream'
78

89
import abortSignal from '@socketsecurity/registry/lib/constants/abort-signal'
10+
import { debugLog, isDebug } from '@socketsecurity/registry/lib/debug'
911
import { hasOwn, isObjectObject } from '@socketsecurity/registry/lib/objects'
1012
import { pRetry } from '@socketsecurity/registry/lib/promises'
1113

@@ -162,7 +164,7 @@ function createRequestBodyForFilepaths(
162164
requestBody.push(
163165
`Content-Disposition: form-data; name="${relPath}"; filename="${filename}"\r\n`,
164166
`Content-Type: application/octet-stream\r\n\r\n`,
165-
createReadStream(absPath)
167+
createReadStream(absPath, { highWaterMark: 1024 * 1024 })
166168
)
167169
}
168170
return requestBody
@@ -171,50 +173,53 @@ function createRequestBodyForFilepaths(
171173
function createRequestBodyForJson(
172174
jsonData: any,
173175
basename = 'data.json'
174-
): Array<string | ReadStream> {
176+
): Array<string | Readable> {
175177
const ext = path.extname(basename)
176178
const name = path.basename(basename, ext)
177179
return [
178-
`Content-Disposition: form-data; name="${name}"; filename="${basename}"\r\n`,
179-
'Content-Type: application/json\r\n\r\n',
180-
JSON.stringify(jsonData),
181-
// New line after file content.
180+
`Content-Disposition: form-data; name="${name}"; filename="${basename}"\r\n` +
181+
`Content-Type: application/json\r\n\r\n`,
182+
Readable.from(JSON.stringify(jsonData), { highWaterMark: 1024 * 1024 }),
182183
'\r\n'
183184
]
184185
}
185186

186187
async function createUploadRequest(
187188
baseUrl: string,
188189
urlPath: string,
189-
requestBodyNoBoundaries: Array<
190-
string | ReadStream | Array<string | ReadStream>
191-
>,
190+
requestBodyNoBoundaries: Array<string | Readable | Array<string | Readable>>,
192191
options: RequestOptions
193192
): Promise<IncomingMessage> {
194-
// Note: this will create a regular http request and stream in the file content
195-
// implicitly. The outgoing buffer is (implicitly) flushed periodically
196-
// by node. When this happens first it will send the headers to the server
197-
// which may decide to reject the request, immediately send a response and
198-
// then cut the connection (EPIPE or ECONNRESET errors may follow while
199-
// writing the files).
200-
// We have to make sure to guard for sudden reject responses because if we
201-
// don't then the file streaming will fail with random errors and it gets
202-
// hard to debug what's going on why.
203-
// Example : `socket scan create --org badorg` should fail gracefully.
193+
// This function constructs and sends a multipart/form-data HTTP POST request
194+
// where each part is streamed to the server. It supports string payloads
195+
// and readable streams (e.g., large file uploads).
196+
197+
// The body is streamed manually with proper backpressure support to avoid
198+
// overwhelming Node.js memory (i.e., avoiding out-of-memory crashes for large inputs).
199+
200+
// We call `flushHeaders()` early to ensure headers are sent before body transmission
201+
// begins. If the server rejects the request (e.g., bad org or auth), it will likely
202+
// respond immediately. We listen for that response while still streaming the body.
203+
//
204+
// This protects against cases where the server closes the connection (EPIPE/ECONNRESET)
205+
// mid-stream, which would otherwise cause hard-to-diagnose failures during file upload.
206+
//
207+
// Example failure this mitigates: `socket scan create --org badorg`
204208

205209
// eslint-disable-next-line no-async-promise-executor
206210
return await new Promise(async (pass, fail) => {
207-
// Generate a unique boundary for multipart encoding.
208211
const boundary = `NodeMultipartBoundary${Date.now()}`
209212
const boundarySep = `--${boundary}\r\n`
210213
const finalBoundary = `--${boundary}--\r\n`
214+
211215
const requestBody = [
212216
...requestBodyNoBoundaries.flatMap(part => [
213217
boundarySep,
214218
...(Array.isArray(part) ? part : [part])
215219
]),
216220
finalBoundary
217221
]
222+
218223
const url = new URL(urlPath, baseUrl)
219224
const req: ClientRequest = getHttpModule(baseUrl).request(url, {
220225
method: 'POST',
@@ -225,81 +230,62 @@ async function createUploadRequest(
225230
}
226231
})
227232

228-
// Send the headers now. If the server would reject this request, it should
229-
// do so asap. This prevents us from sending more data to it then necessary.
230-
// If it will reject we could just await the `req.on(response, ...` now but
231-
// if it accepts the request then the response will not come until after the
232-
// final file. So we can't await the response at this time. Just proceed,
233-
// carefully.
233+
// Send headers early to prompt server validation (auth, URL, quota, etc.).
234234
req.flushHeaders()
235235

236-
// Wait for the response. It may arrive at any point during the request or
237-
// afterwards. Node will flush the output buffer at some point, initiating
238-
// the request, and the server can decide to reject the request immediately
239-
// or at any point later (ike a timeout). We should handle those cases.
240-
getResponse(req).then(
241-
// Note: this returns the response to the caller to createUploadRequest.
242-
pass,
243-
async err => {
244-
// Note: this will throw an error for the caller to createUploadRequest
245-
if (err.response && !isResponseOk(err.response)) {
246-
fail(new ResponseError(err.response, `${err.method} request failed`))
247-
}
248-
fail(err)
236+
// Concurrently wait for response while we stream body.
237+
getResponse(req).then(pass, async err => {
238+
if (err.response && !isResponseOk(err.response)) {
239+
fail(new ResponseError(err.response, `${err.method} request failed`))
249240
}
250-
)
241+
fail(err)
242+
})
251243

252244
let aborted = false
253-
req.on('error', _err => {
254-
aborted = true
255-
})
256-
req.on('close', () => {
257-
aborted = true
258-
})
245+
req.on('error', () => (aborted = true))
246+
req.on('close', () => (aborted = true))
247+
259248
try {
260-
// Send the request body (headers + files).
261249
for (const part of requestBody) {
262250
if (aborted) {
263251
break
264252
}
265253
if (typeof part === 'string') {
266-
req.write(part)
254+
if (!req.write(part)) {
255+
// Wait for 'drain' if backpressure is signaled.
256+
// eslint-disable-next-line no-await-in-loop
257+
await events.once(req, 'drain')
258+
}
267259
} else if (typeof part?.pipe === 'function') {
268-
part.pipe(req, { end: false })
269-
// Wait for file streaming to complete.
260+
// Stream data chunk-by-chunk with backpressure support
261+
const stream = part as Readable
270262
// eslint-disable-next-line no-await-in-loop
271-
await new Promise<void>((resolve, reject) => {
272-
const cleanup = () => {
273-
part.off('end', onEnd)
274-
part.off('error', onError)
275-
}
276-
const onEnd = () => {
277-
cleanup()
278-
resolve()
263+
for await (const chunk of stream) {
264+
if (aborted) {
265+
break
279266
}
280-
const onError = (e: Error) => {
281-
cleanup()
282-
reject(e)
267+
if (!req.write(chunk)) {
268+
await events.once(req, 'drain')
283269
}
284-
part.on('end', onEnd)
285-
part.on('error', onError)
286-
})
287-
if (!aborted) {
288-
// Ensure a new line after file content.
289-
req.write('\r\n')
270+
}
271+
// Ensure trailing CRLF after file part.
272+
if (!aborted && !req.write('\r\n')) {
273+
// eslint-disable-next-line no-await-in-loop
274+
await events.once(req, 'drain')
275+
}
276+
// Cleanup stream to free memory buffers/
277+
if (typeof part.destroy === 'function') {
278+
part.destroy()
290279
}
291280
} else {
292-
throw new TypeError(
293-
'Socket API - Invalid multipart part, expected string or stream'
294-
)
281+
throw new TypeError('Expected string or stream')
295282
}
296283
}
297284
} catch (e) {
298285
req.destroy(e as Error)
299286
fail(e)
300287
} finally {
301288
if (!aborted) {
302-
// Close request after writing all data.
303289
req.end()
304290
}
305291
}
@@ -310,28 +296,36 @@ async function getErrorResponseBody(
310296
response: IncomingMessage
311297
): Promise<string> {
312298
const chunks: Buffer[] = []
313-
response.on('data', (chunk: Buffer) => chunks.push(chunk))
314-
try {
315-
await new Promise<void>((resolve, reject) => {
316-
const cleanup = () => {
317-
response.off('end', onEnd)
318-
response.off('error', onError)
319-
}
320-
const onEnd = () => {
321-
cleanup()
322-
resolve()
323-
}
324-
const onError = (e: Error) => {
299+
let size = 0
300+
const MAX = 5 * 1024 * 1024
301+
return await new Promise<string>((resolve, reject) => {
302+
const cleanup = () => {
303+
response.off('end', onEnd)
304+
response.off('error', onError)
305+
response.off('data', onData)
306+
}
307+
const onData = (chunk: Buffer) => {
308+
size += chunk.length
309+
if (size > MAX) {
310+
response.destroy()
325311
cleanup()
326-
reject(e)
312+
reject(new Error('Response body too large'))
313+
} else {
314+
chunks.push(chunk)
327315
}
328-
response.on('end', onEnd)
329-
response.on('error', onError)
330-
})
331-
return Buffer.concat(chunks).toString('utf8')
332-
} catch {
333-
return '(there was an error reading the body content)'
334-
}
316+
}
317+
const onEnd = () => {
318+
cleanup()
319+
resolve(Buffer.concat(chunks).toString('utf8'))
320+
}
321+
const onError = (e: unknown) => {
322+
cleanup()
323+
reject(e)
324+
}
325+
response.on('data', onData)
326+
response.on('end', onEnd)
327+
response.on('error', onError)
328+
})
335329
}
336330

337331
function desc(value: any) {
@@ -345,7 +339,7 @@ function desc(value: any) {
345339

346340
function getHttpModule(baseUrl: string): typeof http | typeof https {
347341
const { protocol } = new URL(baseUrl)
348-
return protocol === 'https:' ? https : http
342+
return protocol === 'https:' ? require('node:https') : require('node:http')
349343
}
350344

351345
async function getResponse(req: ClientRequest): Promise<IncomingMessage> {
@@ -379,18 +373,24 @@ async function getResponse(req: ClientRequest): Promise<IncomingMessage> {
379373
return res
380374
}
381375

382-
async function getResponseJson(
383-
response: IncomingMessage
384-
): Promise<ReturnType<typeof JSON.parse>> {
385-
let data = ''
376+
async function getResponseJson(response: IncomingMessage) {
377+
const chunks = []
378+
let size = 0
379+
const MAX = 50 * 1024 * 1024
386380
for await (const chunk of response) {
387-
data += chunk
381+
size += chunk.length
382+
if (size > MAX) {
383+
throw new Error('JSON body too large')
384+
}
385+
chunks.push(chunk)
388386
}
387+
const data = Buffer.concat(chunks).toString('utf8')
389388
try {
390389
return JSON.parse(data)
391390
} catch (e) {
391+
const message = (e as Error)?.['message'] || 'Unknown error'
392392
throw new SyntaxError(
393-
`Socket API - Invalid JSON response:\n${data}\n→ ${(e as Error)?.message || 'Unknown error'}`,
393+
`Socket API - Invalid JSON response:\n${data}\n→ ${message}`,
394394
{ cause: e }
395395
)
396396
}
@@ -1290,3 +1290,9 @@ Object.defineProperties(SocketSdk.prototype, {
12901290
getReportSupportedFiles: desc(SocketSdk.prototype.getSupportedScanFiles),
12911291
getScoreByNPMPackage: desc(SocketSdk.prototype.getScoreByNpmPackage)
12921292
})
1293+
1294+
// Optional live heap trace.
1295+
if (isDebug('heap')) {
1296+
const used = process.memoryUsage()
1297+
debugLog('heap', `heap used: ${Math.round(used.heapUsed / 1024 / 1024)}MB`)
1298+
}

0 commit comments

Comments
 (0)