1010import java .util .concurrent .Flow .Publisher ;
1111import java .util .concurrent .Flow .Subscriber ;
1212import java .util .concurrent .Flow .Subscription ;
13+ import java .util .function .Consumer ;
1314
1415import jakarta .ws .rs .core .MediaType ;
1516import jakarta .ws .rs .sse .OutboundSseEvent ;
@@ -46,32 +47,104 @@ public void setStreamingResponseCustomizers(List<StreamingResponseCustomizer> st
4647 this .streamingResponseCustomizers = streamingResponseCustomizers ;
4748 }
4849
50+ @ SuppressWarnings ("rawtypes" )
4951 private static class SseMultiSubscriber extends AbstractMultiSubscriber {
5052
53+ private final Publisher publisher ;
54+ // 0: no items have been pushed by the stream
55+ // 1: the first item has been pushed by the stream, and we have yet to send the empty buffer (with the headers)
56+ // 2: the empty buffer (with the headers) was sent, and we have received a response
57+ // 3: all items pulled from upstream and successfully sent downstream
58+ // 4: we got an error sending an item
59+ private volatile int state = 0 ;
60+
5161 SseMultiSubscriber (ResteasyReactiveRequestContext requestContext , List <StreamingResponseCustomizer > staticCustomizers ,
52- long demand ) {
62+ Publisher publisher , long demand ) {
5363 super (requestContext , staticCustomizers , demand );
64+ this .publisher = publisher ;
65+ }
66+
67+ @ Override
68+ public void onSubscribe (Subscription s ) {
69+ this .subscription = s ;
70+ // we only request one item initially because we need to use that item to create the headers
71+ // that will be sent in the first empty response
72+ s .request (1 );
73+ }
74+
75+ @ Override
76+ public void onComplete () {
77+ // make sure we don't trigger cancel with our onCloseHandler
78+ weClosed = true ;
79+ if (state == 1 ) { // stream only had one item that we have yet to send (we are waiting for the empty buffer to be sent)
80+ // do nothing as we still need to send the first item
81+ // the connection will be closed by doSend when the item is sent
82+ } else if (state < 3 ) {
83+ doClose ();
84+ } else {
85+ handleException (requestContext , new IllegalStateException ("Unexpected state: " + state ));
86+ }
5487 }
5588
5689 @ Override
5790 public void onNext (Object item ) {
58- OutboundSseEvent event ;
59- if (item instanceof OutboundSseEvent ) {
60- event = (OutboundSseEvent ) item ;
91+ if (state == 0 ) { // first item
92+ state = 1 ;
93+ SseUtil .setHeaders (requestContext , requestContext .serverResponse (),
94+ determineCustomizers (publisher , true , staticCustomizers ));
95+
96+ requestContext .serverResponse ().write (EMPTY_BUFFER , new Consumer <>() {
97+ @ Override
98+ public void accept (Throwable throwable ) {
99+ if (throwable == null ) {
100+ state = 2 ;
101+ // now we can actually send the first item
102+ doSend (item );
103+ } else {
104+ state = 4 ;
105+ requestContext .resume (throwable );
106+ }
107+ }
108+ });
109+ } else if (state == 2 ) { // the only should have got here is when the empty buffer was sent
110+ doSend (item );
61111 } else {
62- event = new OutboundSseEventImpl . BuilderImpl (). data ( item ). build ( );
112+ handleException ( requestContext , new IllegalStateException ( "Unexpected state: " + state ) );
63113 }
64- SseUtil .send (requestContext , event , staticCustomizers ).whenComplete ((v , t ) -> {
114+ }
115+
116+ private void doSend (Object item ) {
117+ SseUtil .send (requestContext , fromItem (item ), staticCustomizers ).whenComplete ((v , t ) -> {
65118 if (t != null ) {
119+ state = 4 ;
66120 // need to cancel because the exception didn't come from the Multi
67121 subscription .cancel ();
68122 handleException (requestContext , t );
123+ } else if (weClosed && !requestContext .serverResponse ().closed ()) {
124+ // this is the case where the stream only had one item so we need to close the connection as onComplete could not do it at the time it was called
125+ doClose ();
69126 } else {
70127 // send in the next item
71128 subscription .request (demand );
72129 }
73130 });
74131 }
132+
133+ private void doClose () {
134+ state = 3 ;
135+ requestContext .serverResponse ().end ();
136+ requestContext .close ();
137+ }
138+
139+ private OutboundSseEvent fromItem (Object item ) {
140+ OutboundSseEvent event ;
141+ if (item instanceof OutboundSseEvent ) {
142+ event = (OutboundSseEvent ) item ;
143+ } else {
144+ event = new OutboundSseEventImpl .BuilderImpl ().data (item ).build ();
145+ }
146+ return event ;
147+ }
75148 }
76149
77150 @ SuppressWarnings ("rawtypes" )
@@ -103,7 +176,7 @@ private static class StreamingMultiSubscriber extends AbstractMultiSubscriber {
103176
104177 @ Override
105178 public void onNext (Object item ) {
106- List <StreamingResponseCustomizer > customizers = determineCustomizers (!hadItem );
179+ List <StreamingResponseCustomizer > customizers = determineCustomizers (publisher , !hadItem , staticCustomizers );
107180 hadItem = true ;
108181 StreamingUtil .send (requestContext , customizers , item , messagePrefix (), messageSuffix ())
109182 .handle ((v , t ) -> {
@@ -125,33 +198,12 @@ public void onNext(Object item) {
125198 });
126199 }
127200
128- private List <StreamingResponseCustomizer > determineCustomizers (boolean isFirst ) {
129- // we only need to obtain the customizers from the Publisher if it's the first time we are sending data and the Publisher has customizable data
130- // at this point no matter the type of RestMulti we can safely obtain the headers and status
131- if (isFirst && (publisher instanceof RestMulti <?> restMulti )) {
132- Map <String , List <String >> headers = restMulti .getHeaders ();
133- Integer status = restMulti .getStatus ();
134- if (headers .isEmpty () && (status == null )) {
135- return staticCustomizers ;
136- }
137- List <StreamingResponseCustomizer > result = new ArrayList <>(staticCustomizers .size () + 2 );
138- result .addAll (staticCustomizers ); // these are added first so that the result specific values will take precedence if there are conflicts
139- if (!headers .isEmpty ()) {
140- result .add (new StreamingResponseCustomizer .AddHeadersCustomizer (headers ));
141- }
142- if (status != null ) {
143- result .add (new StreamingResponseCustomizer .StatusCustomizer (status ));
144- }
145- return result ;
146- }
147-
148- return staticCustomizers ;
149- }
150-
151201 @ Override
152202 public void onComplete () {
153203 if (!hadItem ) {
154- StreamingUtil .setHeaders (requestContext , requestContext .serverResponse (), this .determineCustomizers (true ));
204+ StreamingUtil .setHeaders (requestContext , requestContext .serverResponse (), determineCustomizers (
205+ this .publisher , true ,
206+ this .staticCustomizers ));
155207 }
156208 if (json ) {
157209 String postfix = onCompleteText ();
@@ -202,7 +254,7 @@ static abstract class AbstractMultiSubscriber implements Subscriber<Object> {
202254 protected final long demand ;
203255
204256 protected volatile Subscription subscription ;
205- private volatile boolean weClosed = false ;
257+ protected volatile boolean weClosed = false ;
206258
207259 AbstractMultiSubscriber (ResteasyReactiveRequestContext requestContext ,
208260 List <StreamingResponseCustomizer > staticCustomizers , long demand ) {
@@ -218,6 +270,31 @@ static abstract class AbstractMultiSubscriber implements Subscriber<Object> {
218270 });
219271 }
220272
273+ @ SuppressWarnings ("rawtypes" )
274+ protected static List <StreamingResponseCustomizer > determineCustomizers (Publisher publisher , boolean isFirst ,
275+ List <StreamingResponseCustomizer > staticCustomizers ) {
276+ // we only need to obtain the customizers from the Publisher if it's the first time we are sending data and the Publisher has customizable data
277+ // at this point no matter the type of RestMulti we can safely obtain the headers and status
278+ if (isFirst && (publisher instanceof RestMulti <?> restMulti )) {
279+ Map <String , List <String >> headers = restMulti .getHeaders ();
280+ Integer status = restMulti .getStatus ();
281+ if (headers .isEmpty () && (status == null )) {
282+ return staticCustomizers ;
283+ }
284+ List <StreamingResponseCustomizer > result = new ArrayList <>(staticCustomizers .size () + 2 );
285+ result .addAll (staticCustomizers ); // these are added first so that the result specific values will take precedence if there are conflicts
286+ if (!headers .isEmpty ()) {
287+ result .add (new StreamingResponseCustomizer .AddHeadersCustomizer (headers ));
288+ }
289+ if (status != null ) {
290+ result .add (new StreamingResponseCustomizer .StatusCustomizer (status ));
291+ }
292+ return result ;
293+ }
294+
295+ return staticCustomizers ;
296+ }
297+
221298 @ Override
222299 public void onSubscribe (Subscription s ) {
223300 this .subscription = s ;
@@ -343,15 +420,8 @@ private void handleSse(ResteasyReactiveRequestContext requestContext, Publisher<
343420 demand = 1L ;
344421 }
345422
346- SseUtil .setHeaders (requestContext , requestContext .serverResponse (), streamingResponseCustomizers );
347423 requestContext .suspend ();
348- requestContext .serverResponse ().write (EMPTY_BUFFER , throwable -> {
349- if (throwable == null ) {
350- result .subscribe (new SseMultiSubscriber (requestContext , streamingResponseCustomizers , demand ));
351- } else {
352- requestContext .resume (throwable );
353- }
354- });
424+ result .subscribe (new SseMultiSubscriber (requestContext , streamingResponseCustomizers , result , demand ));
355425 }
356426
357427 public interface StreamingResponseCustomizer {
0 commit comments