Skip to content

Commit 7982f0a

Browse files
committed
build
1 parent 2cea1b5 commit 7982f0a

File tree

12 files changed

+226
-97
lines changed

12 files changed

+226
-97
lines changed

cf/src/connection.js

Lines changed: 50 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ const errorFields = {
5353

5454
function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose = noop } = {}) {
5555
const {
56+
sslnegotiation,
5657
ssl,
5758
max,
5859
user,
@@ -79,14 +80,15 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
7980

8081
let socket = null
8182
, cancelMessage
83+
, errorResponse = null
8284
, result = new Result()
8385
, incoming = Buffer.alloc(0)
8486
, needsTypes = options.fetch_types
8587
, backendParameters = {}
8688
, statements = {}
8789
, statementId = Math.random().toString(36).slice(2)
8890
, statementCount = 1
89-
, closedDate = 0
91+
, closedTime = 0
9092
, remaining = 0
9193
, hostIndex = 0
9294
, retries = 0
@@ -157,6 +159,9 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
157159
if (terminated)
158160
return queryError(q, Errors.connection('CONNECTION_DESTROYED', options))
159161

162+
if (stream)
163+
return queryError(q, Errors.generic('COPY_IN_PROGRESS', 'You cannot execute queries during copy'))
164+
160165
if (q.cancelled)
161166
return
162167

@@ -261,25 +266,29 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
261266
}
262267

263268
async function secure() {
264-
write(SSLRequest)
265-
const canSSL = await new Promise(r => socket.once('data', x => r(x[0] === 83))) // S
269+
if (sslnegotiation !== 'direct') {
270+
write(SSLRequest)
271+
const canSSL = await new Promise(r => socket.once('data', x => r(x[0] === 83))) // S
266272

267-
if (!canSSL && ssl === 'prefer')
268-
return connected()
273+
if (!canSSL && ssl === 'prefer')
274+
return connected()
275+
}
269276

270-
socket.removeAllListeners()
271-
socket = tls.connect({
277+
const options = {
272278
socket,
273-
servername: net.isIP(socket.host) ? undefined : socket.host,
274-
...(ssl === 'require' || ssl === 'allow' || ssl === 'prefer'
275-
? { rejectUnauthorized: false }
276-
: ssl === 'verify-full'
277-
? {}
278-
: typeof ssl === 'object'
279-
? ssl
280-
: {}
281-
)
282-
})
279+
servername: net.isIP(socket.host) ? undefined : socket.host
280+
}
281+
282+
if (sslnegotiation === 'direct')
283+
options.ALPNProtocols = ['postgresql']
284+
285+
if (ssl === 'require' || ssl === 'allow' || ssl === 'prefer')
286+
options.rejectUnauthorized = false
287+
else if (typeof ssl === 'object')
288+
Object.assign(options, ssl)
289+
290+
socket.removeAllListeners()
291+
socket = tls.connect(options)
283292
socket.on('secureConnect', connected)
284293
socket.on('error', error)
285294
socket.on('close', closed)
@@ -352,7 +361,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
352361
}
353362

354363
function reconnect() {
355-
setTimeout(connect, closedDate ? closedDate + delay - performance.now() : 0)
364+
setTimeout(connect, closedTime ? Math.max(0, closedTime + delay - performance.now()) : 0)
356365
}
357366

358367
function connected() {
@@ -444,7 +453,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
444453
return reconnect()
445454

446455
!hadError && (query || sent.length) && error(Errors.connection('CONNECTION_CLOSED', options, socket))
447-
closedDate = performance.now()
456+
closedTime = performance.now()
448457
hadError && options.shared.retries++
449458
delay = (typeof backoff === 'function' ? backoff(options.shared.retries) : backoff) * 1000
450459
onclose(connection, Errors.connection('CONNECTION_CLOSED', options, socket))
@@ -526,8 +535,21 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
526535
}
527536

528537
function ReadyForQuery(x) {
529-
query && query.options.simple && query.resolve(results || result)
530-
query = results = null
538+
if (query) {
539+
if (errorResponse) {
540+
query.retried
541+
? errored(query.retried)
542+
: query.prepared && retryRoutines.has(errorResponse.routine)
543+
? retry(query, errorResponse)
544+
: errored(errorResponse)
545+
} else {
546+
query.resolve(results || result)
547+
}
548+
} else if (errorResponse) {
549+
errored(errorResponse)
550+
}
551+
552+
query = results = errorResponse = null
531553
result = new Result()
532554
connectTimer.cancel()
533555

@@ -592,8 +614,6 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
592614
result.count && query.cursorFn(result)
593615
write(Sync)
594616
}
595-
596-
query.resolve(result)
597617
}
598618

599619
function ParseComplete() {
@@ -792,13 +812,12 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
792812
}
793813

794814
function ErrorResponse(x) {
795-
query && (query.cursorFn || query.describeFirst) && write(Sync)
796-
const error = Errors.postgres(parseError(x))
797-
query && query.retried
798-
? errored(query.retried)
799-
: query && query.prepared && retryRoutines.has(error.routine)
800-
? retry(query, error)
801-
: errored(error)
815+
if (query) {
816+
(query.cursorFn || query.describeFirst) && write(Sync)
817+
errorResponse = Errors.postgres(parseError(x))
818+
} else {
819+
errored(Errors.postgres(parseError(x)))
820+
}
802821
}
803822

804823
function retry(q, error) {
@@ -851,6 +870,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
851870
final(callback) {
852871
socket.write(b().c().end())
853872
final = callback
873+
stream = null
854874
}
855875
})
856876
query.resolve(stream)

cf/src/index.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -447,8 +447,9 @@ function parseOptions(a, b) {
447447

448448
const ints = ['idle_timeout', 'connect_timeout', 'max_lifetime', 'max_pipeline', 'backoff', 'keep_alive']
449449
const defaults = {
450-
max : 10,
450+
max : globalThis.Cloudflare ? 3 : 10,
451451
ssl : false,
452+
sslnegotiation : null,
452453
idle_timeout : null,
453454
connect_timeout : 30,
454455
max_lifetime : max_lifetime,

cjs/src/connection.js

Lines changed: 50 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ const errorFields = {
5151

5252
function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose = noop } = {}) {
5353
const {
54+
sslnegotiation,
5455
ssl,
5556
max,
5657
user,
@@ -77,14 +78,15 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
7778

7879
let socket = null
7980
, cancelMessage
81+
, errorResponse = null
8082
, result = new Result()
8183
, incoming = Buffer.alloc(0)
8284
, needsTypes = options.fetch_types
8385
, backendParameters = {}
8486
, statements = {}
8587
, statementId = Math.random().toString(36).slice(2)
8688
, statementCount = 1
87-
, closedDate = 0
89+
, closedTime = 0
8890
, remaining = 0
8991
, hostIndex = 0
9092
, retries = 0
@@ -155,6 +157,9 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
155157
if (terminated)
156158
return queryError(q, Errors.connection('CONNECTION_DESTROYED', options))
157159

160+
if (stream)
161+
return queryError(q, Errors.generic('COPY_IN_PROGRESS', 'You cannot execute queries during copy'))
162+
158163
if (q.cancelled)
159164
return
160165

@@ -259,25 +264,29 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
259264
}
260265

261266
async function secure() {
262-
write(SSLRequest)
263-
const canSSL = await new Promise(r => socket.once('data', x => r(x[0] === 83))) // S
267+
if (sslnegotiation !== 'direct') {
268+
write(SSLRequest)
269+
const canSSL = await new Promise(r => socket.once('data', x => r(x[0] === 83))) // S
264270

265-
if (!canSSL && ssl === 'prefer')
266-
return connected()
271+
if (!canSSL && ssl === 'prefer')
272+
return connected()
273+
}
267274

268-
socket.removeAllListeners()
269-
socket = tls.connect({
275+
const options = {
270276
socket,
271-
servername: net.isIP(socket.host) ? undefined : socket.host,
272-
...(ssl === 'require' || ssl === 'allow' || ssl === 'prefer'
273-
? { rejectUnauthorized: false }
274-
: ssl === 'verify-full'
275-
? {}
276-
: typeof ssl === 'object'
277-
? ssl
278-
: {}
279-
)
280-
})
277+
servername: net.isIP(socket.host) ? undefined : socket.host
278+
}
279+
280+
if (sslnegotiation === 'direct')
281+
options.ALPNProtocols = ['postgresql']
282+
283+
if (ssl === 'require' || ssl === 'allow' || ssl === 'prefer')
284+
options.rejectUnauthorized = false
285+
else if (typeof ssl === 'object')
286+
Object.assign(options, ssl)
287+
288+
socket.removeAllListeners()
289+
socket = tls.connect(options)
281290
socket.on('secureConnect', connected)
282291
socket.on('error', error)
283292
socket.on('close', closed)
@@ -350,7 +359,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
350359
}
351360

352361
function reconnect() {
353-
setTimeout(connect, closedDate ? closedDate + delay - performance.now() : 0)
362+
setTimeout(connect, closedTime ? Math.max(0, closedTime + delay - performance.now()) : 0)
354363
}
355364

356365
function connected() {
@@ -442,7 +451,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
442451
return reconnect()
443452

444453
!hadError && (query || sent.length) && error(Errors.connection('CONNECTION_CLOSED', options, socket))
445-
closedDate = performance.now()
454+
closedTime = performance.now()
446455
hadError && options.shared.retries++
447456
delay = (typeof backoff === 'function' ? backoff(options.shared.retries) : backoff) * 1000
448457
onclose(connection, Errors.connection('CONNECTION_CLOSED', options, socket))
@@ -524,8 +533,21 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
524533
}
525534

526535
function ReadyForQuery(x) {
527-
query && query.options.simple && query.resolve(results || result)
528-
query = results = null
536+
if (query) {
537+
if (errorResponse) {
538+
query.retried
539+
? errored(query.retried)
540+
: query.prepared && retryRoutines.has(errorResponse.routine)
541+
? retry(query, errorResponse)
542+
: errored(errorResponse)
543+
} else {
544+
query.resolve(results || result)
545+
}
546+
} else if (errorResponse) {
547+
errored(errorResponse)
548+
}
549+
550+
query = results = errorResponse = null
529551
result = new Result()
530552
connectTimer.cancel()
531553

@@ -590,8 +612,6 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
590612
result.count && query.cursorFn(result)
591613
write(Sync)
592614
}
593-
594-
query.resolve(result)
595615
}
596616

597617
function ParseComplete() {
@@ -790,13 +810,12 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
790810
}
791811

792812
function ErrorResponse(x) {
793-
query && (query.cursorFn || query.describeFirst) && write(Sync)
794-
const error = Errors.postgres(parseError(x))
795-
query && query.retried
796-
? errored(query.retried)
797-
: query && query.prepared && retryRoutines.has(error.routine)
798-
? retry(query, error)
799-
: errored(error)
813+
if (query) {
814+
(query.cursorFn || query.describeFirst) && write(Sync)
815+
errorResponse = Errors.postgres(parseError(x))
816+
} else {
817+
errored(Errors.postgres(parseError(x)))
818+
}
800819
}
801820

802821
function retry(q, error) {
@@ -849,6 +868,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
849868
final(callback) {
850869
socket.write(b().c().end())
851870
final = callback
871+
stream = null
852872
}
853873
})
854874
query.resolve(stream)

cjs/src/index.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -446,8 +446,9 @@ function parseOptions(a, b) {
446446

447447
const ints = ['idle_timeout', 'connect_timeout', 'max_lifetime', 'max_pipeline', 'backoff', 'keep_alive']
448448
const defaults = {
449-
max : 10,
449+
max : globalThis.Cloudflare ? 3 : 10,
450450
ssl : false,
451+
sslnegotiation : null,
451452
idle_timeout : null,
452453
connect_timeout : 30,
453454
max_lifetime : max_lifetime,

cjs/tests/bootstrap.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ exec('psql', ['-c', 'alter database postgres_js_test owner to postgres_js_test']
2020

2121
module.exports.exec = exec;function exec(cmd, args) {
2222
const { stderr } = spawnSync(cmd, args, { stdio: 'pipe', encoding: 'utf8' })
23-
if (stderr && !stderr.includes('already exists') && !stderr.includes('does not exist'))
23+
if (stderr && !stderr.includes('already exists') && !stderr.includes('does not exist') && !stderr.includes('WARNING:'))
2424
throw stderr
2525
}
2626

cjs/tests/index.js

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,17 @@ t('Connect using SSL require', async() =>
401401
}))]
402402
)
403403

404+
t('Connect using SSL direct', async() => {
405+
const [{ supported }] = await sql`select current_setting('server_version_num')::int >= 180000 as supported`
406+
return [true, !supported || (await new Promise((resolve, reject) => {
407+
postgres({
408+
ssl: 'require',
409+
sslnegotiation: 'direct',
410+
idle_timeout
411+
})`select 1`.then(() => resolve(true), reject)
412+
}))]
413+
})
414+
404415
t('Connect using SSL prefer', async() => {
405416
await exec('psql', ['-c', 'alter system set ssl=off'])
406417
await exec('psql', ['-c', 'select pg_reload_conf()'])
@@ -2614,3 +2625,16 @@ t('Ensure reserve on query throws proper error', async() => {
26142625
'wat', x, reserved.release()
26152626
]
26162627
})
2628+
2629+
t('query during copy error', async() => {
2630+
const sql = postgres(options) // eslint-disable-line
2631+
await sql`create table test (id serial primary key, name text)`
2632+
const copy = await sql`copy test from stdin`.writable()
2633+
const error = await sql`select 1`.catch(e => e)
2634+
await copy.end()
2635+
2636+
return [
2637+
'COPY_IN_PROGRESS', error.code,
2638+
await sql`drop table test`
2639+
]
2640+
})

0 commit comments

Comments
 (0)