42
42
import java .io .IOException ;
43
43
import java .io .InputStream ;
44
44
import java .io .PushbackInputStream ;
45
+ import java .util .concurrent .atomic .AtomicBoolean ;
46
+ import java .util .logging .Level ;
47
+ import java .util .logging .Logger ;
45
48
46
49
import javax .ws .rs .ProcessingException ;
47
50
57
60
* @author Marek Potociar (marek.potociar at oracle.com)
58
61
*/
59
62
public class EntityInputStream extends InputStream {
60
- private InputStream input ;
61
- private boolean closed ;
63
+ private static final Logger LOGGER = Logger .getLogger (EntityInputStream .class .getName ());
64
+
65
+ private volatile InputStream input ;
66
+ private final AtomicBoolean closed = new AtomicBoolean (false );
62
67
63
68
/**
64
69
* Create an entity input stream instance wrapping the original input stream.
@@ -144,21 +149,24 @@ public void reset() {
144
149
* {@inheritDoc}
145
150
* <p>
146
151
* The method is customized to not throw an {@link IOException} if the close operation fails. Instead,
147
- * a runtime {@link javax.ws.rs.ProcessingException} is thrown .
152
+ * a warning message is logged .
148
153
* </p>
149
- *
150
- * @throws javax.ws.rs.ProcessingException
151
- * in case the close operation on the underlying entity input stream failed.
152
154
*/
153
155
@ Override
154
156
public void close () throws ProcessingException {
155
- if (!closed && input != null ) {
157
+ final InputStream in = input ;
158
+ if (in == null ) {
159
+ return ;
160
+ }
161
+ if (closed .compareAndSet (false , true )) {
162
+ // Workaround for JRFCAF-1344: Underlying stream close() may be thread-unsafe
163
+ // and as such the close() may result in an IOException at the socket input stream level,
164
+ // if the close() gets called at once from multiple threads somehow.
156
165
try {
157
- input .close ();
166
+ in .close ();
158
167
} catch (IOException ex ) {
159
- throw new ProcessingException (LocalizationMessages .MESSAGE_CONTENT_INPUT_STREAM_CLOSE_FAILED (), ex );
160
- } finally {
161
- closed = true ;
168
+ // This means that the underlying socket stream got closed by other thread somehow
169
+ LOGGER .log (Level .FINE , LocalizationMessages .MESSAGE_CONTENT_INPUT_STREAM_CLOSE_FAILED (), ex );
162
170
}
163
171
}
164
172
}
@@ -174,36 +182,37 @@ public void close() throws ProcessingException {
174
182
public boolean isEmpty () {
175
183
ensureNotClosed ();
176
184
177
- if (input == null ) {
185
+ final InputStream in = input ;
186
+ if (in == null ) {
178
187
return true ;
179
188
}
180
189
181
190
try {
182
191
// Try #markSupported first - #available on WLS waits until socked timeout is reached when chunked encoding is used.
183
- if (input .markSupported ()) {
184
- input .mark (1 );
185
- int i = input .read ();
186
- input .reset ();
192
+ if (in .markSupported ()) {
193
+ in .mark (1 );
194
+ int i = in .read ();
195
+ in .reset ();
187
196
return i == -1 ;
188
197
} else {
189
198
try {
190
- if (input .available () > 0 ) {
199
+ if (in .available () > 0 ) {
191
200
return false ;
192
201
}
193
202
} catch (IOException ioe ) {
194
203
// NOOP. Try other approaches as this can fail on WLS.
195
204
}
196
205
197
- int b = input .read ();
206
+ int b = in .read ();
198
207
if (b == -1 ) {
199
208
return true ;
200
209
}
201
210
202
211
PushbackInputStream pbis ;
203
- if (input instanceof PushbackInputStream ) {
204
- pbis = (PushbackInputStream ) input ;
212
+ if (in instanceof PushbackInputStream ) {
213
+ pbis = (PushbackInputStream ) in ;
205
214
} else {
206
- pbis = new PushbackInputStream (input , 1 );
215
+ pbis = new PushbackInputStream (in , 1 );
207
216
input = pbis ;
208
217
}
209
218
pbis .unread (b );
@@ -221,7 +230,7 @@ public boolean isEmpty() {
221
230
* @throws IllegalStateException in case the entity input stream has been closed.
222
231
*/
223
232
public void ensureNotClosed () throws IllegalStateException {
224
- if (closed ) {
233
+ if (closed . get () ) {
225
234
throw new IllegalStateException (LocalizationMessages .ERROR_ENTITY_STREAM_CLOSED ());
226
235
}
227
236
}
@@ -232,7 +241,7 @@ public void ensureNotClosed() throws IllegalStateException {
232
241
* @return {@code true} if the stream has been closed, {@code false} otherwise.
233
242
*/
234
243
public boolean isClosed () {
235
- return closed ;
244
+ return closed . get () ;
236
245
}
237
246
238
247
/**
0 commit comments