1616
1717package org .glassfish .jersey .tests .e2e .sse ;
1818
19- import java .util .ArrayList ;
20- import java .util .List ;
21- import java .util .concurrent .CountDownLatch ;
22- import java .util .concurrent .TimeUnit ;
19+ import org .glassfish .jersey .server .ResourceConfig ;
20+ import org .glassfish .jersey .server .ServerProperties ;
21+ import org .glassfish .jersey .test .JerseyTest ;
22+ import org .junit .Assert ;
23+ import org .junit .Test ;
2324
25+ import javax .inject .Singleton ;
2426import javax .ws .rs .GET ;
2527import javax .ws .rs .Path ;
2628import javax .ws .rs .PathParam ;
2729import javax .ws .rs .Produces ;
2830import javax .ws .rs .core .Application ;
2931import javax .ws .rs .core .Context ;
3032import javax .ws .rs .core .MediaType ;
33+ import javax .ws .rs .sse .OutboundSseEvent ;
3134import javax .ws .rs .sse .Sse ;
3235import javax .ws .rs .sse .SseBroadcaster ;
3336import javax .ws .rs .sse .SseEventSink ;
3437import javax .ws .rs .sse .SseEventSource ;
35-
36- import javax .inject .Singleton ;
37-
38- import org .glassfish .jersey .server .ResourceConfig ;
39- import org .glassfish .jersey .server .ServerProperties ;
40- import org .glassfish .jersey .test .JerseyTest ;
41-
42- import org .junit .Assert ;
43- import org .junit .Test ;
38+ import java .util .ArrayList ;
39+ import java .util .List ;
40+ import java .util .concurrent .CountDownLatch ;
41+ import java .util .concurrent .TimeUnit ;
4442
4543/**
4644 * JAX-RS {@link javax.ws.rs.sse.SseBroadcaster} test.
@@ -53,13 +51,14 @@ public class BroadcasterTest extends JerseyTest {
5351 static final CountDownLatch txLatch = new CountDownLatch (4 );
5452 private static boolean isSingleton = false ;
5553
56- private static int LATCH_WAIT_TIMEOUT = 16000 ; //timeout to be waited for asynchronous events to be completed
54+ private static int ASYNC_WAIT_TIMEOUT = 1000 ; //timeout for asynchronous events to complete activities
5755
5856 @ Path ("sse" )
5957 @ Singleton
6058 public static class SseResource {
6159 private final Sse sse ;
6260 private SseBroadcaster broadcaster ;
61+ private OutboundSseEvent .Builder builder ;
6362
6463 public SseResource (@ Context final Sse sse ) {
6564 this .sse = sse ;
@@ -71,9 +70,10 @@ public SseResource(@Context final Sse sse) {
7170 @ Path ("events" )
7271 public void getServerSentEvents (@ Context final SseEventSink eventSink , @ Context final Sse sse ) {
7372 isSingleton = this .sse == sse ;
74- eventSink .send (sse .newEventBuilder ().data ("Event1" ).build ());
75- eventSink .send (sse .newEventBuilder ().data ("Event2" ).build ());
76- eventSink .send (sse .newEventBuilder ().data ("Event3" ).build ());
73+ builder = sse .newEventBuilder ();
74+ eventSink .send (builder .data ("Event1" ).build ());
75+ eventSink .send (builder .data ("Event2" ).build ());
76+ eventSink .send (builder .data ("Event3" ).build ());
7777 broadcaster .register (eventSink );
7878 broadcaster .onClose ((subscriber ) -> {
7979 if (subscriber == eventSink ) {
@@ -84,9 +84,10 @@ public void getServerSentEvents(@Context final SseEventSink eventSink, @Context
8484 }
8585
8686 @ Path ("push/{msg}" )
87+ @ Produces (MediaType .SERVER_SENT_EVENTS )
8788 @ GET
8889 public String pushMessage (@ PathParam ("msg" ) final String msg ) {
89- broadcaster .broadcast (sse . newEventBuilder () .data (msg ).build ());
90+ broadcaster .broadcast (builder .data (msg ).build ());
9091 txLatch .countDown ();
9192 return "Broadcasting message: " + msg ;
9293 }
@@ -99,6 +100,55 @@ public String close() {
99100 }
100101 }
101102
103+ /**
104+ * Wrapper to hold results coming from events (including broadcast)
105+ *
106+ * @param <T> type of expected results
107+ */
108+ public static class EventListWrapper <T > {
109+ private final List <T > data ; //event results
110+ private final CountDownLatch eventCountDown ; //count down delay for expected results
111+ private final CountDownLatch broadcastLag = new CountDownLatch (1 ); //broadcast lag
112+ // which shall be hold until thread is ready to process events from broadcast
113+ private static final int LAG_INTERVAL = 1000 ; //broadcast lag timeout - in milliseconds (1s)
114+ private static final int EXPECTED_REGULAR_EVENTS_COUNT = 3 ; //expected regular outbound events
115+
116+ public EventListWrapper (List <T > data , CountDownLatch eventCountDown ) {
117+ this .data = data ;
118+ this .eventCountDown = eventCountDown ;
119+ }
120+
121+ public void add (T msg ) {
122+ data .add (msg );
123+ eventCountDown .countDown ();
124+ if (eventCountDown .getCount () == EXPECTED_REGULAR_EVENTS_COUNT ) { //all regular events are received,
125+ //ready for broadcast
126+ broadcastLag .countDown ();
127+ }
128+ }
129+
130+ public CountDownLatch getEventCountDown () {
131+ return eventCountDown ;
132+ }
133+
134+ public T get (int pos ) {
135+ return data .get (pos );
136+ }
137+
138+ public int size () {
139+ return data .size ();
140+ }
141+
142+ /**
143+ * makes current thread to wait for predefined interval until broadcast is ready
144+ *
145+ * @throws InterruptedException in case of something went wrong
146+ */
147+ public boolean waitBroadcast () throws InterruptedException {
148+ return broadcastLag .await (LAG_INTERVAL , TimeUnit .MILLISECONDS );
149+ }
150+ }
151+
102152 @ Override
103153 protected Application configure () {
104154 final ResourceConfig rc = new ResourceConfig (SseResource .class );
@@ -108,50 +158,42 @@ protected Application configure() {
108158
109159 @ Test
110160 public void test () throws InterruptedException {
111- SseEventSource eventSourceA = SseEventSource .target (target ().path ("sse/events" )).build ();
112- List <String > resultsA1 = new ArrayList <>();
113- List <String > resultsA2 = new ArrayList <>();
114- CountDownLatch a1Latch = new CountDownLatch (5 );
115- CountDownLatch a2Latch = new CountDownLatch (5 );
116- eventSourceA .register ((event ) -> {
117- resultsA1 .add (event .readData ());
118- a1Latch .countDown ();
119- });
120- eventSourceA .register ((event ) -> {
121- resultsA2 .add (event .readData ());
122- a2Latch .countDown ();
123- });
161+ final SseEventSource eventSourceA = SseEventSource .target (target ().path ("sse/events" )).build ();
162+ final EventListWrapper <String > resultsA1 = new EventListWrapper (new ArrayList (), new CountDownLatch (5 ));
163+ final EventListWrapper <String > resultsA2 = new EventListWrapper (new ArrayList (), new CountDownLatch (5 ));
164+
165+ eventSourceA .register (event -> resultsA1 .add (event .readData ()));
166+ eventSourceA .register (event -> resultsA2 .add (event .readData ()));
124167 eventSourceA .open ();
125168
169+ Assert .assertTrue (resultsA1 .waitBroadcast ()); //some delay is required to process consumer and producer
170+ Assert .assertTrue (resultsA2 .waitBroadcast ()); //some delay is required to process consumer and producer
171+
126172 target ().path ("sse/push/firstBroadcast" ).request ().get (String .class );
127173
128174
129- SseEventSource eventSourceB = SseEventSource .target (target ().path ("sse/events" )).build ();
130- List <String > resultsB1 = new ArrayList <>();
131- List <String > resultsB2 = new ArrayList <>();
132- CountDownLatch b1Latch = new CountDownLatch (4 );
133- CountDownLatch b2Latch = new CountDownLatch (4 );
134- eventSourceB .register ((event ) -> {
135- resultsB1 .add (event .readData ());
136- b1Latch .countDown ();
137- });
138- eventSourceB .register ((event ) -> {
139- resultsB2 .add (event .readData ());
140- b2Latch .countDown ();
141- });
175+ final SseEventSource eventSourceB = SseEventSource .target (target ().path ("sse/events" )).build ();
176+ final EventListWrapper <String > resultsB1 = new EventListWrapper (new ArrayList (), new CountDownLatch (4 ));
177+ final EventListWrapper <String > resultsB2 = new EventListWrapper (new ArrayList (), new CountDownLatch (4 ));
178+
179+ eventSourceB .register (event -> resultsB1 .add (event .readData ()));
180+ eventSourceB .register (event -> resultsB2 .add (event .readData ()));
142181 eventSourceB .open ();
143182
183+ Assert .assertTrue (resultsB1 .waitBroadcast ()); //some delay is required to process consumer and producer
184+ Assert .assertTrue (resultsB2 .waitBroadcast ()); //some delay is required to process consumer and producer
185+
144186 target ().path ("sse/push/secondBroadcast" ).request ().get (String .class );
145187
146188 Assert .assertTrue ("Waiting for resultsA1 to be complete failed." ,
147- a1Latch . await (LATCH_WAIT_TIMEOUT , TimeUnit .MILLISECONDS ));
189+ resultsA1 . getEventCountDown (). await (ASYNC_WAIT_TIMEOUT , TimeUnit .MILLISECONDS ));
148190 Assert .assertTrue ("Waiting for resultsA2 to be complete failed." ,
149- a2Latch . await (LATCH_WAIT_TIMEOUT , TimeUnit .MILLISECONDS ));
191+ resultsA2 . getEventCountDown (). await (ASYNC_WAIT_TIMEOUT , TimeUnit .MILLISECONDS ));
150192
151193 Assert .assertTrue ("Waiting for resultsB1 to be complete failed." ,
152- b1Latch . await (LATCH_WAIT_TIMEOUT , TimeUnit .MILLISECONDS ));
194+ resultsB1 . getEventCountDown (). await (ASYNC_WAIT_TIMEOUT , TimeUnit .MILLISECONDS ));
153195 Assert .assertTrue ("Waiting for resultsB2 to be complete failed." ,
154- b2Latch . await (LATCH_WAIT_TIMEOUT , TimeUnit .MILLISECONDS ));
196+ resultsB2 . getEventCountDown (). await (ASYNC_WAIT_TIMEOUT , TimeUnit .MILLISECONDS ));
155197
156198 Assert .assertTrue (txLatch .await (5000 , TimeUnit .MILLISECONDS ));
157199
@@ -186,7 +228,7 @@ public void test() throws InterruptedException {
186228 && resultsB2 .get (2 ).equals ("Event3" )
187229 && resultsB2 .get (3 ).equals ("secondBroadcast" ));
188230 target ().path ("sse/close" ).request ().get ();
189- Assert . assertTrue ( closeLatch .await (LATCH_WAIT_TIMEOUT , TimeUnit . MILLISECONDS ) );
231+ closeLatch .await ();
190232 Assert .assertTrue ("Sse instances injected into resource and constructor differ. Sse should have been injected"
191233 + "as a singleton" , isSingleton );
192234 }
0 commit comments