Skip to content

Commit 3c29758

Browse files
authored
Extract PSQLRow (#177)
### Motivation In #135, I did some naming things just wrong. `PSQLRows` is a stupid name. We should name it `PSQLRowStream`. `PSQLRows.Row` is a stupid name for a single table row. We should name it `PSQLRow`. Transforming an incoming data row packet to a `[PSQLData]` array early is expensive and stupid. Let's not do this anymore. ### Changes - Extract `PSQLRows.Row` to `PSQLRows` (Got its own file). Stop the early `[PSQLData]` madness. - Rename `PSQLRows` to `PSQLRowStream` - Fix naming in integration tests to match `PSQLRowStream`
1 parent 8b13752 commit 3c29758

File tree

11 files changed

+165
-154
lines changed

11 files changed

+165
-154
lines changed

Sources/PostgresNIO/Connection/PostgresConnection+Database.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,12 @@ internal enum PostgresCommands: PostgresRequest {
7878
}
7979
}
8080

81-
extension PSQLRows {
81+
extension PSQLRowStream {
8282

8383
func iterateRowsWithoutBackpressureOption(lookupTable: PostgresRow.LookupTable, onRow: @escaping (PostgresRow) throws -> ()) -> EventLoopFuture<Void> {
8484
self.onRow { psqlRow in
85-
let columns = psqlRow.data.map { psqlData in
86-
PostgresMessage.DataRow.Column(value: psqlData.bytes)
85+
let columns = psqlRow.data.columns.map { bytes in
86+
PostgresMessage.DataRow.Column(value: bytes)
8787
}
8888

8989
let row = PostgresRow(dataRow: .init(columns: columns), lookupTable: lookupTable)

Sources/PostgresNIO/New/Connection State Machine/ConnectionStateMachine.swift

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,12 +92,12 @@ struct ConnectionStateMachine {
9292

9393
// --- streaming actions
9494
// actions if query has requested next row but we are waiting for backend
95-
case forwardRow([PSQLData], to: EventLoopPromise<StateMachineStreamNextResult>)
96-
case forwardCommandComplete(CircularBuffer<[PSQLData]>, commandTag: String, to: EventLoopPromise<StateMachineStreamNextResult>)
95+
case forwardRow(PSQLBackendMessage.DataRow, to: EventLoopPromise<StateMachineStreamNextResult>)
96+
case forwardCommandComplete(CircularBuffer<PSQLBackendMessage.DataRow>, commandTag: String, to: EventLoopPromise<StateMachineStreamNextResult>)
9797
case forwardStreamError(PSQLError, to: EventLoopPromise<StateMachineStreamNextResult>, cleanupContext: CleanUpContext?)
9898
// actions if query has not asked for next row but are pushing the final bytes to it
9999
case forwardStreamErrorToCurrentQuery(PSQLError, read: Bool, cleanupContext: CleanUpContext?)
100-
case forwardStreamCompletedToCurrentQuery(CircularBuffer<[PSQLData]>, commandTag: String, read: Bool)
100+
case forwardStreamCompletedToCurrentQuery(CircularBuffer<PSQLBackendMessage.DataRow>, commandTag: String, read: Bool)
101101

102102
// Prepare statement actions
103103
case sendParseDescribeSync(name: String, query: String)
@@ -1106,10 +1106,10 @@ extension ConnectionStateMachine {
11061106

11071107
enum StateMachineStreamNextResult {
11081108
/// the next row
1109-
case row([PSQLData])
1109+
case row(PSQLBackendMessage.DataRow)
11101110

11111111
/// the query has completed, all remaining rows and the command completion tag
1112-
case complete(CircularBuffer<[PSQLData]>, commandTag: String)
1112+
case complete(CircularBuffer<PSQLBackendMessage.DataRow>, commandTag: String)
11131113
}
11141114

11151115
struct SendPrepareStatement {

Sources/PostgresNIO/New/Connection State Machine/ExtendedQueryStateMachine.swift

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ struct ExtendedQueryStateMachine {
1414
/// A state that is used if a noData message was received before. If a row description was received `bufferingRows` is
1515
/// used after receiving a `bindComplete` message
1616
case bindCompleteReceived(ExtendedQueryContext)
17-
case bufferingRows([PSQLBackendMessage.RowDescription.Column], CircularBuffer<[PSQLData]>, readOnEmpty: Bool)
18-
case waitingForNextRow([PSQLBackendMessage.RowDescription.Column], CircularBuffer<[PSQLData]>, EventLoopPromise<StateMachineStreamNextResult>)
17+
case bufferingRows([PSQLBackendMessage.RowDescription.Column], CircularBuffer<PSQLBackendMessage.DataRow>, readOnEmpty: Bool)
18+
case waitingForNextRow([PSQLBackendMessage.RowDescription.Column], CircularBuffer<PSQLBackendMessage.DataRow>, EventLoopPromise<StateMachineStreamNextResult>)
1919

2020
case commandComplete(commandTag: String)
2121
case error(PSQLError)
@@ -34,12 +34,12 @@ struct ExtendedQueryStateMachine {
3434

3535
// --- streaming actions
3636
// actions if query has requested next row but we are waiting for backend
37-
case forwardRow([PSQLData], to: EventLoopPromise<StateMachineStreamNextResult>)
38-
case forwardCommandComplete(CircularBuffer<[PSQLData]>, commandTag: String, to: EventLoopPromise<StateMachineStreamNextResult>)
37+
case forwardRow(PSQLBackendMessage.DataRow, to: EventLoopPromise<StateMachineStreamNextResult>)
38+
case forwardCommandComplete(CircularBuffer<PSQLBackendMessage.DataRow>, commandTag: String, to: EventLoopPromise<StateMachineStreamNextResult>)
3939
case forwardStreamError(PSQLError, to: EventLoopPromise<StateMachineStreamNextResult>)
4040
// actions if query has not asked for next row but are pushing the final bytes to it
4141
case forwardStreamErrorToCurrentQuery(PSQLError, read: Bool)
42-
case forwardStreamCompletedToCurrentQuery(CircularBuffer<[PSQLData]>, commandTag: String, read: Bool)
42+
case forwardStreamCompletedToCurrentQuery(CircularBuffer<PSQLBackendMessage.DataRow>, commandTag: String, read: Bool)
4343

4444
case read
4545
case wait
@@ -170,10 +170,7 @@ struct ExtendedQueryStateMachine {
170170
}
171171

172172
return self.avoidingStateMachineCoW { state -> Action in
173-
let row = dataRow.columns.enumerated().map { (index, buffer) in
174-
PSQLData(bytes: buffer, dataType: columns[index].dataType, format: columns[index].format)
175-
}
176-
buffer.append(row)
173+
buffer.append(dataRow)
177174
state = .bufferingRows(columns, buffer, readOnEmpty: readOnEmpty)
178175
return .wait
179176
}
@@ -187,12 +184,8 @@ struct ExtendedQueryStateMachine {
187184

188185
return self.avoidingStateMachineCoW { state -> Action in
189186
precondition(buffer.isEmpty, "Expected the buffer to be empty")
190-
let row = dataRow.columns.enumerated().map { (index, buffer) in
191-
PSQLData(bytes: buffer, dataType: columns[index].dataType, format: columns[index].format)
192-
}
193-
194187
state = .bufferingRows(columns, buffer, readOnEmpty: false)
195-
return .forwardRow(row, to: promise)
188+
return .forwardRow(dataRow, to: promise)
196189
}
197190

198191
case .initialized,

Sources/PostgresNIO/New/PSQLChannelHandler.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ final class PSQLChannelHandler: ChannelDuplexHandler {
1818
self.logger.trace("Connection state changed", metadata: [.connectionState: "\(self.state)"])
1919
}
2020
}
21-
private var currentQuery: PSQLRows?
21+
private var currentQuery: PSQLRowStream?
2222
private let authentificationConfiguration: PSQLConnection.Configuration.Authentication?
2323
private let configureSSLCallback: ((Channel) throws -> Void)?
2424

@@ -426,7 +426,7 @@ final class PSQLChannelHandler: ChannelDuplexHandler {
426426
self.run(action, with: context)
427427
return promise.futureResult
428428
}
429-
let rows = PSQLRows(
429+
let rows = PSQLRowStream(
430430
rowDescription: columns,
431431
queryContext: queryContext,
432432
eventLoop: context.channel.eventLoop,
@@ -451,14 +451,14 @@ final class PSQLChannelHandler: ChannelDuplexHandler {
451451
context: ChannelHandlerContext)
452452
{
453453
let eventLoop = context.channel.eventLoop
454-
let rows = PSQLRows(
454+
let rows = PSQLRowStream(
455455
rowDescription: [],
456456
queryContext: queryContext,
457457
eventLoop: context.channel.eventLoop,
458458
cancel: {
459459
// ignore...
460460
}, next: {
461-
let emptyBuffer = CircularBuffer<[PSQLData]>(initialCapacity: 0)
461+
let emptyBuffer = CircularBuffer<PSQLBackendMessage.DataRow>(initialCapacity: 0)
462462
return eventLoop.makeSucceededFuture(.complete(emptyBuffer, commandTag: commandTag))
463463
})
464464
queryContext.promise.succeed(rows)

Sources/PostgresNIO/New/PSQLConnection.swift

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -121,17 +121,17 @@ final class PSQLConnection {
121121

122122
// MARK: Query
123123

124-
func query(_ query: String, logger: Logger) -> EventLoopFuture<PSQLRows> {
124+
func query(_ query: String, logger: Logger) -> EventLoopFuture<PSQLRowStream> {
125125
self.query(query, [], logger: logger)
126126
}
127127

128-
func query(_ query: String, _ bind: [PSQLEncodable], logger: Logger) -> EventLoopFuture<PSQLRows> {
128+
func query(_ query: String, _ bind: [PSQLEncodable], logger: Logger) -> EventLoopFuture<PSQLRowStream> {
129129
var logger = logger
130130
logger[postgresMetadataKey: .connectionID] = "\(self.connectionID)"
131131
guard bind.count <= Int(Int16.max) else {
132132
return self.channel.eventLoop.makeFailedFuture(PSQLError.tooManyParameters)
133133
}
134-
let promise = self.channel.eventLoop.makePromise(of: PSQLRows.self)
134+
let promise = self.channel.eventLoop.makePromise(of: PSQLRowStream.self)
135135
let context = ExtendedQueryContext(
136136
query: query,
137137
bind: bind,
@@ -161,12 +161,12 @@ final class PSQLConnection {
161161
}
162162

163163
func execute(_ preparedStatement: PSQLPreparedStatement,
164-
_ bind: [PSQLEncodable], logger: Logger) -> EventLoopFuture<PSQLRows>
164+
_ bind: [PSQLEncodable], logger: Logger) -> EventLoopFuture<PSQLRowStream>
165165
{
166166
guard bind.count <= Int(Int16.max) else {
167167
return self.channel.eventLoop.makeFailedFuture(PSQLError.tooManyParameters)
168168
}
169-
let promise = self.channel.eventLoop.makePromise(of: PSQLRows.self)
169+
let promise = self.channel.eventLoop.makePromise(of: PSQLRowStream.self)
170170
let context = ExtendedQueryContext(
171171
preparedStatement: preparedStatement,
172172
bind: bind,
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
2+
/// `PSQLRow` represents a single row that was received from the Postgres Server.
3+
struct PSQLRow {
4+
internal let lookupTable: [String: Int]
5+
internal let data: PSQLBackendMessage.DataRow
6+
7+
internal let columns: [PSQLBackendMessage.RowDescription.Column]
8+
internal let jsonDecoder: PSQLJSONDecoder
9+
10+
internal init(data: PSQLBackendMessage.DataRow, lookupTable: [String: Int], columns: [PSQLBackendMessage.RowDescription.Column], jsonDecoder: PSQLJSONDecoder) {
11+
self.data = data
12+
self.lookupTable = lookupTable
13+
self.columns = columns
14+
self.jsonDecoder = jsonDecoder
15+
}
16+
17+
/// Access the raw Postgres data in the n-th column
18+
subscript(index: Int) -> PSQLData {
19+
PSQLData(bytes: self.data.columns[index], dataType: self.columns[index].dataType, format: self.columns[index].format)
20+
}
21+
22+
// TBD: Should this be optional?
23+
/// Access the raw Postgres data in the column indentified by name
24+
subscript(column columnName: String) -> PSQLData? {
25+
guard let index = self.lookupTable[columnName] else {
26+
return nil
27+
}
28+
29+
return self[index]
30+
}
31+
32+
/// Access the data in the provided column and decode it into the target type.
33+
///
34+
/// - Parameters:
35+
/// - column: The column name to read the data from
36+
/// - type: The type to decode the data into
37+
/// - Throws: The error of the decoding implementation. See also `PSQLDecodable` protocol for this.
38+
/// - Returns: The decoded value of Type T.
39+
func decode<T: PSQLDecodable>(column: String, as type: T.Type, file: String = #file, line: Int = #line) throws -> T {
40+
guard let index = self.lookupTable[column] else {
41+
preconditionFailure("A column '\(column)' does not exist.")
42+
}
43+
44+
return try self.decode(column: index, as: type, file: file, line: line)
45+
}
46+
47+
/// Access the data in the provided column and decode it into the target type.
48+
///
49+
/// - Parameters:
50+
/// - column: The column index to read the data from
51+
/// - type: The type to decode the data into
52+
/// - Throws: The error of the decoding implementation. See also `PSQLDecodable` protocol for this.
53+
/// - Returns: The decoded value of Type T.
54+
func decode<T: PSQLDecodable>(column index: Int, as type: T.Type, file: String = #file, line: Int = #line) throws -> T {
55+
let column = self.columns[index]
56+
57+
let decodingContext = PSQLDecodingContext(
58+
jsonDecoder: jsonDecoder,
59+
columnName: column.name,
60+
columnIndex: index,
61+
file: file,
62+
line: line)
63+
64+
return try self[index].decode(as: T.self, context: decodingContext)
65+
}
66+
}

Sources/PostgresNIO/New/PSQLRows.swift renamed to Sources/PostgresNIO/New/PSQLRowStream.swift

Lines changed: 9 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
import NIOCore
22
import Logging
33

4-
final class PSQLRows {
4+
final class PSQLRowStream {
55

66
let eventLoop: EventLoop
77
let logger: Logger
88

99
private enum UpstreamState {
1010
case streaming(next: () -> EventLoopFuture<StateMachineStreamNextResult>, cancel: () -> ())
11-
case finished(remaining: CircularBuffer<[PSQLData]>, commandTag: String)
11+
case finished(remaining: CircularBuffer<PSQLBackendMessage.DataRow>, commandTag: String)
1212
case failure(Error)
1313
case consumed(Result<String, Error>)
1414
}
@@ -46,7 +46,7 @@ final class PSQLRows {
4646
self.lookupTable = lookup
4747
}
4848

49-
func next() -> EventLoopFuture<Row?> {
49+
func next() -> EventLoopFuture<PSQLRow?> {
5050
guard self.eventLoop.inEventLoop else {
5151
return self.eventLoop.flatSubmit {
5252
self.next()
@@ -57,15 +57,15 @@ final class PSQLRows {
5757

5858
switch self.upstreamState {
5959
case .streaming(let upstreamNext, _):
60-
return upstreamNext().map { payload -> Row? in
60+
return upstreamNext().map { payload -> PSQLRow? in
6161
self.downstreamState = .consuming
6262
switch payload {
6363
case .row(let data):
64-
return Row(data: data, lookupTable: self.lookupTable, columns: self.rowDescription, jsonDecoder: self.jsonDecoder)
64+
return PSQLRow(data: data, lookupTable: self.lookupTable, columns: self.rowDescription, jsonDecoder: self.jsonDecoder)
6565
case .complete(var buffer, let commandTag):
6666
if let data = buffer.popFirst() {
6767
self.upstreamState = .finished(remaining: buffer, commandTag: commandTag)
68-
return Row(data: data, lookupTable: self.lookupTable, columns: self.rowDescription, jsonDecoder: self.jsonDecoder)
68+
return PSQLRow(data: data, lookupTable: self.lookupTable, columns: self.rowDescription, jsonDecoder: self.jsonDecoder)
6969
}
7070

7171
self.upstreamState = .consumed(.success(commandTag))
@@ -82,7 +82,7 @@ final class PSQLRows {
8282
self.downstreamState = .consuming
8383
if let data = buffer.popFirst() {
8484
self.upstreamState = .finished(remaining: buffer, commandTag: commandTag)
85-
let row = Row(data: data, lookupTable: self.lookupTable, columns: self.rowDescription, jsonDecoder: self.jsonDecoder)
85+
let row = PSQLRow(data: data, lookupTable: self.lookupTable, columns: self.rowDescription, jsonDecoder: self.jsonDecoder)
8686
return self.eventLoop.makeSucceededFuture(row)
8787
}
8888

@@ -104,7 +104,7 @@ final class PSQLRows {
104104
])
105105
}
106106

107-
internal func finalForward(_ finalForward: Result<(CircularBuffer<[PSQLData]>, commandTag: String), PSQLError>?) {
107+
internal func finalForward(_ finalForward: Result<(CircularBuffer<PSQLBackendMessage.DataRow>, commandTag: String), PSQLError>?) {
108108
switch finalForward {
109109
case .some(.success((let buffer, commandTag: let commandTag))):
110110
guard case .streaming = self.upstreamState else {
@@ -146,56 +146,8 @@ final class PSQLRows {
146146
}
147147
return commandTag
148148
}
149-
150-
struct Row {
151-
let lookupTable: [String: Int]
152-
let data: [PSQLData]
153-
let columns: [PSQLBackendMessage.RowDescription.Column]
154-
let jsonDecoder: PSQLJSONDecoder
155-
156-
init(data: [PSQLData], lookupTable: [String: Int], columns: [PSQLBackendMessage.RowDescription.Column], jsonDecoder: PSQLJSONDecoder) {
157-
self.data = data
158-
self.lookupTable = lookupTable
159-
self.columns = columns
160-
self.jsonDecoder = jsonDecoder
161-
}
162-
163-
subscript(index: Int) -> PSQLData {
164-
self.data[index]
165-
}
166-
167-
// TBD: Should this be optional?
168-
subscript(column columnName: String) -> PSQLData? {
169-
guard let index = self.lookupTable[columnName] else {
170-
return nil
171-
}
172-
173-
return self[index]
174-
}
175149

176-
func decode<T: PSQLDecodable>(column: String, as type: T.Type, file: String = #file, line: Int = #line) throws -> T {
177-
guard let index = self.lookupTable[column] else {
178-
preconditionFailure("A column '\(column)' does not exist.")
179-
}
180-
181-
return try self.decode(column: index, as: type, file: file, line: line)
182-
}
183-
184-
func decode<T: PSQLDecodable>(column index: Int, as type: T.Type, file: String = #file, line: Int = #line) throws -> T {
185-
let column = self.columns[index]
186-
187-
let decodingContext = PSQLDecodingContext(
188-
jsonDecoder: jsonDecoder,
189-
columnName: column.name,
190-
columnIndex: index,
191-
file: file,
192-
line: line)
193-
194-
return try self[index].decode(as: T.self, context: decodingContext)
195-
}
196-
}
197-
198-
func onRow(_ onRow: @escaping (Row) -> EventLoopFuture<Void>) -> EventLoopFuture<Void> {
150+
func onRow(_ onRow: @escaping (PSQLRow) -> EventLoopFuture<Void>) -> EventLoopFuture<Void> {
199151
let promise = self.eventLoop.makePromise(of: Void.self)
200152

201153
func consumeNext(promise: EventLoopPromise<Void>) {

Sources/PostgresNIO/New/PSQLTask.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,13 @@ final class ExtendedQueryContext {
2929
let logger: Logger
3030

3131
let jsonDecoder: PSQLJSONDecoder
32-
let promise: EventLoopPromise<PSQLRows>
32+
let promise: EventLoopPromise<PSQLRowStream>
3333

3434
init(query: String,
3535
bind: [PSQLEncodable],
3636
logger: Logger,
3737
jsonDecoder: PSQLJSONDecoder,
38-
promise: EventLoopPromise<PSQLRows>)
38+
promise: EventLoopPromise<PSQLRowStream>)
3939
{
4040
self.query = .unnamed(query)
4141
self.bind = bind
@@ -48,7 +48,7 @@ final class ExtendedQueryContext {
4848
bind: [PSQLEncodable],
4949
logger: Logger,
5050
jsonDecoder: PSQLJSONDecoder,
51-
promise: EventLoopPromise<PSQLRows>)
51+
promise: EventLoopPromise<PSQLRowStream>)
5252
{
5353
self.query = .preparedStatement(
5454
name: preparedStatement.name,

0 commit comments

Comments
 (0)