@@ -20,9 +20,6 @@ import Synchronization
2020
2121@usableFromInline
2222final actor LambdaRuntimeClient : LambdaRuntimeClientProtocol {
23- @usableFromInline
24- var _hasStreamingCustomHeaders = false
25-
2623 @usableFromInline
2724 nonisolated let unownedExecutor : UnownedSerialExecutor
2825
@@ -47,13 +44,8 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol {
4744 }
4845
4946 @usableFromInline
50- func writeCustomHeader( _ buffer: NIOCore . ByteBuffer ) async throws {
51- try await self . runtimeClient. writeCustomHeader ( buffer)
52- }
53-
54- @usableFromInline
55- func write( _ buffer: NIOCore . ByteBuffer ) async throws {
56- try await self . runtimeClient. write ( buffer)
47+ func write( _ buffer: NIOCore . ByteBuffer , hasCustomHeaders: Bool = false ) async throws {
48+ try await self . runtimeClient. write ( buffer, hasCustomHeaders: hasCustomHeaders)
5749 }
5850
5951 @usableFromInline
@@ -197,11 +189,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol {
197189 }
198190 }
199191
200- private func writeCustomHeader( _ buffer: NIOCore . ByteBuffer ) async throws {
201- _hasStreamingCustomHeaders = true
202- try await self . write ( buffer)
203- }
204- private func write( _ buffer: NIOCore . ByteBuffer ) async throws {
192+ private func write( _ buffer: NIOCore . ByteBuffer , hasCustomHeaders: Bool = false ) async throws {
205193 switch self . lambdaState {
206194 case . idle, . sentResponse:
207195 throw LambdaRuntimeError ( code: . writeAfterFinishHasBeenSent)
@@ -218,12 +206,15 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol {
218206 guard case . sendingResponse( requestID) = self . lambdaState else {
219207 fatalError ( " Invalid state: \( self . lambdaState) " )
220208 }
221- return try await handler. writeResponseBodyPart ( buffer, requestID: requestID)
209+ return try await handler. writeResponseBodyPart (
210+ buffer,
211+ requestID: requestID,
212+ hasCustomHeaders: hasCustomHeaders
213+ )
222214 }
223215 }
224216
225217 private func writeAndFinish( _ buffer: NIOCore . ByteBuffer ? ) async throws {
226- _hasStreamingCustomHeaders = false
227218 switch self . lambdaState {
228219 case . idle, . sentResponse:
229220 throw LambdaRuntimeError ( code: . finishAfterFinishHasBeenSent)
@@ -444,16 +435,11 @@ extension LambdaRuntimeClient: LambdaChannelHandlerDelegate {
444435 }
445436 }
446437 }
447-
448- func hasStreamingCustomHeaders( isolation: isolated ( any Actor ) ? = #isolation) async -> Bool {
449- await self . _hasStreamingCustomHeaders
450- }
451438}
452439
453440private protocol LambdaChannelHandlerDelegate {
454441 func connectionWillClose( channel: any Channel )
455442 func connectionErrorHappened( _ error: any Error , channel: any Channel )
456- func hasStreamingCustomHeaders( isolation: isolated ( any Actor ) ? ) async -> Bool
457443}
458444
459445private final class LambdaChannelHandler < Delegate: LambdaChannelHandlerDelegate > {
@@ -596,18 +582,29 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
596582 func writeResponseBodyPart(
597583 isolation: isolated ( any Actor ) ? = #isolation,
598584 _ byteBuffer: ByteBuffer ,
599- requestID: String
585+ requestID: String ,
586+ hasCustomHeaders: Bool
600587 ) async throws {
601588 switch self . state {
602589 case . connected( _, . waitingForNextInvocation) :
603590 fatalError ( " Invalid state: \( self . state) " )
604591
605592 case . connected( let context, . waitingForResponse) :
606593 self . state = . connected( context, . sendingResponse)
607- try await self . sendResponseBodyPart ( byteBuffer, sendHeadWithRequestID: requestID, context: context)
594+ try await self . sendResponseBodyPart (
595+ byteBuffer,
596+ sendHeadWithRequestID: requestID,
597+ context: context,
598+ hasCustomHeaders: hasCustomHeaders
599+ )
608600
609601 case . connected( let context, . sendingResponse) :
610- try await self . sendResponseBodyPart ( byteBuffer, sendHeadWithRequestID: nil , context: context)
602+ try await self . sendResponseBodyPart (
603+ byteBuffer,
604+ sendHeadWithRequestID: nil ,
605+ context: context,
606+ hasCustomHeaders: hasCustomHeaders
607+ )
611608
612609 case . connected( _, . idle) ,
613610 . connected( _, . sentResponse) :
@@ -658,15 +655,16 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
658655 isolation: isolated ( any Actor ) ? = #isolation,
659656 _ byteBuffer: ByteBuffer ,
660657 sendHeadWithRequestID: String ? ,
661- context: ChannelHandlerContext
658+ context: ChannelHandlerContext ,
659+ hasCustomHeaders: Bool
662660 ) async throws {
663661
664662 if let requestID = sendHeadWithRequestID {
665663 // TODO: This feels super expensive. We should be able to make this cheaper. requestIDs are fixed length.
666664 let url = Consts . invocationURLPrefix + " / " + requestID + Consts. postResponseURLSuffix
667665
668666 var headers = self . streamingHeaders
669- if await self . delegate . hasStreamingCustomHeaders ( isolation : #isolation ) {
667+ if hasCustomHeaders {
670668 // this header is required by Function URL when the user sends custom status code or headers
671669 headers. add ( name: " Content-Type " , value: " application/vnd.awslambda.http-integration-response " )
672670 }
@@ -764,7 +762,6 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
764762 }
765763
766764 private func sendResponseStreamingFailure( error: any Error , context: ChannelHandlerContext ) {
767- // TODO: Use base64 here
768765 let trailers : HTTPHeaders = [
769766 " Lambda-Runtime-Function-Error-Type " : " Unhandled " ,
770767 " Lambda-Runtime-Function-Error-Body " : " Requires base64 " ,
0 commit comments