Skip to content
This repository was archived by the owner on May 28, 2018. It is now read-only.

Commit 0fa96c1

Browse files
author
Petr Janouch
committed
JERSEY-2967: Jersey Server-Sent Event Client does not close the connection
Change-Id: I7397538d97d5c356dfa17d34187a1d59a90adb18
1 parent 5217803 commit 0fa96c1

File tree

4 files changed

+165
-6
lines changed

4 files changed

+165
-6
lines changed

core-common/src/main/java/org/glassfish/jersey/message/internal/ReaderInterceptorExecutor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -341,9 +341,9 @@ private InputStream unwrap() {
341341
* @param inputStream Potential {@link ReaderInterceptorExecutor.UnCloseableInputStream} to undo its effect
342342
* @return Input stream that is possible to close
343343
*/
344-
static InputStream closeableInputStream(InputStream inputStream) {
345-
if (inputStream instanceof ReaderInterceptorExecutor.UnCloseableInputStream) {
346-
return ((ReaderInterceptorExecutor.UnCloseableInputStream) inputStream).unwrap();
344+
public static InputStream closeableInputStream(InputStream inputStream) {
345+
if (inputStream instanceof UnCloseableInputStream) {
346+
return ((UnCloseableInputStream) inputStream).unwrap();
347347
} else {
348348
return inputStream;
349349
}

media/sse/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,13 @@
7272
<artifactId>junit</artifactId>
7373
<scope>test</scope>
7474
</dependency>
75+
<dependency>
76+
<groupId>org.glassfish.jersey.test-framework.providers</groupId>
77+
<artifactId>jersey-test-framework-provider-bundle</artifactId>
78+
<version>${project.version}</version>
79+
<type>pom</type>
80+
<scope>test</scope>
81+
</dependency>
7582
</dependencies>
7683

7784
<build>

media/sse/src/main/java/org/glassfish/jersey/media/sse/EventInputReader.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/*
22
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
33
*
4-
* Copyright (c) 2012-2013 Oracle and/or its affiliates. All rights reserved.
4+
* Copyright (c) 2012-2015 Oracle and/or its affiliates. All rights reserved.
55
*
66
* The contents of this file are subject to the terms of either the GNU
77
* General Public License Version 2 only ("GPL") or the Common Development
@@ -54,6 +54,8 @@
5454

5555
import org.glassfish.jersey.internal.PropertiesDelegate;
5656
import org.glassfish.jersey.message.MessageBodyWorkers;
57+
import org.glassfish.jersey.message.MessageUtils;
58+
import org.glassfish.jersey.message.internal.ReaderInterceptorExecutor;
5759

5860
/**
5961
* SSE {@link EventInput event input} message body reader.
@@ -79,9 +81,9 @@ public EventInput readFrom(Class<EventInput> chunkedInputClass,
7981
MediaType mediaType,
8082
MultivaluedMap<String, String> headers,
8183
InputStream inputStream) throws IOException, WebApplicationException {
82-
84+
InputStream closeableInputStream = ReaderInterceptorExecutor.closeableInputStream(inputStream);
8385
return new EventInput(
84-
inputStream,
86+
closeableInputStream,
8587
annotations,
8688
mediaType,
8789
headers,
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
3+
*
4+
* Copyright (c) 2015 Oracle and/or its affiliates. All rights reserved.
5+
*
6+
* The contents of this file are subject to the terms of either the GNU
7+
* General Public License Version 2 only ("GPL") or the Common Development
8+
* and Distribution License("CDDL") (collectively, the "License"). You
9+
* may not use this file except in compliance with the License. You can
10+
* obtain a copy of the License at
11+
* http://glassfish.java.net/public/CDDL+GPL_1_1.html
12+
* or packager/legal/LICENSE.txt. See the License for the specific
13+
* language governing permissions and limitations under the License.
14+
*
15+
* When distributing the software, include this License Header Notice in each
16+
* file and include the License file at packager/legal/LICENSE.txt.
17+
*
18+
* GPL Classpath Exception:
19+
* Oracle designates this particular file as subject to the "Classpath"
20+
* exception as provided by Oracle in the GPL Version 2 section of the License
21+
* file that accompanied this code.
22+
*
23+
* Modifications:
24+
* If applicable, add the following below the License Header, with the fields
25+
* enclosed by brackets [] replaced by your own identifying information:
26+
* "Portions Copyright [year] [name of copyright owner]"
27+
*
28+
* Contributor(s):
29+
* If you wish your version of this file to be governed by only the CDDL or
30+
* only the GPL Version 2, indicate your decision by adding "[Contributor]
31+
* elects to include this software in this distribution under the [CDDL or GPL
32+
* Version 2] license." If you don't indicate a single choice of license, a
33+
* recipient has the option to distribute your version of this file under
34+
* either the CDDL, the GPL Version 2 or to extend the choice of license to
35+
* its licensees as provided above. However, if you add GPL Version 2 code
36+
* and therefore, elected the GPL Version 2 license, then the option applies
37+
* only if the new code is made subject to such option by the copyright
38+
* holder.
39+
*/
40+
41+
package org.glassfish.jersey.media.sse;
42+
43+
import java.io.IOException;
44+
import java.util.concurrent.CountDownLatch;
45+
import java.util.concurrent.TimeUnit;
46+
import java.util.stream.IntStream;
47+
48+
import javax.ws.rs.GET;
49+
import javax.ws.rs.Path;
50+
import javax.ws.rs.Produces;
51+
import javax.ws.rs.client.WebTarget;
52+
import javax.ws.rs.core.Application;
53+
54+
import javax.inject.Singleton;
55+
56+
import org.glassfish.jersey.client.ClientConfig;
57+
import org.glassfish.jersey.server.ResourceConfig;
58+
import org.glassfish.jersey.test.JerseyTest;
59+
60+
import org.junit.Test;
61+
62+
import static junit.framework.Assert.assertEquals;
63+
import static junit.framework.TestCase.assertTrue;
64+
65+
/**
66+
* @author Petr Janouch (petr.janouch at oracle.com)
67+
*/
68+
public class ClientCloseTest extends JerseyTest {
69+
70+
/**
71+
* The test test that SSE connection is really closed when EventSource.close() is called.
72+
* <p/>
73+
* This test is very HttpURLConnection and Grizzly server specific, so it will probably fail, if other client and server
74+
* transport are used.
75+
*/
76+
@Test
77+
public void testClose() throws InterruptedException {
78+
WebTarget sseTarget = target("sse");
79+
80+
CountDownLatch eventLatch = new CountDownLatch(3);
81+
CountDownLatch eventLatch2 = new CountDownLatch(4);
82+
EventSource eventSource = new EventSource(sseTarget) {
83+
@Override
84+
public void onEvent(final InboundEvent inboundEvent) {
85+
eventLatch.countDown();
86+
eventLatch2.countDown();
87+
}
88+
};
89+
90+
// Server sends 3 events we are interested in.
91+
IntStream.range(0, 3).forEach((i) -> assertEquals("OK",
92+
target("sse/send").request().get().readEntity(String.class)));
93+
assertTrue(eventLatch.await(5, TimeUnit.SECONDS));
94+
95+
// After receiving the 3 events, we try to close.
96+
eventSource.close();
97+
98+
// Unfortunately the HTTPURLConnection is blocked in read() method, so it will close only after receiving the next event.
99+
assertEquals("OK", target("sse/send").request().get().readEntity(String.class));
100+
// Wait for the event that will unblock the HTTPURLConnection and result in sending FIN.
101+
assertTrue(eventLatch2.await(5, TimeUnit.SECONDS));
102+
// Now it is interesting. Client has sent FIN, but Grizzly does not listen for client input (selector READ key is
103+
// disabled), while streaming the response. For some reason we need to send 2 more events for Grizzly to notice
104+
// that the client is gone.
105+
assertEquals("OK", target("sse/send").request().get().readEntity(String.class));
106+
assertEquals("OK", target("sse/send").request().get().readEntity(String.class));
107+
// Now the grizzly should notice that the SSE connection is finally dead and sending events from the server will fail.
108+
assertEquals("NOK", target("sse/send").request().get().readEntity(String.class));
109+
}
110+
111+
@Override
112+
protected Application configure() {
113+
return new ResourceConfig(SseEndpoint.class);
114+
}
115+
116+
@Override
117+
protected void configureClient(ClientConfig config) {
118+
config.register(SseFeature.class);
119+
}
120+
121+
@Singleton
122+
@Path("sse")
123+
public static class SseEndpoint {
124+
125+
private final EventOutput eventOutput = new EventOutput();
126+
127+
@GET
128+
@Path("send")
129+
public String sendEvent() throws InterruptedException {
130+
OutboundEvent event = new OutboundEvent.Builder().data("An event").build();
131+
try {
132+
if (eventOutput.isClosed()) {
133+
return "NOK";
134+
}
135+
136+
eventOutput.write(event);
137+
} catch (IOException e) {
138+
return "NOK";
139+
}
140+
141+
return "OK";
142+
}
143+
144+
@GET
145+
@Produces(SseFeature.SERVER_SENT_EVENTS)
146+
public EventOutput get() {
147+
return eventOutput;
148+
}
149+
}
150+
}

0 commit comments

Comments
 (0)