1
1
/*
2
- * Copyright 2002-2020 the original author or authors.
2
+ * Copyright 2002-2023 the original author or authors.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
@@ -106,14 +106,12 @@ public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
106
106
Function <? super ClientHttpRequest , Mono <Void >> requestCallback ) {
107
107
108
108
HttpClientContext context = this .contextProvider .apply (method , uri );
109
-
110
109
if (context .getCookieStore () == null ) {
111
110
context .setCookieStore (new BasicCookieStore ());
112
111
}
113
112
114
- HttpComponentsClientHttpRequest request = new HttpComponentsClientHttpRequest (method , uri ,
115
- context , this .dataBufferFactory );
116
-
113
+ HttpComponentsClientHttpRequest request =
114
+ new HttpComponentsClientHttpRequest (method , uri , context , this .dataBufferFactory );
117
115
return requestCallback .apply (request ).then (Mono .defer (() -> execute (request , context )));
118
116
}
119
117
@@ -123,7 +121,6 @@ private Mono<ClientHttpResponse> execute(HttpComponentsClientHttpRequest request
123
121
return Mono .create (sink -> {
124
122
ReactiveResponseConsumer reactiveResponseConsumer =
125
123
new ReactiveResponseConsumer (new MonoFutureCallbackAdapter (sink , this .dataBufferFactory , context ));
126
-
127
124
this .client .execute (requestProducer , reactiveResponseConsumer , context , null );
128
125
});
129
126
}
@@ -133,6 +130,7 @@ public void close() throws IOException {
133
130
this .client .close ();
134
131
}
135
132
133
+
136
134
private static class MonoFutureCallbackAdapter
137
135
implements FutureCallback <Message <HttpResponse , Publisher <ByteBuffer >>> {
138
136
@@ -144,26 +142,20 @@ private static class MonoFutureCallbackAdapter
144
142
145
143
public MonoFutureCallbackAdapter (MonoSink <ClientHttpResponse > sink ,
146
144
DataBufferFactory dataBufferFactory , HttpClientContext context ) {
145
+
147
146
this .sink = sink ;
148
147
this .dataBufferFactory = dataBufferFactory ;
149
148
this .context = context ;
150
149
}
151
150
152
151
@ Override
153
152
public void completed (Message <HttpResponse , Publisher <ByteBuffer >> result ) {
154
- HttpComponentsClientHttpResponse response =
155
- new HttpComponentsClientHttpResponse (this .dataBufferFactory , result , this .context );
156
- this .sink .success (response );
153
+ this .sink .success (new HttpComponentsClientHttpResponse (this .dataBufferFactory , result , this .context ));
157
154
}
158
155
159
156
@ Override
160
157
public void failed (Exception ex ) {
161
- Throwable t = ex ;
162
- if (t instanceof HttpStreamResetException ) {
163
- HttpStreamResetException httpStreamResetException = (HttpStreamResetException ) ex ;
164
- t = httpStreamResetException .getCause ();
165
- }
166
- this .sink .error (t );
158
+ this .sink .error (ex instanceof HttpStreamResetException && ex .getCause () != null ? ex .getCause () : ex );
167
159
}
168
160
169
161
@ Override
0 commit comments