Skip to content

Commit 876c83d

Browse files
committed
Propagate WebApplicationException from SseEventSource to provided error handler
Signed-off-by: jansupol <[email protected]>
1 parent ff92b7e commit 876c83d

File tree

3 files changed

+166
-2
lines changed

3 files changed

+166
-2
lines changed

media/sse/src/main/java/org/glassfish/jersey/media/sse/internal/EventProcessor.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2017, 2024 Oracle and/or its affiliates. All rights reserved.
33
*
44
* This program and the accompanying materials are made available under the
55
* terms of the Eclipse Public License v. 2.0, which is available at
@@ -24,6 +24,7 @@
2424
import java.util.concurrent.CountDownLatch;
2525
import java.util.concurrent.TimeUnit;
2626
import java.util.concurrent.atomic.AtomicReference;
27+
import java.util.function.Consumer;
2728
import java.util.logging.Level;
2829
import java.util.logging.Logger;
2930

@@ -87,6 +88,10 @@ public class EventProcessor implements Runnable, EventListener {
8788
* A map of listeners bound to receive only events of a particular name.
8889
*/
8990
private final Map<String, List<EventListener>> boundListeners;
91+
/**
92+
* A list of Error Consumers.
93+
*/
94+
private final List<Consumer<Throwable>> throwableConsumers;
9095

9196
/**
9297
* Shutdown handler is invoked when Event processor reaches terminal stage.
@@ -111,6 +116,7 @@ private EventProcessor(final EventProcessor that) {
111116
this.unboundListeners = that.unboundListeners;
112117
this.eventListener = that.eventListener;
113118
this.shutdownHandler = that.shutdownHandler;
119+
this.throwableConsumers = that.throwableConsumers;
114120
}
115121

116122
private EventProcessor(Builder builder) {
@@ -128,6 +134,7 @@ private EventProcessor(Builder builder) {
128134
this.unboundListeners = builder.unboundListeners == null ? Collections.EMPTY_LIST : builder.unboundListeners;
129135
this.eventListener = builder.eventListener;
130136
this.shutdownHandler = builder.shutdownHandler;
137+
this.throwableConsumers = builder.throwableConsumers;
131138
}
132139

133140
/**
@@ -199,6 +206,16 @@ public void run() {
199206
}
200207
// if we're here, an unrecoverable error has occurred - just turn off the lights...
201208
shutdownHandler.shutdown();
209+
// and notify error handlers
210+
if (throwableConsumers != null) {
211+
for (Consumer<Throwable> consumer : throwableConsumers) {
212+
try {
213+
consumer.accept(ex);
214+
} catch (Throwable throwable) {
215+
LOGGER.fine(String.format("User throwable ignored: %s", throwable.getMessage()));
216+
}
217+
}
218+
}
202219
} finally {
203220
if (eventInput != null && !eventInput.isClosed()) {
204221
eventInput.close();
@@ -357,6 +374,7 @@ public static class Builder {
357374
private boolean disableKeepAlive;
358375
private List<EventListener> unboundListeners;
359376
private Map<String, List<EventListener>> boundListeners;
377+
private List<Consumer<Throwable>> throwableConsumers = null;
360378

361379
private Builder(WebTarget target,
362380
AtomicReference<State> state,
@@ -420,6 +438,17 @@ public Builder disableKeepAlive() {
420438
return this;
421439
}
422440

441+
/**
442+
* Set the consumers of {@link Throwable} occurring during connection.
443+
*
444+
* @param throwableConsumers a list of consumers of throwable.
445+
* @return updated builder instance.
446+
*/
447+
public Builder throwableConsumers(List<Consumer<Throwable>> throwableConsumers) {
448+
this.throwableConsumers = throwableConsumers;
449+
return this;
450+
}
451+
423452
/**
424453
* Build the {@link EventProcessor}.
425454
*

media/sse/src/main/java/org/glassfish/jersey/media/sse/internal/JerseySseEventSource.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2017, 2024 Oracle and/or its affiliates. All rights reserved.
33
*
44
* This program and the accompanying materials are made available under the
55
* terms of the Eclipse Public License v. 2.0, which is available at
@@ -16,6 +16,8 @@
1616

1717
package org.glassfish.jersey.media.sse.internal;
1818

19+
import java.util.ArrayList;
20+
import java.util.List;
1921
import java.util.Objects;
2022
import java.util.concurrent.TimeUnit;
2123
import java.util.concurrent.atomic.AtomicReference;
@@ -72,6 +74,10 @@ public class JerseySseEventSource implements SseEventSource {
7274
* Client provided executor facade.
7375
*/
7476
private final ClientExecutor clientExecutor;
77+
/**
78+
* List of Throwable consumers passed to EventProcessor.Builder.
79+
*/
80+
private final List<Consumer<Throwable>> throwableConsumers = new ArrayList<>();
7581

7682
/**
7783
* Private constructor.
@@ -110,11 +116,13 @@ public void register(final Consumer<InboundSseEvent> onEvent) {
110116
public void register(final Consumer<InboundSseEvent> onEvent, final Consumer<Throwable> onError) {
111117
this.subscribe(DEFAULT_SUBSCRIPTION_HANDLER, onEvent, onError, () -> {
112118
});
119+
throwableConsumers.add(onError);
113120
}
114121

115122
@Override
116123
public void register(final Consumer<InboundSseEvent> onEvent, final Consumer<Throwable> onError, final Runnable onComplete) {
117124
this.subscribe(DEFAULT_SUBSCRIPTION_HANDLER, onEvent, onError, onComplete);
125+
throwableConsumers.add(onError);
118126
}
119127

120128
private void subscribe(final Consumer<Flow.Subscription> onSubscribe,
@@ -173,6 +181,7 @@ public void open() {
173181
EventProcessor processor = EventProcessor
174182
.builder(endpoint, state, clientExecutor, this::onEvent, this::close)
175183
.reconnectDelay(reconnectDelay, reconnectTimeUnit)
184+
.throwableConsumers(throwableConsumers)
176185
.build();
177186
clientExecutor.submit(processor);
178187

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Copyright (c) 2024 Oracle and/or its affiliates. All rights reserved.
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Eclipse Public License v. 2.0, which is available at
6+
* http://www.eclipse.org/legal/epl-2.0.
7+
*
8+
* This Source Code may also be made available under the following Secondary
9+
* Licenses when the conditions for such availability set forth in the
10+
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
11+
* version 2 with the GNU Classpath Exception, which is available at
12+
* https://www.gnu.org/software/classpath/license.html.
13+
*
14+
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
15+
*/
16+
17+
package org.glassfish.jersey.media.sse;
18+
19+
import org.glassfish.jersey.server.ResourceConfig;
20+
import org.glassfish.jersey.test.JerseyTest;
21+
import org.hamcrest.MatcherAssert;
22+
import org.hamcrest.Matchers;
23+
import org.junit.jupiter.api.Test;
24+
25+
import javax.ws.rs.BadRequestException;
26+
import javax.ws.rs.GET;
27+
import javax.ws.rs.InternalServerErrorException;
28+
import javax.ws.rs.NotFoundException;
29+
import javax.ws.rs.Path;
30+
import javax.ws.rs.Produces;
31+
import javax.ws.rs.WebApplicationException;
32+
import javax.ws.rs.client.WebTarget;
33+
import javax.ws.rs.core.Application;
34+
import javax.ws.rs.core.Context;
35+
import javax.ws.rs.sse.InboundSseEvent;
36+
import javax.ws.rs.sse.Sse;
37+
import javax.ws.rs.sse.SseEventSink;
38+
import javax.ws.rs.sse.SseEventSource;
39+
import java.util.concurrent.CountDownLatch;
40+
import java.util.concurrent.TimeUnit;
41+
import java.util.concurrent.atomic.AtomicReference;
42+
import java.util.function.Consumer;
43+
44+
public class SseEventSourceRegisterErrorHandlerTest extends JerseyTest {
45+
@Path("sse")
46+
public static class SseEventSourceRegisterTestSseEndpoint {
47+
48+
@Path("hello")
49+
@GET
50+
@Produces(SseFeature.SERVER_SENT_EVENTS)
51+
public void hello(@Context SseEventSink output, @Context Sse sse) throws InterruptedException {
52+
output.send(sse.newEvent("HELLO"));
53+
}
54+
55+
@Path("close")
56+
@GET
57+
@Produces(SseFeature.SERVER_SENT_EVENTS)
58+
public void close(@Context SseEventSink output, @Context Sse sse) throws InterruptedException {
59+
output.close();
60+
}
61+
62+
@Path("500")
63+
@GET
64+
@Produces(SseFeature.SERVER_SENT_EVENTS)
65+
public void throw500(@Context SseEventSink output, @Context Sse sse) throws InterruptedException {
66+
throw new WebApplicationException();
67+
}
68+
69+
@Path("400")
70+
@GET
71+
@Produces(SseFeature.SERVER_SENT_EVENTS)
72+
public void throw400(@Context SseEventSink output, @Context Sse sse) throws InterruptedException {
73+
throw new BadRequestException();
74+
}
75+
}
76+
77+
@Override
78+
protected Application configure() {
79+
return new ResourceConfig(SseEventSourceRegisterTestSseEndpoint.class);
80+
}
81+
82+
private static final Consumer<InboundSseEvent> EMPTY = event -> {
83+
};
84+
85+
@Test
86+
public void testConnection404() throws InterruptedException {
87+
WebTarget sseTarget = target("sse");
88+
AtomicReference<Throwable> throwable = new AtomicReference<>();
89+
CountDownLatch completeLatch = new CountDownLatch(1);
90+
91+
SseEventSource eventSource = SseEventSource.target(sseTarget).build();
92+
eventSource.register(EMPTY, throwable::set, completeLatch::countDown);
93+
eventSource.open();
94+
completeLatch.await(10_000, TimeUnit.MILLISECONDS);
95+
MatcherAssert.assertThat(throwable.get(), Matchers.notNullValue());
96+
MatcherAssert.assertThat(throwable.get().getClass(), Matchers.is(NotFoundException.class));
97+
}
98+
99+
@Test
100+
public void testError500() throws InterruptedException {
101+
WebTarget sseTarget = target("sse/500");
102+
AtomicReference<Throwable> throwable = new AtomicReference<>();
103+
CountDownLatch completeLatch = new CountDownLatch(1);
104+
105+
SseEventSource eventSource = SseEventSource.target(sseTarget).build();
106+
eventSource.register(EMPTY, throwable::set, completeLatch::countDown);
107+
eventSource.open();
108+
completeLatch.await(10_000, TimeUnit.MILLISECONDS);
109+
MatcherAssert.assertThat(throwable.get(), Matchers.notNullValue());
110+
MatcherAssert.assertThat(throwable.get().getClass(), Matchers.is(InternalServerErrorException.class));
111+
}
112+
113+
@Test
114+
public void testError400() throws InterruptedException {
115+
WebTarget sseTarget = target("sse/400");
116+
AtomicReference<Throwable> throwable = new AtomicReference<>();
117+
CountDownLatch completeLatch = new CountDownLatch(1);
118+
119+
SseEventSource eventSource = SseEventSource.target(sseTarget).build();
120+
eventSource.register(EMPTY, throwable::set, completeLatch::countDown);
121+
eventSource.open();
122+
completeLatch.await(10_000, TimeUnit.MILLISECONDS);
123+
MatcherAssert.assertThat(throwable.get(), Matchers.notNullValue());
124+
MatcherAssert.assertThat(throwable.get().getClass(), Matchers.is(BadRequestException.class));
125+
}
126+
}

0 commit comments

Comments
 (0)