Skip to content

Commit 3b46997

Browse files
authored
Support Environment chains (#42)
1 parent d86c627 commit 3b46997

File tree

5 files changed

+74
-36
lines changed

5 files changed

+74
-36
lines changed

hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/Resource.java

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,11 @@ public interface Environment {
152152
Environment EMPTY = new SimpleEnvironment();
153153
Environment PROCESS = new ProcessEnvironment();
154154

155-
String getOrDefault(String key, String defaultValue);
155+
String getOrDefault(String key, Supplier<String> f);
156+
157+
default Environment orElse(Environment other) {
158+
return (k, f) -> getOrDefault(k, () -> other.getOrDefault(k, f));
159+
}
156160
}
157161

158162
/** Basic Environment implementation */
@@ -162,6 +166,14 @@ public static class SimpleEnvironment implements Environment {
162166
public SimpleEnvironment() {
163167
}
164168

169+
public SimpleEnvironment(Properties properties) {
170+
properties.forEach((k, v) -> vars.put(k.toString(), v.toString()));
171+
}
172+
173+
public SimpleEnvironment(Map<String, String> vars) {
174+
exportAll(vars);
175+
}
176+
165177
protected void export(String property, String value) {
166178
vars.put(property, value);
167179
}
@@ -180,20 +192,24 @@ public SimpleEnvironment with(String key, String value) {
180192
}
181193

182194
@Override
183-
public String getOrDefault(String key, String defaultValue) {
184-
if (defaultValue == null && !vars.containsKey(key)) {
185-
throw new IllegalArgumentException("No variable '" + key + "' found in the environment");
195+
public String getOrDefault(String key, Supplier<String> f) {
196+
if (!vars.containsKey(key)) {
197+
if (f == null || f.get() == null) {
198+
throw new IllegalArgumentException("No variable '" + key + "' found in the environment");
199+
} else {
200+
return f.get();
201+
}
186202
}
187-
return vars.getOrDefault(key, defaultValue);
203+
return vars.get(key);
188204
}
189205
}
190206

191207
/** Returns "{{key}}" for any key without a default */
192208
public static class DummyEnvironment implements Environment {
193209
@Override
194-
public String getOrDefault(String key, String defaultValue) {
195-
if (defaultValue != null) {
196-
return defaultValue;
210+
public String getOrDefault(String key, Supplier<String> f) {
211+
if (f != null && f.get() != null) {
212+
return f.get();
197213
} else {
198214
return "{{" + key + "}}";
199215
}
@@ -204,13 +220,13 @@ public String getOrDefault(String key, String defaultValue) {
204220
public static class ProcessEnvironment implements Environment {
205221

206222
@Override
207-
public String getOrDefault(String key, String defaultValue) {
223+
public String getOrDefault(String key, Supplier<String> f) {
208224
String value = System.getenv(key);
209225
if (value == null) {
210226
value = System.getProperty(key);
211227
}
212-
if (value == null) {
213-
value = defaultValue;
228+
if (value == null && f != null) {
229+
value = f.get();
214230
}
215231
if (value == null) {
216232
throw new IllegalArgumentException("Missing system property `" + key + "`");
@@ -270,7 +286,7 @@ public String render(Resource resource) {
270286
}
271287
String key = m.group(2);
272288
String defaultValue = m.group(4);
273-
String value = resource.getOrDefault(key, () -> env.getOrDefault(key, defaultValue));
289+
String value = resource.getOrDefault(key, () -> env.getOrDefault(key, () -> defaultValue));
274290
if (value == null) {
275291
throw new IllegalArgumentException(template + " has no value for key " + key + ".");
276292
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.linkedin.hoptimator.catalog;
2+
3+
import static org.junit.Assert.assertTrue;
4+
import static org.junit.Assert.assertFalse;
5+
import static org.junit.Assert.assertEquals;
6+
import org.junit.Test;
7+
8+
public class ResourceTest {
9+
10+
@Test
11+
public void handlesChainedEnvironments() {
12+
Resource.Environment env = new Resource.SimpleEnvironment() {{
13+
export("one", "1");
14+
export("foo", "bar");
15+
}}.orElse(new Resource.SimpleEnvironment() {{
16+
export("two", "2");
17+
export("foo", "car");
18+
}}.orElse(new Resource.SimpleEnvironment() {{
19+
export("three", "3");
20+
export("foo", "dar");
21+
}}));
22+
assertEquals("1", env.getOrDefault("one", () -> "x"));
23+
assertEquals("2", env.getOrDefault("two", () -> "x"));
24+
assertEquals("3", env.getOrDefault("three", () -> "x"));
25+
assertEquals("bar", env.getOrDefault("foo", () -> "x"));
26+
assertEquals("x", env.getOrDefault("oof", () -> "x"));
27+
}
28+
}

hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/HoptimatorOperatorApp.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import io.kubernetes.client.extended.controller.Controller;
77
import io.kubernetes.client.extended.controller.ControllerManager;
88

9+
import com.linkedin.hoptimator.catalog.Resource;
910
import com.linkedin.hoptimator.models.V1alpha1Subscription;
1011
import com.linkedin.hoptimator.models.V1alpha1SubscriptionList;
1112
import com.linkedin.hoptimator.operator.subscription.SubscriptionReconciler;
@@ -34,12 +35,14 @@ public class HoptimatorOperatorApp {
3435
final String modelPath;
3536
final String namespace;
3637
final Properties properties;
38+
final Resource.Environment environment;
3739

3840
/** This constructor is likely to evolve and break. */
3941
public HoptimatorOperatorApp(String modelPath, String namespace, Properties properties) {
4042
this.modelPath = modelPath;
4143
this.namespace = namespace;
4244
this.properties = properties;
45+
this.environment = new Resource.SimpleEnvironment(properties);
4346
}
4447

4548
public static void main(String[] args) throws Exception {
@@ -91,7 +94,7 @@ public void run() throws Exception {
9194

9295
List<Controller> controllers = new ArrayList<>();
9396
controllers.addAll(ControllerService.controllers(operator));
94-
controllers.add(SubscriptionReconciler.controller(operator, plannerFactory));
97+
controllers.add(SubscriptionReconciler.controller(operator, plannerFactory, environment));
9598

9699
ControllerManager controllerManager = new ControllerManager(operator.informerFactory(),
97100
controllers.toArray(new Controller[0]));

hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionEnvironment.java

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,27 +26,13 @@
2626
* as a basis for deriving K8s object names, Kafka topic names, etc. The
2727
* name is guaranteed to be a valid K8s object name, e.g. `my-subscription`.
2828
* - `pipeline.avroSchema`, an Avro schema for the pipeline's output type.
29-
*
30-
* In addition, any "hints" in the Subscription object (`.spec.hints`) are
31-
* exported as-is. These can be used to provide optional properties to
32-
* templates. When using such hints in a template, ensure that you provide a
33-
* default value, e.g. `{{numPartitions:null}``, since they will usually be
34-
* missing.
3529
*/
3630
public class SubscriptionEnvironment extends Resource.SimpleEnvironment {
3731

38-
public SubscriptionEnvironment(String namespace, String name, Pipeline pipeline,
39-
Map<String, String> hints) {
40-
if (hints != null) {
41-
exportAll(hints);
42-
}
32+
public SubscriptionEnvironment(String namespace, String name, Pipeline pipeline) {
4333
export("pipeline.namespace", namespace);
4434
export("pipeline.name", name);
4535
export("pipeline.avroSchema", AvroConverter.avro("com.linkedin.hoptimator", "OutputRecord",
4636
pipeline.outputType()).toString(false));
4737
}
48-
49-
public SubscriptionEnvironment(String namespace, String name, Pipeline pipeline) {
50-
this(namespace, name, pipeline, Collections.emptyMap());
51-
}
5238
}

hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,13 @@ public class SubscriptionReconciler implements Reconciler {
4343

4444
private final Operator operator;
4545
private final HoptimatorPlanner.Factory plannerFactory;
46+
private final Resource.Environment environment;
4647

47-
private SubscriptionReconciler(Operator operator, HoptimatorPlanner.Factory plannerFactory) {
48+
private SubscriptionReconciler(Operator operator, HoptimatorPlanner.Factory plannerFactory,
49+
Resource.Environment environment) {
4850
this.operator = operator;
4951
this.plannerFactory = plannerFactory;
52+
this.environment = environment;
5053
}
5154

5255
@Override
@@ -84,11 +87,13 @@ public Result reconcile(Request request) {
8487
log.info("Planning a new pipeline for {}/{} with SQL `{}`...", kind, name, object.getSpec().getSql());
8588

8689
Pipeline pipeline = pipeline(object);
87-
SubscriptionEnvironment env = new SubscriptionEnvironment(namespace, name, pipeline);
88-
SubscriptionEnvironment sinkEnv = new SubscriptionEnvironment(namespace, name, pipeline,
89-
object.getSpec().getHints());
90-
Resource.TemplateFactory templateFactory = new Resource.SimpleTemplateFactory(env);
91-
Resource.TemplateFactory sinkTemplateFactory = new Resource.SimpleTemplateFactory(sinkEnv);
90+
Resource.Environment subEnv = new SubscriptionEnvironment(namespace, name, pipeline)
91+
.orElse(environment);
92+
Resource.TemplateFactory templateFactory = new Resource.SimpleTemplateFactory(subEnv);
93+
94+
// For sink resources, also expose hints.
95+
Resource.TemplateFactory sinkTemplateFactory = new Resource.SimpleTemplateFactory(subEnv
96+
.orElse(new Resource.SimpleEnvironment(object.getSpec().getHints())));
9297

9398
// Render resources related to all source tables.
9499
List<String> upstreamResources = pipeline.upstreamResources().stream()
@@ -266,8 +271,8 @@ private static boolean isReady(DynamicKubernetesObject obj) {
266271
return true;
267272
}
268273

269-
public static Controller controller(Operator operator, HoptimatorPlanner.Factory plannerFactory) {
270-
Reconciler reconciler = new SubscriptionReconciler(operator, plannerFactory);
274+
public static Controller controller(Operator operator, HoptimatorPlanner.Factory plannerFactory, Resource.Environment environment) {
275+
Reconciler reconciler = new SubscriptionReconciler(operator, plannerFactory, environment);
271276
return ControllerBuilder.defaultBuilder(operator.informerFactory())
272277
.withReconciler(reconciler)
273278
.withName("subscription-controller")

0 commit comments

Comments
 (0)