@@ -641,7 +641,7 @@ internal class TaskHandler<Delegate: HTTPClientResponseDelegate>: RemovableChann
641
641
case head
642
642
case redirected( HTTPResponseHead , URL )
643
643
case body
644
- case end
644
+ case endOrError
645
645
}
646
646
647
647
let task : HTTPClient . Task < Delegate . Response >
@@ -782,7 +782,7 @@ extension TaskHandler: ChannelDuplexHandler {
782
782
} catch {
783
783
promise? . fail ( error)
784
784
self . failTaskAndNotifyDelegate ( error: error, self . delegate. didReceiveError)
785
- self . state = . end
785
+ self . state = . endOrError
786
786
return
787
787
}
788
788
@@ -796,20 +796,24 @@ extension TaskHandler: ChannelDuplexHandler {
796
796
assert ( head. version == HTTPVersion ( major: 1 , minor: 1 ) ,
797
797
" Sending a request in HTTP version \( head. version) which is unsupported by the above `if` " )
798
798
799
- self . expectedBodyLength = head. headers [ canonicalForm: " content-length " ] . first. flatMap { Int ( $0) }
799
+
800
+ let contentLengths = head. headers [ canonicalForm: " content-length " ]
801
+ assert ( contentLengths. count <= 1 )
802
+
803
+ self . expectedBodyLength = contentLengths. first. flatMap { Int ( $0) }
800
804
801
805
context. write ( wrapOutboundOut ( . head( head) ) ) . map {
802
806
self . callOutToDelegateFireAndForget ( value: head, self . delegate. didSendRequestHead)
803
807
} . flatMap {
804
808
self . writeBody ( request: request, context: context)
805
809
} . flatMap {
810
+ context. eventLoop. assertInEventLoop ( )
806
811
if let expectedBodyLength = self . expectedBodyLength, expectedBodyLength != self . actualBodyLength {
807
- self . state = . end
812
+ self . state = . endOrError
808
813
let error = HTTPClientError . bodyLengthMismatch
809
814
self . failTaskAndNotifyDelegate ( error: error, self . delegate. didReceiveError)
810
815
return context. eventLoop. makeFailedFuture ( error)
811
816
}
812
- context. eventLoop. assertInEventLoop ( )
813
817
return context. writeAndFlush ( self . wrapOutboundOut ( . end( nil ) ) )
814
818
} . map {
815
819
context. eventLoop. assertInEventLoop ( )
@@ -818,10 +822,10 @@ extension TaskHandler: ChannelDuplexHandler {
818
822
} . flatMapErrorThrowing { error in
819
823
context. eventLoop. assertInEventLoop ( )
820
824
switch self . state {
821
- case . end :
825
+ case . endOrError :
822
826
break
823
827
default :
824
- self . state = . end
828
+ self . state = . endOrError
825
829
self . failTaskAndNotifyDelegate ( error: error, self . delegate. didReceiveError)
826
830
}
827
831
throw error
@@ -839,14 +843,15 @@ extension TaskHandler: ChannelDuplexHandler {
839
843
// All writes have to be switched to the channel EL if channel and task ELs differ
840
844
if context. eventLoop. inEventLoop {
841
845
context. writeAndFlush ( self . wrapOutboundOut ( . body( part) ) , promise: promise)
846
+ self . actualBodyLength += part. readableBytes
842
847
} else {
843
848
context. eventLoop. execute {
844
849
context. writeAndFlush ( self . wrapOutboundOut ( . body( part) ) , promise: promise)
850
+ self . actualBodyLength += part. readableBytes
845
851
}
846
852
}
847
853
848
854
return promise. futureResult. map {
849
- self . actualBodyLength += part. readableBytes
850
855
self . callOutToDelegateFireAndForget ( value: part, self . delegate. didSendRequestPart)
851
856
}
852
857
} )
@@ -904,12 +909,12 @@ extension TaskHandler: ChannelDuplexHandler {
904
909
case . end:
905
910
switch self . state {
906
911
case . redirected( let head, let redirectURL) :
907
- self . state = . end
912
+ self . state = . endOrError
908
913
self . task. releaseAssociatedConnection ( delegateType: Delegate . self, closing: self . closing) . whenSuccess {
909
914
self . redirectHandler? . redirect ( status: head. status, to: redirectURL, promise: self . task. promise)
910
915
}
911
916
default :
912
- self . state = . end
917
+ self . state = . endOrError
913
918
self . callOutToDelegate ( promise: self . task. promise, self . delegate. didFinishRequest)
914
919
}
915
920
}
@@ -924,14 +929,14 @@ extension TaskHandler: ChannelDuplexHandler {
924
929
context. read ( )
925
930
}
926
931
case . failure( let error) :
927
- self . state = . end
932
+ self . state = . endOrError
928
933
self . failTaskAndNotifyDelegate ( error: error, self . delegate. didReceiveError)
929
934
}
930
935
}
931
936
932
937
func userInboundEventTriggered( context: ChannelHandlerContext , event: Any ) {
933
938
if ( event as? IdleStateHandler . IdleStateEvent) == . read {
934
- self . state = . end
939
+ self . state = . endOrError
935
940
let error = HTTPClientError . readTimeout
936
941
self . failTaskAndNotifyDelegate ( error: error, self . delegate. didReceiveError)
937
942
} else {
@@ -941,7 +946,7 @@ extension TaskHandler: ChannelDuplexHandler {
941
946
942
947
func triggerUserOutboundEvent( context: ChannelHandlerContext , event: Any , promise: EventLoopPromise < Void > ? ) {
943
948
if ( event as? TaskCancelEvent ) != nil {
944
- self . state = . end
949
+ self . state = . endOrError
945
950
let error = HTTPClientError . cancelled
946
951
self . failTaskAndNotifyDelegate ( error: error, self . delegate. didReceiveError)
947
952
promise? . succeed ( ( ) )
@@ -952,10 +957,10 @@ extension TaskHandler: ChannelDuplexHandler {
952
957
953
958
func channelInactive( context: ChannelHandlerContext ) {
954
959
switch self . state {
955
- case . end :
960
+ case . endOrError :
956
961
break
957
962
case . body, . head, . idle, . redirected, . sent:
958
- self . state = . end
963
+ self . state = . endOrError
959
964
let error = HTTPClientError . remoteConnectionClosed
960
965
self . failTaskAndNotifyDelegate ( error: error, self . delegate. didReceiveError)
961
966
}
@@ -966,7 +971,7 @@ extension TaskHandler: ChannelDuplexHandler {
966
971
switch error {
967
972
case NIOSSLError . uncleanShutdown:
968
973
switch self . state {
969
- case . end :
974
+ case . endOrError :
970
975
/// Some HTTP Servers can 'forget' to respond with CloseNotify when client is closing connection,
971
976
/// this could lead to incomplete SSL shutdown. But since request is already processed, we can ignore this error.
972
977
break
@@ -975,11 +980,11 @@ extension TaskHandler: ChannelDuplexHandler {
975
980
/// We can also ignore this error like `.end`.
976
981
break
977
982
default :
978
- self . state = . end
983
+ self . state = . endOrError
979
984
self . failTaskAndNotifyDelegate ( error: error, self . delegate. didReceiveError)
980
985
}
981
986
default :
982
- self . state = . end
987
+ self . state = . endOrError
983
988
self . failTaskAndNotifyDelegate ( error: error, self . delegate. didReceiveError)
984
989
}
985
990
}
0 commit comments