Skip to content

Commit 367c32c

Browse files
committed
Connection now exposes an AsyncStream for recieving server notices. Improvements to connection state tracking.
1 parent 4bb253c commit 367c32c

17 files changed

+573
-220
lines changed
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
//
2+
// Connection+transaction.swift
3+
// SwiftPostgresClient
4+
//
5+
// Copyright 2025 Will Temperley and the SwiftPostgresClient contributors.
6+
//
7+
// Licensed under the Apache License, Version 2.0 (the "License");
8+
// you may not use this file except in compliance with the License.
9+
// You may obtain a copy of the License at
10+
//
11+
// http://www.apache.org/licenses/LICENSE-2.0
12+
//
13+
// Unless required by applicable law or agreed to in writing, software
14+
// distributed under the License is distributed on an "AS IS" BASIS,
15+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
// See the License for the specific language governing permissions and
17+
// limitations under the License.
18+
//
19+
20+
/// Execution of queries without using extended query protocol.
21+
extension Connection {
22+
23+
func execute(_ sql: String) async throws {
24+
if state != .ready {
25+
throw PostgresError.invalidState("Expected ready state, got \(state)")
26+
}
27+
let queryRequest = QueryRequest(query: sql)
28+
try await sendRequest(queryRequest)
29+
try await receiveResponse(type: CommandCompleteResponse.self)
30+
try await receiveResponse(type: ReadyForQueryResponse.self)
31+
}
32+
33+
func query(_ sql: String) async throws -> ResultCursor {
34+
if state != .ready {
35+
throw PostgresError.invalidState("Expected ready state, got \(state)")
36+
}
37+
let queryRequest = QueryRequest(query: sql)
38+
try await sendRequest(queryRequest)
39+
try await receiveResponse(type: RowDescriptionResponse.self)
40+
let response = await receiveResponse()
41+
return ResultCursor(connection: self, portalName: "", extendedProtocol: false, initialResponse: response)
42+
}
43+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
//
2+
// Connection+metadata.swift
3+
// SwiftPostgresClient
4+
//
5+
// Copyright 2025 Will Temperley and the SwiftPostgresClient contributors.
6+
//
7+
// Licensed under the Apache License, Version 2.0 (the "License");
8+
// you may not use this file except in compliance with the License.
9+
// You may obtain a copy of the License at
10+
//
11+
// http://www.apache.org/licenses/LICENSE-2.0
12+
//
13+
// Unless required by applicable law or agreed to in writing, software
14+
// distributed under the License is distributed on an "AS IS" BASIS,
15+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
// See the License for the specific language governing permissions and
17+
// limitations under the License.
18+
//
19+
20+
/// Methods to query connection status, using server state instead of attempting to track status client-side.
21+
extension Connection {
22+
23+
func listPreparedStatements() async throws -> [String] {
24+
25+
let text = "SELECT name FROM pg_prepared_statements"
26+
27+
let cursor = try await query(text)
28+
29+
var data: [String] = .init()
30+
for try await row in cursor {
31+
data.append(try row.columns[0].string())
32+
}
33+
try await receiveResponse(type: ReadyForQueryResponse.self)
34+
return data
35+
}
36+
37+
func listOpenPortals() async throws -> [String] {
38+
39+
let text = "SELECT name FROM pg_cursor"
40+
41+
let cursor = try await query(text)
42+
43+
var data: [String] = .init()
44+
for try await row in cursor {
45+
data.append(try row.columns[0].string())
46+
}
47+
try await receiveResponse(type: ReadyForQueryResponse.self)
48+
return data
49+
}
50+
51+
func statementClosed(_ name: String) async -> Bool {
52+
!openStatements.contains(name)
53+
}
54+
55+
}

Sources/SwiftPostgresClient/Connection+portal.swift

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@ extension Connection {
2626
parameterValues: [PostgresValueConvertible?] = [],
2727
columnMetadata: Bool = false
2828
) async throws -> Portal {
29+
30+
if state == .closed {
31+
throw PostgresError.connectionClosed
32+
}
33+
2934
let portalName = UUID().uuidString
3035
let bindRequest = BindRequest(name: portalName, statement: statement, parameterValues: parameterValues)
3136
try await sendRequest(bindRequest)
@@ -39,27 +44,62 @@ extension Connection {
3944
if columnMetadata {
4045
metadata = try await retrieveColumnMetadata(portalName: portalName)
4146
}
42-
portalStatus[portalName] = .open
4347

4448
return Portal(name: portalName, metadata: metadata, statement: statement, connection: self)
4549
}
4650

51+
func query(
52+
portalName: String,
53+
statement: Statement,
54+
metadata: [ColumnMetadata]?
55+
) async throws -> ResultCursor {
56+
57+
state = .querySent
58+
59+
let executeRequest = ExecuteRequest(portalName: portalName, statement: statement)
60+
try await sendRequest(executeRequest)
61+
62+
let flushRequest = FlushRequest()
63+
try await sendRequest(flushRequest)
64+
65+
// The first response of the query is evaluated eagerly to check its type.
66+
let response = await receiveResponse()
67+
68+
// A zero-result query immediately gives a CommandCompleteResponse
69+
if response is CommandCompleteResponse {
70+
try await cleanupPortal(name: portalName)
71+
}
72+
73+
return ResultCursor(connection: self, portalName: portalName, metadata: metadata, initialResponse: response)
74+
}
75+
76+
func execute(
77+
portalName: String,
78+
statement: Statement
79+
) async throws -> CommandStatus {
80+
81+
state = .querySent
82+
83+
let executeRequest = ExecuteRequest(portalName: portalName, statement: statement)
84+
try await sendRequest(executeRequest)
85+
let flushRequest = FlushRequest()
86+
try await sendRequest(flushRequest)
87+
88+
let response = try await receiveResponse(type: CommandCompleteResponse.self)
89+
try await cleanupPortal(name: portalName)
90+
return response.status
91+
}
92+
4793
func cleanupPortal(name: String) async throws {
4894
// Close the portal
4995
try await sendRequest(ClosePortalRequest(name: name))
5096
try await sendRequest(FlushRequest())
5197
try await receiveResponse(type: CloseCompleteResponse.self)
5298

53-
let status = portalStatus.removeValue(forKey: name)
54-
if status == nil {
55-
throw PostgresError.protocolError("Missing portal status for \(name)")
56-
}
57-
5899
// Finalize the transaction (unless already in BEGIN/COMMIT)
59100
try await sendRequest(SyncRequest())
60101

61102
// transaction status always set on ReadyForQueryResponse
62103
try await receiveResponse(type: ReadyForQueryResponse.self)
63104
}
64-
65105
}

Sources/SwiftPostgresClient/Connection+query.swift renamed to Sources/SwiftPostgresClient/Connection+statement.swift

Lines changed: 9 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ extension Connection {
3030
/// - Returns: a `PreparedStatement` ready for parameter binding.
3131
public func prepareStatement(text: String) async throws -> PreparedStatement {
3232

33+
if self.state != .ready {
34+
throw PostgresError.invalidState("Expected connection to be in state .ready, but was \(self.state)")
35+
}
36+
3337
let statement = Statement(text: text)
3438
let parseRequest = ParseRequest(statement: statement)
3539
try await sendRequest(parseRequest)
@@ -39,34 +43,12 @@ extension Connection {
3943

4044
try await receiveResponse(type: ParseCompleteResponse.self)
4145

46+
self.openStatements.insert(statement.name)
4247
return PreparedStatement(name: statement.name, statement: statement, connection: self)
4348
}
4449

45-
func query(
46-
portalName: String,
47-
statement: Statement,
48-
metadata: [ColumnMetadata]?
49-
) async throws -> ResultCursor {
50-
51-
state = .querySent
52-
53-
let executeRequest = ExecuteRequest(portalName: portalName, statement: statement)
54-
try await sendRequest(executeRequest)
55-
56-
let flushRequest = FlushRequest()
57-
try await sendRequest(flushRequest)
58-
59-
// The first response of the query is evaluated eagerly to check its
60-
// type.
61-
// An zero-result query could immediately give a CommandCompleteResponse
62-
// This also prevents some issues with undrained message queues.
63-
// TODO: check error response
64-
let response = await receiveResponse()
65-
66-
return ResultCursor(connection: self, portalName: portalName, metadata: metadata, initialResponse: response)
67-
}
68-
69-
public func closeStatement(name: String) async throws {
50+
/// Close a prepared statement
51+
func closeStatement(name: String) async throws {
7052

7153
let closeStatementRequest = CloseStatementRequest(name: name)
7254
try await sendRequest(closeStatementRequest)
@@ -75,9 +57,10 @@ extension Connection {
7557
try await sendRequest(flushRequest)
7658

7759
try await receiveResponse(type: CloseCompleteResponse.self)
60+
self.openStatements.remove(name)
7861
}
7962

80-
// used for decoding structs using column names
63+
/// Returns metadata used for decoding structs via column names
8164
func retrieveColumnMetadata(portalName: String) async throws -> [ColumnMetadata]? {
8265

8366
var columns: [ColumnMetadata]? = nil

Sources/SwiftPostgresClient/Connection+transaction.swift

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,21 @@
1717
// limitations under the License.
1818
//
1919

20+
/// Transaction convenience methods
2021
extension Connection {
2122

23+
public func beginTransaction() async throws {
24+
try await execute("BEGIN;")
25+
}
26+
27+
public func commitTransaction() async throws {
28+
try await execute("COMMIT;")
29+
}
30+
31+
func rollback() async throws {
32+
try await execute("ROLLBACK;")
33+
}
34+
2235
/// Runs a block within a transaction. Commits on success, rolls back on error.
2336
func withTransaction<T>(_ operation: @Sendable () async throws -> T) async throws -> T where T: Sendable {
2437
try await beginTransaction()
@@ -35,26 +48,4 @@ extension Connection {
3548
throw error
3649
}
3750
}
38-
39-
public func beginTransaction() async throws {
40-
try await executeSimpleQuery("BEGIN;")
41-
}
42-
43-
public func commitTransaction() async throws {
44-
try await executeSimpleQuery("COMMIT;")
45-
}
46-
47-
func rollback() async throws {
48-
try await executeSimpleQuery("ROLLBACK;")
49-
}
50-
51-
func executeSimpleQuery(_ sql: String) async throws {
52-
let queryRequest = QueryRequest(query: sql)
53-
try await sendRequest(queryRequest)
54-
try await receiveResponse(type: CommandCompleteResponse.self)
55-
try await receiveResponse(type: ReadyForQueryResponse.self)
56-
#if DEBUG
57-
print("Transaction status: \(transactionStatus)")
58-
#endif
59-
}
6051
}

0 commit comments

Comments
 (0)