Skip to content

Commit 1603bdf

Browse files
authored
Modify the SparkMeasure feature to be disabled by default. (#5274)
* Modify the SparkMeasure feature to be disabled by default. * format code
1 parent 7215d4e commit 1603bdf

File tree

1 file changed

+19
-17
lines changed
  • linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor

1 file changed

+19
-17
lines changed

linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkSqlExecutor.scala

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -160,23 +160,25 @@ class SparkSqlExecutor(
160160
sparkEngineSession: SparkEngineSession,
161161
code: String
162162
): Option[SparkSqlMeasure] = {
163-
val sparkMeasureType = engineExecutionContext.getProperties
164-
.getOrDefault(SparkConfiguration.SPARKMEASURE_AGGREGATE_TYPE, "")
165-
.toString
166-
167-
if (sparkMeasureType.nonEmpty) {
168-
val outputPrefix = SparkConfiguration.SPARKMEASURE_OUTPUT_PREFIX.getValue(options)
169-
val outputPath = FsPath.getFsPath(
170-
outputPrefix,
171-
LabelUtil.getUserCreator(engineExecutionContext.getLabels.toList.asJava)._1,
172-
sparkMeasureType,
173-
JobUtils.getJobIdFromMap(engineExecutionContext.getProperties),
174-
new Date().getTime.toString
175-
)
176-
Some(new SparkSqlMeasure(sparkEngineSession.sparkSession, code, sparkMeasureType, outputPath))
177-
} else {
178-
None
179-
}
163+
Option(engineExecutionContext.getProperties.get(SparkConfiguration.SPARKMEASURE_AGGREGATE_TYPE))
164+
.map(_.toString)
165+
.flatMap { sparkMeasureType =>
166+
val userName = LabelUtil.getUserCreator(engineExecutionContext.getLabels.toList.asJava)._1
167+
val outputPrefix = SparkConfiguration.SPARKMEASURE_OUTPUT_PREFIX.getValue(options)
168+
val timestamp = System.currentTimeMillis().toString
169+
170+
val outputPath = FsPath.getFsPath(
171+
outputPrefix,
172+
userName,
173+
sparkMeasureType,
174+
JobUtils.getJobIdFromMap(engineExecutionContext.getProperties),
175+
timestamp
176+
)
177+
178+
Some(
179+
new SparkSqlMeasure(sparkEngineSession.sparkSession, code, sparkMeasureType, outputPath)
180+
)
181+
}
180182
}
181183

182184
override protected def getExecutorIdPreFix: String = "SparkSqlExecutor_"

0 commit comments

Comments
 (0)