Skip to content
This repository was archived by the owner on May 28, 2018. It is now read-only.

Commit edc716d

Browse files
author
Marek Potociar
committed
Fixed JERSEY-2910 & JERSEY-2912.
JERSEY-2910 - SSE EventSource thread factory updated to produce daemon threads that can be terminated when JVM quits. JERSEY-2912 - Added support for producing / consuming “comments only” SSE events. Change-Id: Ic9bcc44997b05cc7dabf1c0c3ff3c5bee641296c Signed-off-by: Marek Potociar <[email protected]>
1 parent f874edc commit edc716d

File tree

7 files changed

+148
-46
lines changed

7 files changed

+148
-46
lines changed

media/sse/src/main/java/org/glassfish/jersey/media/sse/EventOutput.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/*
22
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
33
*
4-
* Copyright (c) 2012-2014 Oracle and/or its affiliates. All rights reserved.
4+
* Copyright (c) 2012-2015 Oracle and/or its affiliates. All rights reserved.
55
*
66
* The contents of this file are subject to the terms of either the GNU
77
* General Public License Version 2 only ("GPL") or the Common Development
@@ -54,7 +54,7 @@
5454
*/
5555
public class EventOutput extends ChunkedOutput<OutboundEvent> {
5656
// encoding does not matter for lower ASCII characters
57-
private static final byte[] SSE_EVENT_DELIMITER = "\n\n".getBytes(Charset.forName("UTF-8"));
57+
private static final byte[] SSE_EVENT_DELIMITER = "\n".getBytes(Charset.forName("UTF-8"));
5858

5959
/**
6060
* Create new outbound Server-Sent Events channel.

media/sse/src/main/java/org/glassfish/jersey/media/sse/EventSource.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import java.util.concurrent.CountDownLatch;
5050
import java.util.concurrent.Executors;
5151
import java.util.concurrent.ScheduledExecutorService;
52-
import java.util.concurrent.ThreadFactory;
5352
import java.util.concurrent.TimeUnit;
5453
import java.util.concurrent.atomic.AtomicReference;
5554
import java.util.logging.Level;
@@ -61,6 +60,8 @@
6160

6261
import org.glassfish.jersey.internal.util.ExtendedLogger;
6362

63+
import jersey.repackaged.com.google.common.util.concurrent.ThreadFactoryBuilder;
64+
6465
/**
6566
* Client for reading and processing {@link InboundEvent incoming Server-Sent Events}.
6667
* <p>
@@ -139,7 +140,7 @@ public class EventSource implements EventListener {
139140
*/
140141
public static final long RECONNECT_DEFAULT = 500;
141142

142-
private static enum State {
143+
private enum State {
143144
READY, OPEN, CLOSED
144145
}
145146

@@ -232,6 +233,7 @@ public Builder named(String name) {
232233
*
233234
* @return updated event source builder instance.
234235
*/
236+
@SuppressWarnings("unused")
235237
public Builder usePersistentConnections() {
236238
disableKeepAlive = false;
237239
return this;
@@ -249,6 +251,7 @@ public Builder usePersistentConnections() {
249251
* @param unit time unit of the reconnect delay parameter.
250252
* @return updated event source builder instance.
251253
*/
254+
@SuppressWarnings("unused")
252255
public Builder reconnectingEvery(final long delay, TimeUnit unit) {
253256
reconnect = unit.toMillis(delay);
254257
return this;
@@ -380,12 +383,8 @@ private EventSource(final WebTarget target,
380383
this.disableKeepAlive = disableKeepAlive;
381384

382385
final String esName = (name == null) ? createDefaultName(target) : name;
383-
this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
384-
@Override
385-
public Thread newThread(Runnable r) {
386-
return new Thread(r, esName);
387-
}
388-
});
386+
this.executor = Executors.newSingleThreadScheduledExecutor(
387+
new ThreadFactoryBuilder().setNameFormat(esName + "-%d").setDaemon(true).build());
389388

390389
if (open) {
391390
open();

media/sse/src/main/java/org/glassfish/jersey/media/sse/InboundEvent.java

Lines changed: 65 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,12 @@
6060
* @author Marek Potociar (marek.potociar at oracle.com)
6161
*/
6262
public class InboundEvent {
63-
private static final GenericType<String> STRING_AS_GENERIC_TYPE = new GenericType<String>(String.class);
63+
64+
private static final GenericType<String> STRING_AS_GENERIC_TYPE = new GenericType<>(String.class);
6465

6566
private final String name;
6667
private final String id;
68+
private final String comment;
6769
private final byte[] data;
6870
private final long reconnectDelay;
6971

@@ -76,6 +78,7 @@ public class InboundEvent {
7678
* Inbound event builder. This implementation is not thread-safe.
7779
*/
7880
static class Builder {
81+
7982
private String name;
8083
private String id;
8184
private long reconnectDelay = SseFeature.RECONNECT_NOT_SET;
@@ -85,6 +88,7 @@ static class Builder {
8588
private final Annotation[] annotations;
8689
private final MediaType mediaType;
8790
private final MultivaluedMap<String, String> headers;
91+
private final StringBuilder commentBuilder;
8892

8993
/**
9094
* Create new inbound event builder.
@@ -106,12 +110,13 @@ public Builder(MessageBodyWorkers workers,
106110
this.mediaType = mediaType;
107111
this.headers = headers;
108112

113+
this.commentBuilder = new StringBuilder();
109114
this.dataStream = new ByteArrayOutputStream();
110115
}
111116

112117
/**
113118
* Set inbound event name.
114-
*
119+
* <p/>
115120
* Value of the received SSE {@code "event"} field.
116121
*
117122
* @param name {@code "event"} field value.
@@ -124,7 +129,7 @@ public Builder name(String name) {
124129

125130
/**
126131
* Set inbound event identifier.
127-
*
132+
* <p/>
128133
* Value of the received SSE {@code "id"} field.
129134
*
130135
* @param id {@code "id"} field value.
@@ -135,12 +140,32 @@ public Builder id(String id) {
135140
return this;
136141
}
137142

143+
/**
144+
* Add a comment line to the event.
145+
* <p>
146+
* The comment line will be added to the received SSE event comment as a new line in the comment field.
147+
* If the comment line parameter is {@code null}, the call will be ignored.
148+
* </p>
149+
*
150+
* @param commentLine comment line to be added to the event comment.
151+
* @return updated builder instance.
152+
* @since 2.21
153+
*/
154+
public Builder commentLine(final CharSequence commentLine) {
155+
if (commentLine != null) {
156+
commentBuilder.append(commentLine).append('\n');
157+
}
158+
159+
return this;
160+
}
161+
138162
/**
139163
* Set reconnection delay (in milliseconds) that indicates how long the event receiver should wait
140164
* before attempting to reconnect in case a connection to SSE event source is lost.
141165
* <p>
142166
* Value of the received SSE {@code "retry"} field.
143167
* </p>
168+
*
144169
* @param milliseconds reconnection delay in milliseconds. Negative values un-set the reconnection delay.
145170
* @return updated builder instance.
146171
* @since 2.3
@@ -181,6 +206,7 @@ public InboundEvent build() {
181206
return new InboundEvent(
182207
name,
183208
id,
209+
commentBuilder.length() > 0 ? commentBuilder.substring(0, commentBuilder.length() - 1) : null,
184210
reconnectDelay,
185211
dataStream.toByteArray(),
186212
workers,
@@ -190,16 +216,18 @@ public InboundEvent build() {
190216
}
191217
}
192218

193-
private InboundEvent(String name,
194-
String id,
195-
long reconnectDelay,
196-
byte[] data,
197-
MessageBodyWorkers messageBodyWorkers,
198-
Annotation[] annotations,
199-
MediaType mediaType,
200-
MultivaluedMap<String, String> headers) {
219+
private InboundEvent(final String name,
220+
final String id,
221+
final String comment,
222+
final long reconnectDelay,
223+
final byte[] data,
224+
final MessageBodyWorkers messageBodyWorkers,
225+
final Annotation[] annotations,
226+
final MediaType mediaType,
227+
final MultivaluedMap<String, String> headers) {
201228
this.name = name;
202229
this.id = id;
230+
this.comment = comment;
203231
this.reconnectDelay = reconnectDelay;
204232
this.data = data;
205233
this.messageBodyWorkers = messageBodyWorkers;
@@ -235,6 +263,20 @@ public String getId() {
235263
return id;
236264
}
237265

266+
/**
267+
* Get a comment string that accompanies the event.
268+
* <p>
269+
* Contains value of the comment associated with SSE event. This field is optional. Method may return {@code null},
270+
* if the event comment is not specified.
271+
* </p>
272+
*
273+
* @return comment associated with the event.
274+
* @since 2.21
275+
*/
276+
public String getComment() {
277+
return comment;
278+
}
279+
238280
/**
239281
* Get new connection retry time in milliseconds the event receiver should wait before attempting to
240282
* reconnect after a connection to the SSE event source is lost.
@@ -273,8 +315,7 @@ public boolean isEmpty() {
273315
* Get the original event data string {@link String}.
274316
*
275317
* @return event data de-serialized into a string.
276-
* @throws javax.ws.rs.ProcessingException
277-
* when provided type can't be read. The thrown exception wraps the original cause.
318+
* @throws javax.ws.rs.ProcessingException when provided type can't be read. The thrown exception wraps the original cause.
278319
* @since 2.3
279320
*/
280321
public String readData() {
@@ -286,8 +327,7 @@ public String readData() {
286327
*
287328
* @param type Java type to be used for event data de-serialization.
288329
* @return event data de-serialized as an instance of a given type.
289-
* @throws javax.ws.rs.ProcessingException
290-
* when provided type can't be read. The thrown exception wraps the original cause.
330+
* @throws javax.ws.rs.ProcessingException when provided type can't be read. The thrown exception wraps the original cause.
291331
* @since 2.3
292332
*/
293333
public <T> T readData(Class<T> type) {
@@ -299,10 +339,10 @@ public <T> T readData(Class<T> type) {
299339
*
300340
* @param type generic type to be used for event data de-serialization.
301341
* @return event data de-serialized as an instance of a given type.
302-
* @throws javax.ws.rs.ProcessingException
303-
* when provided type can't be read. The thrown exception wraps the original cause.
342+
* @throws javax.ws.rs.ProcessingException when provided type can't be read. The thrown exception wraps the original cause.
304343
* @since 2.3
305344
*/
345+
@SuppressWarnings("unused")
306346
public <T> T readData(GenericType<T> type) {
307347
return readData(type, null);
308348
}
@@ -313,22 +353,21 @@ public <T> T readData(GenericType<T> type) {
313353
* @param messageType Java type to be used for event data de-serialization.
314354
* @param mediaType {@link MediaType media type} to be used for event data de-serialization.
315355
* @return event data de-serialized as an instance of a given type.
316-
* @throws javax.ws.rs.ProcessingException
317-
* when provided type can't be read. The thrown exception wraps the original cause.
356+
* @throws javax.ws.rs.ProcessingException when provided type can't be read. The thrown exception wraps the original cause.
318357
* @since 2.3
319358
*/
359+
@SuppressWarnings("unused")
320360
public <T> T readData(Class<T> messageType, MediaType mediaType) {
321361
return readData(new GenericType<T>(messageType), mediaType);
322362
}
323363

324364
/**
325365
* Read event data as a given generic type.
326366
*
327-
* @param type generic type to be used for event data de-serialization.
328-
* @param mediaType {@link MediaType media type} to be used for event data de-serialization.
367+
* @param type generic type to be used for event data de-serialization.
368+
* @param mediaType {@link MediaType media type} to be used for event data de-serialization.
329369
* @return event data de-serialized as an instance of a given type.
330-
* @throws javax.ws.rs.ProcessingException
331-
* when provided type can't be read. The thrown exception wraps the original cause.
370+
* @throws javax.ws.rs.ProcessingException when provided type can't be read. The thrown exception wraps the original cause.
332371
* @since 2.3
333372
*/
334373
public <T> T readData(GenericType<T> type, MediaType mediaType) {
@@ -360,8 +399,9 @@ private <T> T readAndCast(GenericType<T> type, MediaType effectiveMediaType, Mes
360399
* Get the raw event data bytes.
361400
*
362401
* @return raw event data bytes. The returned byte array may be empty if the event does not
363-
* contain any data.
402+
* contain any data.
364403
*/
404+
@SuppressWarnings("unused")
365405
public byte[] getRawData() {
366406
if (isEmpty()) {
367407
return data;
@@ -383,6 +423,7 @@ public String toString() {
383423
return "InboundEvent{"
384424
+ "name='" + name + '\''
385425
+ ", id='" + id + '\''
426+
+ ", comment=" + (comment == null ? "[no comments]" : '\'' + comment + '\'')
386427
+ ", data=" + s
387428
+ '}';
388429
}

media/sse/src/main/java/org/glassfish/jersey/media/sse/InboundEventReader.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/*
22
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
33
*
4-
* Copyright (c) 2012-2014 Oracle and/or its affiliates. All rights reserved.
4+
* Copyright (c) 2012-2015 Oracle and/or its affiliates. All rights reserved.
55
*
66
* The contents of this file are subject to the terms of either the GNU
77
* General Public License Version 2 only ("GPL") or the Common Development
@@ -74,7 +74,7 @@ class InboundEventReader implements MessageBodyReader<InboundEvent> {
7474
@Inject
7575
private Provider<MessageBodyWorkers> messageBodyWorkers;
7676

77-
private static enum State {
77+
private enum State {
7878
NEW_LINE,
7979
COMMENT,
8080
FIELD,
@@ -100,6 +100,7 @@ public InboundEvent readFrom(final Class<InboundEvent> type,
100100
* last editors draft from 13 March 2012
101101
*/
102102
final ByteArrayOutputStream tokenData = new ByteArrayOutputStream();
103+
final String charsetName = MessageUtils.getCharset(mediaType).name();
103104
final InboundEvent.Builder eventBuilder =
104105
new InboundEvent.Builder(messageBodyWorkers.get(), annotations, mediaType, headers);
105106

@@ -123,13 +124,16 @@ public InboundEvent readFrom(final Class<InboundEvent> type,
123124
break;
124125
case COMMENT:
125126
// skipping comment data
126-
b = readLineUntil(entityStream, '\n', null);
127+
b = readLineUntil(entityStream, '\n', tokenData);
128+
final String commentLine = tokenData.toString(charsetName);
129+
tokenData.reset();
130+
eventBuilder.commentLine(commentLine.trim());
127131
currentState = State.NEW_LINE;
128132
break;
129133
case FIELD:
130134
// read field name
131135
b = readLineUntil(entityStream, ':', tokenData);
132-
final String fieldName = tokenData.toString(MessageUtils.getCharset(mediaType).name());
136+
final String fieldName = tokenData.toString(charsetName);
133137
tokenData.reset();
134138

135139
if (b == ':') {

media/sse/src/main/java/org/glassfish/jersey/media/sse/OutboundEvent.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/*
22
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
33
*
4-
* Copyright (c) 2012-2013 Oracle and/or its affiliates. All rights reserved.
4+
* Copyright (c) 2012-2015 Oracle and/or its affiliates. All rights reserved.
55
*
66
* The contents of this file are subject to the terms of either the GNU
77
* General Public License Version 2 only ("GPL") or the Common Development
@@ -349,10 +349,10 @@ public boolean isReconnectDelaySet() {
349349
* serializing the {@link #getData() event data}.
350350
* </p>
351351
*
352-
* @return data type.
352+
* @return data type. May return {@code null}, if the event does not contain any data.
353353
*/
354354
public Class<?> getType() {
355-
return type.getRawType();
355+
return type == null ? null : type.getRawType();
356356
}
357357

358358
/**
@@ -362,11 +362,11 @@ public Class<?> getType() {
362362
* serializing the {@link #getData() event data}.
363363
* </p>
364364
*
365-
* @return generic data type.
365+
* @return generic data type. May return {@code null}, if the event does not contain any data.
366366
* @since 2.3
367367
*/
368368
public Type getGenericType() {
369-
return type.getType();
369+
return type == null ? null : type.getType();
370370
}
371371

372372
/**
@@ -406,7 +406,7 @@ public String getComment() {
406406
* {@link #getType() type}, {@link #getGenericType()} generic type} and {@link #getMediaType()} media type}.
407407
* </p>
408408
*
409-
* @return event data.
409+
* @return event data. May return {@code null}, if the event does not contain any data.
410410
*/
411411
public Object getData() {
412412
return data;

media/sse/src/main/java/org/glassfish/jersey/media/sse/OutboundEventWriter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ public void write(final int i) throws IOException {
155155
}
156156
}
157157
});
158+
entityStream.write(EOL);
158159
}
159160
}
160161
}

0 commit comments

Comments
 (0)