-
Notifications
You must be signed in to change notification settings - Fork 19
fix: decompress gzip responses for Anthropic token extraction #1550
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
981b298
7414046
b68b7ef
7344aef
fb09c82
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -377,6 +377,177 @@ function trackTokenUsage(proxyRes, opts) { | |||||
| }); | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
| * Parse WebSocket frames from a buffer (server→client direction, unmasked). | ||||||
| * | ||||||
| * Returns an object with: | ||||||
| * - messages: Array of decoded text frame payloads (strings) | ||||||
| * - consumed: Number of bytes consumed from the buffer | ||||||
| * | ||||||
| * Only handles non-fragmented text frames (FIN=1, opcode=1). | ||||||
| * Other frame types (binary, ping, pong, close, continuation) are consumed | ||||||
| * but their payloads are not returned. | ||||||
| * | ||||||
| * @param {Buffer} buf - Buffer containing WebSocket frame data | ||||||
| * @returns {{ messages: string[], consumed: number }} | ||||||
| */ | ||||||
| function parseWebSocketFrames(buf) { | ||||||
| const messages = []; | ||||||
| let pos = 0; | ||||||
|
|
||||||
| while (pos + 2 <= buf.length) { | ||||||
| const firstByte = buf[pos]; | ||||||
| const secondByte = buf[pos + 1]; | ||||||
| const fin = (firstByte & 0x80) !== 0; | ||||||
| const opcode = firstByte & 0x0F; | ||||||
| const masked = (secondByte & 0x80) !== 0; | ||||||
| let payloadLength = secondByte & 0x7F; | ||||||
| let headerSize = 2; | ||||||
|
|
||||||
| if (payloadLength === 126) { | ||||||
| if (pos + 4 > buf.length) break; | ||||||
| payloadLength = buf.readUInt16BE(pos + 2); | ||||||
| headerSize = 4; | ||||||
| } else if (payloadLength === 127) { | ||||||
| if (pos + 10 > buf.length) break; | ||||||
| payloadLength = Number(buf.readBigUInt64BE(pos + 2)); | ||||||
| headerSize = 10; | ||||||
| } | ||||||
|
|
||||||
| if (masked) headerSize += 4; // skip masking key | ||||||
|
|
||||||
| const frameEnd = pos + headerSize + payloadLength; | ||||||
| if (frameEnd > buf.length) break; | ||||||
|
|
||||||
| // Extract text frames (opcode 1) with FIN set | ||||||
| if (opcode === 1 && fin) { | ||||||
| messages.push(buf.slice(pos + headerSize, frameEnd).toString('utf8')); | ||||||
| } | ||||||
|
|
||||||
| pos = frameEnd; | ||||||
| } | ||||||
|
|
||||||
| return { messages, consumed: pos }; | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
| * Attach token usage tracking to a WebSocket upstream connection. | ||||||
| * | ||||||
| * Claude Code CLI uses WebSocket streaming to the Anthropic API. The | ||||||
| * api-proxy relays this as a raw socket pipe (tlsSocket ↔ clientSocket). | ||||||
| * This function adds a non-blocking 'data' listener on the upstream socket | ||||||
| * to parse WebSocket frames and extract token usage from JSON text messages. | ||||||
| * | ||||||
| * The upstream stream starts with an HTTP 101 response header, followed by | ||||||
| * WebSocket frames. This function skips the HTTP header before parsing frames. | ||||||
| * | ||||||
| * @param {import('tls').TLSSocket} upstreamSocket - Upstream TLS socket | ||||||
| * @param {object} opts | ||||||
| * @param {string} opts.requestId - Request ID for correlation | ||||||
| * @param {string} opts.provider - Provider name (anthropic, copilot, etc.) | ||||||
| * @param {string} opts.path - Request path | ||||||
| * @param {number} opts.startTime - Request start time (Date.now()) | ||||||
| * @param {object} opts.metrics - Metrics module reference | ||||||
| */ | ||||||
| function trackWebSocketTokenUsage(upstreamSocket, opts) { | ||||||
| const { requestId, provider, path: reqPath, startTime, metrics: metricsRef } = opts; | ||||||
|
|
||||||
| let httpHeaderParsed = false; | ||||||
| let buffer = Buffer.alloc(0); | ||||||
| let totalBytes = 0; | ||||||
| let streamingUsage = {}; | ||||||
| let streamingModel = null; | ||||||
| let finalized = false; | ||||||
|
|
||||||
| // Max buffer to prevent unbounded memory growth (1 MB) | ||||||
| const MAX_WS_BUFFER = 1 * 1024 * 1024; | ||||||
|
|
||||||
| upstreamSocket.on('data', (chunk) => { | ||||||
| totalBytes += chunk.length; | ||||||
| buffer = Buffer.concat([buffer, chunk]); | ||||||
|
|
||||||
| // Safety: drop buffer if it grows too large (malformed frames) | ||||||
| if (buffer.length > MAX_WS_BUFFER) { | ||||||
| buffer = Buffer.alloc(0); | ||||||
| httpHeaderParsed = true; // skip header parsing | ||||||
| return; | ||||||
| } | ||||||
|
|
||||||
| // Skip the HTTP 101 Switching Protocols response header | ||||||
| if (!httpHeaderParsed) { | ||||||
| const headerEnd = buffer.indexOf('\r\n\r\n'); | ||||||
| if (headerEnd === -1) return; // need more data for full header | ||||||
| buffer = buffer.slice(headerEnd + 4); | ||||||
| httpHeaderParsed = true; | ||||||
|
Comment on lines
+614
to
+631
|
||||||
| } | ||||||
|
|
||||||
| // Parse any complete WebSocket frames | ||||||
| const { messages, consumed } = parseWebSocketFrames(buffer); | ||||||
| if (consumed > 0) { | ||||||
| buffer = buffer.slice(consumed); | ||||||
| } | ||||||
|
|
||||||
| for (const text of messages) { | ||||||
| const { usage, model } = extractUsageFromSseLine(text); | ||||||
| if (model && !streamingModel) streamingModel = model; | ||||||
| if (usage) { | ||||||
| for (const [k, v] of Object.entries(usage)) { | ||||||
| streamingUsage[k] = v; | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
| }); | ||||||
|
|
||||||
| function doFinalize() { | ||||||
| if (finalized) return; | ||||||
| finalized = true; | ||||||
|
|
||||||
| if (Object.keys(streamingUsage).length === 0) return; | ||||||
|
|
||||||
| const duration = Date.now() - startTime; | ||||||
| const normalized = normalizeUsage(streamingUsage); | ||||||
| if (!normalized) return; | ||||||
|
|
||||||
| if (metricsRef) { | ||||||
| metricsRef.increment('input_tokens_total', { provider }, normalized.input_tokens); | ||||||
| metricsRef.increment('output_tokens_total', { provider }, normalized.output_tokens); | ||||||
| } | ||||||
|
|
||||||
| const record = { | ||||||
| timestamp: new Date().toISOString(), | ||||||
| request_id: requestId, | ||||||
| provider, | ||||||
| model: streamingModel || 'unknown', | ||||||
| path: reqPath, | ||||||
| status: 200, | ||||||
|
||||||
| status: 200, | |
| status: 101, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
parseWebSocketFramesadvances past the masking key whenmaskedis set, but it does not unmask the payload before decoding it as UTF-8. That means masked text frames would produce corrupted messages. Since this parser is exported and already branches onmasked, either properly unmask the payload or explicitly treat masked frames as unsupported and skip returning them.