Skip to content
This repository was archived by the owner on May 28, 2018. It is now read-only.

Commit 16704c8

Browse files
author
Michal Gajdos
committed
JERSEY-2558: ChunkedOutput.close() broken since Jersey 2.8
- Fix in ServerRuntime and AsyncContextDelegateProviderImpl - Problem seems to be only in servlet container when ChunkedOutput is closed before response from resource method is processed by Jersey - request is indefinitely suspended after it's completed (and never completed after that). - Added test for 2558 and also for combination of ChunkedOutput with AsyncResponse Change-Id: I705f139118d6a1a236a28aac0f78e02847d9b24a Signed-off-by: Michal Gajdos <[email protected]>
1 parent bf7345e commit 16704c8

File tree

11 files changed

+312
-177
lines changed

11 files changed

+312
-177
lines changed

containers/jersey-servlet-core/src/main/java/org/glassfish/jersey/servlet/internal/ResponseWriter.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/*
22
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
33
*
4-
* Copyright (c) 2012-2014 Oracle and/or its affiliates. All rights reserved.
4+
* Copyright (c) 2012-2015 Oracle and/or its affiliates. All rights reserved.
55
*
66
* The contents of this file are subject to the terms of either the GNU
77
* General Public License Version 2 only ("GPL") or the Common Development
@@ -119,7 +119,7 @@ public boolean suspend(final long timeOut, final TimeUnit timeUnit, final Timeou
119119
try {
120120
// Suspend the servlet.
121121
asyncExt.suspend();
122-
} catch (IllegalStateException ex) {
122+
} catch (final IllegalStateException ex) {
123123
LOGGER.log(Level.WARNING, LocalizationMessages.SERVLET_REQUEST_SUSPEND_FAILED(), ex);
124124
return false;
125125
}
@@ -128,12 +128,12 @@ public boolean suspend(final long timeOut, final TimeUnit timeUnit, final Timeou
128128
}
129129

130130
@Override
131-
public void setSuspendTimeout(long timeOut, TimeUnit timeUnit) throws IllegalStateException {
131+
public void setSuspendTimeout(final long timeOut, final TimeUnit timeUnit) throws IllegalStateException {
132132
requestTimeoutHandler.setSuspendTimeout(timeOut, timeUnit);
133133
}
134134

135135
@Override
136-
public OutputStream writeResponseStatusAndHeaders(long contentLength, ContainerResponse responseContext)
136+
public OutputStream writeResponseStatusAndHeaders(final long contentLength, final ContainerResponse responseContext)
137137
throws ContainerException {
138138
this.responseContext.set(responseContext);
139139

@@ -145,8 +145,8 @@ public OutputStream writeResponseStatusAndHeaders(long contentLength, ContainerR
145145
// the invocation of sendError as on some Servlet implementations
146146
// modification of the response headers will have no effect
147147
// after the invocation of sendError.
148-
MultivaluedMap<String, String> headers = getResponseContext().getStringHeaders();
149-
for (Map.Entry<String, List<String>> e : headers.entrySet()) {
148+
final MultivaluedMap<String, String> headers = getResponseContext().getStringHeaders();
149+
for (final Map.Entry<String, List<String>> e : headers.entrySet()) {
150150
final Iterator<String> it = e.getValue().iterator();
151151
if (!it.hasNext()) {
152152
continue;
@@ -178,7 +178,7 @@ public OutputStream writeResponseStatusAndHeaders(long contentLength, ContainerR
178178
// delegating output stream prevents closing the underlying servlet output stream,
179179
// so that any Servlet filters in the chain can still write to the response after us.
180180
return new NonCloseableOutputStreamWrapper(outputStream);
181-
} catch (IOException e) {
181+
} catch (final IOException e) {
182182
throw new ContainerException(e);
183183
}
184184
}
@@ -198,7 +198,7 @@ public void commit() {
198198
} else {
199199
response.sendError(status, reason);
200200
}
201-
} catch (IOException ex) {
201+
} catch (final IOException ex) {
202202
throw new ContainerException(
203203
LocalizationMessages.EXCEPTION_SENDING_ERROR_RESPONSE(status, reason != null ? reason : "--"),
204204
ex);
@@ -212,7 +212,7 @@ public void commit() {
212212
}
213213

214214
@Override
215-
public void failure(Throwable error) {
215+
public void failure(final Throwable error) {
216216
try {
217217
if (!response.isCommitted()) {
218218
try {
@@ -223,10 +223,10 @@ public void failure(Throwable error) {
223223
} else {
224224
response.sendError(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request failed.");
225225
}
226-
} catch (IllegalStateException ex) {
226+
} catch (final IllegalStateException ex) {
227227
// a race condition externally committing the response can still occur...
228228
LOGGER.log(Level.FINER, "Unable to reset failed response.", ex);
229-
} catch (IOException ex) {
229+
} catch (final IOException ex) {
230230
throw new ContainerException(LocalizationMessages.EXCEPTION_SENDING_ERROR_RESPONSE(
231231
Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request failed."), ex);
232232
} finally {
@@ -249,7 +249,7 @@ public boolean enableResponseBuffering() {
249249
*
250250
* @param error throwable to be re-thrown
251251
*/
252-
private void rethrow(Throwable error) {
252+
private void rethrow(final Throwable error) {
253253
if (error instanceof RuntimeException) {
254254
throw (RuntimeException) error;
255255
} else {
@@ -279,22 +279,22 @@ private ContainerResponse getResponseContext() {
279279
private static class NonCloseableOutputStreamWrapper extends OutputStream {
280280
private final OutputStream delegate;
281281

282-
public NonCloseableOutputStreamWrapper(OutputStream delegate) {
282+
public NonCloseableOutputStreamWrapper(final OutputStream delegate) {
283283
this.delegate = delegate;
284284
}
285285

286286
@Override
287-
public void write(int b) throws IOException {
287+
public void write(final int b) throws IOException {
288288
delegate.write(b);
289289
}
290290

291291
@Override
292-
public void write(byte[] b) throws IOException {
292+
public void write(final byte[] b) throws IOException {
293293
delegate.write(b);
294294
}
295295

296296
@Override
297-
public void write(byte[] b, int off, int len) throws IOException {
297+
public void write(final byte[] b, final int off, final int len) throws IOException {
298298
delegate.write(b, off, len);
299299
}
300300

containers/jersey-servlet/src/main/java/org/glassfish/jersey/servlet/async/AsyncContextDelegateProviderImpl.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/*
22
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
33
*
4-
* Copyright (c) 2012-2013 Oracle and/or its affiliates. All rights reserved.
4+
* Copyright (c) 2012-2015 Oracle and/or its affiliates. All rights reserved.
55
*
66
* The contents of this file are subject to the terms of either the GNU
77
* General Public License Version 2 only ("GPL") or the Common Development
@@ -39,6 +39,7 @@
3939
*/
4040
package org.glassfish.jersey.servlet.async;
4141

42+
import java.util.concurrent.atomic.AtomicBoolean;
4243
import java.util.concurrent.atomic.AtomicReference;
4344

4445
import javax.servlet.AsyncContext;
@@ -69,6 +70,7 @@ private static final class ExtensionImpl implements AsyncContextDelegate {
6970
private final HttpServletRequest request;
7071
private final HttpServletResponse response;
7172
private final AtomicReference<AsyncContext> asyncContextRef;
73+
private final AtomicBoolean completed;
7274

7375
/**
7476
* Create a Servlet 3.x {@link AsyncContextDelegate} with given {@code request} and {@code response}.
@@ -79,21 +81,27 @@ private static final class ExtensionImpl implements AsyncContextDelegate {
7981
private ExtensionImpl(final HttpServletRequest request, final HttpServletResponse response) {
8082
this.request = request;
8183
this.response = response;
82-
this.asyncContextRef = new AtomicReference<AsyncContext>();
84+
this.asyncContextRef = new AtomicReference<>();
85+
this.completed = new AtomicBoolean(false);
8386
}
8487

8588
@Override
8689
public void suspend() throws IllegalStateException {
87-
final AsyncContext asyncContext = request.startAsync(request, response);
90+
// Suspend only if not completed and not suspended before.
91+
if (!completed.get() && asyncContextRef.get() == null) {
92+
final AsyncContext asyncContext = request.startAsync(request, response);
8893

89-
// Tell underlying asyncContext to never time out.
90-
asyncContext.setTimeout(NEVER_TIMEOUT_VALUE);
94+
// Tell underlying asyncContext to never time out.
95+
asyncContext.setTimeout(NEVER_TIMEOUT_VALUE);
9196

92-
asyncContextRef.set(asyncContext);
97+
asyncContextRef.set(asyncContext);
98+
}
9399
}
94100

95101
@Override
96102
public void complete() {
103+
completed.set(true);
104+
97105
final AsyncContext asyncContext = asyncContextRef.getAndSet(null);
98106
if (asyncContext != null) {
99107
asyncContext.complete();

core-client/src/main/java/org/glassfish/jersey/client/ChunkedInput.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/*
22
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
33
*
4-
* Copyright (c) 2012-2013 Oracle and/or its affiliates. All rights reserved.
4+
* Copyright (c) 2012-2015 Oracle and/or its affiliates. All rights reserved.
55
*
66
* The contents of this file are subject to the terms of either the GNU
77
* General Public License Version 2 only ("GPL") or the Common Development
@@ -70,7 +70,9 @@
7070
* @param <T> chunk type.
7171
* @author Marek Potociar (marek.potociar at oracle.com)
7272
*/
73+
@SuppressWarnings("UnusedDeclaration")
7374
public class ChunkedInput<T> extends GenericType<T> implements Closeable {
75+
7476
private static final Logger LOGGER = Logger.getLogger(ChunkedInput.class.getName());
7577

7678
private final AtomicBoolean closed = new AtomicBoolean(false);
@@ -108,12 +110,12 @@ public static ChunkParser createParser(final byte[] boundary) {
108110
private static class FixedBoundaryParser implements ChunkParser {
109111
private final byte[] delimiter;
110112

111-
public FixedBoundaryParser(byte[] boundary) {
113+
public FixedBoundaryParser(final byte[] boundary) {
112114
delimiter = Arrays.copyOf(boundary, boundary.length);
113115
}
114116

115117
@Override
116-
public byte[] readChunk(InputStream in) throws IOException {
118+
public byte[] readChunk(final InputStream in) throws IOException {
117119
final ByteArrayOutputStream buffer = new ByteArrayOutputStream();
118120
final byte[] delimiterBuffer = new byte[delimiter.length];
119121

@@ -157,13 +159,13 @@ public byte[] readChunk(InputStream in) throws IOException {
157159
* @param propertiesDelegate properties delegate for this request/response.
158160
*/
159161
protected ChunkedInput(
160-
Type chunkType,
161-
InputStream inputStream,
162-
Annotation[] annotations,
163-
MediaType mediaType,
164-
MultivaluedMap<String, String> headers,
165-
MessageBodyWorkers messageBodyWorkers,
166-
PropertiesDelegate propertiesDelegate) {
162+
final Type chunkType,
163+
final InputStream inputStream,
164+
final Annotation[] annotations,
165+
final MediaType mediaType,
166+
final MultivaluedMap<String, String> headers,
167+
final MessageBodyWorkers messageBodyWorkers,
168+
final PropertiesDelegate propertiesDelegate) {
167169
super(chunkType);
168170

169171
this.inputStream = inputStream;
@@ -196,7 +198,7 @@ public ChunkParser getParser() {
196198
*
197199
* @param parser new chunk parser.
198200
*/
199-
public void setParser(ChunkParser parser) {
201+
public void setParser(final ChunkParser parser) {
200202
this.parser = parser;
201203
}
202204

@@ -236,7 +238,7 @@ public MediaType getChunkType() {
236238
* @param mediaType custom chunk data media type. Must not be {@code null}.
237239
* @throws IllegalArgumentException in case the {@code mediaType} is {@code null}.
238240
*/
239-
public void setChunkType(MediaType mediaType) throws IllegalArgumentException {
241+
public void setChunkType(final MediaType mediaType) throws IllegalArgumentException {
240242
if (mediaType == null) {
241243
throw new IllegalArgumentException(LocalizationMessages.CHUNKED_INPUT_MEDIA_TYPE_NULL());
242244
}
@@ -255,7 +257,7 @@ public void setChunkType(MediaType mediaType) throws IllegalArgumentException {
255257
* a valid {@link MediaType} instance or is {@code null}.
256258
* @see #setChunkType(javax.ws.rs.core.MediaType)
257259
*/
258-
public void setChunkType(String mediaType) throws IllegalArgumentException {
260+
public void setChunkType(final String mediaType) throws IllegalArgumentException {
259261
this.mediaType = MediaType.valueOf(mediaType);
260262
}
261263

@@ -265,7 +267,7 @@ public void close() {
265267
if (inputStream != null) {
266268
try {
267269
inputStream.close();
268-
} catch (IOException e) {
270+
} catch (final IOException e) {
269271
LOGGER.log(Level.FINE, LocalizationMessages.CHUNKED_INPUT_STREAM_CLOSING_ERROR(), e);
270272
}
271273
}
@@ -306,7 +308,7 @@ public T read() throws IllegalStateException {
306308
if (chunk == null) {
307309
close();
308310
} else {
309-
ByteArrayInputStream chunkStream = new ByteArrayInputStream(chunk);
311+
final ByteArrayInputStream chunkStream = new ByteArrayInputStream(chunk);
310312
// TODO: add interceptors: interceptors are used in ChunkedOutput, so the stream should
311313
// be intercepted in the ChunkedInput too. Interceptors cannot be easily added to the readFrom
312314
// method as they should wrap the stream before it is processed by ChunkParser. Also please check todo
@@ -323,7 +325,7 @@ public T read() throws IllegalStateException {
323325
Collections.<ReaderInterceptor>emptyList(),
324326
false);
325327
}
326-
} catch (IOException e) {
328+
} catch (final IOException e) {
327329
Logger.getLogger(this.getClass().getName()).log(Level.FINE, e.getMessage(), e);
328330
close();
329331
}

core-server/src/main/java/org/glassfish/jersey/server/ChunkedOutput.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/*
22
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
33
*
4-
* Copyright (c) 2012-2014 Oracle and/or its affiliates. All rights reserved.
4+
* Copyright (c) 2012-2015 Oracle and/or its affiliates. All rights reserved.
55
*
66
* The contents of this file are subject to the terms of either the GNU
77
* General Public License Version 2 only ("GPL") or the Common Development
@@ -106,7 +106,7 @@ public ChunkedOutput(final Type chunkType) {
106106
* @param chunkDelimiter custom chunk delimiter bytes. Must not be {code null}.
107107
* @since 2.4.1
108108
*/
109-
protected ChunkedOutput(byte[] chunkDelimiter) {
109+
protected ChunkedOutput(final byte[] chunkDelimiter) {
110110
if (chunkDelimiter.length > 0) {
111111
this.chunkDelimiter = new byte[chunkDelimiter.length];
112112
System.arraycopy(chunkDelimiter, 0, this.chunkDelimiter, 0, chunkDelimiter.length);
@@ -122,7 +122,7 @@ protected ChunkedOutput(byte[] chunkDelimiter) {
122122
* @param chunkDelimiter custom chunk delimiter bytes. Must not be {code null}.
123123
* @since 2.4.1
124124
*/
125-
public ChunkedOutput(final Type chunkType, byte[] chunkDelimiter) {
125+
public ChunkedOutput(final Type chunkType, final byte[] chunkDelimiter) {
126126
super(chunkType);
127127
if (chunkDelimiter.length > 0) {
128128
this.chunkDelimiter = new byte[chunkDelimiter.length];
@@ -138,7 +138,7 @@ public ChunkedOutput(final Type chunkType, byte[] chunkDelimiter) {
138138
* @param chunkDelimiter custom chunk delimiter string. Must not be {code null}.
139139
* @since 2.4.1
140140
*/
141-
protected ChunkedOutput(String chunkDelimiter) {
141+
protected ChunkedOutput(final String chunkDelimiter) {
142142
if (chunkDelimiter.isEmpty()) {
143143
this.chunkDelimiter = ZERO_LENGTH_DELIMITER;
144144
} else {
@@ -153,7 +153,7 @@ protected ChunkedOutput(String chunkDelimiter) {
153153
* @param chunkDelimiter custom chunk delimiter string. Must not be {code null}.
154154
* @since 2.4.1
155155
*/
156-
public ChunkedOutput(final Type chunkType, String chunkDelimiter) {
156+
public ChunkedOutput(final Type chunkType, final String chunkDelimiter) {
157157
super(chunkType);
158158
if (chunkDelimiter.isEmpty()) {
159159
this.chunkDelimiter = ZERO_LENGTH_DELIMITER;
@@ -243,10 +243,10 @@ public Void call() throws IOException {
243243
// if MBW replaced the stream, let's make sure to set it in the response context.
244244
responseContext.setEntityStream(writtenStream);
245245
}
246-
} catch (IOException ioe) {
246+
} catch (final IOException ioe) {
247247
connectionCallback.onDisconnect(asyncContext.get());
248248
throw ioe;
249-
} catch (MappableException mpe) {
249+
} catch (final MappableException mpe) {
250250
if (mpe.getCause() instanceof IOException) {
251251
connectionCallback.onDisconnect(asyncContext.get());
252252
}
@@ -278,15 +278,15 @@ public Void call() throws IOException {
278278
return null;
279279
}
280280
});
281-
} catch (Exception e) {
281+
} catch (final Exception e) {
282282
closed = true;
283283
// remember the exception (it will get rethrown from finally clause, once it does it's work)
284284
ex = e;
285285
} finally {
286286
if (closed) {
287287
try {
288288
responseContext.close();
289-
} catch (Exception e) {
289+
} catch (final Exception e) {
290290
// if no exception remembered before, remember this one
291291
// otherwise the previously remembered exception (from catch clause) takes precedence
292292
ex = ex == null ? e : ex;

core-server/src/main/java/org/glassfish/jersey/server/ServerRuntime.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/*
22
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
33
*
4-
* Copyright (c) 2012-2014 Oracle and/or its affiliates. All rights reserved.
4+
* Copyright (c) 2012-2015 Oracle and/or its affiliates. All rights reserved.
55
*
66
* The contents of this file are subject to the terms of either the GNU
77
* General Public License Version 2 only ("GPL") or the Common Development
@@ -665,8 +665,9 @@ public OutputStream getOutputStream(final int contentLength) throws IOException
665665
close = true;
666666
}
667667

668+
final ChunkedOutput chunked = (ChunkedOutput) entity;
668669
try {
669-
((ChunkedOutput) entity).setContext(
670+
chunked.setContext(
670671
runtime.requestScope,
671672
runtime.requestScope.referenceCurrent(),
672673
request,
@@ -677,9 +678,11 @@ public OutputStream getOutputStream(final int contentLength) throws IOException
677678
LOGGER.log(Level.SEVERE, LocalizationMessages.ERROR_WRITING_RESPONSE_ENTITY_CHUNK(), ex);
678679
close = true;
679680
}
680-
// suspend the writer indefinitely (passing null timeout handler is ok in such case).
681+
// suspend the writer indefinitely (passing null timeout handler is ok in such case) if the output is not
682+
// already closed.
681683
// TODO what to do if we detect that the writer has already been suspended? override the timeout value?
682-
if (!writer.suspend(0, TimeUnit.SECONDS, null)) {
684+
if (!chunked.isClosed()
685+
&& !writer.suspend(AsyncResponder.NO_TIMEOUT, TimeUnit.SECONDS, null)) {
683686
LOGGER.fine(LocalizationMessages.ERROR_SUSPENDING_CHUNKED_OUTPUT_RESPONSE());
684687
}
685688
}

0 commit comments

Comments
 (0)