55
66import java .net .URI ;
77import java .util .Arrays ;
8+ import java .util .Base64 ;
89import java .util .List ;
910import java .util .Objects ;
1011import java .util .Random ;
1112import java .util .concurrent .CopyOnWriteArrayList ;
1213import java .util .concurrent .CountDownLatch ;
1314import java .util .concurrent .TimeUnit ;
15+ import java .util .stream .Collectors ;
16+ import java .util .stream .IntStream ;
1417
15- import jakarta .inject .Inject ;
1618import jakarta .ws .rs .GET ;
1719import jakarta .ws .rs .Path ;
1820import jakarta .ws .rs .Produces ;
3436import io .smallrye .mutiny .Multi ;
3537import io .vertx .core .Vertx ;
3638import io .vertx .ext .web .RoutingContext ;
37- import wiremock .org .apache .hc .client5 .http .utils .Base64 ;
3839
3940public class MultiNdjsonTest {
4041 @ RegisterExtension
@@ -110,6 +111,34 @@ void shouldReadNdjsonFromSingleMessage() throws InterruptedException {
110111 assertThat (collected ).hasSize (4 ).containsAll (expected );
111112 }
112113
114+ @ Test
115+ void shouldReadNdjsonFromSingleMessageWithNoDelimiter () throws InterruptedException {
116+ var client = createClient (uri );
117+ var collected = new CopyOnWriteArrayList <Message >();
118+ var completionLatch = new CountDownLatch (1 );
119+ client .readSingleMessageNoDelimiter ().onCompletion ().invoke (completionLatch ::countDown )
120+ .subscribe ().with (collected ::add );
121+
122+ if (!completionLatch .await (5 , TimeUnit .SECONDS )) {
123+ fail ("Streaming did not complete in time" );
124+ }
125+ assertThat (collected ).singleElement ().satisfies (m -> assertThat (m ).isEqualTo (Message .of ("foo" , "bar" )));
126+ }
127+
128+ @ Test
129+ void shouldReadNdjsonFromMultipleMessagesWithNoEndingDelimiter () throws InterruptedException {
130+ var client = createClient (uri );
131+ var collected = new CopyOnWriteArrayList <Message >();
132+ var completionLatch = new CountDownLatch (1 );
133+ client .readMultipleMessagesNoEndingDelimiter ().onCompletion ().invoke (completionLatch ::countDown )
134+ .subscribe ().with (collected ::add );
135+
136+ if (!completionLatch .await (5 , TimeUnit .SECONDS )) {
137+ fail ("Streaming did not complete in time" );
138+ }
139+ assertThat (collected ).hasSize (100 );
140+ }
141+
113142 @ Test
114143 void shouldReadLargeNdjsonPojoAsMulti () throws InterruptedException {
115144 var client = createClient (uri );
@@ -151,6 +180,18 @@ public interface Client {
151180 @ RestStreamElementType (MediaType .APPLICATION_JSON )
152181 Multi <Message > readPojoSingle ();
153182
183+ @ GET
184+ @ Path ("/single-message-no-delimiter" )
185+ @ Produces (RestMediaType .APPLICATION_NDJSON )
186+ @ RestStreamElementType (MediaType .APPLICATION_JSON )
187+ Multi <Message > readSingleMessageNoDelimiter ();
188+
189+ @ GET
190+ @ Path ("multiple-messages-no-ending-delimiter" )
191+ @ Produces (RestMediaType .APPLICATION_NDJSON )
192+ @ RestStreamElementType (MediaType .APPLICATION_JSON )
193+ Multi <Message > readMultipleMessagesNoEndingDelimiter ();
194+
154195 @ GET
155196 @ Path ("/large-pojo" )
156197 @ Produces (RestMediaType .APPLICATION_NDJSON )
@@ -172,8 +213,14 @@ Multi<Message> people(RoutingContext context) {
172213
173214 @ Path ("/stream" )
174215 public static class StreamingResource {
175- @ Inject
176- Vertx vertx ;
216+ private final ObjectMapper mapper = new ObjectMapper ();
217+ private final ObjectWriter messageWriter = mapper .writerFor (Message .class );
218+
219+ private final Vertx vertx ;
220+
221+ public StreamingResource (Vertx vertx ) {
222+ this .vertx = vertx ;
223+ }
177224
178225 @ GET
179226 @ Path ("/string" )
@@ -212,19 +259,41 @@ public Multi<Message> readPojo() {
212259 @ Produces (RestMediaType .APPLICATION_NDJSON )
213260 @ RestStreamElementType (MediaType .APPLICATION_JSON )
214261 public String getPojosAsString () throws JsonProcessingException {
215- ObjectMapper mapper = new ObjectMapper ();
216262 StringBuilder result = new StringBuilder ();
217- ObjectWriter objectWriter = mapper .writerFor (Message .class );
218263 for (var msg : List .of (Message .of ("zero" , "0" ),
219264 Message .of ("one" , "1" ),
220265 Message .of ("two" , "2" ),
221266 Message .of ("three" , "3" ))) {
222- result .append (objectWriter .writeValueAsString (msg ));
267+ result .append (messageWriter .writeValueAsString (msg ));
223268 result .append ("\n " );
224269 }
225270 return result .toString ();
226271 }
227272
273+ @ GET
274+ @ Path ("/single-message-no-delimiter" )
275+ @ Produces (RestMediaType .APPLICATION_NDJSON )
276+ @ RestStreamElementType (MediaType .APPLICATION_JSON )
277+ public String singleMessageNoDelimiter () throws JsonProcessingException {
278+ return messageWriter .writeValueAsString (Message .of ("foo" , "bar" ));
279+ }
280+
281+ @ GET
282+ @ Path ("/multiple-messages-no-ending-delimiter" )
283+ @ Produces (RestMediaType .APPLICATION_NDJSON )
284+ @ RestStreamElementType (MediaType .APPLICATION_JSON )
285+ public String multipleMessagesNoEndingDelimiter () throws JsonProcessingException {
286+ return IntStream .range (0 , 100 )
287+ .mapToObj (i -> Message .of ("foo" , "bar" ))
288+ .map (m -> {
289+ try {
290+ return messageWriter .writeValueAsString (m );
291+ } catch (JsonProcessingException e ) {
292+ throw new RuntimeException (e );
293+ }
294+ }).collect (Collectors .joining ("\n " ));
295+ }
296+
228297 @ GET
229298 @ Path ("/large-pojo" )
230299 @ Produces (RestMediaType .APPLICATION_NDJSON )
@@ -235,7 +304,7 @@ public Multi<Message> readLargePojo() {
235304 byte [] bytes = new byte [4 * 1024 ];
236305 Random random = new Random ();
237306 random .nextBytes (bytes );
238- String value = Base64 .encodeBase64String (bytes );
307+ String value = Base64 .getEncoder (). encodeToString (bytes );
239308 em .emit (Message .of ("one" , value ));
240309 em .emit (Message .of ("two" , value ));
241310 em .emit (Message .of ("three" , value ));
0 commit comments