|
30 | 30 | import java.util.Optional; |
31 | 31 | import java.util.Random; |
32 | 32 | import java.util.UUID; |
| 33 | +import java.util.concurrent.atomic.AtomicLong; |
33 | 34 | import java.util.function.Consumer; |
34 | 35 | import java.util.function.Predicate; |
35 | 36 | import java.util.function.Supplier; |
|
83 | 84 | import org.springframework.context.annotation.Import; |
84 | 85 | import org.springframework.core.io.DefaultResourceLoader; |
85 | 86 | import org.springframework.hateoas.PagedModel; |
| 87 | +import org.springframework.http.HttpHeaders; |
86 | 88 | import org.springframework.http.HttpMethod; |
| 89 | +import org.springframework.http.MediaType; |
87 | 90 | import org.springframework.test.context.junit.jupiter.SpringExtension; |
88 | 91 | import org.springframework.util.StreamUtils; |
89 | 92 | import org.springframework.web.util.UriComponentsBuilder; |
@@ -753,6 +756,50 @@ public void namedChannelDirectedGraph() { |
753 | 756 | } |
754 | 757 | } |
755 | 758 |
|
| 759 | + @Test |
| 760 | + public void dataflowTaskLauncherSink() { |
| 761 | + logger.info("dataflow-tasklauncher-sink-test"); |
| 762 | + String uri = String.format("docker:springcloud/spring-cloud-dataflow-tasklauncher-sink-kafka:%s", |
| 763 | + testProperties.getDatabase().getDataflowVersion()); |
| 764 | + dataFlowOperations.appRegistryOperations() |
| 765 | + .register("dataflowTaskLauncher", ApplicationType.sink, uri, null, true); |
| 766 | + |
| 767 | + |
| 768 | + String taskName = randomTaskName(); |
| 769 | + try (Task task = Task.builder(dataFlowOperations) |
| 770 | + .name(taskName) |
| 771 | + .definition("timestamp") |
| 772 | + .description("Test timestamp task") |
| 773 | + .build()) { |
| 774 | + try (Stream stream = Stream.builder(dataFlowOperations).name("tasklauncher-test") |
| 775 | + .definition("http | dataflowTaskLauncher --trigger.initialDelay=100 --trigger.maxPeriod=1000 " + |
| 776 | + "--spring.cloud.dataflow.client.serverUri=http://dataflow-server:9393") |
| 777 | + .create() |
| 778 | + .deploy(testDeploymentProperties())) { |
| 779 | + |
| 780 | + Awaitility.await().until(() -> stream.getStatus().equals(DEPLOYED)); |
| 781 | + |
| 782 | + HttpHeaders headers = new HttpHeaders(); |
| 783 | + headers.setContentType(MediaType.APPLICATION_JSON); |
| 784 | + headers.setAccept(Collections.singletonList(MediaType.APPLICATION_JSON)); |
| 785 | + |
| 786 | + runtimeApps.httpPost(stream.getName(), "http", "{\"name\" : \"" + taskName + "\"}", headers); |
| 787 | + |
| 788 | + AtomicLong launchId = new AtomicLong(); |
| 789 | + Awaitility.await().until(() -> task.executions().stream().filter(t -> |
| 790 | + t.getTaskName().equals(taskName) && t.getTaskExecutionStatus() == TaskExecutionStatus.COMPLETE) |
| 791 | + .findFirst() |
| 792 | + .map(t -> launchId.getAndSet(t.getExecutionId())).isPresent() |
| 793 | + ); |
| 794 | + long id = launchId.get(); |
| 795 | + assertThat(task.executions().size()).isEqualTo(1); |
| 796 | + assertThat(task.execution(id).isPresent()).isTrue(); |
| 797 | + assertThat(task.execution(id).get().getExitCode()).isEqualTo(EXIT_CODE_SUCCESS); |
| 798 | + } |
| 799 | + } |
| 800 | + } |
| 801 | + |
| 802 | + |
756 | 803 | // ----------------------------------------------------------------------- |
757 | 804 | // STREAM METRICS TESTS |
758 | 805 | // ----------------------------------------------------------------------- |
|
0 commit comments