Skip to content
This repository was archived by the owner on May 14, 2025. It is now read-only.

Commit eccc8e2

Browse files
markpollackjvalkeal
authored andcommitted
Prevent un-registration of Apps used in deployed streams
Resolves #2013
1 parent a16f42b commit eccc8e2

File tree

11 files changed

+263
-23
lines changed

11 files changed

+263
-23
lines changed

spring-cloud-dataflow-registry/src/main/java/org/springframework/cloud/dataflow/registry/service/DefaultAppRegistryService.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,6 @@ public AppRegistration save(AppRegistration app) {
166166
*/
167167
public void delete(String name, ApplicationType type, String version) {
168168
this.appRegistrationRepository.deleteAppRegistrationByNameAndTypeAndVersion(name, type, version);
169-
// TODO select new default
170169
}
171170

172171
@Override

spring-cloud-dataflow-rest-resource/src/main/java/org/springframework/cloud/dataflow/rest/SkipperStream.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,8 @@ public abstract class SkipperStream {
3636
public static final String SKIPPER_DEFAULT_KIND = "SpringCloudDeployerApplication";
3737

3838
public static final String SKIPPER_DEFAULT_MAINTAINER = "dataflow";
39+
40+
public static final String SKIPPER_SPEC_RESOURCE = "resource";
41+
42+
public static final String SKIPPER_SPEC_VERSION = "version";
3943
}

spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/config/DataFlowControllerAutoConfiguration.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,14 +75,14 @@
7575
import org.springframework.cloud.dataflow.server.controller.RootController;
7676
import org.springframework.cloud.dataflow.server.controller.RuntimeAppsController;
7777
import org.springframework.cloud.dataflow.server.controller.RuntimeAppsController.AppInstanceController;
78+
import org.springframework.cloud.dataflow.server.controller.SkipperAppRegistryController;
7879
import org.springframework.cloud.dataflow.server.controller.SkipperStreamDeploymentController;
7980
import org.springframework.cloud.dataflow.server.controller.StreamDefinitionController;
8081
import org.springframework.cloud.dataflow.server.controller.StreamDeploymentController;
8182
import org.springframework.cloud.dataflow.server.controller.TaskDefinitionController;
8283
import org.springframework.cloud.dataflow.server.controller.TaskExecutionController;
8384
import org.springframework.cloud.dataflow.server.controller.ToolsController;
8485
import org.springframework.cloud.dataflow.server.controller.UiController;
85-
import org.springframework.cloud.dataflow.server.controller.VersionedAppRegistryController;
8686
import org.springframework.cloud.dataflow.server.controller.security.LoginController;
8787
import org.springframework.cloud.dataflow.server.controller.security.SecurityController;
8888
import org.springframework.cloud.dataflow.server.controller.support.MetricStore;
@@ -406,10 +406,15 @@ public AppRegistryService appRegistryService(AppRegistrationRepository appRegist
406406
}
407407

408408
@Bean
409-
public VersionedAppRegistryController appRegistryController2(AppRegistryService appRegistry,
410-
ApplicationConfigurationMetadataResolver metadataResolver, ForkJoinPool appRegistryFJPFB,
411-
MavenProperties mavenProperties) {
412-
return new VersionedAppRegistryController(appRegistry, metadataResolver, appRegistryFJPFB, mavenProperties);
409+
public SkipperAppRegistryController skipperAppRegistryController(
410+
StreamDefinitionRepository streamDefinitionRepository,
411+
StreamService streamService,
412+
AppRegistryService appRegistry, ApplicationConfigurationMetadataResolver metadataResolver,
413+
ForkJoinPool appRegistryFJPFB, MavenProperties mavenProperties) {
414+
return new SkipperAppRegistryController(streamDefinitionRepository,
415+
streamService,
416+
appRegistry,
417+
metadataResolver, appRegistryFJPFB, mavenProperties);
413418
}
414419
}
415420

spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/RestControllerAdvice.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ public VndErrors onException(Exception e) {
9191
* application/vnd.error+json
9292
*/
9393
@ExceptionHandler({ AppAlreadyRegisteredException.class, DuplicateStreamDefinitionException.class,
94-
DuplicateTaskException.class, StreamAlreadyDeployedException.class, StreamAlreadyDeployingException.class })
94+
DuplicateTaskException.class, StreamAlreadyDeployedException.class, StreamAlreadyDeployingException.class,
95+
UnregisterAppException.class})
9596
@ResponseStatus(HttpStatus.CONFLICT)
9697
@ResponseBody
9798
public VndErrors onConflictException(Exception e) {
Lines changed: 86 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,22 +24,33 @@
2424
import java.util.Arrays;
2525
import java.util.Collections;
2626
import java.util.List;
27+
import java.util.Map;
2728
import java.util.Properties;
2829
import java.util.concurrent.ForkJoinPool;
2930

31+
import com.fasterxml.jackson.core.type.TypeReference;
32+
import com.fasterxml.jackson.databind.ObjectMapper;
3033
import org.slf4j.Logger;
3134
import org.slf4j.LoggerFactory;
3235

3336
import org.springframework.boot.configurationmetadata.ConfigurationMetadataProperty;
3437
import org.springframework.cloud.dataflow.configuration.metadata.ApplicationConfigurationMetadataResolver;
3538
import org.springframework.cloud.dataflow.core.ApplicationType;
39+
import org.springframework.cloud.dataflow.core.StreamAppDefinition;
40+
import org.springframework.cloud.dataflow.core.StreamDefinition;
41+
import org.springframework.cloud.dataflow.core.StreamDeployment;
3642
import org.springframework.cloud.dataflow.registry.domain.AppRegistration;
3743
import org.springframework.cloud.dataflow.registry.service.AppRegistryService;
3844
import org.springframework.cloud.dataflow.registry.service.DefaultAppRegistryService;
3945
import org.springframework.cloud.dataflow.registry.support.NoSuchAppRegistrationException;
4046
import org.springframework.cloud.dataflow.registry.support.ResourceUtils;
47+
import org.springframework.cloud.dataflow.rest.SkipperStream;
4148
import org.springframework.cloud.dataflow.rest.resource.AppRegistrationResource;
4249
import org.springframework.cloud.dataflow.rest.resource.DetailedAppRegistrationResource;
50+
import org.springframework.cloud.dataflow.server.DataFlowServerUtil;
51+
import org.springframework.cloud.dataflow.server.repository.StreamDefinitionRepository;
52+
import org.springframework.cloud.dataflow.server.service.StreamService;
53+
import org.springframework.cloud.dataflow.server.support.CannotDetermineApplicationTypeException;
4354
import org.springframework.cloud.deployer.resource.maven.MavenProperties;
4455
import org.springframework.core.io.ByteArrayResource;
4556
import org.springframework.core.io.DefaultResourceLoader;
@@ -75,25 +86,33 @@
7586
@RestController
7687
@RequestMapping("/apps")
7788
@ExposesResourceFor(AppRegistrationResource.class)
78-
public class VersionedAppRegistryController {
89+
public class SkipperAppRegistryController {
7990

80-
private static final Logger logger = LoggerFactory.getLogger(VersionedAppRegistryController.class);
91+
private static final Logger logger = LoggerFactory.getLogger(SkipperAppRegistryController.class);
8192

8293
private final Assembler assembler = new Assembler();
8394

95+
private final StreamDefinitionRepository streamDefinitionRepository;
96+
8497
private final AppRegistryService appRegistryService;
8598

99+
private final MavenProperties mavenProperties;
100+
101+
private final StreamService streamService;
102+
86103
private ApplicationConfigurationMetadataResolver metadataResolver;
87104

88105
private ForkJoinPool forkJoinPool;
89106

90-
private final MavenProperties mavenProperties;
91-
92107
private ResourceLoader resourceLoader = new DefaultResourceLoader();
93108

94-
public VersionedAppRegistryController(AppRegistryService appRegistryService,
109+
public SkipperAppRegistryController(StreamDefinitionRepository streamDefinitionRepository,
110+
StreamService streamService,
111+
AppRegistryService appRegistryService,
95112
ApplicationConfigurationMetadataResolver metadataResolver,
96113
ForkJoinPool forkJoinPool, MavenProperties mavenProperties) {
114+
this.streamDefinitionRepository = streamDefinitionRepository;
115+
this.streamService = streamService;
97116
this.appRegistryService = appRegistryService;
98117
this.metadataResolver = metadataResolver;
99118
this.forkJoinPool = forkJoinPool;
@@ -230,9 +249,70 @@ public void makeDefault(@PathVariable("type") ApplicationType type, @PathVariabl
230249
@ResponseStatus(HttpStatus.OK)
231250
public void unregister(@PathVariable("type") ApplicationType type, @PathVariable("name") String name,
232251
@PathVariable("version") String version) {
252+
253+
if (type != ApplicationType.task) {
254+
String streamWithApp = findStreamContainingAppOf(type, name, version);
255+
if (streamWithApp != null) {
256+
throw new UnregisterAppException(String.format("The app [%s:%s:%s] you're trying to unregister is " +
257+
"currently used in stream '%s'.", name, type, version, streamWithApp));
258+
}
259+
}
260+
233261
appRegistryService.delete(name, type, version);
234262
}
235263

264+
/**
265+
* Given the application type, name, and version, determine if it is being used in a deployed stream definition.
266+
*
267+
* @param appType the application type
268+
* @param appName the application name
269+
* @param appVersion application version
270+
* @return the name of the deployed stream where the app is being used. If the app is not deployed in a stream,
271+
* return {@code null}.
272+
*/
273+
private String findStreamContainingAppOf(ApplicationType appType, String appName, String appVersion) {
274+
Iterable<StreamDefinition> streamDefinitions = streamDefinitionRepository.findAll();
275+
for (StreamDefinition streamDefinition : streamDefinitions) {
276+
StreamDeployment streamDeployment = this.streamService.info(streamDefinition.getName());
277+
for (StreamAppDefinition streamAppDefinition : streamDefinition.getAppDefinitions()) {
278+
final String streamAppName = streamAppDefinition.getRegisteredAppName();
279+
final ApplicationType streamAppType;
280+
try {
281+
streamAppType = DataFlowServerUtil.determineApplicationType(streamAppDefinition);
282+
}
283+
catch (CannotDetermineApplicationTypeException e) {
284+
logger.warn("Can not determine ApplicationType for " + streamAppDefinition);
285+
continue;
286+
}
287+
if (appType != streamAppType) {
288+
continue;
289+
}
290+
Map<String, Map<String, String>> streamDeploymentPropertiesMap;
291+
String streamDeploymentPropertiesString = streamDeployment.getDeploymentProperties();
292+
ObjectMapper objectMapper = new ObjectMapper();
293+
try {
294+
streamDeploymentPropertiesMap = objectMapper.readValue(streamDeploymentPropertiesString,
295+
new TypeReference<Map<String, Map<String, String>>>() {
296+
});
297+
}
298+
catch (IOException e) {
299+
throw new RuntimeException("Can not deserialize Stream Deployment Properties JSON '"
300+
+ streamDeploymentPropertiesString + "'");
301+
}
302+
if (streamDeploymentPropertiesMap.containsKey(appName)) {
303+
Map<String, String> appDeploymentProperties = streamDeploymentPropertiesMap.get(streamAppName);
304+
if (appDeploymentProperties.containsKey(SkipperStream.SKIPPER_SPEC_VERSION)) {
305+
String version = appDeploymentProperties.get(SkipperStream.SKIPPER_SPEC_VERSION);
306+
if (version != null && version.equals(appVersion)) {
307+
return streamDefinition.getName();
308+
}
309+
}
310+
}
311+
}
312+
}
313+
return null;
314+
}
315+
236316
@Deprecated
237317
@RequestMapping(value = "/{type}/{name}", method = RequestMethod.DELETE)
238318
@ResponseStatus(HttpStatus.OK)
@@ -306,7 +386,7 @@ private void prefetchMetadata(List<AppRegistration> appRegistrations) {
306386
class Assembler extends ResourceAssemblerSupport<AppRegistration, AppRegistrationResource> {
307387

308388
public Assembler() {
309-
super(VersionedAppRegistryController.class, AppRegistrationResource.class);
389+
super(SkipperAppRegistryController.class, AppRegistrationResource.class);
310390
}
311391

312392
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.cloud.dataflow.server.controller;
17+
18+
/**
19+
* Exception thrown when an application can not be unregistered.
20+
*
21+
* @author Christian Tzolov
22+
*/
23+
public class UnregisterAppException extends RuntimeException {
24+
25+
private static final long serialVersionUID = 1L;
26+
27+
public UnregisterAppException(String message) {
28+
super(message);
29+
}
30+
}

spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/stream/SkipperStreamDeployer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,8 @@ public StreamDeployment getStreamInfo(String streamName) {
431431
SpringCloudDeployerApplicationSpec spec = applicationManifest.getSpec();
432432
String applicationName = applicationManifest.getApplicationName();
433433
versionAndDeploymentProperties.putAll(spec.getDeploymentProperties());
434-
versionAndDeploymentProperties.put(spec.getResource(), spec.getVersion());
434+
versionAndDeploymentProperties.put(SkipperStream.SKIPPER_SPEC_RESOURCE, spec.getResource());
435+
versionAndDeploymentProperties.put(SkipperStream.SKIPPER_SPEC_VERSION, spec.getVersion());
435436
streamPropertiesMap.put(applicationName, versionAndDeploymentProperties);
436437
}
437438
return new StreamDeployment(streamName, new JSONObject(streamPropertiesMap).toString());

spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/TestDependencies.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,13 @@
5454
import org.springframework.cloud.dataflow.server.controller.MetricsController;
5555
import org.springframework.cloud.dataflow.server.controller.RestControllerAdvice;
5656
import org.springframework.cloud.dataflow.server.controller.RuntimeAppsController;
57+
import org.springframework.cloud.dataflow.server.controller.SkipperAppRegistryController;
5758
import org.springframework.cloud.dataflow.server.controller.SkipperStreamDeploymentController;
5859
import org.springframework.cloud.dataflow.server.controller.StreamDefinitionController;
5960
import org.springframework.cloud.dataflow.server.controller.StreamDeploymentController;
6061
import org.springframework.cloud.dataflow.server.controller.TaskDefinitionController;
6162
import org.springframework.cloud.dataflow.server.controller.TaskExecutionController;
6263
import org.springframework.cloud.dataflow.server.controller.ToolsController;
63-
import org.springframework.cloud.dataflow.server.controller.VersionedAppRegistryController;
6464
import org.springframework.cloud.dataflow.server.controller.support.ApplicationsMetrics;
6565
import org.springframework.cloud.dataflow.server.controller.support.ApplicationsMetrics.Application;
6666
import org.springframework.cloud.dataflow.server.controller.support.ApplicationsMetrics.Instance;
@@ -274,9 +274,13 @@ public AppRegistryService appRegistryService(AppRegistrationRepository appRegist
274274

275275
@Bean
276276
@ConditionalOnSkipperEnabled
277-
public VersionedAppRegistryController versionedAppRegistryController(AppRegistryService appRegistry,
277+
public SkipperAppRegistryController versionedAppRegistryController(
278+
StreamDefinitionRepository streamDefinitionRepository,
279+
StreamService streamService,
280+
AppRegistryService appRegistry,
278281
ApplicationConfigurationMetadataResolver metadataResolver, MavenProperties mavenProperties) {
279-
return new VersionedAppRegistryController(appRegistry, metadataResolver, new ForkJoinPool(2), mavenProperties);
282+
return new SkipperAppRegistryController(streamDefinitionRepository, streamService, appRegistry, metadataResolver,
283+
new ForkJoinPool(2), mavenProperties);
280284
}
281285

282286
@Bean

0 commit comments

Comments
 (0)