Skip to content
This repository was archived by the owner on May 14, 2025. It is now read-only.

Commit 02d3b09

Browse files
cppwfsilayaperumalg
authored andcommitted
CTR now does not have to be registered to schedule composed tasks. (#4143)
resolves #4122
1 parent fce336e commit 02d3b09

File tree

2 files changed

+28
-7
lines changed

2 files changed

+28
-7
lines changed

spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultSchedulerService.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,17 +141,27 @@ public void schedule(String scheduleName, String taskDefinitionName, Map<String,
141141
.orElseThrow(() -> new NoSuchTaskDefinitionException(taskDefinitionName));
142142
TaskParser taskParser = new TaskParser(taskDefinition.getName(), taskDefinition.getDslText(), true, true);
143143
TaskNode taskNode = taskParser.parse();
144+
AppRegistration appRegistration;
144145
// if composed task definition replace definition with one composed task
145146
// runner and executable graph.
146147
if (taskNode.isComposed()) {
147148
taskDefinition = new TaskDefinition(taskDefinition.getName(),
148149
TaskServiceUtils.createComposedTaskDefinition(
149150
taskNode.toExecutableDSL()));
150151
taskDeploymentProperties = TaskServiceUtils.establishComposedTaskProperties(taskDeploymentProperties, taskNode);
152+
try {
153+
appRegistration = new AppRegistration(TaskConfigurationProperties.COMPOSED_TASK_RUNNER_NAME,
154+
ApplicationType.task,
155+
new URI(this.taskConfigurationProperties.getComposedTaskRunnerUri()));
156+
}
157+
catch (URISyntaxException e) {
158+
throw new IllegalStateException("Invalid Compose Task Runner Resource", e);
159+
}
160+
}
161+
else {
162+
appRegistration = this.registry.find(taskDefinition.getRegisteredAppName(),
163+
ApplicationType.task);
151164
}
152-
153-
AppRegistration appRegistration = this.registry.find(taskDefinition.getRegisteredAppName(),
154-
ApplicationType.task);
155165
Assert.notNull(appRegistration, "Unknown task app: " + taskDefinition.getRegisteredAppName());
156166
Resource metadataResource = this.registry.getAppMetadataResource(appRegistration);
157167

spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/service/impl/DefaultSchedulerServiceTests.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.springframework.cloud.dataflow.server.service.SchedulerService;
5757
import org.springframework.cloud.dataflow.server.service.SchedulerServiceProperties;
5858
import org.springframework.cloud.deployer.resource.docker.DockerResource;
59+
import org.springframework.cloud.deployer.spi.core.AppDefinition;
5960
import org.springframework.cloud.deployer.spi.scheduler.CreateScheduleException;
6061
import org.springframework.cloud.deployer.spi.scheduler.ScheduleInfo;
6162
import org.springframework.cloud.deployer.spi.scheduler.ScheduleRequest;
@@ -371,7 +372,18 @@ public void testScheduleWithoutCommandLineArguments() throws Exception {
371372
assertEquals("Invalid number of command line arguments", 0, commandLineArguments.size());
372373
}
373374

375+
@Test
376+
public void testGetDefaultCTR() {
377+
ScheduleRequest request = getScheduleRequest(new ArrayList<>(), "springcloudtask/composed-task-runner:latest", "1: timestamp && 2: timestamp");
378+
AppDefinition definition = request.getDefinition();
379+
assertEquals("Docker Resource [docker:springcloudtask/composed-task-runner:latest]", request.getResource().toString());
380+
}
381+
374382
private List<String> getCommandLineArguments(List<String> commandLineArguments) {
383+
return getScheduleRequest(commandLineArguments,"springcloudtask/timestamp-task:latest", "timestamp").getCommandlineArguments();
384+
}
385+
386+
private ScheduleRequest getScheduleRequest(List<String> commandLineArguments, String resourceToReturn, String definition) {
375387
Scheduler mockScheduler = mock(SimpleTestScheduler.class);
376388
TaskDefinitionRepository mockTaskDefinitionRepository = mock(TaskDefinitionRepository.class);
377389
AppRegistryService mockAppRegistryService = mock(AppRegistryService.class);
@@ -386,19 +398,18 @@ this.taskConfigurationProperties, mock(DataSourceProperties.class), "uri",
386398
mock(ApplicationConfigurationMetadataResolver.class), mock(SchedulerServiceProperties.class),
387399
mock(AuditRecordService.class));
388400

389-
TaskDefinition taskDefinition = new TaskDefinition(BASE_DEFINITION_NAME, "timestamp");
401+
TaskDefinition taskDefinition = new TaskDefinition(BASE_DEFINITION_NAME, definition);
390402

391403
when(mockTaskDefinitionRepository.findById(BASE_DEFINITION_NAME)).thenReturn(Optional.of(taskDefinition));
392-
when(mockAppRegistryService.getAppResource(any())).thenReturn(new DockerResource("springcloudtask/timestamp-task:latest"));
404+
when(mockAppRegistryService.getAppResource(any())).thenReturn(new DockerResource(resourceToReturn));
393405
when(mockAppRegistryService.find(taskDefinition.getRegisteredAppName(), ApplicationType.task))
394406
.thenReturn(new AppRegistration());
395407
mockSchedulerService.schedule(BASE_SCHEDULE_NAME, BASE_DEFINITION_NAME, this.testProperties,
396408
commandLineArguments, null);
397409

398410
ArgumentCaptor<ScheduleRequest> scheduleRequestArgumentCaptor = ArgumentCaptor.forClass(ScheduleRequest.class);
399411
verify(mockScheduler).schedule(scheduleRequestArgumentCaptor.capture());
400-
401-
return scheduleRequestArgumentCaptor.getValue().getCommandlineArguments();
412+
return scheduleRequestArgumentCaptor.getValue();
402413
}
403414

404415
private void verifyScheduleExistsInScheduler(ScheduleInfo scheduleInfo) {

0 commit comments

Comments
 (0)