Skip to content

Commit b19c11e

Browse files
committed
Remove join pre project metrics and fix ut
1 parent 65b321d commit b19c11e

File tree

9 files changed

+3
-97
lines changed

9 files changed

+3
-97
lines changed

backends-clickhouse/src/main/scala/org/apache/gluten/metrics/HashJoinMetricsUpdater.scala

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -31,24 +31,6 @@ class HashJoinMetricsUpdater(val metrics: Map[String, SQLMetric])
3131
var currentIdx = operatorMetrics.metricsList.size() - 1
3232
var totalTime = 0L
3333

34-
// build side pre projection
35-
if (joinParams.buildPreProjectionNeeded) {
36-
metrics("buildPreProjectionTime") +=
37-
(operatorMetrics.metricsList.get(currentIdx).time / 1000L).toLong
38-
metrics("outputVectors") += operatorMetrics.metricsList.get(currentIdx).outputVectors
39-
totalTime += operatorMetrics.metricsList.get(currentIdx).time
40-
currentIdx -= 1
41-
}
42-
43-
// stream side pre projection
44-
if (joinParams.streamPreProjectionNeeded) {
45-
metrics("streamPreProjectionTime") +=
46-
(operatorMetrics.metricsList.get(currentIdx).time / 1000L).toLong
47-
metrics("outputVectors") += operatorMetrics.metricsList.get(currentIdx).outputVectors
48-
totalTime += operatorMetrics.metricsList.get(currentIdx).time
49-
currentIdx -= 1
50-
}
51-
5234
// update fillingRightJoinSideTime
5335
MetricsUtil
5436
.getAllProcessorList(operatorMetrics.metricsList.get(currentIdx))

backends-clickhouse/src/main/scala/org/apache/gluten/metrics/SortMergeJoinMetricsUpdater.scala

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,24 +32,6 @@ class SortMergeJoinMetricsUpdater(val metrics: Map[String, SQLMetric])
3232
var currentIdx = operatorMetrics.metricsList.size() - 1
3333
var totalTime = 0L
3434

35-
// build side pre projection
36-
if (joinParams.buildPreProjectionNeeded) {
37-
metrics("buildPreProjectionTime") +=
38-
(operatorMetrics.metricsList.get(currentIdx).time / 1000L).toLong
39-
metrics("outputVectors") += operatorMetrics.metricsList.get(currentIdx).outputVectors
40-
totalTime += operatorMetrics.metricsList.get(currentIdx).time
41-
currentIdx -= 1
42-
}
43-
44-
// stream side pre projection
45-
if (joinParams.streamPreProjectionNeeded) {
46-
metrics("streamPreProjectionTime") +=
47-
(operatorMetrics.metricsList.get(currentIdx).time / 1000L).toLong
48-
metrics("outputVectors") += operatorMetrics.metricsList.get(currentIdx).outputVectors
49-
totalTime += operatorMetrics.metricsList.get(currentIdx).time
50-
currentIdx -= 1
51-
}
52-
5335
// update fillingRightJoinSideTime
5436
MetricsUtil
5537
.getAllProcessorList(operatorMetrics.metricsList.get(currentIdx))

backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ class GlutenClickHouseTPCHBucketSuite
7575
plans(3)
7676
.asInstanceOf[HashJoinLikeExecTransformer]
7777
.right
78-
.isInstanceOf[InputIteratorTransformer])
78+
.isInstanceOf[ProjectExecTransformer])
7979

8080
// Check the bucket join
8181
assert(

backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCDSMetricsSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ class GlutenClickHouseTPCDSMetricsSuite extends GlutenClickHouseTPCDSAbstractSui
9090
case g: GlutenPlan if !g.isInstanceOf[InputIteratorTransformer] => g
9191
}
9292

93-
assert(allGlutenPlans.size == 30)
93+
assert(allGlutenPlans.size == 34)
9494

9595
val windowPlan0 = allGlutenPlans(3)
9696
assert(windowPlan0.metrics("totalTime").value == 2)

backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,6 @@ class HashJoinMetricsUpdater(override val metrics: Map[String, SQLMetric])
101101

102102
val bloomFilterBlocksByteSize: SQLMetric = metrics("bloomFilterBlocksByteSize")
103103

104-
val streamPreProjectionCpuCount: SQLMetric = metrics("streamPreProjectionCpuCount")
105-
val streamPreProjectionWallNanos: SQLMetric = metrics("streamPreProjectionWallNanos")
106-
107-
val buildPreProjectionCpuCount: SQLMetric = metrics("buildPreProjectionCpuCount")
108-
val buildPreProjectionWallNanos: SQLMetric = metrics("buildPreProjectionWallNanos")
109-
110104
val loadLazyVectorTime: SQLMetric = metrics("loadLazyVectorTime")
111105

112106
override protected def updateJoinMetricsInternal(
@@ -148,17 +142,6 @@ class HashJoinMetricsUpdater(override val metrics: Map[String, SQLMetric])
148142
hashBuildSpilledFiles += hashBuildMetrics.spilledFiles
149143
idx += 1
150144

151-
if (joinParams.buildPreProjectionNeeded) {
152-
buildPreProjectionCpuCount += joinMetrics.get(idx).cpuCount
153-
buildPreProjectionWallNanos += joinMetrics.get(idx).wallNanos
154-
idx += 1
155-
}
156-
157-
if (joinParams.streamPreProjectionNeeded) {
158-
streamPreProjectionCpuCount += joinMetrics.get(idx).cpuCount
159-
streamPreProjectionWallNanos += joinMetrics.get(idx).wallNanos
160-
idx += 1
161-
}
162145
if (TaskResources.inSparkTask()) {
163146
SparkMetricsUtil.incMemoryBytesSpilled(
164147
TaskResources.getLocalTaskContext().taskMetrics(),
@@ -185,11 +168,6 @@ class SortMergeJoinMetricsUpdater(override val metrics: Map[String, SQLMetric])
185168
val peakMemoryBytes: SQLMetric = metrics("peakMemoryBytes")
186169
val numMemoryAllocations: SQLMetric = metrics("numMemoryAllocations")
187170

188-
val streamPreProjectionCpuCount: SQLMetric = metrics("streamPreProjectionCpuCount")
189-
val streamPreProjectionWallNanos: SQLMetric = metrics("streamPreProjectionWallNanos")
190-
val bufferPreProjectionCpuCount: SQLMetric = metrics("bufferPreProjectionCpuCount")
191-
val bufferPreProjectionWallNanos: SQLMetric = metrics("bufferPreProjectionWallNanos")
192-
193171
override protected def updateJoinMetricsInternal(
194172
joinMetrics: util.ArrayList[OperatorMetrics],
195173
joinParams: JoinParams): Unit = {
@@ -200,17 +178,5 @@ class SortMergeJoinMetricsUpdater(override val metrics: Map[String, SQLMetric])
200178
peakMemoryBytes += smjMetrics.peakMemoryBytes
201179
numMemoryAllocations += smjMetrics.numMemoryAllocations
202180
idx += 1
203-
204-
if (joinParams.buildPreProjectionNeeded) {
205-
bufferPreProjectionCpuCount += joinMetrics.get(idx).cpuCount
206-
bufferPreProjectionWallNanos += joinMetrics.get(idx).wallNanos
207-
idx += 1
208-
}
209-
210-
if (joinParams.streamPreProjectionNeeded) {
211-
streamPreProjectionCpuCount += joinMetrics.get(idx).cpuCount
212-
streamPreProjectionWallNanos += joinMetrics.get(idx).wallNanos
213-
idx += 1
214-
}
215181
}
216182
}

gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -231,13 +231,6 @@ trait HashJoinLikeExecTransformer extends BaseJoinExec with TransformSupport {
231231
val operatorId = context.nextOperatorId(this.nodeName)
232232

233233
val joinParams = new JoinParams
234-
if (JoinUtils.preProjectionNeeded(streamedKeyExprs)) {
235-
joinParams.streamPreProjectionNeeded = true
236-
}
237-
if (JoinUtils.preProjectionNeeded(buildKeyExprs)) {
238-
joinParams.buildPreProjectionNeeded = true
239-
}
240-
241234
if (condition.isDefined) {
242235
joinParams.isWithCondition = true
243236
}

gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinUtils.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.gluten.substrait.extensions.{AdvancedExtensionNode, ExtensionB
2424
import org.apache.gluten.substrait.rel.{RelBuilder, RelNode}
2525
import org.apache.gluten.utils.SubstraitUtil
2626

27-
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
27+
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
2828
import org.apache.spark.sql.catalyst.plans._
2929

3030
import com.google.protobuf.Any
@@ -43,10 +43,6 @@ object JoinUtils {
4343
}
4444
}
4545

46-
def preProjectionNeeded(keyExprs: Seq[Expression]): Boolean = {
47-
!keyExprs.forall(_.isInstanceOf[AttributeReference])
48-
}
49-
5046
private def createJoinExtensionNode(
5147
joinParameters: Any,
5248
output: Seq[Attribute]): AdvancedExtensionNode = {

gluten-substrait/src/main/scala/org/apache/gluten/execution/SortMergeJoinExecTransformer.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -198,13 +198,6 @@ abstract class SortMergeJoinExecTransformerBase(
198198
val operatorId = context.nextOperatorId(this.nodeName)
199199

200200
val joinParams = new JoinParams
201-
if (JoinUtils.preProjectionNeeded(leftKeys)) {
202-
joinParams.streamPreProjectionNeeded = true
203-
}
204-
if (JoinUtils.preProjectionNeeded(rightKeys)) {
205-
joinParams.buildPreProjectionNeeded = true
206-
}
207-
208201
val joinRel = JoinUtils.createJoinRel(
209202
streamedKeys,
210203
bufferedKeys,

gluten-substrait/src/main/scala/org/apache/gluten/substrait/SubstraitContext.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,6 @@ import java.security.InvalidParameterException
2121
import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList, Map => JMap}
2222

2323
case class JoinParams() {
24-
// Whether preProjection is needed in streamed side.
25-
var streamPreProjectionNeeded = false
26-
27-
// Whether preProjection is needed in build side.
28-
var buildPreProjectionNeeded = false
29-
3024
// Whether postProjection is needed after Join.
3125
var postProjectionNeeded = true
3226

0 commit comments

Comments
 (0)