Skip to content

Commit d183c81

Browse files
authored
Merge pull request #344 from s22s/fix/343
Fix for SparkML memory pressure issues
2 parents 5367d2e + dba27d1 commit d183c81

File tree

18 files changed

+335
-52
lines changed

18 files changed

+335
-52
lines changed
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
[
2+
{
3+
"jmhVersion" : "1.21",
4+
"benchmark" : "org.locationtech.rasterframes.bench.CellTypeBench.fromRow",
5+
"mode" : "avgt",
6+
"threads" : 1,
7+
"forks" : 1,
8+
"jvm" : "/Library/Java/JavaVirtualMachines/jdk1.8.0_171.jdk/Contents/Home/jre/bin/java",
9+
"jvmArgs" : [
10+
"-Xmx4g"
11+
],
12+
"jdkVersion" : "1.8.0_171",
13+
"vmName" : "Java HotSpot(TM) 64-Bit Server VM",
14+
"vmVersion" : "25.171-b11",
15+
"warmupIterations" : 8,
16+
"warmupTime" : "10 s",
17+
"warmupBatchSize" : 1,
18+
"measurementIterations" : 5,
19+
"measurementTime" : "10 s",
20+
"measurementBatchSize" : 1,
21+
"primaryMetric" : {
22+
"score" : 2.835836726694286,
23+
"scoreError" : 0.19270422040266105,
24+
"scoreConfidence" : [
25+
2.643132506291625,
26+
3.028540947096947
27+
],
28+
"scorePercentiles" : {
29+
"0.0" : 2.795428349144724,
30+
"50.0" : 2.833201108807787,
31+
"90.0" : 2.918007291796378,
32+
"95.0" : 2.918007291796378,
33+
"99.0" : 2.918007291796378,
34+
"99.9" : 2.918007291796378,
35+
"99.99" : 2.918007291796378,
36+
"99.999" : 2.918007291796378,
37+
"99.9999" : 2.918007291796378,
38+
"100.0" : 2.918007291796378
39+
},
40+
"scoreUnit" : "us/op",
41+
"rawData" : [
42+
[
43+
2.795428349144724,
44+
2.79552918770274,
45+
2.837017696019802,
46+
2.918007291796378,
47+
2.833201108807787
48+
]
49+
]
50+
},
51+
"secondaryMetrics" : {
52+
}
53+
},
54+
{
55+
"jmhVersion" : "1.21",
56+
"benchmark" : "org.locationtech.rasterframes.bench.CellTypeBench.intoRow",
57+
"mode" : "avgt",
58+
"threads" : 1,
59+
"forks" : 1,
60+
"jvm" : "/Library/Java/JavaVirtualMachines/jdk1.8.0_171.jdk/Contents/Home/jre/bin/java",
61+
"jvmArgs" : [
62+
"-Xmx4g"
63+
],
64+
"jdkVersion" : "1.8.0_171",
65+
"vmName" : "Java HotSpot(TM) 64-Bit Server VM",
66+
"vmVersion" : "25.171-b11",
67+
"warmupIterations" : 8,
68+
"warmupTime" : "10 s",
69+
"warmupBatchSize" : 1,
70+
"measurementIterations" : 5,
71+
"measurementTime" : "10 s",
72+
"measurementBatchSize" : 1,
73+
"primaryMetric" : {
74+
"score" : 0.35239272354175755,
75+
"scoreError" : 0.007379508933042929,
76+
"scoreConfidence" : [
77+
0.3450132146087146,
78+
0.3597722324748005
79+
],
80+
"scorePercentiles" : {
81+
"0.0" : 0.34961161284617265,
82+
"50.0" : 0.3525031701325926,
83+
"90.0" : 0.3550145438728285,
84+
"95.0" : 0.3550145438728285,
85+
"99.0" : 0.3550145438728285,
86+
"99.9" : 0.3550145438728285,
87+
"99.99" : 0.3550145438728285,
88+
"99.999" : 0.3550145438728285,
89+
"99.9999" : 0.3550145438728285,
90+
"100.0" : 0.3550145438728285
91+
},
92+
"scoreUnit" : "us/op",
93+
"rawData" : [
94+
[
95+
0.3550145438728285,
96+
0.3522314263912252,
97+
0.3525031701325926,
98+
0.34961161284617265,
99+
0.3526028644659687
100+
]
101+
]
102+
},
103+
"secondaryMetrics" : {
104+
}
105+
}
106+
]
107+
108+
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
[
2+
{
3+
"jmhVersion" : "1.21",
4+
"benchmark" : "org.locationtech.rasterframes.bench.CellTypeBench.fromRow",
5+
"mode" : "avgt",
6+
"threads" : 1,
7+
"forks" : 1,
8+
"jvm" : "/Library/Java/JavaVirtualMachines/jdk1.8.0_171.jdk/Contents/Home/jre/bin/java",
9+
"jvmArgs" : [
10+
"-Xmx4g"
11+
],
12+
"jdkVersion" : "1.8.0_171",
13+
"vmName" : "Java HotSpot(TM) 64-Bit Server VM",
14+
"vmVersion" : "25.171-b11",
15+
"warmupIterations" : 8,
16+
"warmupTime" : "10 s",
17+
"warmupBatchSize" : 1,
18+
"measurementIterations" : 5,
19+
"measurementTime" : "10 s",
20+
"measurementBatchSize" : 1,
21+
"primaryMetric" : {
22+
"score" : 0.09014145135858256,
23+
"scoreError" : 0.002394012003996672,
24+
"scoreConfidence" : [
25+
0.0877474393545859,
26+
0.09253546336257923
27+
],
28+
"scorePercentiles" : {
29+
"0.0" : 0.08936698388038906,
30+
"50.0" : 0.08989743516282808,
31+
"90.0" : 0.09083590280173164,
32+
"95.0" : 0.09083590280173164,
33+
"99.0" : 0.09083590280173164,
34+
"99.9" : 0.09083590280173164,
35+
"99.99" : 0.09083590280173164,
36+
"99.999" : 0.09083590280173164,
37+
"99.9999" : 0.09083590280173164,
38+
"100.0" : 0.09083590280173164
39+
},
40+
"scoreUnit" : "us/op",
41+
"rawData" : [
42+
[
43+
0.08989743516282808,
44+
0.09072300570225225,
45+
0.08988392924571173,
46+
0.09083590280173164,
47+
0.08936698388038906
48+
]
49+
]
50+
},
51+
"secondaryMetrics" : {
52+
}
53+
},
54+
{
55+
"jmhVersion" : "1.21",
56+
"benchmark" : "org.locationtech.rasterframes.bench.CellTypeBench.intoRow",
57+
"mode" : "avgt",
58+
"threads" : 1,
59+
"forks" : 1,
60+
"jvm" : "/Library/Java/JavaVirtualMachines/jdk1.8.0_171.jdk/Contents/Home/jre/bin/java",
61+
"jvmArgs" : [
62+
"-Xmx4g"
63+
],
64+
"jdkVersion" : "1.8.0_171",
65+
"vmName" : "Java HotSpot(TM) 64-Bit Server VM",
66+
"vmVersion" : "25.171-b11",
67+
"warmupIterations" : 8,
68+
"warmupTime" : "10 s",
69+
"warmupBatchSize" : 1,
70+
"measurementIterations" : 5,
71+
"measurementTime" : "10 s",
72+
"measurementBatchSize" : 1,
73+
"primaryMetric" : {
74+
"score" : 0.0804223243178353,
75+
"scoreError" : 0.003232629425106684,
76+
"scoreConfidence" : [
77+
0.07718969489272862,
78+
0.08365495374294199
79+
],
80+
"scorePercentiles" : {
81+
"0.0" : 0.07979611499160112,
82+
"50.0" : 0.08015109899040546,
83+
"90.0" : 0.08189719727590557,
84+
"95.0" : 0.08189719727590557,
85+
"99.0" : 0.08189719727590557,
86+
"99.9" : 0.08189719727590557,
87+
"99.99" : 0.08189719727590557,
88+
"99.999" : 0.08189719727590557,
89+
"99.9999" : 0.08189719727590557,
90+
"100.0" : 0.08189719727590557
91+
},
92+
"scoreUnit" : "us/op",
93+
"rawData" : [
94+
[
95+
0.08189719727590557,
96+
0.08020909063591224,
97+
0.07979611499160112,
98+
0.08005811969535213,
99+
0.08015109899040546
100+
]
101+
]
102+
},
103+
"secondaryMetrics" : {
104+
}
105+
}
106+
]
107+
108+
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* This software is licensed under the Apache 2 license, quoted below.
3+
*
4+
* Copyright 2019 Astraea, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
7+
* use this file except in compliance with the License. You may obtain a copy of
8+
* the License at
9+
*
10+
* [http://www.apache.org/licenses/LICENSE-2.0]
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15+
* License for the specific language governing permissions and limitations under
16+
* the License.
17+
*
18+
* SPDX-License-Identifier: Apache-2.0
19+
*
20+
*/
21+
22+
package org.locationtech.rasterframes.bench
23+
import java.util.concurrent.TimeUnit
24+
25+
import geotrellis.raster.{CellType, DoubleUserDefinedNoDataCellType, IntUserDefinedNoDataCellType}
26+
import org.apache.spark.sql.catalyst.InternalRow
27+
import org.locationtech.rasterframes.encoders.CatalystSerializer._
28+
import org.openjdk.jmh.annotations._
29+
30+
@BenchmarkMode(Array(Mode.AverageTime))
31+
@State(Scope.Benchmark)
32+
@OutputTimeUnit(TimeUnit.MICROSECONDS)
33+
class CellTypeBench {
34+
var row: InternalRow = _
35+
var ct: CellType = _
36+
@Setup(Level.Trial)
37+
def setupData(): Unit = {
38+
ct = IntUserDefinedNoDataCellType(scala.util.Random.nextInt())
39+
val o: CellType = DoubleUserDefinedNoDataCellType(scala.util.Random.nextDouble())
40+
row = o.toInternalRow
41+
}
42+
43+
@Benchmark
44+
def fromRow(): CellType = {
45+
row.to[CellType]
46+
}
47+
48+
@Benchmark
49+
def intoRow(): InternalRow = {
50+
ct.toInternalRow
51+
}
52+
}

bench/src/main/scala/org/locationtech/rasterframes/bench/TileExplodeBench.scala

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@ package org.locationtech.rasterframes.bench
2222

2323
import java.util.concurrent.TimeUnit
2424

25+
import org.apache.spark.sql.catalyst.InternalRow
26+
import org.apache.spark.sql.catalyst.expressions.BoundReference
27+
import org.apache.spark.sql.rf.TileUDT
2528
import org.locationtech.rasterframes._
26-
import org.apache.spark.sql._
27-
import org.apache.spark.sql.functions._
29+
import org.locationtech.rasterframes.expressions.generators.ExplodeTiles
2830
import org.openjdk.jmh.annotations._
29-
3031
/**
3132
*
3233
* @author sfitch
@@ -36,33 +37,32 @@ import org.openjdk.jmh.annotations._
3637
@State(Scope.Benchmark)
3738
@OutputTimeUnit(TimeUnit.MILLISECONDS)
3839
class TileExplodeBench extends SparkEnv {
39-
import spark.implicits._
4040

41-
@Param(Array("uint8", "uint16ud255", "float32", "float64"))
41+
//@Param(Array("uint8", "uint16ud255", "float32", "float64"))
42+
@Param(Array("uint16ud255"))
4243
var cellTypeName: String = _
4344

4445
@Param(Array("256"))
4546
var tileSize: Int = _
4647

47-
@Param(Array("100"))
48+
@Param(Array("2000"))
4849
var numTiles: Int = _
4950

5051
@transient
51-
var tiles: DataFrame = _
52+
var tiles: Array[InternalRow] = _
53+
54+
var exploder: ExplodeTiles = _
5255

5356
@Setup(Level.Trial)
5457
def setupData(): Unit = {
55-
tiles = Seq.fill(numTiles)(randomTile(tileSize, tileSize, cellTypeName))
56-
.toDF("tile").repartition(10)
57-
}
58-
59-
@Benchmark
60-
def arrayExplode() = {
61-
tiles.select(posexplode(rf_tile_to_array_double($"tile"))).count()
58+
tiles = Array.fill(numTiles)(randomTile(tileSize, tileSize, cellTypeName))
59+
.map(t => InternalRow(TileUDT.tileSerializer.toInternalRow(t)))
60+
val expr = BoundReference(0, TileType, true)
61+
exploder = new ExplodeTiles(1.0, None, Seq(expr))
6262
}
63-
6463
@Benchmark
6564
def tileExplode() = {
66-
tiles.select(rf_explode_tiles($"tile")).count()
65+
for(t <- tiles)
66+
exploder.eval(t)
6767
}
6868
}

core/src/main/scala/org/apache/spark/sql/rf/RasterSourceUDT.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ object RasterSourceUDT {
7373

7474
implicit val rasterSourceSerializer: CatalystSerializer[RasterSource] = new CatalystSerializer[RasterSource] {
7575

76-
override def schema: StructType = StructType(Seq(
76+
override val schema: StructType = StructType(Seq(
7777
StructField("raster_source_kryo", BinaryType, false)
7878
))
7979

core/src/main/scala/org/apache/spark/sql/rf/TileUDT.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ case object TileUDT {
7474

7575
implicit def tileSerializer: CatalystSerializer[Tile] = new CatalystSerializer[Tile] {
7676

77-
override def schema: StructType = StructType(Seq(
77+
override val schema: StructType = StructType(Seq(
7878
StructField("cell_context", schemaOf[TileDataContext], false),
7979
StructField("cell_data", schemaOf[Cells], false)
8080
))

0 commit comments

Comments
 (0)