Skip to content

Commit 245e616

Browse files
authored
fix(backend): avoid header mutation after body stream (#1593)
* fix(backend): avoid header mutation after body stream * chore(backend): add docstrings and stabilize tests
1 parent c6238f6 commit 245e616

File tree

2 files changed

+117
-12
lines changed

2 files changed

+117
-12
lines changed

supabase/functions/_backend/utils/pg.ts

Lines changed: 55 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -132,12 +132,55 @@ async function queryReplicaLag(c: Context, pool: Pool): Promise<ReplicationLagSt
132132
*/
133133
export async function setReplicationLagHeader(c: Context, pool: Pool): Promise<void> {
134134
const status = await queryReplicaLag(c, pool)
135-
c.header('X-Replication-Lag', status.status)
135+
safeSetResponseHeader(c, 'X-Replication-Lag', status.status)
136136
if (status.max_lag_seconds !== null) {
137-
c.header('X-Replication-Lag-Seconds', String(Math.round(status.max_lag_seconds)))
137+
safeSetResponseHeader(c, 'X-Replication-Lag-Seconds', String(Math.round(status.max_lag_seconds)))
138138
}
139139
}
140140

141+
/**
142+
* Best-effort response header setter.
143+
*
144+
* In Cloudflare Workers, we sometimes run background tasks via `waitUntil()`
145+
* after the response has started streaming. Hono's `c.header()` clones the
146+
* Response and reuses the body stream; if the stream is already used/locked
147+
* this can throw (e.g. "ReadableStream is disturbed").
148+
*/
149+
function safeSetResponseHeader(c: Context, name: string, value: string): void {
150+
try {
151+
const res = c.res
152+
if (res?.bodyUsed)
153+
return
154+
const body = res?.body as unknown as { locked?: boolean } | null
155+
if (body?.locked)
156+
return
157+
}
158+
catch {
159+
return
160+
}
161+
162+
try {
163+
c.header(name, value)
164+
}
165+
catch {
166+
// Best-effort only: avoid crashing background tasks due to header mutation.
167+
}
168+
}
169+
170+
/**
171+
* Store the selected DB source in the context (for logging) and try to also
172+
* expose it via a response header when still safe to mutate headers.
173+
*/
174+
function setDatabaseSource(c: Context, source: string): void {
175+
try {
176+
c.set('databaseSource', source)
177+
}
178+
catch {
179+
// Ignore: mostly useful for logging in request-scoped context.
180+
}
181+
safeSetResponseHeader(c, 'X-Database-Source', source)
182+
}
183+
141184
export function getDatabaseURL(c: Context, readOnly = false): string {
142185
const dbRegion = getClientDbRegionSB(c)
143186

@@ -147,58 +190,58 @@ export function getDatabaseURL(c: Context, readOnly = false): string {
147190
// When using Hyperdrive we use session databases directly to avoid supabase pooler overhead and allow prepared statements
148191
// Asia region - Japan
149192
if (c.env.HYPERDRIVE_CAPGO_PS_AS_JAPAN && dbRegion === 'AS_JAPAN') {
150-
c.header('X-Database-Source', 'HYPERDRIVE_CAPGO_PLANETSCALE_AS_JAPAN')
193+
setDatabaseSource(c, 'HYPERDRIVE_CAPGO_PLANETSCALE_AS_JAPAN')
151194
cloudlog({ requestId: c.get('requestId'), message: 'Using HYPERDRIVE_CAPGO_PLANETSCALE_AS_JAPAN for read-only' })
152195
return c.env.HYPERDRIVE_CAPGO_PS_AS_JAPAN.connectionString
153196
}
154197
// Asia region - India
155198
if (c.env.HYPERDRIVE_CAPGO_PS_AS_INDIA && dbRegion === 'AS_INDIA') {
156-
c.header('X-Database-Source', 'HYPERDRIVE_CAPGO_PLANETSCALE_AS_INDIA')
199+
setDatabaseSource(c, 'HYPERDRIVE_CAPGO_PLANETSCALE_AS_INDIA')
157200
cloudlog({ requestId: c.get('requestId'), message: 'Using HYPERDRIVE_CAPGO_PLANETSCALE_AS_INDIA for read-only' })
158201
return c.env.HYPERDRIVE_CAPGO_PS_AS_INDIA.connectionString
159202
}
160203
// // US region
161204
if (c.env.HYPERDRIVE_CAPGO_PS_NA && dbRegion === 'NA') {
162-
c.header('X-Database-Source', 'HYPERDRIVE_CAPGO_PLANETSCALE_NA')
205+
setDatabaseSource(c, 'HYPERDRIVE_CAPGO_PLANETSCALE_NA')
163206
cloudlog({ requestId: c.get('requestId'), message: 'Using HYPERDRIVE_CAPGO_PLANETSCALE_NA for read-only' })
164207
return c.env.HYPERDRIVE_CAPGO_PS_NA.connectionString
165208
}
166209
// // EU region
167210
if (c.env.HYPERDRIVE_CAPGO_PS_EU && dbRegion === 'EU') {
168-
c.header('X-Database-Source', 'HYPERDRIVE_CAPGO_PLANETSCALE_EU')
211+
setDatabaseSource(c, 'HYPERDRIVE_CAPGO_PLANETSCALE_EU')
169212
cloudlog({ requestId: c.get('requestId'), message: 'Using HYPERDRIVE_CAPGO_PLANETSCALE_EU for read-only' })
170213
return c.env.HYPERDRIVE_CAPGO_PS_EU.connectionString
171214
}
172215
// // OC region
173216
if (c.env.HYPERDRIVE_CAPGO_PS_OC && dbRegion === 'OC') {
174-
c.header('X-Database-Source', 'HYPERDRIVE_CAPGO_PLANETSCALE_OC')
217+
setDatabaseSource(c, 'HYPERDRIVE_CAPGO_PLANETSCALE_OC')
175218
cloudlog({ requestId: c.get('requestId'), message: 'Using HYPERDRIVE_CAPGO_PLANETSCALE_OC for read-only' })
176219
return c.env.HYPERDRIVE_CAPGO_PS_OC.connectionString
177220
}
178221
// // SA region
179222
if (c.env.HYPERDRIVE_CAPGO_PS_SA && dbRegion === 'SA') {
180-
c.header('X-Database-Source', 'HYPERDRIVE_CAPGO_PLANETSCALE_SA')
223+
setDatabaseSource(c, 'HYPERDRIVE_CAPGO_PLANETSCALE_SA')
181224
cloudlog({ requestId: c.get('requestId'), message: 'Using HYPERDRIVE_CAPGO_PLANETSCALE_SA for read-only' })
182225
return c.env.HYPERDRIVE_CAPGO_PS_SA.connectionString
183226
}
184227
}
185228

186229
// Fallback to single Hyperdrive if available
187230
if (c.env.HYPERDRIVE_CAPGO_DIRECT_EU) {
188-
c.header('X-Database-Source', 'HYPERDRIVE_CAPGO_DIRECT_EU')
231+
setDatabaseSource(c, 'HYPERDRIVE_CAPGO_DIRECT_EU')
189232
cloudlog({ requestId: c.get('requestId'), message: `Using HYPERDRIVE_CAPGO_DIRECT_EU for ${readOnly ? 'read-only' : 'read-write'}` })
190233
return c.env.HYPERDRIVE_CAPGO_DIRECT_EU.connectionString
191234
}
192235

193236
// Main DB write poller EU region in supabase
194237
if (existInEnv(c, 'MAIN_SUPABASE_DB_URL')) {
195-
c.header('X-Database-Source', 'sb_pooler_main')
238+
setDatabaseSource(c, 'sb_pooler_main')
196239
cloudlog({ requestId: c.get('requestId'), message: 'Using MAIN_SUPABASE_DB_URL for read-write' })
197240
return getEnv(c, 'MAIN_SUPABASE_DB_URL')
198241
}
199242

200243
// Default Supabase direct connection used for testing or if no other option is available
201-
c.header('X-Database-Source', 'direct')
244+
setDatabaseSource(c, 'direct')
202245
cloudlog({ requestId: c.get('requestId'), message: 'Using Direct Supabase for read-write' })
203246
return fixSupabaseHost(getEnv(c, 'SUPABASE_DB_URL'))
204247
}
@@ -207,7 +250,7 @@ export function getPgClient(c: Context, readOnly = false) {
207250
const dbUrl = getDatabaseURL(c, readOnly)
208251
const requestId = c.get('requestId')
209252
const appName = c.res.headers.get('X-Worker-Source') ?? 'unknown source'
210-
const dbName = c.res.headers.get('X-Database-Source') ?? 'unknown source'
253+
const dbName = String(c.get('databaseSource') ?? c.res.headers.get('X-Database-Source') ?? 'unknown source')
211254
cloudlog({ requestId, message: 'SUPABASE_DB_URL selected', dbName, appName, readOnly })
212255

213256
const isPooler = dbName.startsWith('sb_pooler')

tests/pg-header-safety.test.ts

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import { describe, expect, it } from 'vitest'
2+
3+
import { getDatabaseURL } from '../supabase/functions/_backend/utils/pg.ts'
4+
5+
describe('getDatabaseURL header safety', () => {
6+
it.concurrent('does not try to mutate headers when response body was already consumed', async () => {
7+
const res = new Response('ok')
8+
await res.text() // mark bodyUsed=true
9+
10+
let headerCalls = 0
11+
const ctx = {
12+
res,
13+
env: {
14+
// Provide a deterministic code path that doesn't depend on host env vars.
15+
HYPERDRIVE_CAPGO_DIRECT_EU: { connectionString: 'postgres://postgres:postgres@localhost:5432/postgres' },
16+
// Included to satisfy environments where `getEnv()` might read from `c.env`.
17+
SUPABASE_DB_URL: 'postgres://postgres:postgres@localhost:5432/postgres',
18+
},
19+
header: () => {
20+
headerCalls++
21+
throw new TypeError('This ReadableStream is disturbed (has already been read from), and cannot be used as a body.')
22+
},
23+
get: (key: string) => (key === 'requestId' ? 'test-request' : undefined),
24+
set: () => {},
25+
} as any
26+
27+
expect(() => getDatabaseURL(ctx)).not.toThrow()
28+
expect(headerCalls).toBe(0)
29+
})
30+
31+
it.concurrent('does not try to mutate headers when response body stream is locked', () => {
32+
const stream = new ReadableStream<Uint8Array>({
33+
start(controller) {
34+
controller.enqueue(new TextEncoder().encode('ok'))
35+
controller.close()
36+
},
37+
})
38+
const res = new Response(stream)
39+
// Lock the response body as if the runtime started streaming it already.
40+
res.body?.getReader()
41+
42+
let headerCalls = 0
43+
const ctx = {
44+
res,
45+
env: {
46+
// Provide a deterministic code path that doesn't depend on host env vars.
47+
HYPERDRIVE_CAPGO_DIRECT_EU: { connectionString: 'postgres://postgres:postgres@localhost:5432/postgres' },
48+
// Included to satisfy environments where `getEnv()` might read from `c.env`.
49+
SUPABASE_DB_URL: 'postgres://postgres:postgres@localhost:5432/postgres',
50+
},
51+
header: () => {
52+
headerCalls++
53+
throw new TypeError('This ReadableStream is disturbed (has already been read from), and cannot be used as a body.')
54+
},
55+
get: (key: string) => (key === 'requestId' ? 'test-request' : undefined),
56+
set: () => {},
57+
} as any
58+
59+
expect(() => getDatabaseURL(ctx)).not.toThrow()
60+
expect(headerCalls).toBe(0)
61+
})
62+
})

0 commit comments

Comments
 (0)