Skip to content
This repository was archived by the owner on May 14, 2025. It is now read-only.

Commit d04e59e

Browse files
David Turanskimarkpollack
authored andcommitted
Make TaskPlatform where SCDF is deployed primary and use with Scheduler
Fixes #3040
1 parent 6d4bbc3 commit d04e59e

File tree

15 files changed

+506
-93
lines changed

15 files changed

+506
-93
lines changed

spring-cloud-dataflow-core/src/main/java/org/springframework/cloud/dataflow/core/TaskPlatform.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,17 @@
1919
import java.util.List;
2020

2121
/**
22+
* For a given platform, Cloud Foundry, Kubernetes, etc, associate a list of Task Launchers.
23+
*
2224
* @author Donovan Muller
25+
* @author David Turanski
2326
*/
2427
public class TaskPlatform {
2528

2629
private String name;
2730

31+
private boolean primary;
32+
2833
private List<Launcher> launchers;
2934

3035
public TaskPlatform(String name, List<Launcher> launchers) {
@@ -47,5 +52,18 @@ public List<Launcher> getLaunchers() {
4752
public void setLaunchers(List<Launcher> launchers) {
4853
this.launchers = launchers;
4954
}
55+
56+
/**
57+
* If true, identifies which Platform the Data Flow Server is running on. This information is used to select
58+
* the TaskPlatform used by Scheduler. @see SchedulerConfiguration.
59+
* @return true if the TaskPlatform is the same as where the Data Flow Server is running.
60+
*/
61+
public boolean isPrimary() {
62+
return primary;
63+
}
64+
65+
public void setPrimary(boolean primary) {
66+
this.primary = primary;
67+
}
5068
}
5169

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.dataflow.server.config.cloudfoundry;
18+
19+
import java.util.Optional;
20+
21+
import io.pivotal.reactor.scheduler.ReactorSchedulerClient;
22+
import io.pivotal.scheduler.SchedulerClient;
23+
import reactor.core.publisher.Mono;
24+
25+
import org.springframework.cloud.scheduler.spi.cloudfoundry.CloudFoundrySchedulerProperties;
26+
27+
/**
28+
* @author David Turanski
29+
**/
30+
public class CloudFoundrySchedulerClientProvider {
31+
32+
private final CloudFoundryPlatformConnectionContextProvider connectionContextProvider;
33+
34+
private final CloudFoundryPlatformTokenProvider platformTokenProvider;
35+
36+
private final Optional<CloudFoundrySchedulerProperties> schedulerProperties;
37+
38+
public CloudFoundrySchedulerClientProvider(
39+
CloudFoundryPlatformConnectionContextProvider connectionContextProvider,
40+
CloudFoundryPlatformTokenProvider platformTokenProvider,
41+
Optional<CloudFoundrySchedulerProperties> schedulerProperties) {
42+
43+
44+
this.connectionContextProvider = connectionContextProvider;
45+
this.platformTokenProvider = platformTokenProvider;
46+
this.schedulerProperties = schedulerProperties;
47+
}
48+
49+
public SchedulerClient cloudFoundrySchedulerClient(String account) {
50+
return ReactorSchedulerClient.builder()
51+
.connectionContext(connectionContextProvider.connectionContext(account))
52+
.tokenProvider(platformTokenProvider.tokenProvider(account))
53+
.root(Mono.just(schedulerProperties().getSchedulerUrl()))
54+
.build();
55+
}
56+
57+
public CloudFoundrySchedulerProperties schedulerProperties() {
58+
return this.schedulerProperties.orElseGet(CloudFoundrySchedulerProperties::new);
59+
}
60+
61+
}

spring-cloud-dataflow-platform-cloudfoundry/src/main/java/org/springframework/cloud/dataflow/server/config/cloudfoundry/CloudFoundryTaskPlatformAutoConfiguration.java

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,25 +17,38 @@
1717

1818
import java.util.Optional;
1919

20-
import org.springframework.beans.factory.annotation.Value;
20+
import org.springframework.boot.autoconfigure.condition.ConditionalOnCloudPlatform;
21+
import org.springframework.boot.cloud.CloudPlatform;
2122
import org.springframework.boot.context.properties.EnableConfigurationProperties;
2223
import org.springframework.cloud.dataflow.core.TaskPlatform;
24+
import org.springframework.cloud.dataflow.server.config.CloudProfileProvider;
25+
import org.springframework.cloud.dataflow.server.config.features.ConditionalOnTasksEnabled;
26+
import org.springframework.cloud.dataflow.server.config.features.SchedulerConfiguration.SchedulerConfigurationPropertyChecker;
2327
import org.springframework.cloud.scheduler.spi.cloudfoundry.CloudFoundrySchedulerProperties;
2428
import org.springframework.context.annotation.Bean;
29+
import org.springframework.context.annotation.Conditional;
2530
import org.springframework.context.annotation.Configuration;
31+
import org.springframework.core.env.Environment;
2632

2733
/**
2834
* Creates TaskPlatform implementations to launch/schedule tasks on Cloud Foundry.
2935
* @author Mark Pollack
3036
* @author David Turanski
3137
*/
3238
@Configuration
39+
@ConditionalOnTasksEnabled
3340
@EnableConfigurationProperties(CloudFoundryPlatformProperties.class)
3441
public class CloudFoundryTaskPlatformAutoConfiguration {
3542

3643
@Bean
37-
public TaskPlatform cloudFoundryPlatform(CloudFoundryTaskPlatformFactory cloudFoundryTaskPlatformFactory) {
38-
return cloudFoundryTaskPlatformFactory.createTaskPlatform();
44+
public TaskPlatform cloudFoundryPlatform(CloudFoundryTaskPlatformFactory cloudFoundryTaskPlatformFactory,
45+
Environment environment) {
46+
TaskPlatform taskPlatform = cloudFoundryTaskPlatformFactory.createTaskPlatform();
47+
CloudProfileProvider cloudProfileProvider = new CloudFoundryCloudProfileProvider();
48+
if (cloudProfileProvider.isCloudPlatform(environment)) {
49+
taskPlatform.setPrimary(true);
50+
}
51+
return taskPlatform;
3952
}
4053

4154
@Bean
@@ -59,21 +72,30 @@ public CloudFoundryPlatformClientProvider cloudFoundryClientProvider(
5972
cloudFoundryPlatformProperties, connectionContextProvider, platformTokenProvider);
6073
}
6174

75+
@Bean
76+
@ConditionalOnCloudPlatform(CloudPlatform.CLOUD_FOUNDRY)
77+
@Conditional(SchedulerConfigurationPropertyChecker.class)
78+
public CloudFoundrySchedulerClientProvider schedulerClientProvider(
79+
CloudFoundryPlatformConnectionContextProvider connectionContextProvider,
80+
CloudFoundryPlatformTokenProvider platformTokenProvider,
81+
Optional<CloudFoundrySchedulerProperties> schedulerProperties) {
82+
return new CloudFoundrySchedulerClientProvider(
83+
connectionContextProvider, platformTokenProvider, schedulerProperties);
84+
}
85+
6286
@Bean
6387
public CloudFoundryTaskPlatformFactory cloudFoundryTaskPlatformFactory(
6488
CloudFoundryPlatformProperties cloudFoundryPlatformProperties,
6589
CloudFoundryPlatformTokenProvider platformTokenProvider,
6690
CloudFoundryPlatformConnectionContextProvider connectionContextProvider,
6791
CloudFoundryPlatformClientProvider cloudFoundryClientProvider,
68-
Optional<CloudFoundrySchedulerProperties> schedulerProperties,
69-
@Value("${spring.cloud.dataflow.features.schedules-enabled:false}") boolean schedulesEnabled) {
92+
Optional<CloudFoundrySchedulerClientProvider> cloudFoundrySchedulerClientProvider) {
7093
return CloudFoundryTaskPlatformFactory.builder()
7194
.platformProperties(cloudFoundryPlatformProperties)
7295
.platformTokenProvider(platformTokenProvider)
7396
.connectionContextProvider(connectionContextProvider)
7497
.cloudFoundryClientProvider(cloudFoundryClientProvider)
75-
.schedulesEnabled(schedulesEnabled)
76-
.schedulerProperties(schedulerProperties)
98+
.cloudFoundrySchedulerClientProvider(cloudFoundrySchedulerClientProvider)
7799
.build();
78100
}
79101
}

spring-cloud-dataflow-platform-cloudfoundry/src/main/java/org/springframework/cloud/dataflow/server/config/cloudfoundry/CloudFoundryTaskPlatformFactory.java

Lines changed: 35 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
import com.github.zafarkhaja.semver.Version;
2222
import io.jsonwebtoken.lang.Assert;
23-
import io.pivotal.reactor.scheduler.ReactorSchedulerClient;
23+
import io.pivotal.scheduler.SchedulerClient;
2424
import org.cloudfoundry.client.CloudFoundryClient;
2525
import org.cloudfoundry.client.v2.info.GetInfoRequest;
2626
import org.cloudfoundry.operations.CloudFoundryOperations;
@@ -29,7 +29,6 @@
2929
import org.cloudfoundry.reactor.TokenProvider;
3030
import org.slf4j.Logger;
3131
import org.slf4j.LoggerFactory;
32-
import reactor.core.publisher.Mono;
3332

3433
import org.springframework.cloud.dataflow.core.AbstractTaskPlatformFactory;
3534
import org.springframework.cloud.dataflow.core.Launcher;
@@ -49,33 +48,30 @@
4948
**/
5049
public class CloudFoundryTaskPlatformFactory extends AbstractTaskPlatformFactory<CloudFoundryPlatformProperties> {
5150

52-
private final boolean schedulerEnabled;
53-
5451
private final static String PLATFORM_TYPE = "Cloud Foundry";
5552

5653
private final static Logger logger = LoggerFactory.getLogger(CloudFoundryTaskPlatformFactory.class);
5754

58-
private final CloudFoundrySchedulerProperties schedulerProperties;
59-
6055
private final CloudFoundryPlatformTokenProvider platformTokenProvider;
6156

6257
private final CloudFoundryPlatformConnectionContextProvider connectionContextProvider;
6358

6459
private final CloudFoundryPlatformClientProvider cloudFoundryClientProvider;
6560

61+
private final Optional<CloudFoundrySchedulerClientProvider> cloudFoundrySchedulerClientProvider;
62+
63+
6664
private CloudFoundryTaskPlatformFactory(CloudFoundryPlatformProperties cloudFoundryPlatformProperties,
6765
CloudFoundryPlatformTokenProvider platformTokenProvider,
6866
CloudFoundryPlatformConnectionContextProvider connectionContextProvider,
6967
CloudFoundryPlatformClientProvider cloudFoundryClientProvider,
70-
Optional<CloudFoundrySchedulerProperties> schedulerProperties,
71-
boolean schedulerEnabled) {
72-
super(cloudFoundryPlatformProperties, PLATFORM_TYPE);
68+
Optional<CloudFoundrySchedulerClientProvider> cloudFoundrySchedulerClientProvider) {
7369

74-
this.schedulerEnabled = schedulerEnabled;
75-
this.schedulerProperties = schedulerProperties.orElseGet(CloudFoundrySchedulerProperties::new);
70+
super(cloudFoundryPlatformProperties, PLATFORM_TYPE);
7671
this.platformTokenProvider = platformTokenProvider;
7772
this.connectionContextProvider = connectionContextProvider;
7873
this.cloudFoundryClientProvider = cloudFoundryClientProvider;
74+
this.cloudFoundrySchedulerClientProvider = cloudFoundrySchedulerClientProvider;
7975
}
8076

8177
@Override
@@ -89,36 +85,27 @@ public Launcher createLauncher(String account) {
8985
deploymentProperties(account),
9086
cloudFoundryOperations,
9187
runtimeEnvironmentInfo(cloudFoundryClient, account));
92-
Launcher launcher = new Launcher(account, PLATFORM_TYPE, taskLauncher,
93-
scheduler(
94-
account,
95-
taskLauncher,
96-
cloudFoundryOperations,
97-
connectionContext,
98-
tokenProvider));
88+
Launcher launcher = new Launcher(account, PLATFORM_TYPE, taskLauncher,
89+
scheduler(account, taskLauncher, cloudFoundryOperations));
9990
CloudFoundryConnectionProperties connectionProperties = connectionProperties(account);
10091
launcher.setDescription(String.format("org = [%s], space = [%s], url = [%s]",
101-
connectionProperties.getOrg(), connectionProperties.getSpace(),
102-
connectionProperties.getUrl()));
92+
connectionProperties.getOrg(), connectionProperties.getSpace(),
93+
connectionProperties.getUrl()));
10394
return launcher;
10495
}
10596

106-
private Scheduler scheduler(String key, CloudFoundry2630AndLaterTaskLauncher taskLauncher,
107-
CloudFoundryOperations cloudFoundryOperations, ConnectionContext connectionContext,
108-
TokenProvider tokenProvider) {
97+
private Scheduler scheduler(String account, CloudFoundry2630AndLaterTaskLauncher taskLauncher,
98+
CloudFoundryOperations cloudFoundryOperations) {
10999
Scheduler scheduler = null;
110-
if (schedulerEnabled) {
111-
ReactorSchedulerClient reactorSchedulerClient = ReactorSchedulerClient.builder()
112-
.connectionContext(connectionContext)
113-
.tokenProvider(tokenProvider)
114-
.root(Mono.just(schedulerProperties.getSchedulerUrl()))
115-
.build();
116-
117-
scheduler = new CloudFoundryAppScheduler(reactorSchedulerClient,
100+
if (cloudFoundrySchedulerClientProvider.isPresent()) {
101+
SchedulerClient schedulerClient =
102+
cloudFoundrySchedulerClientProvider.get().cloudFoundrySchedulerClient(account);
103+
scheduler = new CloudFoundryAppScheduler(
104+
schedulerClient,
118105
cloudFoundryOperations,
119-
connectionProperties(key),
106+
connectionProperties(account),
120107
taskLauncher,
121-
schedulerProperties);
108+
cloudFoundrySchedulerClientProvider.get().schedulerProperties());
122109
}
123110
return scheduler;
124111
}
@@ -196,6 +183,8 @@ public static class Builder {
196183

197184
private CloudFoundryPlatformClientProvider cloudFoundryClientProvider;
198185

186+
private Optional<CloudFoundrySchedulerClientProvider> cloudFoundrySchedulerClientProvider = Optional.empty();
187+
199188
public Builder platformProperties(CloudFoundryPlatformProperties platformProperties) {
200189
this.platformProperties = platformProperties;
201190
return this;
@@ -211,19 +200,25 @@ public Builder schedulerProperties(Optional<CloudFoundrySchedulerProperties> sch
211200
return this;
212201
}
213202

203+
public Builder cloudFoundrySchedulerClientProvider(Optional<CloudFoundrySchedulerClientProvider>
204+
cloudFoundrySchedulerClientProvider) {
205+
this.cloudFoundrySchedulerClientProvider = cloudFoundrySchedulerClientProvider;
206+
return this;
207+
}
208+
214209
public Builder platformTokenProvider(CloudFoundryPlatformTokenProvider platformTokenProvider) {
215210
this.platformTokenProvider = platformTokenProvider;
216211
return this;
217212
}
218213

219214
public Builder connectionContextProvider(
220-
CloudFoundryPlatformConnectionContextProvider connectionContextProvider) {
215+
CloudFoundryPlatformConnectionContextProvider connectionContextProvider) {
221216
this.connectionContextProvider = connectionContextProvider;
222217
return this;
223218
}
224219

225220
public Builder cloudFoundryClientProvider(
226-
CloudFoundryPlatformClientProvider cloudFoundryClientProvider) {
221+
CloudFoundryPlatformClientProvider cloudFoundryClientProvider) {
227222
this.cloudFoundryClientProvider = cloudFoundryClientProvider;
228223
return this;
229224
}
@@ -235,12 +230,11 @@ public CloudFoundryTaskPlatformFactory build() {
235230
Assert.notNull(cloudFoundryClientProvider, "'cloudFoundryClientProvider' is required.");
236231

237232
return new CloudFoundryTaskPlatformFactory(
238-
platformProperties,
239-
platformTokenProvider,
240-
connectionContextProvider,
241-
cloudFoundryClientProvider,
242-
schedulerProperties,
243-
schedulesEnabled);
233+
platformProperties,
234+
platformTokenProvider,
235+
connectionContextProvider,
236+
cloudFoundryClientProvider,
237+
cloudFoundrySchedulerClientProvider);
244238
}
245239
}
246240
}

spring-cloud-dataflow-platform-cloudfoundry/src/test/java/org/springframework/cloud/dataflow/server/config/cloudfoundry/CloudFoundryTaskPlatformFactoryTests.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.Collections;
2020
import java.util.Optional;
2121

22+
import io.pivotal.scheduler.SchedulerClient;
2223
import org.cloudfoundry.client.CloudFoundryClient;
2324
import org.cloudfoundry.client.v2.info.GetInfoResponse;
2425
import org.cloudfoundry.reactor.TokenProvider;
@@ -51,6 +52,9 @@ public class CloudFoundryTaskPlatformFactoryTests {
5152
private CloudFoundryPlatformClientProvider cloudFoundryClientProvider = mock(
5253
CloudFoundryPlatformClientProvider.class);
5354

55+
private CloudFoundrySchedulerClientProvider cloudFoundrySchedulerClientProvider = mock(
56+
CloudFoundrySchedulerClientProvider.class);
57+
5458
private CloudFoundryClient cloudFoundryClient = mock(CloudFoundryClient.class);
5559

5660
private CloudFoundryPlatformProperties cloudFoundryPlatformProperties;
@@ -102,13 +106,18 @@ public void cloudFoundryTaskPlatformNoScheduler() {
102106
assertThat(taskPlatform.getLaunchers().get(0).getTaskLauncher()).isInstanceOf(
103107
CloudFoundry2630AndLaterTaskLauncher.class);
104108
assertThat(taskPlatform.getLaunchers().get(0).getDescription()).isEqualTo(
105-
"org = [org], space = [space], url = [https://localhost:9999]");
109+
"org = [org], space = [space], url = [https://localhost:9999]");
106110
assertThat(taskPlatform.getLaunchers().get(0).getScheduler()).isNull();
107111
}
108112

109113
@Test
110114
public void cloudFoundryTaskPlatformWithScheduler() {
111115

116+
when(cloudFoundrySchedulerClientProvider.cloudFoundrySchedulerClient(anyString())).thenReturn(
117+
mock(SchedulerClient.class));
118+
when(cloudFoundrySchedulerClientProvider.schedulerProperties())
119+
.thenReturn(new CloudFoundrySchedulerProperties());
120+
112121
CloudFoundrySchedulerProperties schedulerProperties = new CloudFoundrySchedulerProperties();
113122
schedulerProperties.setSchedulerUrl("https://localhost:9999");
114123

@@ -118,6 +127,7 @@ public void cloudFoundryTaskPlatformWithScheduler() {
118127
.platformTokenProvider(platformTokenProvider)
119128
.connectionContextProvider(connectionContextProvider)
120129
.cloudFoundryClientProvider(cloudFoundryClientProvider)
130+
.cloudFoundrySchedulerClientProvider(Optional.of(cloudFoundrySchedulerClientProvider))
121131
.schedulesEnabled(true)
122132
.schedulerProperties(Optional.of(schedulerProperties))
123133
.build();
@@ -130,7 +140,7 @@ public void cloudFoundryTaskPlatformWithScheduler() {
130140
assertThat(taskPlatform.getLaunchers().get(0).getTaskLauncher()).isInstanceOf(
131141
CloudFoundry2630AndLaterTaskLauncher.class);
132142
assertThat(taskPlatform.getLaunchers().get(0).getDescription()).isEqualTo(
133-
"org = [org], space = [space], url = [https://localhost:9999]");
143+
"org = [org], space = [space], url = [https://localhost:9999]");
134144
assertThat(taskPlatform.getLaunchers().get(0).getScheduler()).isNotNull();
135145
}
136146
}

0 commit comments

Comments
 (0)