@@ -242,7 +242,7 @@ public void applicationMetadataMavenTests() {
242242
243243 // Maven app without metadata
244244 dataFlowOperations .appRegistryOperations ().register ("maven-app-without-metadata" , ApplicationType .sink ,
245- "maven://org.springframework.cloud.stream.app:file-sink-kafka:2.1.1.RELEASE " , null , true );
245+ "maven://org.springframework.cloud.stream.app:file-sink-kafka:3.0.1 " , null , true );
246246 DetailedAppRegistrationResource mavenAppWithoutMetadata = dataFlowOperations .appRegistryOperations ()
247247 .info ("maven-app-without-metadata" , ApplicationType .sink , false );
248248 assertThat (mavenAppWithoutMetadata .getOptions ()).hasSize (8 );
@@ -293,21 +293,21 @@ public void applicationMetadataDockerTests() {
293293 }
294294
295295 @ Test
296- @ EnabledIfEnvironmentVariable (named = "SCDF_CR_TEST" , matches = "true" )
296+ @ EnabledIfEnvironmentVariable (named = "SCDF_CR_TEST" , matches = "true" )
297297 public void githubContainerRegistryTests () {
298298 containerRegistryTests ("github-log-sink" ,
299299 "docker:ghcr.io/tzolov/log-sink-rabbit:3.1.0-SNAPSHOT" );
300300 }
301301
302302 @ Test
303- @ EnabledIfEnvironmentVariable (named = "SCDF_CR_TEST" , matches = "true" )
303+ @ EnabledIfEnvironmentVariable (named = "SCDF_CR_TEST" , matches = "true" )
304304 public void azureContainerRegistryTests () {
305305 containerRegistryTests ("azure-log-sink" ,
306306 "docker:scdftest.azurecr.io/springcloudstream/log-sink-rabbit:3.1.0-SNAPSHOT" );
307307 }
308308
309309 @ Test
310- @ EnabledIfEnvironmentVariable (named = "SCDF_CR_TEST" , matches = "true" )
310+ @ EnabledIfEnvironmentVariable (named = "SCDF_CR_TEST" , matches = "true" )
311311 public void harborContainerRegistryTests () {
312312 containerRegistryTests ("harbor-log-sink" ,
313313 "docker:projects.registry.vmware.com/scdf/scdftest/log-sink-rabbit:3.1.0-SNAPSHOT" );
@@ -317,7 +317,7 @@ private void containerRegistryTests(String appName, String appUrl) {
317317 logger .info ("application-metadata-" + appName + "-container-registry-test" );
318318
319319 // Docker app with container image metadata
320- dataFlowOperations .appRegistryOperations ().register ( appName , ApplicationType .sink ,
320+ dataFlowOperations .appRegistryOperations ().register (appName , ApplicationType .sink ,
321321 appUrl , null , true );
322322 DetailedAppRegistrationResource dockerAppWithContainerMetadata = dataFlowOperations .appRegistryOperations ()
323323 .info (appName , ApplicationType .sink , false );
@@ -758,12 +758,20 @@ public void namedChannelDirectedGraph() {
758758
759759 @ Test
760760 public void dataflowTaskLauncherSink () {
761- logger .info ("dataflow-tasklauncher-sink-test" );
762- String uri = String .format ("docker:springcloud/spring-cloud-dataflow-tasklauncher-sink-kafka:%s" ,
763- testProperties .getDatabase ().getDataflowVersion ());
764- dataFlowOperations .appRegistryOperations ()
765- .register ("dataflowTaskLauncher" , ApplicationType .sink , uri , null , true );
761+ String dataflowTaskLauncherAppName = "dataflow-tasklauncher" ;
762+ String skipLogMessage = dataflowTaskLauncherAppName + "-sink-test: SKIP - no " + dataflowTaskLauncherAppName + " app registered!" ;
763+ boolean isDataflowTaskLauncherAppRegistered = runtimeApps .isAppRegistered (dataflowTaskLauncherAppName , ApplicationType .sink );
764+ if (!isDataflowTaskLauncherAppRegistered ) {
765+ logger .info (skipLogMessage );
766+ }
767+ Assumptions .assumeTrue (isDataflowTaskLauncherAppRegistered , skipLogMessage );
768+
769+ DetailedAppRegistrationResource dataflowTaskLauncherRegistration =
770+ dataFlowOperations .appRegistryOperations ().info (dataflowTaskLauncherAppName , ApplicationType .sink , false );
766771
772+ logger .info (dataflowTaskLauncherAppName + "-sink-test: "
773+ + dataflowTaskLauncherAppName + " [" + dataflowTaskLauncherRegistration .getVersion ()
774+ + "], DataFlow [" + runtimeApps .getDataflowServerVersion () + "]" );
767775
768776 String taskName = randomTaskName ();
769777 try (Task task = Task .builder (dataFlowOperations )
@@ -772,7 +780,7 @@ public void dataflowTaskLauncherSink() {
772780 .description ("Test timestamp task" )
773781 .build ()) {
774782 try (Stream stream = Stream .builder (dataFlowOperations ).name ("tasklauncher-test" )
775- .definition ("http | dataflowTaskLauncher --trigger.initialDelay=100 --trigger.maxPeriod=1000 " +
783+ .definition ("http | " + dataflowTaskLauncherAppName + " --trigger.initialDelay=100 --trigger.maxPeriod=1000 " +
776784 "--spring.cloud.dataflow.client.serverUri=http://dataflow-server:9393" )
777785 .create ()
778786 .deploy (testDeploymentProperties ())) {
@@ -787,7 +795,7 @@ public void dataflowTaskLauncherSink() {
787795
788796 AtomicLong launchId = new AtomicLong ();
789797 Awaitility .await ().until (() -> task .executions ().stream ().filter (t ->
790- t .getTaskName ().equals (taskName ) && t .getTaskExecutionStatus () == TaskExecutionStatus .COMPLETE )
798+ t .getTaskName ().equals (taskName ) && t .getTaskExecutionStatus () == TaskExecutionStatus .COMPLETE )
791799 .findFirst ()
792800 .map (t -> launchId .getAndSet (t .getExecutionId ())).isPresent ()
793801 );
@@ -799,7 +807,6 @@ public void dataflowTaskLauncherSink() {
799807 }
800808 }
801809
802-
803810 // -----------------------------------------------------------------------
804811 // STREAM METRICS TESTS
805812 // -----------------------------------------------------------------------
@@ -900,7 +907,7 @@ public void analyticsCounterPrometheus() throws IOException {
900907
901908 // Wait for ~1 min for Micrometer to send first metrics to Prometheus.
902909 Awaitility .await ().until (() -> (int ) JsonPath .parse (
903- runtimeApps .httpGet (testProperties .getPlatform ().getConnection ().getPrometheusUrl () + "/api/v1/query?query=my_http_analytics_total" ))
910+ runtimeApps .httpGet (testProperties .getPlatform ().getConnection ().getPrometheusUrl () + "/api/v1/query?query=my_http_analytics_total" ))
904911 .read ("$.data.result.length()" ) > 0 );
905912
906913 JsonAssertions .assertThatJson (runtimeApps .httpGet (testProperties .getPlatform ().getConnection ().getPrometheusUrl () + "/api/v1/query?query=my_http_analytics_total" ))
@@ -1528,6 +1535,7 @@ private List<String> createNewJobandStepScenario(String jobName, String stepName
15281535 result .add ("--io.spring.stepName=" + stepName );
15291536 return result ;
15301537 }
1538+
15311539 private void validateSuccessfulTaskLaunch (Task task , long launchId ) {
15321540 Awaitility .await ().until (() -> task .executionStatus (launchId ) == TaskExecutionStatus .COMPLETE );
15331541 assertThat (task .executions ().size ()).isEqualTo (1 );
@@ -1541,7 +1549,7 @@ private void verifySuccessfulJobAndStepScenario(Task task, String stepName) {
15411549 assertThat (jobExecutionIds .size ()).isEqualTo (1 );
15421550 //Verify that steps can be retrieved
15431551 task .jobExecutionResources ().stream ().filter (
1544- jobExecution -> jobExecution .getName ().equals (task .getTaskName ())).
1552+ jobExecution -> jobExecution .getName ().equals (task .getTaskName ())).
15451553 forEach (jobExecutionResource -> {
15461554 assertThat (jobExecutionResource .getStepExecutionCount ()).isEqualTo (1 );
15471555 task .jobStepExecutions (jobExecutionResource .getExecutionId ()).forEach (stepExecutionResource -> {
@@ -1588,43 +1596,43 @@ public void basicBatchSuccessRestartTest() {
15881596 @ Test
15891597 public void basicBatchFailRestartTest () {
15901598 // Verify Batch runs successfully
1591- logger .info ("basic-batch-fail-restart-test" );
1592- try (Task task = Task .builder (dataFlowOperations )
1593- .name (randomTaskName ())
1594- .definition ("scenario" )
1595- .description ("Test scenario batch app that will fail on first pass" )
1596- .build ()) {
1597-
1598- String stepName = randomStepName ();
1599- List <String > args = createNewJobandStepScenario (task .getTaskName (), stepName );
1600- args .add ("--io.spring.failBatch=true" );
1601- // task first launch
1602- long launchId = task .launch (args );
1603- //Verify task
1604- validateSuccessfulTaskLaunch (task , launchId );
1605-
1606- //Verify that batch app that fails can be restarted
1607-
1608- // Attempt a job restart
1609- List <Long > jobExecutionIds = task .executions ().stream ().findFirst ().get ().getJobExecutionIds ();
1610- //There is an Error deserialization issue related to backward compatibility with SCDF 2.6.x
1611- //The Exception thrown by the 2.6.x servers can not be deserialized by the VndErrorResponseErrorHandler in 2.8+ clients.
1612- Assumptions .assumingThat (runtimeApps .dataflowServerVersionEqualOrGreaterThan ("2.7.0" ), () -> {
1613- dataFlowOperations .jobOperations ().executionRestart (jobExecutionIds .get (0 ));
1614- // Wait for job to start
1615- Awaitility .await ().until (() -> task .jobExecutionResources ().size () == 2 );
1616- // Wait for task for the job to complete
1617- Awaitility .await ().until (() -> task .executions ().stream ().findFirst ().get ().getTaskExecutionStatus () == TaskExecutionStatus .COMPLETE );
1618- assertThat (task .jobExecutionResources ().size ()).isEqualTo (2 );
1619- List <JobExecutionResource > jobExecutionResources = task .jobInstanceResources ().stream ().
1620- findFirst ().get ().getJobExecutions ().stream ().collect (Collectors .toList ());
1621- List <BatchStatus > batchStatuses = new ArrayList <>();
1622- jobExecutionResources .stream ().forEach (jobExecutionResource ->
1623- batchStatuses .add (jobExecutionResource .getJobExecution ().getStatus ()));
1624- assertThat (batchStatuses ).contains (BatchStatus .FAILED );
1625- assertThat (batchStatuses ).contains (BatchStatus .COMPLETED );
1626- });
1627- }
1599+ logger .info ("basic-batch-fail-restart-test" );
1600+ try (Task task = Task .builder (dataFlowOperations )
1601+ .name (randomTaskName ())
1602+ .definition ("scenario" )
1603+ .description ("Test scenario batch app that will fail on first pass" )
1604+ .build ()) {
1605+
1606+ String stepName = randomStepName ();
1607+ List <String > args = createNewJobandStepScenario (task .getTaskName (), stepName );
1608+ args .add ("--io.spring.failBatch=true" );
1609+ // task first launch
1610+ long launchId = task .launch (args );
1611+ //Verify task
1612+ validateSuccessfulTaskLaunch (task , launchId );
1613+
1614+ //Verify that batch app that fails can be restarted
1615+
1616+ // Attempt a job restart
1617+ List <Long > jobExecutionIds = task .executions ().stream ().findFirst ().get ().getJobExecutionIds ();
1618+ //There is an Error deserialization issue related to backward compatibility with SCDF 2.6.x
1619+ //The Exception thrown by the 2.6.x servers can not be deserialized by the VndErrorResponseErrorHandler in 2.8+ clients.
1620+ Assumptions .assumingThat (runtimeApps .dataflowServerVersionEqualOrGreaterThan ("2.7.0" ), () -> {
1621+ dataFlowOperations .jobOperations ().executionRestart (jobExecutionIds .get (0 ));
1622+ // Wait for job to start
1623+ Awaitility .await ().until (() -> task .jobExecutionResources ().size () == 2 );
1624+ // Wait for task for the job to complete
1625+ Awaitility .await ().until (() -> task .executions ().stream ().findFirst ().get ().getTaskExecutionStatus () == TaskExecutionStatus .COMPLETE );
1626+ assertThat (task .jobExecutionResources ().size ()).isEqualTo (2 );
1627+ List <JobExecutionResource > jobExecutionResources = task .jobInstanceResources ().stream ().
1628+ findFirst ().get ().getJobExecutions ().stream ().collect (Collectors .toList ());
1629+ List <BatchStatus > batchStatuses = new ArrayList <>();
1630+ jobExecutionResources .stream ().forEach (jobExecutionResource ->
1631+ batchStatuses .add (jobExecutionResource .getJobExecution ().getStatus ()));
1632+ assertThat (batchStatuses ).contains (BatchStatus .FAILED );
1633+ assertThat (batchStatuses ).contains (BatchStatus .COMPLETED );
1634+ });
1635+ }
16281636 }
16291637
16301638 @ Test
@@ -1645,8 +1653,8 @@ public void multipleTaskAppVersionTest() {
16451653 //Verify task
16461654 validateSuccessfulTaskLaunch (task , launchId );
16471655 AppRegistryOperations appRegistryOperations = this .dataFlowOperations .appRegistryOperations ();
1648- DetailedAppRegistrationResource taskResource = appRegistryOperations .info ("timestamp" , ApplicationType .task ,false );
1649- if (taskResource .getUri ().startsWith ("maven:" )) {
1656+ DetailedAppRegistrationResource taskResource = appRegistryOperations .info ("timestamp" , ApplicationType .task , false );
1657+ if (taskResource .getUri ().startsWith ("maven:" )) {
16501658 try {
16511659 appRegistryOperations .register ("timestamp" , ApplicationType .task ,
16521660 "maven://org.springframework.cloud.task.app:timestamp-task:2.1.0.RELEASE" ,
@@ -1698,7 +1706,7 @@ public void basicTaskWithPropertiesTest() {
16981706 Awaitility .await ().until (() -> task .executionStatus (launchId1 ) == TaskExecutionStatus .COMPLETE );
16991707 assertThat (task .executions ().size ()).isEqualTo (2 );
17001708 assertThat (task .executions ().stream ().filter (taskExecutionResource ->
1701- taskExecutionResource .getDeploymentProperties ().containsKey (testPropertyKey )).
1709+ taskExecutionResource .getDeploymentProperties ().containsKey (testPropertyKey )).
17021710 collect (Collectors .toList ()).size ()).isEqualTo (2 );
17031711
17041712 }
@@ -1732,7 +1740,7 @@ public void taskLaunchWithArguments() {
17321740 String stepName = randomStepName ();
17331741 List <String > baseArgs = createNewJobandStepScenario (task .getTaskName (), stepName );
17341742 List <String > args = new ArrayList <>(baseArgs );
1735- args .add (argument );
1743+ args .add (argument );
17361744 // task first launch
17371745 long launchId = task .launch (args );
17381746 //Verify first launch
@@ -1742,7 +1750,7 @@ public void taskLaunchWithArguments() {
17421750 Awaitility .await ().until (() -> task .executionStatus (launchId1 ) == TaskExecutionStatus .COMPLETE );
17431751 assertThat (task .executions ().size ()).isEqualTo (2 );
17441752 assertThat (task .executions ().stream ().filter (execution ->
1745- execution .getArguments ().contains (argument )).
1753+ execution .getArguments ().contains (argument )).
17461754 collect (Collectors .toList ()).size ()).isEqualTo (1 );
17471755 }
17481756
0 commit comments