Skip to content

Commit 7d4383a

Browse files
authored
Allow configuring ownerReference on K8s resources created by worker service (#174)
1 parent 01e9928 commit 7d4383a

File tree

15 files changed

+183
-29
lines changed

15 files changed

+183
-29
lines changed

mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/FunctionsImpl.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,8 @@ public void registerFunction(final String tenant,
124124
version,
125125
functionName,
126126
functionPkgUrl,
127-
functionConfig
127+
functionConfig,
128+
worker().getWorkerConfig().getFunctionsWorkerServiceCustomConfigs()
128129
);
129130
// override namespace by configuration file
130131
v1alpha1Function.getMetadata().setNamespace(KubernetesUtils.getNamespace(worker().getFactoryConfig()));
@@ -188,7 +189,8 @@ public void updateFunction(final String tenant,
188189
version,
189190
functionName,
190191
functionPkgUrl,
191-
functionConfig
192+
functionConfig,
193+
worker().getWorkerConfig().getFunctionsWorkerServiceCustomConfigs()
192194
);
193195
v1alpha1Function.getMetadata().setResourceVersion(oldFn.getMetadata().getResourceVersion());
194196
this.upsertFunction(tenant, namespace, functionName, functionConfig, v1alpha1Function, clientAuthenticationDataHttps);

mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SinksImpl.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,8 @@ public void registerSink(
132132
sinkPkgUrl,
133133
uploadedInputStream,
134134
sinkConfig,
135-
this.meshWorkerServiceSupplier.get().getConnectorsManager());
135+
this.meshWorkerServiceSupplier.get().getConnectorsManager(),
136+
worker().getWorkerConfig().getFunctionsWorkerServiceCustomConfigs());
136137
// override namesapce by configuration
137138
v1alpha1Sink.getMetadata().setNamespace(KubernetesUtils.getNamespace(worker().getFactoryConfig()));
138139
try {
@@ -207,7 +208,8 @@ public void updateSink(
207208
sinkName,
208209
sinkPkgUrl,
209210
uploadedInputStream,
210-
sinkConfig, this.meshWorkerServiceSupplier.get().getConnectorsManager());
211+
sinkConfig, this.meshWorkerServiceSupplier.get().getConnectorsManager(),
212+
worker().getWorkerConfig().getFunctionsWorkerServiceCustomConfigs());
211213
this.upsertSink(tenant, namespace, sinkName, sinkConfig, v1alpha1Sink, clientAuthenticationDataHttps);
212214
v1alpha1Sink.getMetadata().setNamespace(KubernetesUtils.getNamespace(worker().getFactoryConfig()));
213215
v1alpha1Sink

mesh-worker-service/src/main/java/io/functionmesh/compute/rest/api/SourcesImpl.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,8 @@ public void registerSource(final String tenant,
141141
sourcePkgUrl,
142142
uploadedInputStream,
143143
sourceConfig,
144-
this.meshWorkerServiceSupplier.get().getConnectorsManager());
144+
this.meshWorkerServiceSupplier.get().getConnectorsManager(),
145+
worker().getWorkerConfig().getFunctionsWorkerServiceCustomConfigs());
145146
// override namesapce by configuration
146147
v1alpha1Source.getMetadata().setNamespace(KubernetesUtils.getNamespace(worker().getFactoryConfig()));
147148
Map<String, String> customLabels = Maps.newHashMap();
@@ -207,7 +208,8 @@ public void updateSource(final String tenant,
207208
sourcePkgUrl,
208209
uploadedInputStream,
209210
sourceConfig,
210-
this.meshWorkerServiceSupplier.get().getConnectorsManager()
211+
this.meshWorkerServiceSupplier.get().getConnectorsManager(),
212+
worker().getWorkerConfig().getFunctionsWorkerServiceCustomConfigs()
211213
);
212214
v1alpha1Source.getMetadata().setResourceVersion(oldRes.getMetadata().getResourceVersion());
213215
this.upsertSource(tenant, namespace, sourceName, sourceConfig, v1alpha1Source, clientAuthenticationDataHttps);

mesh-worker-service/src/main/java/io/functionmesh/compute/util/CommonUtil.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@
1919
package io.functionmesh.compute.util;
2020

2121
import io.kubernetes.client.openapi.models.V1ObjectMeta;
22+
import io.kubernetes.client.openapi.models.V1OwnerReference;
23+
import java.util.Collections;
2224
import org.apache.commons.codec.digest.DigestUtils;
2325
import org.apache.pulsar.common.functions.FunctionConfig;
2426
import java.util.Map;
2527
import java.util.stream.Collectors;
2628

27-
import static org.apache.commons.lang.StringUtils.left;
28-
2929
public class CommonUtil {
3030
private static final String CLUSTER_NAME_ENV = "clusterName";
3131

@@ -49,10 +49,29 @@ private static String toValidResourceName(String ori) {
4949
return ori.toLowerCase().replaceAll("[^a-z0-9-\\.]", "-");
5050
}
5151

52-
public static V1ObjectMeta makeV1ObjectMeta(String name, String k8sNamespace, String pulsarNamespace, String tenant, String cluster) {
52+
public static V1OwnerReference getOwnerReferenceFromCustomConfigs(Map<String, Object> customConfigs) {
53+
if (customConfigs == null) {
54+
return null;
55+
}
56+
Map<String, Object> ownerRef = (Map<String, Object>) customConfigs.get("ownerReference");
57+
if (ownerRef == null) {
58+
return null;
59+
}
60+
return new V1OwnerReference()
61+
.apiVersion(String.valueOf(ownerRef.get("apiVersion")))
62+
.kind(String.valueOf(ownerRef.get("kind")))
63+
.name(String.valueOf(ownerRef.get("name")))
64+
.uid(String.valueOf(ownerRef.get("uid")));
65+
}
66+
67+
public static V1ObjectMeta makeV1ObjectMeta(String name, String k8sNamespace, String pulsarNamespace, String tenant,
68+
String cluster, V1OwnerReference ownerReference) {
5369
V1ObjectMeta v1ObjectMeta = new V1ObjectMeta();
5470
v1ObjectMeta.setName(createObjectName(cluster, tenant, pulsarNamespace, name));
5571
v1ObjectMeta.setNamespace(k8sNamespace);
72+
if (ownerReference != null) {
73+
v1ObjectMeta.setOwnerReferences(Collections.singletonList(ownerReference));
74+
}
5675

5776
return v1ObjectMeta;
5877
}

mesh-worker-service/src/main/java/io/functionmesh/compute/util/FunctionsUtil.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ public class FunctionsUtil {
5858
public final static String sourceKey = "source";
5959

6060
public static V1alpha1Function createV1alpha1FunctionFromFunctionConfig(String kind, String group, String version
61-
, String functionName, String functionPkgUrl, FunctionConfig functionConfig) {
61+
, String functionName, String functionPkgUrl, FunctionConfig functionConfig
62+
, Map<String, Object> customConfigs) {
6263
String customRuntimeOptionsJSON = functionConfig.getCustomRuntimeOptions();
6364
CustomRuntimeOptions customRuntimeOptions = null;
6465
if (Strings.isEmpty(customRuntimeOptionsJSON)) {
@@ -98,7 +99,8 @@ public static V1alpha1Function createV1alpha1FunctionFromFunctionConfig(String k
9899
functionConfig.getNamespace(),
99100
functionDetails.getNamespace(),
100101
functionDetails.getTenant(),
101-
clusterName));
102+
clusterName,
103+
CommonUtil.getOwnerReferenceFromCustomConfigs(customConfigs)));
102104

103105
V1alpha1FunctionSpec v1alpha1FunctionSpec = new V1alpha1FunctionSpec();
104106
v1alpha1FunctionSpec.setClassName(functionConfig.getClassName());

mesh-worker-service/src/main/java/io/functionmesh/compute/util/SinksUtil.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ public class SinksUtil {
5656

5757
public static V1alpha1Sink createV1alpha1SkinFromSinkConfig(String kind, String group, String version
5858
, String sinkName, String sinkPkgUrl, InputStream uploadedInputStream, SinkConfig sinkConfig,
59-
MeshConnectorsManager connectorsManager) {
59+
MeshConnectorsManager connectorsManager,
60+
Map<String, Object> customConfigs) {
6061
String customRuntimeOptionsJSON = sinkConfig.getCustomRuntimeOptions();
6162
CustomRuntimeOptions customRuntimeOptions = null;
6263
if (Strings.isEmpty(customRuntimeOptionsJSON)) {
@@ -107,7 +108,8 @@ public static V1alpha1Sink createV1alpha1SkinFromSinkConfig(String kind, String
107108
sinkConfig.getNamespace(),
108109
functionDetails.getNamespace(),
109110
functionDetails.getTenant(),
110-
clusterName));
111+
clusterName,
112+
CommonUtil.getOwnerReferenceFromCustomConfigs(customConfigs)));
111113

112114
V1alpha1SinkSpec v1alpha1SinkSpec = new V1alpha1SinkSpec();
113115
v1alpha1SinkSpec.setClassName(sinkConfig.getClassName());

mesh-worker-service/src/main/java/io/functionmesh/compute/util/SourcesUtil.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ public static V1alpha1Source createV1alpha1SourceFromSourceConfig(String kind, S
6161
String sourceName, String sourcePkgUrl,
6262
InputStream uploadedInputStream,
6363
SourceConfig sourceConfig,
64-
MeshConnectorsManager connectorsManager) {
64+
MeshConnectorsManager connectorsManager,
65+
Map<String, Object> customConfigs) {
6566
String customRuntimeOptionsJSON = sourceConfig.getCustomRuntimeOptions();
6667
CustomRuntimeOptions customRuntimeOptions = null;
6768
if (Strings.isEmpty(customRuntimeOptionsJSON)) {
@@ -110,7 +111,8 @@ public static V1alpha1Source createV1alpha1SourceFromSourceConfig(String kind, S
110111
sourceConfig.getNamespace(),
111112
functionDetails.getNamespace(),
112113
functionDetails.getTenant(),
113-
clusterName));
114+
clusterName,
115+
CommonUtil.getOwnerReferenceFromCustomConfigs(customConfigs)));
114116

115117
V1alpha1SourceSpec v1alpha1SourceSpec = new V1alpha1SourceSpec();
116118
v1alpha1SourceSpec.setClassName(sourceConfig.getClassName());

mesh-worker-service/src/test/java/io/functionmesh/compute/rest/api/FunctionsImplTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import io.kubernetes.client.openapi.JSON;
3131
import io.kubernetes.client.openapi.apis.CoreV1Api;
3232
import io.kubernetes.client.openapi.apis.CustomObjectsApi;
33+
import java.util.Collections;
3334
import okhttp3.Call;
3435
import okhttp3.Response;
3536
import okhttp3.ResponseBody;
@@ -278,7 +279,7 @@ public void registerFunctionTest() throws ApiException, IOException, PulsarAdmin
278279
PowerMockito.when(tenants.getTenantInfo(tenant)).thenReturn(null);
279280

280281
V1alpha1Function v1alpha1Function = FunctionsUtil.createV1alpha1FunctionFromFunctionConfig(kind, group,
281-
version, functionName, null, functionConfig);
282+
version, functionName, null, functionConfig, Collections.emptyMap());
282283

283284
Map<String, String> customLabels = Maps.newHashMap();
284285
customLabels.put("pulsar-tenant", tenant);
@@ -460,7 +461,7 @@ public void updateFunctionTest() throws ApiException, IOException {
460461

461462
FunctionConfig functionConfig = Generate.CreateJavaFunctionConfig(tenant, namespace, functionName);
462463
V1alpha1Function v1alpha1Function = FunctionsUtil.createV1alpha1FunctionFromFunctionConfig(kind, group,
463-
version, functionName, null, functionConfig);
464+
version, functionName, null, functionConfig, Collections.emptyMap());
464465
v1alpha1Function.getMetadata().setResourceVersion("24794021");
465466

466467
PowerMockito.when(meshWorkerService.getCustomObjectsApi()

mesh-worker-service/src/test/java/io/functionmesh/compute/rest/api/SinksImpTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,8 @@ public void testRegisterSink()
224224

225225
V1alpha1Sink v1alpha1Sink =
226226
SinksUtil.createV1alpha1SkinFromSinkConfig(
227-
kind, group, version, componentName, null, uploadedInputStream, sinkConfig, null);
227+
kind, group, version, componentName, null, uploadedInputStream, sinkConfig, null,
228+
Collections.emptyMap());
228229

229230
Map<String, String> customLabels = Maps.newHashMap();
230231
customLabels.put("pulsar-cluster", clusterName);
@@ -442,7 +443,8 @@ public void testUpdateSink()
442443

443444
V1alpha1Sink v1alpha1Sink =
444445
SinksUtil.createV1alpha1SkinFromSinkConfig(
445-
kind, group, version, componentName, null, uploadedInputStream, sinkConfig, null);
446+
kind, group, version, componentName, null, uploadedInputStream, sinkConfig, null,
447+
Collections.emptyMap());
446448
v1alpha1Sink.getMetadata().setResourceVersion("881033");
447449

448450
PowerMockito.when(

mesh-worker-service/src/test/java/io/functionmesh/compute/rest/api/SourcesImpTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.kubernetes.client.openapi.ApiException;
2929
import io.kubernetes.client.openapi.JSON;
3030
import io.kubernetes.client.openapi.apis.CustomObjectsApi;
31+
import java.util.Collections;
3132
import okhttp3.Call;
3233
import okhttp3.Response;
3334
import okhttp3.ResponseBody;
@@ -227,7 +228,8 @@ public void testRegisterSource()
227228
null,
228229
uploadedInputStream,
229230
sourceConfig,
230-
null);
231+
null,
232+
Collections.emptyMap());
231233
Map<String, String> customLabels = Maps.newHashMap();
232234
customLabels.put("pulsar-cluster", clusterName);
233235
customLabels.put("pulsar-tenant", tenant);
@@ -459,7 +461,8 @@ public void testUpdateSource()
459461
componentName,
460462
null,
461463
uploadedInputStream,
462-
sourceConfig, null);
464+
sourceConfig, null,
465+
Collections.emptyMap());
463466
v1alpha1Source.getMetadata().setResourceVersion("881033");
464467

465468
PowerMockito.when(

0 commit comments

Comments
 (0)