Skip to content
This repository was archived by the owner on May 14, 2025. It is now read-only.

Commit fe080c1

Browse files
committed
Validate K8s specific stream deployment/task launch
- Instead of restricting the stream creation for all the platforms, restrict the invalid stream/task name only after when they are about to be deployed/launched. Since SCDF is aware of the underlying platform only at the time of deployment, we go with this approach of validation - Also, to provide compatibility with the previous deployments on local and CF, we don't enforce this validation on non K8s environments Resolves #4640
1 parent 19e8d34 commit fe080c1

File tree

7 files changed

+112
-33
lines changed

7 files changed

+112
-33
lines changed

spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultSchedulerService.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import java.util.List;
2424
import java.util.Map;
2525
import java.util.TreeMap;
26+
import java.util.regex.Pattern;
2627
import java.util.stream.Collectors;
28+
import java.util.stream.StreamSupport;
2729

2830
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
2931
import org.springframework.cloud.dataflow.audit.service.AuditRecordService;
@@ -50,6 +52,7 @@
5052
import org.springframework.cloud.deployer.spi.core.AppDefinition;
5153
import org.springframework.cloud.deployer.spi.scheduler.ScheduleInfo;
5254
import org.springframework.cloud.deployer.spi.scheduler.ScheduleRequest;
55+
import org.springframework.cloud.task.listener.TaskException;
5356
import org.springframework.core.io.Resource;
5457
import org.springframework.core.io.ResourceLoader;
5558
import org.springframework.data.domain.Page;
@@ -63,6 +66,7 @@
6366
*
6467
* @author Glenn Renfro
6568
* @author Chris Schaefer
69+
* @author Ilayaperumal Gopinathan
6670
*/
6771
public class DefaultSchedulerService implements SchedulerService {
6872

@@ -81,6 +85,11 @@ public class DefaultSchedulerService implements SchedulerService {
8185
private final DataSourceProperties dataSourceProperties;
8286
private final ComposedTaskRunnerConfigurationProperties composedTaskRunnerConfigurationProperties;
8387

88+
private static final Pattern TASK_NAME_PATTERN = Pattern.compile("[a-zA-Z]([-a-zA-Z0-9]*[a-zA-Z0-9])?");
89+
private static final String TASK_NAME_VALIDATION_MSG = "Task name must consist of alphanumeric characters " +
90+
"or '-', start with an alphabetic character, and end with an alphanumeric character (e.g. 'my-name', " +
91+
"or 'abc-123')";
92+
8493
/**
8594
* Constructor for DefaultSchedulerService
8695
* @param commonApplicationProperties common properties for applications deployed via Spring Cloud Data Flow.
@@ -169,6 +178,14 @@ public void schedule(String scheduleName, String taskDefinitionName, Map<String,
169178
@Override
170179
public void schedule(String scheduleName, String taskDefinitionName, Map<String, String> taskDeploymentProperties,
171180
List<String> commandLineArgs, String platformName) {
181+
String platformType = StreamSupport.stream(getLaunchers().spliterator(), true)
182+
.filter(deployer -> deployer.getName().equalsIgnoreCase(platformName))
183+
.map(Launcher::getType)
184+
.findFirst()
185+
.orElse("unknown");
186+
if (platformType.equals(TaskPlatformFactory.KUBERNETES_PLATFORM_TYPE) && !TASK_NAME_PATTERN.matcher(taskDefinitionName).matches()) {
187+
throw new TaskException(String.format("Task name %s is invalid. %s", taskDefinitionName, TASK_NAME_VALIDATION_MSG));
188+
}
172189
Assert.hasText(taskDefinitionName, "The provided taskName must not be null or empty.");
173190
Assert.notNull(taskDeploymentProperties, "The provided taskDeploymentProperties must not be null.");
174191
TaskDefinition taskDefinition = this.taskDefinitionRepository.findById(taskDefinitionName)

spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultStreamService.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,12 @@ private Release doDeployStream(StreamDefinition streamDefinition, Map<String, St
168168
.findFirst()
169169
.orElse("unknown");
170170

171+
172+
if (platformType.equals("kubernetes") && !STREAM_NAME_PATTERN.matcher(streamDefinition.getName()).matches()) {
173+
throw new InvalidStreamDefinitionException(String.format("Stream name %s is invalid. %s",
174+
streamDefinition.getName(), STREAM_NAME_VALIDATION_MSG));
175+
}
176+
171177
List<AppDeploymentRequest> appDeploymentRequests = this.appDeploymentRequestCreator
172178
.createRequests(streamDefinition, deploymentPropertiesToUse, platformType);
173179

@@ -409,10 +415,6 @@ public StreamDefinition createStream(String streamName, String dsl, String descr
409415
}
410416
}
411417

412-
if (!STREAM_NAME_PATTERN.matcher(streamName).matches()) {
413-
errorMessages.add(STREAM_NAME_VALIDATION_MSG);
414-
}
415-
416418
if (!errorMessages.isEmpty()) {
417419
throw new InvalidStreamDefinitionException(
418420
StringUtils.collectionToDelimitedString(errorMessages, "\n"));

spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskExecutionService.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.SortedSet;
2828
import java.util.TreeSet;
2929
import java.util.concurrent.ConcurrentHashMap;
30+
import java.util.regex.Pattern;
3031
import java.util.stream.Collectors;
3132
import java.util.stream.StreamSupport;
3233

@@ -64,6 +65,7 @@
6465
import org.springframework.cloud.deployer.spi.core.AppDeploymentRequest;
6566
import org.springframework.cloud.deployer.spi.task.LaunchState;
6667
import org.springframework.cloud.deployer.spi.task.TaskLauncher;
68+
import org.springframework.cloud.task.listener.TaskException;
6769
import org.springframework.cloud.task.repository.TaskExecution;
6870
import org.springframework.cloud.task.repository.TaskExplorer;
6971
import org.springframework.cloud.task.repository.TaskRepository;
@@ -145,6 +147,11 @@ public class DefaultTaskExecutionService implements TaskExecutionService {
145147

146148
private ComposedTaskRunnerConfigurationProperties composedTaskRunnerConfigurationProperties;
147149

150+
private static final Pattern TASK_NAME_PATTERN = Pattern.compile("[a-zA-Z]([-a-zA-Z0-9]*[a-zA-Z0-9])?");
151+
private static final String TASK_NAME_VALIDATION_MSG = "Task name must consist of alphanumeric characters " +
152+
"or '-', start with an alphabetic character, and end with an alphanumeric character (e.g. 'my-name', " +
153+
"or 'abc-123')";
154+
148155
/**
149156
* Initializes the {@link DefaultTaskExecutionService}.
150157
*
@@ -256,7 +263,14 @@ public DefaultTaskExecutionService(LauncherRepository launcherRepository,
256263
public long executeTask(String taskName, Map<String, String> taskDeploymentProperties, List<String> commandLineArgs) {
257264
// Get platform name and fallback to 'default'
258265
String platformName = getPlatform(taskDeploymentProperties);
259-
266+
String platformType = StreamSupport.stream(launcherRepository.findAll().spliterator(), true)
267+
.filter(deployer -> deployer.getName().equalsIgnoreCase(platformName))
268+
.map(Launcher::getType)
269+
.findFirst()
270+
.orElse("unknown");
271+
if (platformType.equals(TaskPlatformFactory.KUBERNETES_PLATFORM_TYPE) && !TASK_NAME_PATTERN.matcher(taskName).matches()) {
272+
throw new TaskException(String.format("Task name %s is invalid. %s", taskName, TASK_NAME_VALIDATION_MSG));
273+
}
260274
// Naive local state to prevent parallel launches to break things up
261275
if(this.tasksBeingUpgraded.containsKey(taskName)) {
262276
List<String> platforms = this.tasksBeingUpgraded.get(taskName);

spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskSaveService.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package org.springframework.cloud.dataflow.server.service.impl;
1818

19-
import java.util.regex.Pattern;
2019
import java.util.stream.Collectors;
2120

2221
import org.springframework.cloud.dataflow.audit.service.AuditRecordService;
@@ -33,7 +32,6 @@
3332
import org.springframework.cloud.dataflow.server.repository.DuplicateTaskException;
3433
import org.springframework.cloud.dataflow.server.repository.TaskDefinitionRepository;
3534
import org.springframework.cloud.dataflow.server.service.TaskSaveService;
36-
import org.springframework.cloud.task.listener.TaskException;
3735
import org.springframework.transaction.annotation.Transactional;
3836
import org.springframework.util.Assert;
3937
import org.springframework.util.StringUtils;
@@ -56,9 +54,6 @@
5654
* @author Chris Schaefer
5755
*/
5856
public class DefaultTaskSaveService implements TaskSaveService {
59-
private static final Pattern TASK_NAME_PATTERN = Pattern.compile("[a-zA-Z]([-a-zA-Z0-9]*[a-zA-Z0-9])?");
60-
private static final String TASK_NAME_VALIDATION_MSG = "Task name must consist of alphanumeric characters or '-', " +
61-
"start with an alphabetic character, and end with an alphanumeric character (e.g. 'my-name', or 'abc-123')";
6257

6358
private final TaskDefinitionRepository taskDefinitionRepository;
6459

@@ -85,10 +80,6 @@ public DefaultTaskSaveService(TaskDefinitionRepository taskDefinitionRepository,
8580
@Override
8681
@Transactional
8782
public void saveTaskDefinition(TaskDefinition taskDefinition) {
88-
if (!TASK_NAME_PATTERN.matcher(taskDefinition.getTaskName()).matches()) {
89-
throw new TaskException(TASK_NAME_VALIDATION_MSG);
90-
}
91-
9283
TaskParser taskParser = new TaskParser(taskDefinition.getTaskName(), taskDefinition.getDslText(), true, true);
9384
TaskNode taskNode = taskParser.parse();
9485
if (taskDefinitionRepository.existsById(taskDefinition.getTaskName())) {

spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/service/impl/DefaultSchedulerServiceTests.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.springframework.cloud.deployer.spi.scheduler.ScheduleRequest;
6363
import org.springframework.cloud.deployer.spi.scheduler.Scheduler;
6464
import org.springframework.cloud.deployer.spi.task.TaskLauncher;
65+
import org.springframework.cloud.task.listener.TaskException;
6566
import org.springframework.core.io.Resource;
6667
import org.springframework.core.io.ResourceLoader;
6768
import org.springframework.data.domain.PageRequest;
@@ -179,6 +180,16 @@ public void testScheduleWithLongNameOnKuberenetesPlatform() {
179180
this.commandLineArgs, null);
180181
}
181182

183+
@Test(expected = TaskException.class)
184+
public void testScheduleWithInvalidTaskNameOnKuberenetesPlatform() {
185+
String taskName = "test_a1";
186+
taskDefinitionRepository.save(new TaskDefinition(taskName, "demo"));
187+
getMockedKubernetesSchedulerService().schedule(BASE_SCHEDULE_NAME +
188+
"test1", taskName, this.testProperties,
189+
this.commandLineArgs, "default");
190+
}
191+
192+
182193
@Test
183194
public void testScheduleWithCapitalizeNameOnKuberenetesPlatform() {
184195
SchedulerService testSchedulerService = getMockedKubernetesSchedulerService();

spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/service/impl/DefaultStreamServiceTests.java

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.springframework.test.context.junit4.SpringRunner;
6060
import org.springframework.util.StreamUtils;
6161

62+
import static org.junit.Assert.fail;
6263
import static org.mockito.ArgumentMatchers.any;
6364
import static org.mockito.ArgumentMatchers.anyInt;
6465
import static org.mockito.ArgumentMatchers.anyString;
@@ -286,25 +287,43 @@ public void testStreamDeployWithPreDefinedPackageVersion() {
286287
}
287288

288289
@Test
289-
public void testInvalidStreamName() {
290+
public void testInvalidStreamNameOnKubernetes() {
290291
when(this.streamValidationService.isRegistered("time", ApplicationType.source)).thenReturn(true);
291292
when(this.streamValidationService.isRegistered("log", ApplicationType.sink)).thenReturn(true);
292-
293+
Deployer k8sDeployer = new Deployer("k8s1", "kubernetes", null);
294+
Deployer cfDeployer = new Deployer("cf1", "cloudfoundry", null);
295+
when(this.skipperStreamDeployer.platformList()).thenReturn(Arrays.asList(k8sDeployer, cfDeployer));
293296
String[] streamNames = { "$stream", "stream$", "st_ream" };
294-
295297
for (String streamName : streamNames) {
296298
try {
297299
final StreamDefinition expectedStreamDefinition = new StreamDefinition(streamName, "time | log");
298-
when(streamDefinitionRepository.save(expectedStreamDefinition)).thenReturn(expectedStreamDefinition);
299-
300-
this.defaultStreamService.createStream(streamName, "time | log", "demo stream", false);
300+
when(this.streamDefinitionRepository.findById(streamName)).thenReturn(Optional.of(expectedStreamDefinition));
301+
Map<String, String> k8sProperties = new HashMap<>();
302+
k8sProperties.put(SkipperStream.SKIPPER_PLATFORM_NAME, k8sDeployer.getName());
303+
this.defaultStreamService.deployStream(streamName, k8sProperties);
304+
fail("Stream deployment should fail as the stream name is invalid");
301305
} catch (Exception e) {
302306
Assert.assertTrue(e instanceof InvalidStreamDefinitionException);
303-
Assert.assertEquals(e.getMessage(), "Stream name must consist of alphanumeric characters or '-', " +
307+
Assert.assertEquals(e.getMessage(), "Stream name "+ streamName +" is invalid. Stream name must consist of alphanumeric characters or '-', " +
304308
"start with an alphabetic character, and end with an alphanumeric character (e.g. 'my-name', " +
305309
"or 'abc-123')");
306310
}
307311
}
312+
for (String streamName : streamNames) {
313+
try {
314+
final StreamDefinition expectedStreamDefinition = new StreamDefinition(streamName, "time | log");
315+
when(this.streamDefinitionRepository.findById(streamName)).thenReturn(Optional.of(expectedStreamDefinition));
316+
Map<String, String> cfProperties = new HashMap<>();
317+
cfProperties.put(SkipperStream.SKIPPER_PLATFORM_NAME, cfDeployer.getName());
318+
this.defaultStreamService.deployStream(streamName, cfProperties);
319+
}
320+
catch (InvalidStreamDefinitionException e) {
321+
fail("Stream deployment should not fail as the stream name is valid");
322+
}
323+
catch (IllegalArgumentException e) {
324+
//ignore for the deployment
325+
}
326+
}
308327
}
309328

310329
public ArgumentCaptor<StreamDeploymentRequest> testStreamDeploy(Map<String, String> deploymentProperties) {

spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskExecutionServiceTests.java

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -136,9 +136,9 @@ public abstract class DefaultTaskExecutionServiceTests {
136136

137137
private final static String BASE_TASK_NAME = "myTask";
138138

139-
private final static String TASK_NAME_ORIG = BASE_TASK_NAME + "_ORIG";
139+
private final static String TASK_NAME_ORIG = BASE_TASK_NAME + "-ORIG";
140140

141-
private final static String TASK_NAME_ORIG2 = BASE_TASK_NAME + "_ORIG2";
141+
private final static String TASK_NAME_ORIG2 = BASE_TASK_NAME + "-ORIG2";
142142

143143
private final static String K8_PLATFORM = "k8platform";
144144

@@ -998,7 +998,7 @@ public void executeSameTaskDefinitionOnMultiplePlatforms() {
998998
this.taskExecutionService.executeTask(TASK_NAME_ORIG, deploymentProperties, new LinkedList<>());
999999
} catch (IllegalStateException ise) {
10001000
errorCaught = true;
1001-
assertEquals("Task definition [myTask_ORIG] has already been deployed on platform [default]. Requested to deploy on platform [anotherPlatform].", ise.getMessage());
1001+
assertEquals("Task definition ["+TASK_NAME_ORIG+"] has already been deployed on platform [default]. Requested to deploy on platform [anotherPlatform].", ise.getMessage());
10021002
}
10031003
if (!errorCaught) {
10041004
fail();
@@ -1013,9 +1013,9 @@ public void executeDeleteNoDeploymentWithMultiplePlatforms() {
10131013
this.launcherRepository.save(new Launcher("anotherPlatform", "local", taskLauncher));
10141014
taskDeleteService.deleteTaskDefinition(TASK_NAME_ORIG, true);
10151015
String logEntries = outputCapture.toString();
1016-
assertTrue(logEntries.contains("Deleted task app resources for myTask_ORIG in platform anotherPlatform"));
1017-
assertTrue(logEntries.contains("Deleted task app resources for myTask_ORIG in platform default"));
1018-
assertTrue(logEntries.contains("Deleted task app resources for myTask_ORIG in platform MyPlatform"));
1016+
assertTrue(logEntries.contains("Deleted task app resources for "+TASK_NAME_ORIG+" in platform anotherPlatform"));
1017+
assertTrue(logEntries.contains("Deleted task app resources for "+TASK_NAME_ORIG+" in platform default"));
1018+
assertTrue(logEntries.contains("Deleted task app resources for "+TASK_NAME_ORIG+" in platform MyPlatform"));
10191019
}
10201020

10211021
@Test
@@ -1029,7 +1029,7 @@ public void executeTaskWithNullIDReturnedTest() {
10291029
}
10301030
catch (IllegalStateException ise) {
10311031
errorCaught = true;
1032-
assertEquals("Deployment ID is null for the task:myTask_ORIG", ise.getMessage());
1032+
assertEquals("Deployment ID is null for the task:"+TASK_NAME_ORIG, ise.getMessage());
10331033
}
10341034
if (!errorCaught) {
10351035
fail();
@@ -1061,7 +1061,7 @@ taskExecutionInfoService, mock(TaskDeploymentRepository.class),
10611061
}
10621062
catch (NoSuchTaskDefinitionException ise) {
10631063
errorCaught = true;
1064-
assertEquals("Could not find task definition named myTask_ORIG", ise.getMessage());
1064+
assertEquals("Could not find task definition named "+TASK_NAME_ORIG, ise.getMessage());
10651065
}
10661066
if (!errorCaught) {
10671067
fail();
@@ -1103,14 +1103,39 @@ public void validateInvalidTaskNameTest() {
11031103
try {
11041104
initializeSuccessfulRegistry(appRegistry);
11051105
taskSaveService.saveTaskDefinition(new TaskDefinition(taskName, "AAA --foo=bar"));
1106-
1106+
this.launcherRepository.save(new Launcher("k8s1", TaskPlatformFactory.KUBERNETES_PLATFORM_TYPE, taskLauncher));
1107+
this.launcherRepository.save(new Launcher("cf1", TaskPlatformFactory.CLOUDFOUNDRY_PLATFORM_TYPE, taskLauncher));
1108+
initializeSuccessfulRegistry(appRegistry);
1109+
Map<String, String> taskDeploymentProperties = new HashMap<>();
1110+
taskDeploymentProperties.put("spring.cloud.dataflow.task.platformName", "k8s1");
1111+
taskExecutionService.executeTask(taskName, taskDeploymentProperties, Arrays.asList());
11071112
fail("Expected TaskException");
11081113
} catch (Exception e) {
11091114
assertTrue(e instanceof TaskException);
1110-
assertEquals(e.getMessage(), "Task name must consist of alphanumeric characters or '-', start " +
1111-
"with an alphabetic character, and end with an alphanumeric character (e.g. 'my-name', " +
1112-
" or 'abc-123')");
1115+
assertEquals(e.getMessage(), "Task name "+ taskName +" is invalid. Task name must consist of "
1116+
+ "alphanumeric characters or '-', start with an alphabetic character, and end with an "
1117+
+ "alphanumeric character (e.g. 'my-name', or 'abc-123')");
1118+
}
1119+
}
1120+
taskDeleteService.deleteAll();
1121+
for (String taskName : taskNames) {
1122+
try {
1123+
initializeSuccessfulRegistry(appRegistry);
1124+
taskSaveService.saveTaskDefinition(new TaskDefinition(taskName, "AAA --foo=bar"));
1125+
this.launcherRepository.save(new Launcher("k8s1", TaskPlatformFactory.KUBERNETES_PLATFORM_TYPE, taskLauncher));
1126+
this.launcherRepository.save(new Launcher("cf1", TaskPlatformFactory.CLOUDFOUNDRY_PLATFORM_TYPE, taskLauncher));
1127+
initializeSuccessfulRegistry(appRegistry);
1128+
Map<String, String> taskDeploymentProperties = new HashMap<>();
1129+
taskDeploymentProperties.put("spring.cloud.dataflow.task.platformName", "cf1");
1130+
taskExecutionService.executeTask(taskName, taskDeploymentProperties, Arrays.asList());
1131+
}
1132+
catch (TaskException e) {
1133+
fail("TaskException is not expected");
1134+
}
1135+
catch (IllegalStateException e) {
1136+
// Ignore for the tests
11131137
}
1138+
taskDeleteService.deleteAll();
11141139
}
11151140
}
11161141

0 commit comments

Comments
 (0)