Skip to content

Commit c858cfc

Browse files
committed
cleanup
1 parent 9914364 commit c858cfc

File tree

10 files changed

+38
-47
lines changed

10 files changed

+38
-47
lines changed

src/main/scala/ldbc/snb/datagen/factors/FactorGenerationStage.scala

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ package ldbc.snb.datagen.factors
33
import ldbc.snb.datagen.factors.io.FactorTableSink
44
import ldbc.snb.datagen.io.graphs.GraphSource
55
import ldbc.snb.datagen.model.{EntityType, graphs}
6-
import ldbc.snb.datagen.{SparkApp, model}
6+
import ldbc.snb.datagen.model
77
import ldbc.snb.datagen.syntax._
8-
import ldbc.snb.datagen.util.Logging
8+
import ldbc.snb.datagen.util.{DatagenStage, Logging}
99
import org.apache.spark.sql.functions.{broadcast, count, date_trunc, sum}
1010
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
1111

@@ -14,8 +14,7 @@ case class Factor(requiredEntities: EntityType*)(f: Seq[DataFrame] => DataFrame)
1414
override def apply(v1: Seq[DataFrame]): DataFrame = f(v1)
1515
}
1616

17-
object FactorGenerationStage extends SparkApp with Logging {
18-
override def appName: String = "LDBC SNB Datagen for Spark: Factor Generation Stage"
17+
object FactorGenerationStage extends DatagenStage with Logging {
1918

2019
case class Args(outputDir: String = "out")
2120

@@ -55,8 +54,8 @@ object FactorGenerationStage extends SparkApp with Logging {
5554

5655
private val rawFactors = Map(
5756
"countryNumPersons" -> Factor(Place, Person) { case Seq(places, persons) =>
58-
val cities = places.where($"type" === "City")
59-
val countries = places.where($"type" === "Country")
57+
val cities = places.where($"type" === "City").cache()
58+
val countries = places.where($"type" === "Country").cache()
6059

6160
frequency(
6261
persons.as("Person")
@@ -78,8 +77,8 @@ object FactorGenerationStage extends SparkApp with Logging {
7877

7978
frequency(
8079
personKnowsPerson.alias("Knows")
81-
.join(persons.as("Person1"), $"Person1.id" === $"Knows.Person1Id")
82-
.join(cities.as("City1"), $"City1.id" === "Person1.LocationCityId")
80+
.join(persons.cache().as("Person1"), $"Person1.id" === $"Knows.Person1Id")
81+
.join(cities.cache().as("City1"), $"City1.id" === "Person1.LocationCityId")
8382
.join(persons.as("Person2"), $"Person2.id" === $"Knows.Person2Id")
8483
.join(cities.as("City2"), $"City2.id" === "Person2.LocationCityId")
8584
.where($"City1.id" < $"City2.id"),
@@ -99,9 +98,9 @@ object FactorGenerationStage extends SparkApp with Logging {
9998

10099
frequency(
101100
personKnowsPerson.alias("Knows")
102-
.join(persons.as("Person1"), $"Person1.id" === $"Knows.Person1Id")
103-
.join(cities.as("City1"), $"City1.id" === "Person1.LocationCityId")
104-
.join(countries.as("Country1"), $"Country1.id" === "City1.PartOfPlaceId")
101+
.join(persons.cache().as("Person1"), $"Person1.id" === $"Knows.Person1Id")
102+
.join(cities.cache().as("City1"), $"City1.id" === "Person1.LocationCityId")
103+
.join(countries.cache().as("Country1"), $"Country1.id" === "City1.PartOfPlaceId")
105104
.join(persons.as("Person2"), $"Person2.id" === $"Knows.Person2Id")
106105
.join(cities.as("City2"), $"City2.id" === "Person2.LocationCityId")
107106
.join(countries.as("Country2"), $"Country2.id" === "City2.PartOfPlaceId")
@@ -155,7 +154,7 @@ object FactorGenerationStage extends SparkApp with Logging {
155154
)
156155
},
157156
"companiesNumEmployees" -> Factor(Organisation, PersonWorkAtCompany) { case Seq(organisation, workAt) =>
158-
val company = organisation.where($"Type" === "Company")
157+
val company = organisation.where($"Type" === "Company").cache()
159158
frequency(
160159
company.as("Company").join(workAt.as("WorkAt"), $"WorkAt.CompanyId" === $"Company.id"),
161160
value = $"WorkAt.PersonId",

src/main/scala/ldbc/snb/datagen/generation/GenerationStage.scala

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,16 @@
11
package ldbc.snb.datagen.generation
22

3-
import ldbc.snb.datagen.{DatagenParams, SparkApp}
3+
import ldbc.snb.datagen.DatagenParams
44
import ldbc.snb.datagen.generation.generator.{SparkKnowsGenerator, SparkKnowsMerger, SparkPersonGenerator, SparkRanker}
55
import ldbc.snb.datagen.generation.serializer.{SparkActivitySerializer, SparkPersonSerializer, SparkStaticGraphSerializer}
66
import ldbc.snb.datagen.syntax._
7-
import ldbc.snb.datagen.util.{ConfigParser, GeneratorConfiguration, Logging, SparkUI}
8-
import ldbc.snb.datagen.util.Utils.simpleNameOf
7+
import ldbc.snb.datagen.util.{ConfigParser, DatagenStage, GeneratorConfiguration, Logging, SparkUI, simpleNameOf}
98
import org.apache.hadoop.fs.{FileSystem, Path}
109
import org.apache.spark.sql.SparkSession
1110

1211
import java.net.URI
1312

14-
object GenerationStage extends SparkApp with Logging {
15-
override def appName: String = "LDBC SNB Datagen for Spark: Generation Stage"
16-
13+
object GenerationStage extends DatagenStage with Logging {
1714
val optimalPersonsPerFile = 500000
1815

1916
case class Args(
@@ -51,7 +48,7 @@ object GenerationStage extends SparkApp with Logging {
5148
SparkActivitySerializer(merged, randomRanker, config, Some(numPartitions), oversizeFactor)
5249
}
5350

54-
SparkUI.job(simpleNameOf[SparkPersonSerializer.type ], "serialize persons") {
51+
SparkUI.job(simpleNameOf[SparkPersonSerializer.type], "serialize persons") {
5552
SparkPersonSerializer(merged, config, Some(numPartitions), oversizeFactor)
5653
}
5754

src/main/scala/ldbc/snb/datagen/generation/serializer/SparkActivitySerializer.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,12 @@ import ldbc.snb.datagen.serializer.{DynamicActivitySerializer, PersonActivityExp
66
import ldbc.snb.datagen.generation.generator.SparkRanker
77
import ldbc.snb.datagen.util.{GeneratorConfiguration, SerializableConfiguration}
88
import ldbc.snb.datagen.syntax._
9-
import ldbc.snb.datagen.util.formatter.DateFormatter
109
import ldbc.snb.datagen.{DatagenContext, DatagenParams}
1110
import org.apache.hadoop.fs.{FileSystem, Path}
1211
import org.apache.spark.TaskContext
1312
import org.apache.spark.rdd.RDD
1413
import org.apache.spark.sql.SparkSession
1514

16-
import java.nio.charset.StandardCharsets
1715
import java.util
1816
import java.util.function.Consumer
1917
import scala.collection.JavaConverters._

src/main/scala/ldbc/snb/datagen/io/graphs.scala

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ object graphs {
7979

8080

8181
private final class GraphWriter[M <: Mode](implicit
82-
`==`: M#Layout =:= DataFrame
82+
the: M#Layout =:= DataFrame
8383
) extends Writer[GraphSink] with Logging with GraphWriterMixin {
8484

8585
override type CoRet = Graph[M]
@@ -93,7 +93,7 @@ object graphs {
9393
val p = (sink.path / sink.format / PathComponent[GraphLike[M]].path(self) / tpe.entityPath).toString()
9494
log.info(s"$tpe: Writing started")
9595
val opts = getFormatOptions(sink.format, self.mode)
96-
`==`(dataset).write(DataFrameSink(p, sink.format, opts, SaveMode.Ignore))
96+
the(dataset).write(DataFrameSink(p, sink.format, opts, SaveMode.Ignore))
9797
log.info(s"$tpe: Writing completed")
9898
}(dataset.sparkSession)
9999
}
@@ -135,20 +135,16 @@ object graphs {
135135
}
136136

137137
trait WriterInstances {
138-
implicit def dataFrameGraphWriter[M <: Mode]
138+
implicit def graphWriter[M <: Mode]
139139
(implicit ev: M#Layout =:= DataFrame): Writer.Aux[GraphSink, Graph[M]] = new GraphWriter[M]
140140

141-
implicit def batchedDataFrameGraphWriter[M <: Mode]
141+
implicit def batchedGraphWriter[M <: Mode]
142142
(implicit ev: M#Layout =:= BatchedEntity): Writer.Aux[GraphSink, Graph[M]] = new BatchedGraphWriter[M]
143143
}
144144

145-
case class GraphSource[M <: Mode](
146-
definition: GraphDef[M],
147-
path: String,
148-
format: String
149-
)
145+
case class GraphSource[M <: Mode](definition: GraphDef[M], path: String, format: String)
150146

151-
private final class DataFrameGraphReader[M <: Mode](implicit spark: SparkSession, ev: DataFrame =:= M#Layout)
147+
private final class GraphReader[M <: Mode](implicit spark: SparkSession, ev: DataFrame =:= M#Layout)
152148
extends Reader[GraphSource[M]]
153149
with Logging {
154150
override type Ret = Graph[M]
@@ -188,9 +184,9 @@ object graphs {
188184
}
189185

190186
trait ReaderInstances {
191-
implicit def dataFrameGraphReader[M <: Mode]
187+
implicit def graphReader[M <: Mode]
192188
(implicit spark: SparkSession, ev: DataFrame =:= M#Layout): Reader.Aux[GraphSource[M], Graph[M]] =
193-
new DataFrameGraphReader[M]
189+
new GraphReader[M]
194190
}
195191

196192
trait Instances

src/main/scala/ldbc/snb/datagen/model/package.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package ldbc.snb.datagen
22

33
import ldbc.snb.datagen.syntax._
4-
import ldbc.snb.datagen.util.Utils.camel
4+
import ldbc.snb.datagen.util.camel
55
import org.apache.spark.sql.{Column, DataFrame}
66

77
import scala.language.higherKinds

src/main/scala/ldbc/snb/datagen/spark/LdbcDatagen.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@ package ldbc.snb.datagen.spark
22

33
import ldbc.snb.datagen.dictionary.Dictionaries
44
import ldbc.snb.datagen.factors.FactorGenerationStage
5-
import ldbc.snb.datagen.{DatagenContext, SparkApp}
5+
import ldbc.snb.datagen.DatagenContext
66
import ldbc.snb.datagen.generation.GenerationStage
77
import ldbc.snb.datagen.transformation.TransformationStage
88
import ldbc.snb.datagen.model.Mode
9-
import ldbc.snb.datagen.util.Utils.lower
9+
import ldbc.snb.datagen.util.{SparkApp, lower}
1010
import shapeless.lens
1111

1212

src/main/scala/ldbc/snb/datagen/transformation/TransformationStage.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,15 @@
11
package ldbc.snb.datagen.transformation
22

33
import ldbc.snb.datagen.io.graphs.{GraphSink, GraphSource}
4-
import ldbc.snb.datagen.{SparkApp, model}
4+
import ldbc.snb.datagen.model
55
import ldbc.snb.datagen.model.{BatchedEntity, Graph, Mode}
66
import ldbc.snb.datagen.syntax._
77
import ldbc.snb.datagen.transformation.transform.{ExplodeAttrs, ExplodeEdges, RawToBiTransform, RawToInteractiveTransform}
8-
import ldbc.snb.datagen.util.Logging
8+
import ldbc.snb.datagen.util.{DatagenStage, Logging}
99
import org.apache.spark.sql.{DataFrame, SparkSession}
1010
import shapeless._
1111

12-
object TransformationStage extends SparkApp with Logging {
13-
override def appName: String = "LDBC SNB Datagen for Spark: TransformationStage"
14-
12+
object TransformationStage extends DatagenStage with Logging {
1513
case class Args(
1614
outputDir: String = "out",
1715
explodeEdges: Boolean = false,

src/main/scala/ldbc/snb/datagen/util/SerializableConfiguration.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@ import java.io.{ObjectInputStream, ObjectOutputStream}
55
import org.apache.hadoop.conf.Configuration
66

77
class SerializableConfiguration(@transient var value: Configuration) extends Serializable {
8-
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
8+
private def writeObject(out: ObjectOutputStream): Unit = tryOrThrowIOException {
99
out.defaultWriteObject()
1010
value.write(out)
1111
}
1212

13-
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
13+
private def readObject(in: ObjectInputStream): Unit = tryOrThrowIOException {
1414
value = new Configuration(false)
1515
value.readFields(in)
1616
}

src/main/scala/ldbc/snb/datagen/SparkApp.scala renamed to src/main/scala/ldbc/snb/datagen/util/SparkApp.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package ldbc.snb.datagen
1+
package ldbc.snb.datagen.util
22

33
import ldbc.snb.datagen.syntax._
44
import org.apache.spark.sql.SparkSession
@@ -18,5 +18,8 @@ trait SparkApp {
1818
def defaultSparkConf: Map[String, String] = Map(
1919
"spark.sql.session.timeZone" -> "GMT"
2020
)
21+
}
2122

23+
trait DatagenStage extends SparkApp {
24+
override val appName: String = s"LDBC SNB Datagen for Spark: ${this.getClass.getSimpleName.stripSuffix("$")}"
2225
}

src/main/scala/ldbc/snb/datagen/util/Utils.scala renamed to src/main/scala/ldbc/snb/datagen/util/package.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package ldbc.snb.datagen.util
1+
package ldbc.snb.datagen
22

33
import com.google.common.base.CaseFormat
44

@@ -7,8 +7,8 @@ import java.util.function.IntFunction
77
import scala.reflect.ClassTag
88
import scala.util.control.NonFatal
99

10-
object Utils {
11-
def tryOrIOException[T](block: => T): T = {
10+
package object util {
11+
def tryOrThrowIOException[T](block: => T): T = {
1212
try {
1313
block
1414
} catch {

0 commit comments

Comments
 (0)