Skip to content

Commit 1017bca

Browse files
authored
Batch rows for consumption (#180)
### Motivation To allow faster processing of incoming `DataRow`s, we should batch them up in `channelRead` events and forward them as a batch for consumption in `PSQLRowStream`. This work is the foundation for AsyncSequence support in the future. ### Modifications - Extends `ExtendedQueryStateMachine` to use `RowStreamStateMachine` internally - Refactor `PSQLRowStream` to work with batches of rows.
1 parent 419c20b commit 1017bca

File tree

9 files changed

+590
-353
lines changed

9 files changed

+590
-353
lines changed

Sources/PostgresNIO/Connection/PostgresConnection+Database.swift

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,25 @@ extension PostgresConnection: PostgresDatabase {
1515

1616
switch command {
1717
case .query(let query, let binds, let onMetadata, let onRow):
18+
resultFuture = self.underlying.query(query, binds, logger: logger).flatMap { stream in
19+
let fields = stream.rowDescription.map { column in
20+
PostgresMessage.RowDescription.Field(
21+
name: column.name,
22+
tableOID: UInt32(column.tableOID),
23+
columnAttributeNumber: column.columnAttributeNumber,
24+
dataType: PostgresDataType(UInt32(column.dataType.rawValue)),
25+
dataTypeSize: column.dataTypeSize,
26+
dataTypeModifier: column.dataTypeModifier,
27+
formatCode: .init(psqlFormatCode: column.format)
28+
)
29+
}
30+
31+
let lookupTable = PostgresRow.LookupTable(rowDescription: .init(fields: fields), resultFormat: [.binary])
32+
return stream.iterateRowsWithoutBackpressureOption(lookupTable: lookupTable, onRow: onRow).map { _ in
33+
onMetadata(PostgresQueryMetadata(string: stream.commandTag)!)
34+
}
35+
}
36+
case .queryAll(let query, let binds, let onResult):
1837
resultFuture = self.underlying.query(query, binds, logger: logger).flatMap { rows in
1938
let fields = rows.rowDescription.map { column in
2039
PostgresMessage.RowDescription.Field(
@@ -29,10 +48,18 @@ extension PostgresConnection: PostgresDatabase {
2948
}
3049

3150
let lookupTable = PostgresRow.LookupTable(rowDescription: .init(fields: fields), resultFormat: [.binary])
32-
return rows.iterateRowsWithoutBackpressureOption(lookupTable: lookupTable, onRow: onRow).map { _ in
33-
onMetadata(PostgresQueryMetadata(string: rows.commandTag)!)
51+
return rows.all().map { allrows in
52+
let r = allrows.map { psqlRow -> PostgresRow in
53+
let columns = psqlRow.data.columns.map {
54+
PostgresMessage.DataRow.Column(value: $0)
55+
}
56+
return PostgresRow(dataRow: .init(columns: columns), lookupTable: lookupTable)
57+
}
58+
59+
onResult(.init(metadata: PostgresQueryMetadata(string: rows.commandTag)!, rows: r))
3460
}
3561
}
62+
3663
case .prepareQuery(let request):
3764
resultFuture = self.underlying.prepareStatement(request.query, with: request.name, logger: self.logger).map {
3865
request.prepared = PreparedQuery(underlying: $0, database: self)
@@ -62,6 +89,9 @@ internal enum PostgresCommands: PostgresRequest {
6289
binds: [PostgresData],
6390
onMetadata: (PostgresQueryMetadata) -> () = { _ in },
6491
onRow: (PostgresRow) throws -> ())
92+
case queryAll(query: String,
93+
binds: [PostgresData],
94+
onResult: (PostgresQueryResult) -> ())
6595
case prepareQuery(request: PrepareQueryRequest)
6696
case executePreparedStatement(query: PreparedQuery, binds: [PostgresData], onRow: (PostgresRow) throws -> ())
6797

@@ -82,18 +112,12 @@ extension PSQLRowStream {
82112

83113
func iterateRowsWithoutBackpressureOption(lookupTable: PostgresRow.LookupTable, onRow: @escaping (PostgresRow) throws -> ()) -> EventLoopFuture<Void> {
84114
self.onRow { psqlRow in
85-
let columns = psqlRow.data.columns.map { bytes in
86-
PostgresMessage.DataRow.Column(value: bytes)
115+
let columns = psqlRow.data.columns.map {
116+
PostgresMessage.DataRow.Column(value: $0)
87117
}
88118

89119
let row = PostgresRow(dataRow: .init(columns: columns), lookupTable: lookupTable)
90-
91-
do {
92-
try onRow(row)
93-
return self.eventLoop.makeSucceededFuture(Void())
94-
} catch {
95-
return self.eventLoop.makeFailedFuture(error)
96-
}
120+
try onRow(row)
97121
}
98122
}
99123

Sources/PostgresNIO/New/Connection State Machine/ConnectionStateMachine.swift

Lines changed: 126 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -92,12 +92,9 @@ struct ConnectionStateMachine {
9292

9393
// --- streaming actions
9494
// actions if query has requested next row but we are waiting for backend
95-
case forwardRow(PSQLBackendMessage.DataRow, to: EventLoopPromise<StateMachineStreamNextResult>)
96-
case forwardCommandComplete(CircularBuffer<PSQLBackendMessage.DataRow>, commandTag: String, to: EventLoopPromise<StateMachineStreamNextResult>)
97-
case forwardStreamError(PSQLError, to: EventLoopPromise<StateMachineStreamNextResult>, cleanupContext: CleanUpContext?)
98-
// actions if query has not asked for next row but are pushing the final bytes to it
99-
case forwardStreamErrorToCurrentQuery(PSQLError, read: Bool, cleanupContext: CleanUpContext?)
100-
case forwardStreamCompletedToCurrentQuery(CircularBuffer<PSQLBackendMessage.DataRow>, commandTag: String, read: Bool)
95+
case forwardRows(CircularBuffer<PSQLBackendMessage.DataRow>)
96+
case forwardStreamComplete(CircularBuffer<PSQLBackendMessage.DataRow>, commandTag: String)
97+
case forwardStreamError(PSQLError, read: Bool, cleanupContext: CleanUpContext?)
10198

10299
// Prepare statement actions
103100
case sendParseDescribeSync(name: String, query: String)
@@ -172,8 +169,10 @@ struct ConnectionStateMachine {
172169
switch self.state {
173170
case .initialized:
174171
preconditionFailure("How can a connection be closed, if it was never connected.")
172+
175173
case .closed:
176174
preconditionFailure("How can a connection be closed, if it is already closed.")
175+
177176
case .authenticated,
178177
.sslRequestSent,
179178
.sslNegotiated,
@@ -185,10 +184,12 @@ struct ConnectionStateMachine {
185184
.prepareStatement,
186185
.closeCommand:
187186
return self.errorHappened(.uncleanShutdown)
187+
188188
case .error, .closing:
189189
self.state = .closed
190190
self.quiescingState = .notQuiescing
191191
return .fireChannelInactive
192+
192193
case .modifying:
193194
preconditionFailure("Invalid state")
194195
}
@@ -199,36 +200,102 @@ struct ConnectionStateMachine {
199200
case .sslRequestSent:
200201
self.state = .sslNegotiated
201202
return .establishSSLConnection
202-
default:
203+
204+
case .initialized,
205+
.sslNegotiated,
206+
.sslHandlerAdded,
207+
.waitingToStartAuthentication,
208+
.authenticating,
209+
.authenticated,
210+
.readyForQuery,
211+
.extendedQuery,
212+
.prepareStatement,
213+
.closeCommand,
214+
.error,
215+
.closing,
216+
.closed:
203217
return self.closeConnectionAndCleanup(.unexpectedBackendMessage(.sslSupported))
218+
219+
case .modifying:
220+
preconditionFailure("Invalid state: \(self.state)")
204221
}
205222
}
206223

207224
mutating func sslUnsupportedReceived() -> ConnectionAction {
208225
switch self.state {
209226
case .sslRequestSent:
210227
return self.closeConnectionAndCleanup(.sslUnsupported)
211-
default:
228+
229+
case .initialized,
230+
.sslNegotiated,
231+
.sslHandlerAdded,
232+
.waitingToStartAuthentication,
233+
.authenticating,
234+
.authenticated,
235+
.readyForQuery,
236+
.extendedQuery,
237+
.prepareStatement,
238+
.closeCommand,
239+
.error,
240+
.closing,
241+
.closed:
212242
return self.closeConnectionAndCleanup(.unexpectedBackendMessage(.sslSupported))
243+
244+
case .modifying:
245+
preconditionFailure("Invalid state: \(self.state)")
213246
}
214247
}
215248

216249
mutating func sslHandlerAdded() -> ConnectionAction {
217-
guard case .sslNegotiated = self.state else {
218-
preconditionFailure("Can only add a ssl handler after negotiation")
250+
switch self.state {
251+
case .initialized,
252+
.sslRequestSent,
253+
.sslHandlerAdded,
254+
.waitingToStartAuthentication,
255+
.authenticating,
256+
.authenticated,
257+
.readyForQuery,
258+
.extendedQuery,
259+
.prepareStatement,
260+
.closeCommand,
261+
.error,
262+
.closing,
263+
.closed:
264+
preconditionFailure("Can only add a ssl handler after negotiation: \(self.state)")
265+
266+
case .sslNegotiated:
267+
self.state = .sslHandlerAdded
268+
return .wait
269+
270+
case .modifying:
271+
preconditionFailure("Invalid state: \(self.state)")
219272
}
220-
221-
self.state = .sslHandlerAdded
222-
return .wait
223273
}
224274

225275
mutating func sslEstablished() -> ConnectionAction {
226-
guard case .sslHandlerAdded = self.state else {
227-
preconditionFailure("Can only establish a ssl connection after adding a ssl handler")
276+
switch self.state {
277+
case .initialized,
278+
.sslRequestSent,
279+
.sslNegotiated,
280+
.waitingToStartAuthentication,
281+
.authenticating,
282+
.authenticated,
283+
.readyForQuery,
284+
.extendedQuery,
285+
.prepareStatement,
286+
.closeCommand,
287+
.error,
288+
.closing,
289+
.closed:
290+
preconditionFailure("Can only establish a ssl connection after adding a ssl handler: \(self.state)")
291+
292+
case .sslHandlerAdded:
293+
self.state = .waitingToStartAuthentication
294+
return .provideAuthenticationContext
295+
296+
case .modifying:
297+
preconditionFailure("Invalid state: \(self.state)")
228298
}
229-
230-
self.state = .waitingToStartAuthentication
231-
return .provideAuthenticationContext
232299
}
233300

234301
mutating func authenticationMessageReceived(_ message: PSQLBackendMessage.Authentication) -> ConnectionAction {
@@ -518,6 +585,35 @@ struct ConnectionStateMachine {
518585
}
519586
}
520587

588+
mutating func channelReadComplete() -> ConnectionAction {
589+
switch self.state {
590+
case .initialized,
591+
.sslRequestSent,
592+
.sslNegotiated,
593+
.sslHandlerAdded,
594+
.waitingToStartAuthentication,
595+
.authenticating,
596+
.authenticated,
597+
.readyForQuery,
598+
.prepareStatement,
599+
.closeCommand,
600+
.error,
601+
.closing,
602+
.closed:
603+
return .wait
604+
605+
case .extendedQuery(var extendedQuery, let connectionContext):
606+
return self.avoidingStateMachineCoW { machine in
607+
let action = extendedQuery.channelReadComplete()
608+
machine.state = .extendedQuery(extendedQuery, connectionContext)
609+
return machine.modify(with: action)
610+
}
611+
612+
case .modifying:
613+
preconditionFailure("Invalid state")
614+
}
615+
}
616+
521617
mutating func readEventCaught() -> ConnectionAction {
522618
switch self.state {
523619
case .initialized:
@@ -562,7 +658,6 @@ struct ConnectionStateMachine {
562658
preconditionFailure("How can we receive a read, if the connection is closed")
563659
case .modifying:
564660
preconditionFailure("Invalid state")
565-
566661
}
567662
}
568663

@@ -714,13 +809,13 @@ struct ConnectionStateMachine {
714809
preconditionFailure("Unimplemented")
715810
}
716811

717-
mutating func consumeNextQueryRow(promise: EventLoopPromise<StateMachineStreamNextResult>) -> ConnectionAction {
812+
mutating func requestQueryRows() -> ConnectionAction {
718813
guard case .extendedQuery(var queryState, let connectionContext) = self.state, !queryState.isComplete else {
719814
preconditionFailure("Tried to consume next row, without active query")
720815
}
721816

722817
return self.avoidingStateMachineCoW { machine -> ConnectionAction in
723-
let action = queryState.consumeNextRow(promise: promise)
818+
let action = queryState.requestQueryRows()
724819
machine.state = .extendedQuery(queryState, connectionContext)
725820
return machine.modify(with: action)
726821
}
@@ -783,18 +878,15 @@ struct ConnectionStateMachine {
783878
.sendBindExecuteSync,
784879
.succeedQuery,
785880
.succeedQueryNoRowsComming,
786-
.forwardRow,
787-
.forwardCommandComplete,
788-
.forwardStreamCompletedToCurrentQuery,
881+
.forwardRows,
882+
.forwardStreamComplete,
789883
.wait,
790884
.read:
791885
preconditionFailure("Expecting only failure actions if an error happened")
792886
case .failQuery(let queryContext, with: let error):
793887
return .failQuery(queryContext, with: error, cleanupContext: cleanupContext)
794-
case .forwardStreamError(let error, to: let promise):
795-
return .forwardStreamError(error, to: promise, cleanupContext: cleanupContext)
796-
case .forwardStreamErrorToCurrentQuery(let error, read: let read):
797-
return .forwardStreamErrorToCurrentQuery(error, read: read, cleanupContext: cleanupContext)
888+
case .forwardStreamError(let error, let read):
889+
return .forwardStreamError(error, read: read, cleanupContext: cleanupContext)
798890
}
799891
case .prepareStatement(var prepareStateMachine, _):
800892
let cleanupContext = self.setErrorAndCreateCleanupContext(error)
@@ -1025,18 +1117,13 @@ extension ConnectionStateMachine {
10251117
return .succeedQuery(requestContext, columns: columns)
10261118
case .succeedQueryNoRowsComming(let requestContext, let commandTag):
10271119
return .succeedQueryNoRowsComming(requestContext, commandTag: commandTag)
1028-
case .forwardRow(let data, to: let promise):
1029-
return .forwardRow(data, to: promise)
1030-
case .forwardCommandComplete(let buffer, let commandTag, to: let promise):
1031-
return .forwardCommandComplete(buffer, commandTag: commandTag, to: promise)
1032-
case .forwardStreamError(let error, to: let promise):
1033-
let cleanupContext = self.setErrorAndCreateCleanupContextIfNeeded(error)
1034-
return .forwardStreamError(error, to: promise, cleanupContext: cleanupContext)
1035-
case .forwardStreamErrorToCurrentQuery(let error, let read):
1120+
case .forwardRows(let buffer):
1121+
return .forwardRows(buffer)
1122+
case .forwardStreamComplete(let buffer, let commandTag):
1123+
return .forwardStreamComplete(buffer, commandTag: commandTag)
1124+
case .forwardStreamError(let error, let read):
10361125
let cleanupContext = self.setErrorAndCreateCleanupContextIfNeeded(error)
1037-
return .forwardStreamErrorToCurrentQuery(error, read: read, cleanupContext: cleanupContext)
1038-
case .forwardStreamCompletedToCurrentQuery(let buffer, let commandTag, let read):
1039-
return .forwardStreamCompletedToCurrentQuery(buffer, commandTag: commandTag, read: read)
1126+
return .forwardStreamError(error, read: read, cleanupContext: cleanupContext)
10401127
case .read:
10411128
return .read
10421129
case .wait:
@@ -1104,14 +1191,6 @@ extension ConnectionStateMachine {
11041191
}
11051192
}
11061193

1107-
enum StateMachineStreamNextResult {
1108-
/// the next row
1109-
case row(PSQLBackendMessage.DataRow)
1110-
1111-
/// the query has completed, all remaining rows and the command completion tag
1112-
case complete(CircularBuffer<PSQLBackendMessage.DataRow>, commandTag: String)
1113-
}
1114-
11151194
struct SendPrepareStatement {
11161195
let name: String
11171196
let query: String

0 commit comments

Comments
 (0)