Skip to content
This repository was archived by the owner on May 28, 2018. It is now read-only.

Commit 5fbc282

Browse files
committed
Making SseEventSink#send asynchronous.
First send will trigger sending a response headers and then sending of an event. This alings the behavior to a case, when SseEventSink is accessed from outside of a resource method. Change-Id: I2ab8bda71d698ed8cb728295b488d45afc42494d
1 parent 396b5b8 commit 5fbc282

File tree

18 files changed

+269
-42
lines changed

18 files changed

+269
-42
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
* only if the new code is made subject to such option by the copyright
3838
* holder.
3939
*/
40-
package org.glassfish.jersey.server.internal.process;
40+
package org.glassfish.jersey.server;
4141

4242
import javax.ws.rs.container.AsyncResponse;
4343
import javax.ws.rs.core.Response;

core-server/src/main/java/org/glassfish/jersey/server/ChunkedOutput.java

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,11 @@
5252
import javax.ws.rs.core.GenericType;
5353
import javax.ws.rs.ext.WriterInterceptor;
5454

55-
import org.glassfish.jersey.internal.util.collection.Value;
55+
import javax.inject.Provider;
56+
5657
import org.glassfish.jersey.process.internal.RequestContext;
5758
import org.glassfish.jersey.process.internal.RequestScope;
5859
import org.glassfish.jersey.server.internal.LocalizationMessages;
59-
import org.glassfish.jersey.server.internal.process.AsyncContext;
6060
import org.glassfish.jersey.server.internal.process.MappableException;
6161

6262
/**
@@ -75,14 +75,19 @@ public class ChunkedOutput<T> extends GenericType<T> implements Closeable {
7575
private final BlockingDeque<T> queue = new LinkedBlockingDeque<>();
7676
private final byte[] chunkDelimiter;
7777

78-
private volatile boolean closed = false;
7978
private boolean flushing = false;
79+
80+
private volatile boolean closed = false;
81+
private volatile boolean resumed = false;
82+
83+
private volatile AsyncContext asyncContext;
84+
8085
private volatile RequestScope requestScope;
8186
private volatile RequestContext requestScopeContext;
8287
private volatile ContainerRequest requestContext;
8388
private volatile ContainerResponse responseContext;
8489
private volatile ConnectionCallback connectionCallback;
85-
private volatile Value<AsyncContext> asyncContext;
90+
8691

8792
/**
8893
* Create new {@code ChunkedOutput}.
@@ -116,6 +121,23 @@ protected ChunkedOutput(final byte[] chunkDelimiter) {
116121
}
117122
}
118123

124+
/**
125+
* Create new {@code ChunkedOutput} with a custom chunk delimiter.
126+
*
127+
* @param chunkDelimiter custom chunk delimiter bytes. Must not be {code null}.
128+
* @since 2.4.1
129+
*/
130+
protected ChunkedOutput(final byte[] chunkDelimiter, Provider<AsyncContext> asyncContextProvider) {
131+
if (chunkDelimiter.length > 0) {
132+
this.chunkDelimiter = new byte[chunkDelimiter.length];
133+
System.arraycopy(chunkDelimiter, 0, this.chunkDelimiter, 0, chunkDelimiter.length);
134+
} else {
135+
this.chunkDelimiter = ZERO_LENGTH_DELIMITER;
136+
}
137+
138+
this.asyncContext = asyncContextProvider == null ? null : asyncContextProvider.get();
139+
}
140+
119141
/**
120142
* Create new {@code ChunkedOutput} with a custom chunk delimiter.
121143
*
@@ -181,7 +203,12 @@ public void write(final T chunk) throws IOException {
181203
flushQueue();
182204
}
183205

184-
private void flushQueue() throws IOException {
206+
protected void flushQueue() throws IOException {
207+
if (!resumed && asyncContext != null) {
208+
resumed = true;
209+
asyncContext.resume(this);
210+
}
211+
185212
if (requestScopeContext == null || requestContext == null || responseContext == null) {
186213
return;
187214
}
@@ -245,11 +272,11 @@ public Void call() throws IOException {
245272
responseContext.setEntityStream(writtenStream);
246273
}
247274
} catch (final IOException ioe) {
248-
connectionCallback.onDisconnect(asyncContext.get());
275+
connectionCallback.onDisconnect(asyncContext);
249276
throw ioe;
250277
} catch (final MappableException mpe) {
251278
if (mpe.getCause() instanceof IOException) {
252-
connectionCallback.onDisconnect(asyncContext.get());
279+
connectionCallback.onDisconnect(asyncContext);
253280
}
254281
throw mpe;
255282
}
@@ -355,21 +382,18 @@ public String toString() {
355382
* @param requestContext request context.
356383
* @param responseContext response context.
357384
* @param connectionCallbackRunner connection callback.
358-
* @param asyncContext async context value.
359385
* @throws IOException when encountered any problem during serializing or writing a chunk.
360386
*/
361387
void setContext(final RequestScope requestScope,
362388
final RequestContext requestScopeContext,
363389
final ContainerRequest requestContext,
364390
final ContainerResponse responseContext,
365-
final ConnectionCallback connectionCallbackRunner,
366-
final Value<AsyncContext> asyncContext) throws IOException {
391+
final ConnectionCallback connectionCallbackRunner) throws IOException {
367392
this.requestScope = requestScope;
368393
this.requestScopeContext = requestScopeContext;
369394
this.requestContext = requestContext;
370395
this.responseContext = responseContext;
371396
this.connectionCallback = connectionCallbackRunner;
372-
this.asyncContext = asyncContext;
373397
flushQueue();
374398
}
375399
}

core-server/src/main/java/org/glassfish/jersey/server/ServerRuntime.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,6 @@
101101
import org.glassfish.jersey.server.internal.monitoring.EmptyRequestEventBuilder;
102102
import org.glassfish.jersey.server.internal.monitoring.RequestEventBuilder;
103103
import org.glassfish.jersey.server.internal.monitoring.RequestEventImpl;
104-
import org.glassfish.jersey.server.internal.process.AsyncContext;
105104
import org.glassfish.jersey.server.internal.process.Endpoint;
106105
import org.glassfish.jersey.server.internal.process.MappableException;
107106
import org.glassfish.jersey.server.internal.process.RequestProcessingContext;
@@ -115,10 +114,10 @@
115114
import org.glassfish.jersey.server.spi.ResponseErrorMapper;
116115
import org.glassfish.jersey.spi.ExceptionMappers;
117116

118-
import static org.glassfish.jersey.server.internal.process.AsyncContext.State.COMPLETED;
119-
import static org.glassfish.jersey.server.internal.process.AsyncContext.State.RESUMED;
120-
import static org.glassfish.jersey.server.internal.process.AsyncContext.State.RUNNING;
121-
import static org.glassfish.jersey.server.internal.process.AsyncContext.State.SUSPENDED;
117+
import static org.glassfish.jersey.server.AsyncContext.State.COMPLETED;
118+
import static org.glassfish.jersey.server.AsyncContext.State.RESUMED;
119+
import static org.glassfish.jersey.server.AsyncContext.State.RUNNING;
120+
import static org.glassfish.jersey.server.AsyncContext.State.SUSPENDED;
122121

123122
/**
124123
* Server-side request processing runtime.
@@ -714,8 +713,7 @@ public OutputStream getOutputStream(final int contentLength) throws IOException
714713
runtime.requestScope.referenceCurrent(),
715714
request,
716715
response,
717-
connectionCallbackRunner,
718-
processingContext.asyncContextValue());
716+
connectionCallbackRunner);
719717
} catch (final IOException ex) {
720718
LOGGER.log(Level.SEVERE, LocalizationMessages.ERROR_WRITING_RESPONSE_ENTITY_CHUNK(), ex);
721719
close = true;

core-server/src/main/java/org/glassfish/jersey/server/internal/inject/AsyncResponseValueParamProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
import javax.inject.Provider;
4848

4949
import org.glassfish.jersey.server.ContainerRequest;
50-
import org.glassfish.jersey.server.internal.process.AsyncContext;
50+
import org.glassfish.jersey.server.AsyncContext;
5151
import org.glassfish.jersey.server.model.Parameter;
5252
import org.glassfish.jersey.server.spi.internal.ValueParamProvider;
5353

core-server/src/main/java/org/glassfish/jersey/server/internal/inject/ValueParamProviderConfigurator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@
7171
import org.glassfish.jersey.server.ContainerRequest;
7272
import org.glassfish.jersey.server.ServerBootstrapBag;
7373
import org.glassfish.jersey.server.Uri;
74-
import org.glassfish.jersey.server.internal.process.AsyncContext;
74+
import org.glassfish.jersey.server.AsyncContext;
7575
import org.glassfish.jersey.server.internal.process.RequestProcessingContextReference;
7676
import org.glassfish.jersey.server.spi.internal.ValueParamProvider;
7777

core-server/src/main/java/org/glassfish/jersey/server/internal/process/RequestProcessingConfigurator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.glassfish.jersey.internal.inject.AbstractBinder;
5858
import org.glassfish.jersey.internal.inject.InjectionManager;
5959
import org.glassfish.jersey.process.internal.RequestScoped;
60+
import org.glassfish.jersey.server.AsyncContext;
6061
import org.glassfish.jersey.server.CloseableService;
6162
import org.glassfish.jersey.server.ContainerRequest;
6263
import org.glassfish.jersey.server.ExtendedUriInfo;

core-server/src/main/java/org/glassfish/jersey/server/internal/process/RequestProcessingContext.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.glassfish.jersey.internal.util.collection.Values;
5050
import org.glassfish.jersey.process.internal.ChainableStage;
5151
import org.glassfish.jersey.process.internal.Stage;
52+
import org.glassfish.jersey.server.AsyncContext;
5253
import org.glassfish.jersey.server.CloseableService;
5354
import org.glassfish.jersey.server.ContainerRequest;
5455
import org.glassfish.jersey.server.ContainerResponse;
@@ -155,7 +156,7 @@ public CloseableService closeableService() {
155156

156157

157158
/**
158-
* Lazily initialize {@link org.glassfish.jersey.server.internal.process.AsyncContext} for this
159+
* Lazily initialize {@link AsyncContext} for this
159160
* request processing context.
160161
* <p>
161162
* The {@code lazyContextValue} will be only invoked once during the first call to {@link #asyncContext()}.

core-server/src/main/java/org/glassfish/jersey/server/model/IntrospectionModeller.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/*
22
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
33
*
4-
* Copyright (c) 2010-2015 Oracle and/or its affiliates. All rights reserved.
4+
* Copyright (c) 2010-2017 Oracle and/or its affiliates. All rights reserved.
55
*
66
* The contents of this file are subject to the terms of either the GNU
77
* General Public License Version 2 only ("GPL") or the Common Development
@@ -63,6 +63,7 @@
6363
import javax.ws.rs.container.AsyncResponse;
6464
import javax.ws.rs.container.Suspended;
6565
import javax.ws.rs.core.MediaType;
66+
import javax.ws.rs.sse.SseEventSink;
6667

6768
import org.glassfish.jersey.internal.Errors;
6869
import org.glassfish.jersey.internal.util.Producer;
@@ -310,10 +311,15 @@ private static void introspectAsyncFeatures(AnnotatedMethod am, ResourceMethod.B
310311
for (Annotation annotation : annotations) {
311312
if (annotation.annotationType() == Suspended.class) {
312313
resourceMethodBuilder.suspended(AsyncResponse.NO_TIMEOUT, TimeUnit.MILLISECONDS);
313-
return;
314314
}
315315
}
316316
}
317+
318+
for (Class<?> paramType : am.getParameterTypes()) {
319+
if (SseEventSink.class.equals(paramType)) {
320+
resourceMethodBuilder.sse();
321+
}
322+
}
317323
}
318324

319325
private void addResourceMethods(

core-server/src/main/java/org/glassfish/jersey/server/model/ResourceMethod.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ public static final class Builder {
148148
private final Set<MediaType> producedTypes;
149149
// Suspendable
150150
private boolean managedAsync;
151+
private boolean sse;
151152
private boolean suspended;
152153
private long suspendTimeout;
153154
private TimeUnit suspendTimeoutUnit;
@@ -367,6 +368,17 @@ public Builder suspended(long timeout, TimeUnit unit) {
367368
return this;
368369
}
369370

371+
/**
372+
* Set the SSE flag on the method model to {@code true}.
373+
*
374+
* @return updated builder object.
375+
*/
376+
public Builder sse() {
377+
sse = true;
378+
379+
return this;
380+
}
381+
370382
/**
371383
* Set the managed async required flag on the method model to {@code true}.
372384
*
@@ -527,6 +539,7 @@ public ResourceMethod build() {
527539
producedTypes,
528540
managedAsync,
529541
suspended,
542+
sse,
530543
suspendTimeout,
531544
suspendTimeoutUnit,
532545
createInvocable(),
@@ -567,6 +580,7 @@ private Invocable createInvocable() {
567580
// SuspendableComponent
568581
private final boolean managedAsync;
569582
private final boolean suspended;
583+
private final boolean sse;
570584
private final long suspendTimeout;
571585
private final TimeUnit suspendTimeoutUnit;
572586
// Invocable
@@ -579,7 +593,7 @@ private Invocable createInvocable() {
579593
private Data(final String httpMethod,
580594
final Collection<MediaType> consumedTypes,
581595
final Collection<MediaType> producedTypes,
582-
boolean managedAsync, final boolean suspended,
596+
boolean managedAsync, final boolean suspended, boolean sse,
583597
final long suspendTimeout,
584598
final TimeUnit suspendTimeoutUnit,
585599
final Invocable invocable,
@@ -594,6 +608,7 @@ private Data(final String httpMethod,
594608
this.producedTypes = Collections.unmodifiableList(new ArrayList<>(producedTypes));
595609
this.invocable = invocable;
596610
this.suspended = suspended;
611+
this.sse = sse;
597612
this.suspendTimeout = suspendTimeout;
598613
this.suspendTimeoutUnit = suspendTimeoutUnit;
599614

@@ -660,6 +675,15 @@ private Data(final String httpMethod,
660675
return suspended;
661676
}
662677

678+
/**
679+
* Flag indicating whether the method requires injection of Sse Event Sink.
680+
*
681+
* @return {@code true} if the method requires injection of Sse Event Sink, {@code false} otherwise.
682+
*/
683+
/* package */ boolean isSse() {
684+
return sse;
685+
}
686+
663687
/**
664688
* Get the suspended timeout value for the method.
665689
*
@@ -850,6 +874,15 @@ public boolean isSuspendDeclared() {
850874
return data.isSuspended();
851875
}
852876

877+
/**
878+
* Check whether the resource method will be producing Server-sent event stream.
879+
*
880+
* @return {@code true} if the resource method produces Server-sent event stream, {@code false} otherwise.
881+
*/
882+
public boolean isSse() {
883+
return data.isSse();
884+
}
885+
853886
@Override
854887
public boolean isManagedAsyncDeclared() {
855888
return data.isManagedAsync();

core-server/src/main/java/org/glassfish/jersey/server/model/ResourceMethodInvoker.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ public ContainerResponse apply(final RequestProcessingContext processingContext)
394394
final ContainerRequest request = processingContext.request();
395395
final Object resource = processingContext.routingContext().peekMatchedResource();
396396

397-
if (method.isSuspendDeclared() || method.isManagedAsyncDeclared()) {
397+
if (method.isSuspendDeclared() || method.isManagedAsyncDeclared() || method.isSse()) {
398398
if (!processingContext.asyncContext().suspend()) {
399399
throw new ProcessingException(LocalizationMessages.ERROR_SUSPENDING_ASYNC_REQUEST());
400400
}
@@ -414,6 +414,11 @@ public ContainerResponse apply(final RequestProcessingContext processingContext)
414414
// TODO replace with processing context factory method.
415415
Response response = invoke(processingContext, resource);
416416

417+
// we don't care about the response when SseEventSink is injected - it will be sent asynchronously.
418+
if (method.isSse()) {
419+
return null;
420+
}
421+
417422
if (response.hasEntity()) {
418423
Object entityFuture = response.getEntity();
419424
if (entityFuture instanceof CompletionStage) {

0 commit comments

Comments
 (0)