|
| 1 | +import { context, propagation } from '@opentelemetry/api'; |
| 2 | +import type { AggregationCounts, Client, Scope } from '@sentry/core'; |
| 3 | +import { |
| 4 | + addNonEnumerableProperty, |
| 5 | + debug, |
| 6 | + generateSpanId, |
| 7 | + getClient, |
| 8 | + getCurrentScope, |
| 9 | + getIsolationScope, |
| 10 | + httpRequestToRequestData, |
| 11 | + stripUrlQueryAndFragment, |
| 12 | + withIsolationScope, |
| 13 | +} from '@sentry/core'; |
| 14 | +import type EventEmitter from 'events'; |
| 15 | +import type { IncomingMessage, OutgoingMessage, Server } from 'http'; |
| 16 | +import { DEBUG_BUILD } from '../../debug-build'; |
| 17 | +import { INSTRUMENTATION_NAME, MAX_BODY_BYTE_LENGTH } from './constants'; |
| 18 | + |
| 19 | +const clientToRequestSessionAggregatesMap = new Map< |
| 20 | + Client, |
| 21 | + { [timestampRoundedToSeconds: string]: { exited: number; crashed: number; errored: number } } |
| 22 | +>(); |
| 23 | + |
| 24 | +/** |
| 25 | + * Instrument a server to capture incoming requests. |
| 26 | + * |
| 27 | + */ |
| 28 | +export function instrumentServer( |
| 29 | + server: Server, |
| 30 | + { |
| 31 | + ignoreIncomingRequestBody, |
| 32 | + maxIncomingRequestBodySize = 'medium', |
| 33 | + trackIncomingRequestsAsSessions = true, |
| 34 | + sessionFlushingDelayMS, |
| 35 | + }: { |
| 36 | + ignoreIncomingRequestBody?: (url: string, request: IncomingMessage) => boolean; |
| 37 | + maxIncomingRequestBodySize?: 'small' | 'medium' | 'always' | 'none'; |
| 38 | + trackIncomingRequestsAsSessions?: boolean; |
| 39 | + sessionFlushingDelayMS: number; |
| 40 | + }, |
| 41 | +): void { |
| 42 | + // eslint-disable-next-line @typescript-eslint/unbound-method |
| 43 | + const originalEmit = server.emit; |
| 44 | + |
| 45 | + // This means it was already patched, do nothing |
| 46 | + if ((originalEmit as { __sentry_patched__?: boolean }).__sentry_patched__) { |
| 47 | + return; |
| 48 | + } |
| 49 | + |
| 50 | + const newEmit = new Proxy(originalEmit, { |
| 51 | + apply(target, thisArg, args: [event: string, ...args: unknown[]]) { |
| 52 | + // Only traces request events |
| 53 | + if (args[0] !== 'request') { |
| 54 | + return target.apply(thisArg, args); |
| 55 | + } |
| 56 | + |
| 57 | + DEBUG_BUILD && debug.log(INSTRUMENTATION_NAME, 'Handling incoming request'); |
| 58 | + |
| 59 | + const isolationScope = getIsolationScope().clone(); |
| 60 | + const request = args[1] as IncomingMessage; |
| 61 | + const response = args[2] as OutgoingMessage; |
| 62 | + |
| 63 | + const normalizedRequest = httpRequestToRequestData(request); |
| 64 | + |
| 65 | + // request.ip is non-standard but some frameworks set this |
| 66 | + const ipAddress = (request as { ip?: string }).ip || request.socket?.remoteAddress; |
| 67 | + |
| 68 | + const url = request.url || '/'; |
| 69 | + if (!ignoreIncomingRequestBody?.(url, request) && maxIncomingRequestBodySize !== 'none') { |
| 70 | + patchRequestToCaptureBody(request, isolationScope, maxIncomingRequestBodySize); |
| 71 | + } |
| 72 | + |
| 73 | + // Update the isolation scope, isolate this request |
| 74 | + isolationScope.setSDKProcessingMetadata({ normalizedRequest, ipAddress }); |
| 75 | + |
| 76 | + // attempt to update the scope's `transactionName` based on the request URL |
| 77 | + // Ideally, framework instrumentations coming after the HttpInstrumentation |
| 78 | + // update the transactionName once we get a parameterized route. |
| 79 | + const httpMethod = (request.method || 'GET').toUpperCase(); |
| 80 | + const httpTarget = stripUrlQueryAndFragment(url); |
| 81 | + |
| 82 | + const bestEffortTransactionName = `${httpMethod} ${httpTarget}`; |
| 83 | + |
| 84 | + isolationScope.setTransactionName(bestEffortTransactionName); |
| 85 | + |
| 86 | + if (trackIncomingRequestsAsSessions !== false) { |
| 87 | + recordRequestSession({ |
| 88 | + requestIsolationScope: isolationScope, |
| 89 | + response, |
| 90 | + sessionFlushingDelayMS: sessionFlushingDelayMS ?? 60_000, |
| 91 | + }); |
| 92 | + } |
| 93 | + |
| 94 | + return withIsolationScope(isolationScope, () => { |
| 95 | + // Set a new propagationSpanId for this request |
| 96 | + // We rely on the fact that `withIsolationScope()` will implicitly also fork the current scope |
| 97 | + // This way we can save an "unnecessary" `withScope()` invocation |
| 98 | + getCurrentScope().getPropagationContext().propagationSpanId = generateSpanId(); |
| 99 | + |
| 100 | + const ctx = propagation.extract(context.active(), normalizedRequest.headers); |
| 101 | + return context.with(ctx, () => { |
| 102 | + return target.apply(thisArg, args); |
| 103 | + }); |
| 104 | + }); |
| 105 | + }, |
| 106 | + }); |
| 107 | + |
| 108 | + addNonEnumerableProperty(newEmit, '__sentry_patched__', true); |
| 109 | + |
| 110 | + server.emit = newEmit; |
| 111 | +} |
| 112 | + |
| 113 | +/** |
| 114 | + * Starts a session and tracks it in the context of a given isolation scope. |
| 115 | + * When the passed response is finished, the session is put into a task and is |
| 116 | + * aggregated with other sessions that may happen in a certain time window |
| 117 | + * (sessionFlushingDelayMs). |
| 118 | + * |
| 119 | + * The sessions are always aggregated by the client that is on the current scope |
| 120 | + * at the time of ending the response (if there is one). |
| 121 | + */ |
| 122 | +// Exported for unit tests |
| 123 | +export function recordRequestSession({ |
| 124 | + requestIsolationScope, |
| 125 | + response, |
| 126 | + sessionFlushingDelayMS, |
| 127 | +}: { |
| 128 | + requestIsolationScope: Scope; |
| 129 | + response: EventEmitter; |
| 130 | + sessionFlushingDelayMS?: number; |
| 131 | +}): void { |
| 132 | + requestIsolationScope.setSDKProcessingMetadata({ |
| 133 | + requestSession: { status: 'ok' }, |
| 134 | + }); |
| 135 | + response.once('close', () => { |
| 136 | + // We need to grab the client off the current scope instead of the isolation scope because the isolation scope doesn't hold any client out of the box. |
| 137 | + const client = getClient(); |
| 138 | + const requestSession = requestIsolationScope.getScopeData().sdkProcessingMetadata.requestSession; |
| 139 | + |
| 140 | + if (client && requestSession) { |
| 141 | + DEBUG_BUILD && debug.log(`Recorded request session with status: ${requestSession.status}`); |
| 142 | + |
| 143 | + const roundedDate = new Date(); |
| 144 | + roundedDate.setSeconds(0, 0); |
| 145 | + const dateBucketKey = roundedDate.toISOString(); |
| 146 | + |
| 147 | + const existingClientAggregate = clientToRequestSessionAggregatesMap.get(client); |
| 148 | + const bucket = existingClientAggregate?.[dateBucketKey] || { exited: 0, crashed: 0, errored: 0 }; |
| 149 | + bucket[({ ok: 'exited', crashed: 'crashed', errored: 'errored' } as const)[requestSession.status]]++; |
| 150 | + |
| 151 | + if (existingClientAggregate) { |
| 152 | + existingClientAggregate[dateBucketKey] = bucket; |
| 153 | + } else { |
| 154 | + DEBUG_BUILD && debug.log('Opened new request session aggregate.'); |
| 155 | + const newClientAggregate = { [dateBucketKey]: bucket }; |
| 156 | + clientToRequestSessionAggregatesMap.set(client, newClientAggregate); |
| 157 | + |
| 158 | + const flushPendingClientAggregates = (): void => { |
| 159 | + clearTimeout(timeout); |
| 160 | + unregisterClientFlushHook(); |
| 161 | + clientToRequestSessionAggregatesMap.delete(client); |
| 162 | + |
| 163 | + const aggregatePayload: AggregationCounts[] = Object.entries(newClientAggregate).map( |
| 164 | + ([timestamp, value]) => ({ |
| 165 | + started: timestamp, |
| 166 | + exited: value.exited, |
| 167 | + errored: value.errored, |
| 168 | + crashed: value.crashed, |
| 169 | + }), |
| 170 | + ); |
| 171 | + client.sendSession({ aggregates: aggregatePayload }); |
| 172 | + }; |
| 173 | + |
| 174 | + const unregisterClientFlushHook = client.on('flush', () => { |
| 175 | + DEBUG_BUILD && debug.log('Sending request session aggregate due to client flush'); |
| 176 | + flushPendingClientAggregates(); |
| 177 | + }); |
| 178 | + const timeout = setTimeout(() => { |
| 179 | + DEBUG_BUILD && debug.log('Sending request session aggregate due to flushing schedule'); |
| 180 | + flushPendingClientAggregates(); |
| 181 | + }, sessionFlushingDelayMS).unref(); |
| 182 | + } |
| 183 | + } |
| 184 | + }); |
| 185 | +} |
| 186 | + |
| 187 | +/** |
| 188 | + * This method patches the request object to capture the body. |
| 189 | + * Instead of actually consuming the streamed body ourselves, which has potential side effects, |
| 190 | + * we monkey patch `req.on('data')` to intercept the body chunks. |
| 191 | + * This way, we only read the body if the user also consumes the body, ensuring we do not change any behavior in unexpected ways. |
| 192 | + */ |
| 193 | +function patchRequestToCaptureBody( |
| 194 | + req: IncomingMessage, |
| 195 | + isolationScope: Scope, |
| 196 | + maxIncomingRequestBodySize: 'small' | 'medium' | 'always', |
| 197 | +): void { |
| 198 | + let bodyByteLength = 0; |
| 199 | + const chunks: Buffer[] = []; |
| 200 | + |
| 201 | + DEBUG_BUILD && debug.log(INSTRUMENTATION_NAME, 'Patching request.on'); |
| 202 | + |
| 203 | + /** |
| 204 | + * We need to keep track of the original callbacks, in order to be able to remove listeners again. |
| 205 | + * Since `off` depends on having the exact same function reference passed in, we need to be able to map |
| 206 | + * original listeners to our wrapped ones. |
| 207 | + */ |
| 208 | + const callbackMap = new WeakMap(); |
| 209 | + |
| 210 | + const maxBodySize = |
| 211 | + maxIncomingRequestBodySize === 'small' |
| 212 | + ? 1_000 |
| 213 | + : maxIncomingRequestBodySize === 'medium' |
| 214 | + ? 10_000 |
| 215 | + : MAX_BODY_BYTE_LENGTH; |
| 216 | + |
| 217 | + try { |
| 218 | + // eslint-disable-next-line @typescript-eslint/unbound-method |
| 219 | + req.on = new Proxy(req.on, { |
| 220 | + apply: (target, thisArg, args: Parameters<typeof req.on>) => { |
| 221 | + const [event, listener, ...restArgs] = args; |
| 222 | + |
| 223 | + if (event === 'data') { |
| 224 | + DEBUG_BUILD && |
| 225 | + debug.log(INSTRUMENTATION_NAME, `Handling request.on("data") with maximum body size of ${maxBodySize}b`); |
| 226 | + |
| 227 | + const callback = new Proxy(listener, { |
| 228 | + apply: (target, thisArg, args: Parameters<typeof listener>) => { |
| 229 | + try { |
| 230 | + const chunk = args[0] as Buffer | string; |
| 231 | + const bufferifiedChunk = Buffer.from(chunk); |
| 232 | + |
| 233 | + if (bodyByteLength < maxBodySize) { |
| 234 | + chunks.push(bufferifiedChunk); |
| 235 | + bodyByteLength += bufferifiedChunk.byteLength; |
| 236 | + } else if (DEBUG_BUILD) { |
| 237 | + debug.log( |
| 238 | + INSTRUMENTATION_NAME, |
| 239 | + `Dropping request body chunk because maximum body length of ${maxBodySize}b is exceeded.`, |
| 240 | + ); |
| 241 | + } |
| 242 | + } catch (err) { |
| 243 | + DEBUG_BUILD && debug.error(INSTRUMENTATION_NAME, 'Encountered error while storing body chunk.'); |
| 244 | + } |
| 245 | + |
| 246 | + return Reflect.apply(target, thisArg, args); |
| 247 | + }, |
| 248 | + }); |
| 249 | + |
| 250 | + callbackMap.set(listener, callback); |
| 251 | + |
| 252 | + return Reflect.apply(target, thisArg, [event, callback, ...restArgs]); |
| 253 | + } |
| 254 | + |
| 255 | + return Reflect.apply(target, thisArg, args); |
| 256 | + }, |
| 257 | + }); |
| 258 | + |
| 259 | + // Ensure we also remove callbacks correctly |
| 260 | + // eslint-disable-next-line @typescript-eslint/unbound-method |
| 261 | + req.off = new Proxy(req.off, { |
| 262 | + apply: (target, thisArg, args: Parameters<typeof req.off>) => { |
| 263 | + const [, listener] = args; |
| 264 | + |
| 265 | + const callback = callbackMap.get(listener); |
| 266 | + if (callback) { |
| 267 | + callbackMap.delete(listener); |
| 268 | + |
| 269 | + const modifiedArgs = args.slice(); |
| 270 | + modifiedArgs[1] = callback; |
| 271 | + return Reflect.apply(target, thisArg, modifiedArgs); |
| 272 | + } |
| 273 | + |
| 274 | + return Reflect.apply(target, thisArg, args); |
| 275 | + }, |
| 276 | + }); |
| 277 | + |
| 278 | + req.on('end', () => { |
| 279 | + try { |
| 280 | + const body = Buffer.concat(chunks).toString('utf-8'); |
| 281 | + if (body) { |
| 282 | + // Using Buffer.byteLength here, because the body may contain characters that are not 1 byte long |
| 283 | + const bodyByteLength = Buffer.byteLength(body, 'utf-8'); |
| 284 | + const truncatedBody = |
| 285 | + bodyByteLength > maxBodySize |
| 286 | + ? `${Buffer.from(body) |
| 287 | + .subarray(0, maxBodySize - 3) |
| 288 | + .toString('utf-8')}...` |
| 289 | + : body; |
| 290 | + |
| 291 | + isolationScope.setSDKProcessingMetadata({ normalizedRequest: { data: truncatedBody } }); |
| 292 | + } |
| 293 | + } catch (error) { |
| 294 | + if (DEBUG_BUILD) { |
| 295 | + debug.error(INSTRUMENTATION_NAME, 'Error building captured request body', error); |
| 296 | + } |
| 297 | + } |
| 298 | + }); |
| 299 | + } catch (error) { |
| 300 | + if (DEBUG_BUILD) { |
| 301 | + debug.error(INSTRUMENTATION_NAME, 'Error patching request to capture body', error); |
| 302 | + } |
| 303 | + } |
| 304 | +} |
0 commit comments