Skip to content

Commit 7f3c1a9

Browse files
authored
Merge pull request #4832 from apache/master
2 parents a119d2e + 3dd9b0c commit 7f3c1a9

File tree

7 files changed

+76
-1
lines changed

7 files changed

+76
-1
lines changed

linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,4 +118,10 @@ object ComputationExecutorConf {
118118
val TASK_SUBMIT_WAIT_TIME_MS =
119119
CommonVars("linkis.ec.task.submit.wait.time.ms", 2L, "Task submit wait time(ms)").getValue
120120

121+
val ENGINE_SEND_LOG_TO_ENTRANCE_LIMIT_ENABLED =
122+
CommonVars("linkis.ec.send.log.entrance.limit.enabled", true)
123+
124+
val ENGINE_SEND_LOG_TO_ENTRANCE_LIMIT_LENGTH =
125+
CommonVars("linkis.ec.send.log.entrance.limit.length", 2000)
126+
121127
}

linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/EngineExecutionContext.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,8 +193,16 @@ class EngineExecutionContext(executor: ComputationExecutor, executorUser: String
193193
def appendStdout(log: String): Unit = if (executor.isInternalExecute) {
194194
logger.info(log)
195195
} else {
196+
var taskLog = log
197+
if (
198+
ComputationExecutorConf.ENGINE_SEND_LOG_TO_ENTRANCE_LIMIT_ENABLED.getValue &&
199+
log.length > ComputationExecutorConf.ENGINE_SEND_LOG_TO_ENTRANCE_LIMIT_LENGTH.getValue
200+
) {
201+
taskLog =
202+
s"${log.substring(0, ComputationExecutorConf.ENGINE_SEND_LOG_TO_ENTRANCE_LIMIT_LENGTH.getValue)}..."
203+
}
196204
val listenerBus = getEngineSyncListenerBus
197-
getJobId.foreach(jId => listenerBus.postToAll(TaskLogUpdateEvent(jId, log)))
205+
getJobId.foreach(jId => listenerBus.postToAll(TaskLogUpdateEvent(jId, taskLog)))
198206
}
199207

200208
override def close(): Unit = {

linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public class SparkConfig {
4747
private String k8sSparkVersion;
4848

4949
private String k8sNamespace;
50+
private String k8sFileUploadPath;
5051
private String deployMode = "client"; // ("client") // todo cluster
5152
private String appResource; // ("")
5253
private String appName; // ("")
@@ -73,6 +74,14 @@ public class SparkConfig {
7374
private String keytab; // ("--keytab", "")
7475
private String queue; // ("--queue", "")
7576

77+
public String getK8sFileUploadPath() {
78+
return k8sFileUploadPath;
79+
}
80+
81+
public void setK8sFileUploadPath(String k8sFileUploadPath) {
82+
this.k8sFileUploadPath = k8sFileUploadPath;
83+
}
84+
7685
public String getK8sImagePullPolicy() {
7786
return k8sImagePullPolicy;
7887
}
@@ -421,6 +430,9 @@ public String toString() {
421430
+ ", k8sSparkVersion='"
422431
+ k8sSparkVersion
423432
+ '\''
433+
+ ", k8sFileUploadPath='"
434+
+ k8sFileUploadPath
435+
+ '\''
424436
+ ", k8sNamespace='"
425437
+ k8sNamespace
426438
+ '\''

linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@
2121
import org.apache.linkis.engineplugin.spark.client.context.SparkConfig;
2222
import org.apache.linkis.engineplugin.spark.client.deployment.crds.*;
2323
import org.apache.linkis.engineplugin.spark.client.deployment.util.KubernetesHelper;
24+
import org.apache.linkis.engineplugin.spark.config.SparkConfiguration;
2425

2526
import org.apache.commons.collections.CollectionUtils;
2627
import org.apache.commons.lang3.StringUtils;
2728
import org.apache.spark.launcher.SparkAppHandle;
2829

30+
import java.util.HashMap;
2931
import java.util.List;
3032
import java.util.Map;
3133
import java.util.Objects;
@@ -79,6 +81,7 @@ public void deployCluster(String mainClass, String args, Map<String, String> con
7981

8082
NonNamespaceOperation<SparkApplication, SparkApplicationList, Resource<SparkApplication>>
8183
sparkApplicationClient = getSparkApplicationClient(client);
84+
8285
SparkApplication sparkApplication =
8386
getSparkApplication(sparkConfig.getAppName(), sparkConfig.getK8sNamespace());
8487

@@ -88,12 +91,19 @@ public void deployCluster(String mainClass, String args, Map<String, String> con
8891
.memory(sparkConfig.getDriverMemory())
8992
.serviceAccount(sparkConfig.getK8sServiceAccount())
9093
.build();
94+
9195
SparkPodSpec executor =
9296
SparkPodSpec.Builder()
9397
.cores(sparkConfig.getExecutorCores())
9498
.instances(sparkConfig.getNumExecutors())
9599
.memory(sparkConfig.getExecutorMemory())
96100
.build();
101+
102+
Map<String, String> sparkConfMap = new HashMap<>();
103+
sparkConfMap.put(
104+
SparkConfiguration.SPARK_KUBERNETES_FILE_UPLOAD_PATH().key(),
105+
sparkConfig.getK8sFileUploadPath());
106+
97107
SparkApplicationSpec sparkApplicationSpec =
98108
SparkApplicationSpec.Builder()
99109
.type(sparkConfig.getK8sLanguageType())
@@ -107,10 +117,12 @@ public void deployCluster(String mainClass, String args, Map<String, String> con
107117
.restartPolicy(new RestartPolicy(sparkConfig.getK8sRestartPolicy()))
108118
.driver(driver)
109119
.executor(executor)
120+
.sparkConf(sparkConfMap)
110121
.build();
111122

112123
logger.info("Spark k8s operator task parameters: {}", sparkApplicationSpec);
113124
sparkApplication.setSpec(sparkApplicationSpec);
125+
114126
SparkApplication created = sparkApplicationClient.createOrReplace(sparkApplication);
115127
logger.info("Preparing to submit the Spark k8s operator Task: {}", created);
116128

linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplicationSpec.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
package org.apache.linkis.engineplugin.spark.client.deployment.crds;
1919

20+
import java.util.HashMap;
2021
import java.util.List;
22+
import java.util.Map;
2123

2224
import io.fabric8.kubernetes.api.model.KubernetesResource;
2325

@@ -45,6 +47,16 @@ public class SparkApplicationSpec implements KubernetesResource {
4547

4648
private SparkPodSpec executor;
4749

50+
private Map<String, String> sparkConf;
51+
52+
public Map<String, String> getSparkConf() {
53+
return sparkConf;
54+
}
55+
56+
public void setSparkConf(Map<String, String> sparkConf) {
57+
this.sparkConf = sparkConf;
58+
}
59+
4860
public String getType() {
4961
return type;
5062
}
@@ -165,6 +177,8 @@ public String toString() {
165177
+ driver
166178
+ ", executor="
167179
+ executor
180+
+ ", sparkConf="
181+
+ sparkConf
168182
+ '}';
169183
}
170184

@@ -185,6 +199,8 @@ public static class SparkApplicationSpecBuilder {
185199
private SparkPodSpec driver;
186200
private SparkPodSpec executor;
187201

202+
private Map<String, String> sparkConf;
203+
188204
private SparkApplicationSpecBuilder() {}
189205

190206
public SparkApplicationSpecBuilder type(String type) {
@@ -242,6 +258,22 @@ public SparkApplicationSpecBuilder executor(SparkPodSpec executor) {
242258
return this;
243259
}
244260

261+
public SparkApplicationSpecBuilder sparkConf(Map<String, String> sparkConf) {
262+
if (sparkConf == null || sparkConf.size() == 0) {
263+
return this;
264+
}
265+
266+
if (this.sparkConf == null) {
267+
this.sparkConf = new HashMap<>();
268+
}
269+
270+
for (Map.Entry<String, String> entry : sparkConf.entrySet()) {
271+
this.sparkConf.put(entry.getKey(), entry.getValue());
272+
}
273+
274+
return this;
275+
}
276+
245277
public SparkApplicationSpec build() {
246278
SparkApplicationSpec sparkApplicationSpec = new SparkApplicationSpec();
247279
sparkApplicationSpec.type = this.type;
@@ -255,6 +287,7 @@ public SparkApplicationSpec build() {
255287
sparkApplicationSpec.executor = this.executor;
256288
sparkApplicationSpec.image = this.image;
257289
sparkApplicationSpec.restartPolicy = this.restartPolicy;
290+
sparkApplicationSpec.sparkConf = this.sparkConf;
258291
return sparkApplicationSpec;
259292
}
260293
}

linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ object SparkConfiguration extends Logging {
6262
val SPARK_K8S_SPARK_VERSION = CommonVars[String]("linkis.spark.k8s.sparkVersion", "3.2.1")
6363
val SPARK_K8S_NAMESPACE = CommonVars[String]("linkis.spark.k8s.namespace", "default")
6464

65+
val SPARK_KUBERNETES_FILE_UPLOAD_PATH =
66+
CommonVars[String]("spark.kubernetes.file.upload.path", "local:///opt/spark/tmp")
67+
6568
val SPARK_PYTHON_VERSION = CommonVars[String]("spark.python.version", "python")
6669

6770
val SPARK_PYTHON_TEST_MODE_ENABLE =

linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
104104
sparkConfig.setK8sPassword(SPARK_K8S_PASSWORD.getValue(options))
105105
sparkConfig.setK8sImage(SPARK_K8S_IMAGE.getValue(options))
106106
sparkConfig.setK8sNamespace(SPARK_K8S_NAMESPACE.getValue(options))
107+
sparkConfig.setK8sFileUploadPath(SPARK_KUBERNETES_FILE_UPLOAD_PATH.getValue(options))
107108
sparkConfig.setK8sSparkVersion(SPARK_K8S_SPARK_VERSION.getValue(options))
108109
sparkConfig.setK8sRestartPolicy(SPARK_K8S_RESTART_POLICY.getValue(options))
109110
sparkConfig.setK8sLanguageType(SPARK_K8S_LANGUAGE_TYPE.getValue(options))

0 commit comments

Comments
 (0)