@@ -630,7 +630,7 @@ public void testResubscribeExistingTaskSuccess() throws Exception {
630630 // attempting to send a streaming message instead of explicitly calling queueManager#createOrTap
631631 // does not work because after the message is sent, the queue becomes null but task resubscription
632632 // requires the queue to still be active
633- getQueueManager (). createOrTap (MINIMAL_TASK .getId ());
633+ ensureQueueForTask (MINIMAL_TASK .getId ());
634634
635635 CountDownLatch taskResubscriptionRequestSent = new CountDownLatch (1 );
636636 CountDownLatch taskResubscriptionResponseReceived = new CountDownLatch (2 );
@@ -701,7 +701,7 @@ public void testResubscribeExistingTaskSuccess() throws Exception {
701701 .build ());
702702
703703 for (Event event : events ) {
704- getQueueManager (). get ( MINIMAL_TASK . getId ()). enqueueEvent (event );
704+ enqueueEventOnServer (event );
705705 }
706706 } catch (InterruptedException e ) {
707707 Thread .currentThread ().interrupt ();
@@ -941,7 +941,43 @@ protected void deleteTaskInTaskStore(String taskId) throws Exception {
941941 }
942942 }
943943
944- protected abstract InMemoryQueueManager getQueueManager ();
944+ protected void ensureQueueForTask (String taskId ) throws Exception {
945+ HttpClient client = HttpClient .newBuilder ()
946+ .version (HttpClient .Version .HTTP_2 )
947+ .build ();
948+ HttpRequest request = HttpRequest .newBuilder ()
949+ .uri (URI .create ("http://localhost:" + serverPort + "/test/queue/ensure/" + taskId ))
950+ .POST (HttpRequest .BodyPublishers .noBody ())
951+ .build ();
952+ HttpResponse <String > response = client .send (request , HttpResponse .BodyHandlers .ofString (StandardCharsets .UTF_8 ));
953+ if (response .statusCode () != 200 ) {
954+ throw new RuntimeException (response .statusCode () + ": Deleting task failed!" + response .body ());
955+ }
956+ }
957+
958+ protected void enqueueEventOnServer (Event event ) throws Exception {
959+ String path = null ;
960+ if (event instanceof TaskArtifactUpdateEvent e ) {
961+ path = "test/queue/enqueueTaskArtifactUpdateEvent/" + e .getTaskId ();
962+ } else if (event instanceof TaskStatusUpdateEvent e ) {
963+ path = "test/queue/enqueueTaskStatusUpdateEvent/" + e .getTaskId ();
964+ } else {
965+ throw new RuntimeException ("Unknown event type " + event .getClass () + ". If you need the ability to" +
966+ " handle more types, please add the REST endpoints." );
967+ }
968+ HttpClient client = HttpClient .newBuilder ()
969+ .version (HttpClient .Version .HTTP_2 )
970+ .build ();
971+ HttpRequest request = HttpRequest .newBuilder ()
972+ .uri (URI .create ("http://localhost:" + serverPort + "/" + path ))
973+ .POST (HttpRequest .BodyPublishers .ofString (Utils .OBJECT_MAPPER .writeValueAsString (event )))
974+ .build ();
975+
976+ HttpResponse <String > response = client .send (request , HttpResponse .BodyHandlers .ofString (StandardCharsets .UTF_8 ));
977+ if (response .statusCode () != 200 ) {
978+ throw new RuntimeException (response .statusCode () + ": Deleting task failed!" + response .body ());
979+ }
980+ }
945981
946982 protected abstract void setStreamingSubscribedRunnable (Runnable runnable );
947983
0 commit comments