Skip to content

Commit a903fcb

Browse files
authored
Replace got.stream with native fetch streaming (Final Phase) (#57196)
1 parent 55dac43 commit a903fcb

File tree

5 files changed

+330
-81
lines changed

5 files changed

+330
-81
lines changed

src/frame/lib/fetch-utils.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,3 +112,24 @@ export async function fetchWithRetry(
112112

113113
throw lastError || new Error('Maximum retries exceeded')
114114
}
115+
116+
/**
117+
* Create a streaming fetch request that returns a ReadableStream
118+
* This replaces got.stream functionality
119+
*/
120+
export async function fetchStream(
121+
url: string | URL,
122+
init?: RequestInit,
123+
options: FetchWithRetryOptions = {},
124+
): Promise<Response> {
125+
const { timeout, throwHttpErrors = true } = options
126+
127+
const response = await fetchWithTimeout(url, init, timeout)
128+
129+
// Check for HTTP errors if throwHttpErrors is enabled
130+
if (throwHttpErrors && !response.ok && response.status >= 400) {
131+
throw new Error(`HTTP ${response.status}: ${response.statusText}`)
132+
}
133+
134+
return response
135+
}

src/search/lib/ai-search-proxy.ts

Lines changed: 75 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { Response } from 'express'
22
import statsd from '@/observability/lib/statsd'
3-
import got from 'got'
3+
import { fetchStream } from '@/frame/lib/fetch-utils'
44
import { getHmacWithEpoch } from '@/search/lib/helpers/get-cse-copilot-auth'
55
import { getCSECopilotSource } from '@/search/lib/helpers/cse-copilot-docs-versions'
66
import type { ExtendedRequest } from '@/types'
@@ -56,56 +56,76 @@ export const aiSearchProxy = async (req: ExtendedRequest, res: Response) => {
5656
stream: true,
5757
}
5858

59+
let reader: ReadableStreamDefaultReader<Uint8Array> | null = null
60+
5961
try {
6062
// TODO: We temporarily add ?ai_search=1 to use a new pattern in cgs-copilot production
61-
const stream = got.stream.post(`${process.env.CSE_COPILOT_ENDPOINT}/answers?ai_search=1`, {
62-
json: body,
63-
headers: {
64-
Authorization: getHmacWithEpoch(),
65-
'Content-Type': 'application/json',
63+
const response = await fetchStream(
64+
`${process.env.CSE_COPILOT_ENDPOINT}/answers?ai_search=1`,
65+
{
66+
method: 'POST',
67+
body: JSON.stringify(body),
68+
headers: {
69+
Authorization: getHmacWithEpoch(),
70+
'Content-Type': 'application/json',
71+
},
6672
},
67-
})
68-
69-
// Listen for data events to count characters
70-
stream.on('data', (chunk: Buffer | string) => {
71-
// Ensure we have a string for proper character count
72-
const dataStr = typeof chunk === 'string' ? chunk : chunk.toString()
73-
totalChars += dataStr.length
74-
})
75-
76-
// Handle the upstream response before piping
77-
stream.on('response', (upstreamResponse) => {
78-
if (upstreamResponse.statusCode !== 200) {
79-
const errorMessage = `Upstream server responded with status code ${upstreamResponse.statusCode}`
80-
console.error(errorMessage)
81-
statsd.increment('ai-search.stream_response_error', 1, diagnosticTags)
82-
res.status(upstreamResponse.statusCode).json({
83-
errors: [{ message: errorMessage }],
84-
upstreamStatus: upstreamResponse.statusCode,
85-
})
86-
stream.destroy()
87-
} else {
88-
// Set response headers
89-
res.setHeader('Content-Type', 'application/x-ndjson')
90-
res.flushHeaders()
91-
92-
// Pipe the got stream directly to the response
93-
stream.pipe(res)
73+
{
74+
throwHttpErrors: false,
75+
},
76+
)
77+
78+
if (!response.ok) {
79+
const errorMessage = `Upstream server responded with status code ${response.status}`
80+
console.error(errorMessage)
81+
statsd.increment('ai-search.stream_response_error', 1, diagnosticTags)
82+
res.status(response.status).json({
83+
errors: [{ message: errorMessage }],
84+
upstreamStatus: response.status,
85+
})
86+
return
87+
}
88+
89+
// Set response headers
90+
res.setHeader('Content-Type', 'application/x-ndjson')
91+
res.flushHeaders()
92+
93+
// Stream the response body
94+
if (!response.body) {
95+
res.status(500).json({ errors: [{ message: 'No response body' }] })
96+
return
97+
}
98+
99+
reader = response.body.getReader()
100+
const decoder = new TextDecoder()
101+
102+
try {
103+
while (true) {
104+
const { done, value } = await reader.read()
105+
106+
if (done) {
107+
break
108+
}
109+
110+
// Decode chunk and count characters
111+
const chunk = decoder.decode(value, { stream: true })
112+
totalChars += chunk.length
113+
114+
// Write chunk to response
115+
res.write(chunk)
94116
}
95-
})
96117

97-
// Handle stream errors
98-
stream.on('error', (error: any) => {
99-
console.error('Error streaming from cse-copilot:', error)
118+
// Calculate metrics on stream end
119+
const totalResponseTime = Date.now() - startTime // in ms
120+
const charPerMsRatio = totalResponseTime > 0 ? totalChars / totalResponseTime : 0 // chars per ms
100121

101-
if (error?.code === 'ERR_NON_2XX_3XX_RESPONSE') {
102-
const upstreamStatus = error?.response?.statusCode || 500
103-
return res.status(upstreamStatus).json({
104-
errors: [{ message: 'Upstream server error' }],
105-
upstreamStatus,
106-
})
107-
}
122+
statsd.gauge('ai-search.total_response_time', totalResponseTime, diagnosticTags)
123+
statsd.gauge('ai-search.response_chars_per_ms', charPerMsRatio, diagnosticTags)
108124

125+
statsd.increment('ai-search.success_stream_end', 1, diagnosticTags)
126+
res.end()
127+
} catch (streamError) {
128+
console.error('Error streaming from cse-copilot:', streamError)
109129
statsd.increment('ai-search.stream_error', 1, diagnosticTags)
110130

111131
if (!res.headersSent) {
@@ -117,22 +137,20 @@ export const aiSearchProxy = async (req: ExtendedRequest, res: Response) => {
117137
res.write(errorMessage)
118138
res.end()
119139
}
120-
})
121-
122-
// Calculate metrics on stream end
123-
stream.on('end', () => {
124-
const totalResponseTime = Date.now() - startTime // in ms
125-
const charPerMsRatio = totalResponseTime > 0 ? totalChars / totalResponseTime : 0 // chars per ms
126-
127-
statsd.gauge('ai-search.total_response_time', totalResponseTime, diagnosticTags)
128-
statsd.gauge('ai-search.response_chars_per_ms', charPerMsRatio, diagnosticTags)
129-
130-
statsd.increment('ai-search.success_stream_end', 1, diagnosticTags)
131-
res.end()
132-
})
140+
} finally {
141+
if (reader) {
142+
reader.releaseLock()
143+
reader = null
144+
}
145+
}
133146
} catch (error) {
134147
statsd.increment('ai-search.route_error', 1, diagnosticTags)
135148
console.error('Error posting /answers to cse-copilot:', error)
136149
res.status(500).json({ errors: [{ message: 'Internal server error' }] })
150+
} finally {
151+
// Ensure reader lock is always released
152+
if (reader) {
153+
reader.releaseLock()
154+
}
137155
}
138156
}

src/search/middleware/ai-search-local-proxy.ts

Lines changed: 60 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
// When in local development we want to proxy to the ai-search route at docs.github.com
22

33
import { Router, Request, Response, NextFunction } from 'express'
4-
import got from 'got'
5-
import { pipeline } from 'node:stream'
4+
import { fetchStream } from '@/frame/lib/fetch-utils'
5+
import { pipeline, Readable } from 'node:stream'
66

77
const router = Router()
88

@@ -18,52 +18,88 @@ const hopByHop = new Set([
1818
])
1919

2020
function filterRequestHeaders(src: Request['headers']) {
21-
const out: Record<string, string | string[]> = {}
21+
const out: Record<string, string> = {}
2222
for (const [key, value] of Object.entries(src)) {
2323
if (!value) continue
2424
const k = key.toLowerCase()
2525
if (hopByHop.has(k) || k === 'cookie' || k === 'host') continue
26-
out[key] = value
26+
// Convert array values to string
27+
out[key] = Array.isArray(value) ? value[0] : value
2728
}
2829
out['accept'] = 'application/x-ndjson'
2930
out['content-type'] = 'application/json'
3031
return out
3132
}
3233

3334
router.post('/ai-search/v1', async (req: Request, res: Response, next: NextFunction) => {
35+
let reader: ReadableStreamDefaultReader<Uint8Array> | null = null
36+
3437
try {
35-
const upstream = got.stream.post('https://docs.github.com/api/ai-search/v1', {
36-
headers: filterRequestHeaders(req.headers),
37-
body: JSON.stringify(req.body ?? {}),
38-
decompress: false,
39-
throwHttpErrors: false,
40-
retry: { limit: 0 },
41-
})
38+
const response = await fetchStream(
39+
'https://docs.github.com/api/ai-search/v1',
40+
{
41+
method: 'POST',
42+
headers: filterRequestHeaders(req.headers),
43+
body: JSON.stringify(req.body ?? {}),
44+
},
45+
{
46+
throwHttpErrors: false,
47+
},
48+
)
4249

43-
upstream.on('response', (uRes) => {
44-
res.status(uRes.statusCode || 500)
50+
// Set status code
51+
res.status(response.status || 500)
4552

46-
for (const [k, v] of Object.entries(uRes.headers)) {
47-
if (!v) continue
48-
const key = k.toLowerCase()
49-
// Never forward hop-by-hop; got already handles chunked → strip content-length
50-
if (hopByHop.has(key) || key === 'content-length') continue
51-
res.setHeader(k, v as string)
52-
}
53-
res.flushHeaders?.()
53+
// Forward response headers
54+
for (const [k, v] of response.headers.entries()) {
55+
if (!v) continue
56+
const key = k.toLowerCase()
57+
// Never forward hop-by-hop; fetch already handles chunked → strip content-length
58+
if (hopByHop.has(key) || key === 'content-length') continue
59+
res.setHeader(k, v)
60+
}
61+
res.flushHeaders?.()
62+
63+
// Convert fetch ReadableStream to Node.js Readable stream for pipeline
64+
if (!response.body) {
65+
if (!res.headersSent) res.status(502).end('Bad Gateway')
66+
return
67+
}
68+
69+
reader = response.body.getReader()
70+
const nodeStream = new Readable({
71+
async read() {
72+
try {
73+
const { done, value } = await reader!.read()
74+
if (done) {
75+
this.push(null)
76+
} else {
77+
this.push(Buffer.from(value))
78+
}
79+
} catch (err) {
80+
this.destroy(err as Error)
81+
}
82+
},
5483
})
5584

56-
pipeline(upstream, res, (err) => {
85+
pipeline(nodeStream, res, (err) => {
5786
if (err) {
5887
console.error('[ai-search proxy] pipeline error:', err)
5988
if (!res.headersSent) res.status(502).end('Bad Gateway')
6089
}
90+
if (reader) {
91+
reader.releaseLock()
92+
reader = null
93+
}
6194
})
62-
63-
upstream.on('error', (err) => console.error('[ai-search proxy] upstream error:', err))
6495
} catch (err) {
6596
console.error('[ai-search proxy] request failed:', err)
6697
next(err)
98+
} finally {
99+
// Ensure reader lock is always released
100+
if (reader) {
101+
reader.releaseLock()
102+
}
67103
}
68104
})
69105

0 commit comments

Comments
 (0)