Skip to content

Commit 927f930

Browse files
authored
[Multipart] Add a frames -> raw parts parsing sequence (#74)
1 parent 94c1b30 commit 927f930

12 files changed

+1339
-849
lines changed

Sources/OpenAPIRuntime/Multipart/MultipartBytesToFramesSequence.swift

Lines changed: 336 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
//===----------------------------------------------------------------------===//
1414

1515
import HTTPTypes
16+
import Foundation
1617

1718
/// A sequence that parses multipart frames from bytes.
1819
struct MultipartBytesToFramesSequence<Upstream: AsyncSequence & Sendable>: Sendable
@@ -65,3 +66,338 @@ extension MultipartBytesToFramesSequence: AsyncSequence {
6566
mutating func next() async throws -> MultipartFrame? { try await parser.next { try await upstream.next() } }
6667
}
6768
}
69+
70+
/// A parser of multipart frames from bytes.
71+
struct MultipartParser {
72+
73+
/// The underlying state machine.
74+
private var stateMachine: StateMachine
75+
76+
/// Creates a new parser.
77+
/// - Parameter boundary: The boundary that separates parts.
78+
init(boundary: String) { self.stateMachine = .init(boundary: boundary) }
79+
80+
/// Parses the next frame.
81+
/// - Parameter fetchChunk: A closure that is called when the parser
82+
/// needs more bytes to parse the next frame.
83+
/// - Returns: A parsed frame, or nil at the end of the message.
84+
/// - Throws: When a parsing error is encountered.
85+
mutating func next(_ fetchChunk: () async throws -> ArraySlice<UInt8>?) async throws -> MultipartFrame? {
86+
while true {
87+
switch stateMachine.readNextPart() {
88+
case .none: continue
89+
case .emitError(let actionError): throw ParserError(error: actionError)
90+
case .returnNil: return nil
91+
case .emitHeaderFields(let httpFields): return .headerFields(httpFields)
92+
case .emitBodyChunk(let bodyChunk): return .bodyChunk(bodyChunk)
93+
case .needsMore:
94+
let chunk = try await fetchChunk()
95+
switch stateMachine.receivedChunk(chunk) {
96+
case .none: continue
97+
case .returnNil: return nil
98+
case .emitError(let actionError): throw ParserError(error: actionError)
99+
}
100+
}
101+
}
102+
}
103+
}
104+
105+
extension MultipartParser {
106+
107+
/// An error thrown by the parser.
108+
struct ParserError: Swift.Error, CustomStringConvertible, LocalizedError {
109+
110+
/// The underlying error emitted by the state machine.
111+
let error: MultipartParser.StateMachine.ActionError
112+
113+
var description: String {
114+
switch error {
115+
case .invalidInitialBoundary: return "Invalid initial boundary."
116+
case .invalidCRLFAtStartOfHeaderField: return "Invalid CRLF at the start of a header field."
117+
case .missingColonAfterHeaderName: return "Missing colon after header field name."
118+
case .invalidCharactersInHeaderFieldName: return "Invalid characters in a header field name."
119+
case .incompleteMultipartMessage: return "Incomplete multipart message."
120+
case .receivedChunkWhenFinished: return "Received a chunk after being finished."
121+
}
122+
}
123+
124+
var errorDescription: String? { description }
125+
}
126+
}
127+
128+
extension MultipartParser {
129+
130+
/// A state machine representing the byte to multipart frame parser.
131+
struct StateMachine {
132+
133+
/// The possible states of the state machine.
134+
enum State: Hashable {
135+
136+
/// Has not yet fully parsed the initial boundary.
137+
case parsingInitialBoundary([UInt8])
138+
139+
/// A substate when parsing a part.
140+
enum PartState: Hashable {
141+
142+
/// Accumulating part headers.
143+
case parsingHeaderFields(HTTPFields)
144+
145+
/// Forwarding body chunks.
146+
case parsingBody
147+
}
148+
149+
/// Is parsing a part.
150+
case parsingPart([UInt8], PartState)
151+
152+
/// Finished, the terminal state.
153+
case finished
154+
155+
/// Helper state to avoid copy-on-write copies.
156+
case mutating
157+
}
158+
159+
/// The current state of the state machine.
160+
private(set) var state: State
161+
162+
/// The bytes of the boundary.
163+
private let boundary: ArraySlice<UInt8>
164+
165+
/// The bytes of the boundary with the double dash prepended.
166+
private let dashDashBoundary: ArraySlice<UInt8>
167+
168+
/// The bytes of the boundary prepended by CRLF + double dash.
169+
private let crlfDashDashBoundary: ArraySlice<UInt8>
170+
171+
/// Creates a new state machine.
172+
/// - Parameter boundary: The boundary used to separate parts.
173+
init(boundary: String) {
174+
self.state = .parsingInitialBoundary([])
175+
self.boundary = ArraySlice(boundary.utf8)
176+
self.dashDashBoundary = ASCII.dashes + self.boundary
177+
self.crlfDashDashBoundary = ASCII.crlf + dashDashBoundary
178+
}
179+
180+
/// An error returned by the state machine.
181+
enum ActionError: Hashable {
182+
183+
/// The initial boundary is malformed.
184+
case invalidInitialBoundary
185+
186+
/// The expected CRLF at the start of a header is missing.
187+
case invalidCRLFAtStartOfHeaderField
188+
189+
/// A header field name contains an invalid character.
190+
case invalidCharactersInHeaderFieldName
191+
192+
/// The header field name is not followed by a colon.
193+
case missingColonAfterHeaderName
194+
195+
/// More bytes were received after completion.
196+
case receivedChunkWhenFinished
197+
198+
/// Ran out of bytes without the message being complete.
199+
case incompleteMultipartMessage
200+
}
201+
202+
/// An action returned by the `readNextPart` method.
203+
enum ReadNextPartAction: Hashable {
204+
205+
/// No action, call `readNextPart` again.
206+
case none
207+
208+
/// Throw the provided error.
209+
case emitError(ActionError)
210+
211+
/// Return nil to the caller, no more frames.
212+
case returnNil
213+
214+
/// Emit a frame with the provided header fields.
215+
case emitHeaderFields(HTTPFields)
216+
217+
/// Emit a frame with the provided part body chunk.
218+
case emitBodyChunk(ArraySlice<UInt8>)
219+
220+
/// Needs more bytes to parse the next frame.
221+
case needsMore
222+
}
223+
224+
/// Read the next frame from the accumulated bytes.
225+
/// - Returns: An action to perform.
226+
mutating func readNextPart() -> ReadNextPartAction {
227+
switch state {
228+
case .mutating: preconditionFailure("Invalid state: \(state)")
229+
case .finished: return .returnNil
230+
case .parsingInitialBoundary(var buffer):
231+
state = .mutating
232+
// These first bytes must be the boundary already, otherwise this is a malformed multipart body.
233+
switch buffer.firstIndexAfterPrefix(dashDashBoundary) {
234+
case .index(let index):
235+
buffer.removeSubrange(buffer.startIndex..<index)
236+
state = .parsingPart(buffer, .parsingHeaderFields(.init()))
237+
return .none
238+
case .reachedEndOfSelf:
239+
state = .parsingInitialBoundary(buffer)
240+
return .needsMore
241+
case .unexpectedPrefix:
242+
state = .finished
243+
return .emitError(.invalidInitialBoundary)
244+
}
245+
case .parsingPart(var buffer, let partState):
246+
state = .mutating
247+
switch partState {
248+
case .parsingHeaderFields(var headerFields):
249+
// Either we find `--` in which case there are no more parts and we're finished, or something else
250+
// and we start parsing headers.
251+
switch buffer.firstIndexAfterPrefix(ASCII.dashes) {
252+
case .index(let index):
253+
state = .finished
254+
buffer.removeSubrange(..<index)
255+
return .returnNil
256+
case .reachedEndOfSelf:
257+
state = .parsingPart(buffer, .parsingHeaderFields(headerFields))
258+
return .needsMore
259+
case .unexpectedPrefix: break
260+
}
261+
// Consume CRLF
262+
let indexAfterFirstCRLF: Array<UInt8>.Index
263+
switch buffer.firstIndexAfterPrefix(ASCII.crlf) {
264+
case .index(let index): indexAfterFirstCRLF = index
265+
case .reachedEndOfSelf:
266+
state = .parsingPart(buffer, .parsingHeaderFields(headerFields))
267+
return .needsMore
268+
case .unexpectedPrefix:
269+
state = .finished
270+
return .emitError(.invalidCRLFAtStartOfHeaderField)
271+
}
272+
// If CRLF is here, this is the end of header fields section.
273+
switch buffer[indexAfterFirstCRLF...].firstIndexAfterPrefix(ASCII.crlf) {
274+
case .index(let index):
275+
buffer.removeSubrange(buffer.startIndex..<index)
276+
state = .parsingPart(buffer, .parsingBody)
277+
return .emitHeaderFields(headerFields)
278+
case .reachedEndOfSelf:
279+
state = .parsingPart(buffer, .parsingHeaderFields(headerFields))
280+
return .needsMore
281+
case .unexpectedPrefix: break
282+
}
283+
let startHeaderNameIndex = indexAfterFirstCRLF
284+
guard
285+
let endHeaderNameIndex = buffer[startHeaderNameIndex...]
286+
.firstIndex(where: { !ASCII.isValidHeaderFieldNameByte($0) })
287+
else {
288+
// No index matched yet, we need more data.
289+
state = .parsingPart(buffer, .parsingHeaderFields(headerFields))
290+
return .needsMore
291+
}
292+
let startHeaderValueWithWhitespaceIndex: Array<UInt8>.Index
293+
// Check that what follows is a colon, otherwise this is a malformed header field line.
294+
// Source: RFC 7230, section 3.2.4.
295+
switch buffer[endHeaderNameIndex...].firstIndexAfterPrefix([ASCII.colon]) {
296+
case .index(let index): startHeaderValueWithWhitespaceIndex = index
297+
case .reachedEndOfSelf:
298+
state = .parsingPart(buffer, .parsingHeaderFields(headerFields))
299+
return .needsMore
300+
case .unexpectedPrefix:
301+
state = .finished
302+
return .emitError(.missingColonAfterHeaderName)
303+
}
304+
guard
305+
let startHeaderValueIndex = buffer[startHeaderValueWithWhitespaceIndex...]
306+
.firstIndex(where: { !ASCII.optionalWhitespace.contains($0) })
307+
else {
308+
state = .parsingPart(buffer, .parsingHeaderFields(headerFields))
309+
return .needsMore
310+
}
311+
312+
// Find the CRLF first, then remove any trailing whitespace.
313+
guard
314+
let endHeaderValueWithWhitespaceRange = buffer[startHeaderValueIndex...]
315+
.firstRange(of: ASCII.crlf)
316+
else {
317+
state = .parsingPart(buffer, .parsingHeaderFields(headerFields))
318+
return .needsMore
319+
}
320+
let headerFieldValueBytes = buffer[
321+
startHeaderValueIndex..<endHeaderValueWithWhitespaceRange.lowerBound
322+
]
323+
.reversed().drop(while: { ASCII.optionalWhitespace.contains($0) }).reversed()
324+
guard
325+
let headerFieldName = HTTPField.Name(
326+
String(decoding: buffer[startHeaderNameIndex..<endHeaderNameIndex], as: UTF8.self)
327+
)
328+
else {
329+
state = .finished
330+
return .emitError(.invalidCharactersInHeaderFieldName)
331+
}
332+
let headerFieldValue = String(decoding: headerFieldValueBytes, as: UTF8.self)
333+
let headerField = HTTPField(name: headerFieldName, value: headerFieldValue)
334+
headerFields.append(headerField)
335+
buffer.removeSubrange(buffer.startIndex..<endHeaderValueWithWhitespaceRange.lowerBound)
336+
337+
state = .parsingPart(buffer, .parsingHeaderFields(headerFields))
338+
return .none
339+
case .parsingBody:
340+
switch buffer.longestMatch(crlfDashDashBoundary) {
341+
case .noMatch:
342+
let bodyChunk = buffer[...]
343+
buffer.removeAll(keepingCapacity: true)
344+
state = .parsingPart(buffer, .parsingBody)
345+
if bodyChunk.isEmpty { return .needsMore } else { return .emitBodyChunk(bodyChunk) }
346+
case .prefixMatch(fromIndex: let fromIndex):
347+
let bodyChunk = buffer[..<fromIndex]
348+
buffer.removeSubrange(..<fromIndex)
349+
state = .parsingPart(buffer, .parsingBody)
350+
if bodyChunk.isEmpty { return .needsMore } else { return .emitBodyChunk(bodyChunk) }
351+
case .fullMatch(let range):
352+
let bodyChunkBeforeBoundary = buffer[..<range.lowerBound]
353+
buffer.removeSubrange(..<range.upperBound)
354+
state = .parsingPart(buffer, .parsingHeaderFields(.init()))
355+
if bodyChunkBeforeBoundary.isEmpty {
356+
return .none
357+
} else {
358+
return .emitBodyChunk(bodyChunkBeforeBoundary)
359+
}
360+
}
361+
}
362+
}
363+
}
364+
365+
/// An action returned by the `receivedChunk` method.
366+
enum ReceivedChunkAction: Hashable {
367+
368+
/// No action, call `readNextPart` again.
369+
case none
370+
371+
/// Return nil to the caller, no more frames.
372+
case returnNil
373+
374+
/// Throw the provided error.
375+
case emitError(ActionError)
376+
}
377+
378+
/// Ingest the provided byte chunk.
379+
/// - Parameter chunk: A new byte chunk. If `nil`, then the source of
380+
/// bytes is finished and no more chunks will come.
381+
/// - Returns: An action to perform.
382+
mutating func receivedChunk(_ chunk: ArraySlice<UInt8>?) -> ReceivedChunkAction {
383+
switch state {
384+
case .parsingInitialBoundary(var buffer):
385+
guard let chunk else { return .emitError(.incompleteMultipartMessage) }
386+
state = .mutating
387+
buffer.append(contentsOf: chunk)
388+
state = .parsingInitialBoundary(buffer)
389+
return .none
390+
case .parsingPart(var buffer, let part):
391+
guard let chunk else { return .emitError(.incompleteMultipartMessage) }
392+
state = .mutating
393+
buffer.append(contentsOf: chunk)
394+
state = .parsingPart(buffer, part)
395+
return .none
396+
case .finished:
397+
guard chunk == nil else { return .emitError(.receivedChunkWhenFinished) }
398+
return .returnNil
399+
case .mutating: preconditionFailure("Invalid state: \(state)")
400+
}
401+
}
402+
}
403+
}

0 commit comments

Comments
 (0)