Skip to content

Commit 5cfe807

Browse files
authored
#192 add defaultSchemaType and defaultSerdeClassName to FunctionMeshConnectorDefinition (#195)
* add defaultSchemaType and defaultSerdeClassName to FunctionMeshConnectorDefinition * check if update
1 parent 3ab6378 commit 5cfe807

File tree

3 files changed

+44
-4
lines changed

3 files changed

+44
-4
lines changed

mesh-worker-service/src/main/java/io/functionmesh/compute/models/FunctionMeshConnectorDefinition.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,16 @@ public class FunctionMeshConnectorDefinition extends ConnectorDefinition {
6363
*/
6464
private String typeClassName;
6565

66+
/**
67+
* Default schema type of the connector's topic, optional.
68+
*/
69+
private String defaultSchemaType;
70+
71+
/**
72+
* Default serde class name of the connector's topic, optional.
73+
*/
74+
private String defaultSerdeClassName;
75+
6676
public String toFullImageURL() {
6777
return String.format("%s%s:%s", imageRegistry != null ? imageRegistry : DEFAULT_REGISTRY,
6878
imageRepository, imageTag != null ? imageTag : version);

mesh-worker-service/src/main/java/io/functionmesh/compute/util/SinksUtil.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,31 @@ public static V1alpha1Sink createV1alpha1SkinFromSinkConfig(String kind, String
160160
} else {
161161
v1alpha1SinkSpecInput.setTypeClassName(functionMeshConnectorDefinition.getTypeClassName());
162162
}
163+
// we only handle user provide --inputs but also with defaultSchemaType defined
164+
if (sinkConfig.getInputs() != null && sinkConfig.getInputs().size() > 0) {
165+
if (v1alpha1SinkSpecInput.getSourceSpecs() == null) {
166+
v1alpha1SinkSpecInput.setSourceSpecs(new HashMap<>());
167+
}
168+
for (String input : sinkConfig.getInputs()) {
169+
V1alpha1SinkSpecInputSourceSpecs inputSourceSpecsItem =
170+
v1alpha1SinkSpecInput.getSourceSpecs().getOrDefault(input,
171+
new V1alpha1SinkSpecInputSourceSpecs());
172+
boolean updated = false;
173+
if (StringUtils.isNotEmpty(functionMeshConnectorDefinition.getDefaultSchemaType())
174+
&& StringUtils.isEmpty(inputSourceSpecsItem.getSchemaType())) {
175+
inputSourceSpecsItem.setSchemaType(functionMeshConnectorDefinition.getDefaultSchemaType());
176+
updated = true;
177+
}
178+
if (StringUtils.isNotEmpty(functionMeshConnectorDefinition.getDefaultSerdeClassName())
179+
&& StringUtils.isEmpty(inputSourceSpecsItem.getSerdeClassname())) {
180+
inputSourceSpecsItem.setSerdeClassname(functionMeshConnectorDefinition.getDefaultSerdeClassName());
181+
updated = true;
182+
}
183+
if (updated) {
184+
v1alpha1SinkSpecInput.putSourceSpecsItem(input, inputSourceSpecsItem);
185+
}
186+
}
187+
}
163188
}
164189
}
165190
}

mesh-worker-service/src/main/java/io/functionmesh/compute/util/SourcesUtil.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,8 @@
3232
import io.functionmesh.compute.worker.MeshConnectorsManager;
3333
import io.kubernetes.client.custom.Quantity;
3434
import lombok.extern.slf4j.Slf4j;
35-
import io.functionmesh.compute.models.CustomRuntimeOptions;
3635
import org.apache.commons.lang.StringUtils;
3736
import org.apache.logging.log4j.util.Strings;
38-
import org.apache.pulsar.common.functions.FunctionDefinition;
3937
import org.apache.pulsar.common.functions.ProducerConfig;
4038
import org.apache.pulsar.common.functions.Resources;
4139
import org.apache.pulsar.common.io.SourceConfig;
@@ -44,9 +42,7 @@
4442
import org.apache.pulsar.functions.utils.SourceConfigUtils;
4543

4644
import javax.ws.rs.core.Response;
47-
import java.io.IOException;
4845
import java.io.InputStream;
49-
import java.net.URISyntaxException;
5046
import java.util.HashMap;
5147
import java.util.Map;
5248

@@ -165,6 +161,15 @@ public static V1alpha1Source createV1alpha1SourceFromSourceConfig(String kind, S
165161
} else {
166162
v1alpha1SourceSpecOutput.setTypeClassName(functionMeshConnectorDefinition.getTypeClassName());
167163
}
164+
// use default schema type if user not provided
165+
if (StringUtils.isNotEmpty(functionMeshConnectorDefinition.getDefaultSchemaType())
166+
&& StringUtils.isEmpty(v1alpha1SourceSpecOutput.getSinkSchemaType())) {
167+
v1alpha1SourceSpecOutput.setSinkSchemaType(functionMeshConnectorDefinition.getDefaultSchemaType());
168+
}
169+
if (StringUtils.isNotEmpty(functionMeshConnectorDefinition.getDefaultSerdeClassName())
170+
&& StringUtils.isEmpty(v1alpha1SourceSpecOutput.getSinkSerdeClassName())) {
171+
v1alpha1SourceSpecOutput.setSinkSerdeClassName(functionMeshConnectorDefinition.getDefaultSerdeClassName());
172+
}
168173
}
169174
}
170175
}

0 commit comments

Comments
 (0)