@@ -20,9 +20,6 @@ import Synchronization
20
20
21
21
@usableFromInline
22
22
final actor LambdaRuntimeClient : LambdaRuntimeClientProtocol {
23
- @usableFromInline
24
- var _hasStreamingCustomHeaders = false
25
-
26
23
@usableFromInline
27
24
nonisolated let unownedExecutor : UnownedSerialExecutor
28
25
@@ -47,13 +44,8 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol {
47
44
}
48
45
49
46
@usableFromInline
50
- func writeCustomHeader( _ buffer: NIOCore . ByteBuffer ) async throws {
51
- try await self . runtimeClient. writeCustomHeader ( buffer)
52
- }
53
-
54
- @usableFromInline
55
- func write( _ buffer: NIOCore . ByteBuffer ) async throws {
56
- try await self . runtimeClient. write ( buffer)
47
+ func write( _ buffer: NIOCore . ByteBuffer , hasCustomHeaders: Bool = false ) async throws {
48
+ try await self . runtimeClient. write ( buffer, hasCustomHeaders: hasCustomHeaders)
57
49
}
58
50
59
51
@usableFromInline
@@ -197,11 +189,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol {
197
189
}
198
190
}
199
191
200
- private func writeCustomHeader( _ buffer: NIOCore . ByteBuffer ) async throws {
201
- _hasStreamingCustomHeaders = true
202
- try await self . write ( buffer)
203
- }
204
- private func write( _ buffer: NIOCore . ByteBuffer ) async throws {
192
+ private func write( _ buffer: NIOCore . ByteBuffer , hasCustomHeaders: Bool = false ) async throws {
205
193
switch self . lambdaState {
206
194
case . idle, . sentResponse:
207
195
throw LambdaRuntimeError ( code: . writeAfterFinishHasBeenSent)
@@ -218,12 +206,15 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol {
218
206
guard case . sendingResponse( requestID) = self . lambdaState else {
219
207
fatalError ( " Invalid state: \( self . lambdaState) " )
220
208
}
221
- return try await handler. writeResponseBodyPart ( buffer, requestID: requestID)
209
+ return try await handler. writeResponseBodyPart (
210
+ buffer,
211
+ requestID: requestID,
212
+ hasCustomHeaders: hasCustomHeaders
213
+ )
222
214
}
223
215
}
224
216
225
217
private func writeAndFinish( _ buffer: NIOCore . ByteBuffer ? ) async throws {
226
- _hasStreamingCustomHeaders = false
227
218
switch self . lambdaState {
228
219
case . idle, . sentResponse:
229
220
throw LambdaRuntimeError ( code: . finishAfterFinishHasBeenSent)
@@ -444,16 +435,11 @@ extension LambdaRuntimeClient: LambdaChannelHandlerDelegate {
444
435
}
445
436
}
446
437
}
447
-
448
- func hasStreamingCustomHeaders( isolation: isolated ( any Actor ) ? = #isolation) async -> Bool {
449
- await self . _hasStreamingCustomHeaders
450
- }
451
438
}
452
439
453
440
private protocol LambdaChannelHandlerDelegate {
454
441
func connectionWillClose( channel: any Channel )
455
442
func connectionErrorHappened( _ error: any Error , channel: any Channel )
456
- func hasStreamingCustomHeaders( isolation: isolated ( any Actor ) ? ) async -> Bool
457
443
}
458
444
459
445
private final class LambdaChannelHandler < Delegate: LambdaChannelHandlerDelegate > {
@@ -596,18 +582,29 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
596
582
func writeResponseBodyPart(
597
583
isolation: isolated ( any Actor ) ? = #isolation,
598
584
_ byteBuffer: ByteBuffer ,
599
- requestID: String
585
+ requestID: String ,
586
+ hasCustomHeaders: Bool
600
587
) async throws {
601
588
switch self . state {
602
589
case . connected( _, . waitingForNextInvocation) :
603
590
fatalError ( " Invalid state: \( self . state) " )
604
591
605
592
case . connected( let context, . waitingForResponse) :
606
593
self . state = . connected( context, . sendingResponse)
607
- try await self . sendResponseBodyPart ( byteBuffer, sendHeadWithRequestID: requestID, context: context)
594
+ try await self . sendResponseBodyPart (
595
+ byteBuffer,
596
+ sendHeadWithRequestID: requestID,
597
+ context: context,
598
+ hasCustomHeaders: hasCustomHeaders
599
+ )
608
600
609
601
case . connected( let context, . sendingResponse) :
610
- try await self . sendResponseBodyPart ( byteBuffer, sendHeadWithRequestID: nil , context: context)
602
+ try await self . sendResponseBodyPart (
603
+ byteBuffer,
604
+ sendHeadWithRequestID: nil ,
605
+ context: context,
606
+ hasCustomHeaders: hasCustomHeaders
607
+ )
611
608
612
609
case . connected( _, . idle) ,
613
610
. connected( _, . sentResponse) :
@@ -658,15 +655,16 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
658
655
isolation: isolated ( any Actor ) ? = #isolation,
659
656
_ byteBuffer: ByteBuffer ,
660
657
sendHeadWithRequestID: String ? ,
661
- context: ChannelHandlerContext
658
+ context: ChannelHandlerContext ,
659
+ hasCustomHeaders: Bool
662
660
) async throws {
663
661
664
662
if let requestID = sendHeadWithRequestID {
665
663
// TODO: This feels super expensive. We should be able to make this cheaper. requestIDs are fixed length.
666
664
let url = Consts . invocationURLPrefix + " / " + requestID + Consts. postResponseURLSuffix
667
665
668
666
var headers = self . streamingHeaders
669
- if await self . delegate . hasStreamingCustomHeaders ( isolation : #isolation ) {
667
+ if hasCustomHeaders {
670
668
// this header is required by Function URL when the user sends custom status code or headers
671
669
headers. add ( name: " Content-Type " , value: " application/vnd.awslambda.http-integration-response " )
672
670
}
@@ -764,7 +762,6 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
764
762
}
765
763
766
764
private func sendResponseStreamingFailure( error: any Error , context: ChannelHandlerContext ) {
767
- // TODO: Use base64 here
768
765
let trailers : HTTPHeaders = [
769
766
" Lambda-Runtime-Function-Error-Type " : " Unhandled " ,
770
767
" Lambda-Runtime-Function-Error-Body " : " Requires base64 " ,
0 commit comments