Skip to content

Commit ddaa53b

Browse files
authored
Merge pull request #342 from ldbc/apply-scalafmt
2 parents 576b5a6 + e30ee69 commit ddaa53b

37 files changed

+380
-378
lines changed

.scalafmt.conf

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
version = 3.1.2
2+
runner.dialect = scala212
3+
align.preset = more
4+
maxColumn = 160

project/plugins.sbt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
addSbtPlugin("com.typesafe.sbt" % "sbt-pom-reader" % "2.1.0")
22
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.15.0")
3+
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.3")

src/main/scala/ldbc/snb/datagen/factors/FactorGenerationStage.scala

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,7 @@ import ldbc.snb.datagen.util.{DatagenStage, Logging}
99
import org.apache.spark.sql.functions.{broadcast, count, date_trunc, sum}
1010
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
1111

12-
13-
case class Factor(requiredEntities: EntityType*)(f: Seq[DataFrame] => DataFrame) extends (Seq[DataFrame] => DataFrame) {
12+
case class Factor(requiredEntities: EntityType*)(f: Seq[DataFrame] => DataFrame) extends (Seq[DataFrame] => DataFrame) {
1413
override def apply(v1: Seq[DataFrame]): DataFrame = f(v1)
1514
}
1615

@@ -24,22 +23,23 @@ object FactorGenerationStage extends DatagenStage with Logging {
2423
import ldbc.snb.datagen.io.Writer.ops._
2524
import ldbc.snb.datagen.io.instances._
2625

27-
GraphSource(model.graphs.Raw.graphDef, args.outputDir, "csv")
28-
.read
29-
.pipe(g => rawFactors.map { case (name, calc) =>
30-
val resolvedEntities = calc.requiredEntities.foldLeft(Seq.empty[DataFrame])((args, et) => args :+ g.entities(et))
31-
FactorTable(name, calc(resolvedEntities), g)
32-
})
26+
GraphSource(model.graphs.Raw.graphDef, args.outputDir, "csv").read
27+
.pipe(g =>
28+
rawFactors.map { case (name, calc) =>
29+
val resolvedEntities = calc.requiredEntities.foldLeft(Seq.empty[DataFrame])((args, et) => args :+ g.entities(et))
30+
FactorTable(name, calc(resolvedEntities), g)
31+
}
32+
)
3333
.foreach(_.write(FactorTableSink(args.outputDir)))
3434
}
3535

3636
private def frequency(df: DataFrame, value: Column, by: Seq[Column], agg: Column => Column = count) =
3737
df
38-
.groupBy(by: _*).agg(agg(value).as("frequency"))
38+
.groupBy(by: _*)
39+
.agg(agg(value).as("frequency"))
3940
.select(by :+ $"frequency": _*)
4041
.orderBy($"frequency".desc +: by.map(_.asc): _*)
4142

42-
4343
private def messageTags(commentHasTag: DataFrame, postHasTag: DataFrame, tag: DataFrame) = {
4444
val messageHasTag = commentHasTag.select($"CommentId".as("id"), $"TagId") |+| postHasTag.select($"PostId".as("id"), $"TagId")
4545

@@ -54,11 +54,12 @@ object FactorGenerationStage extends DatagenStage with Logging {
5454

5555
private val rawFactors = Map(
5656
"countryNumPersons" -> Factor(Place, Person) { case Seq(places, persons) =>
57-
val cities = places.where($"type" === "City").cache()
57+
val cities = places.where($"type" === "City").cache()
5858
val countries = places.where($"type" === "Country").cache()
5959

6060
frequency(
61-
persons.as("Person")
61+
persons
62+
.as("Person")
6263
.join(broadcast(cities.as("City")), $"City.id" === $"Person.LocationCityId")
6364
.join(broadcast(countries.as("Country")), $"Country.id" === $"City.PartOfPlaceId"),
6465
value = $"Person.id",
@@ -76,7 +77,8 @@ object FactorGenerationStage extends DatagenStage with Logging {
7677
val cities = places.where($"type" === "City").cache()
7778

7879
frequency(
79-
personKnowsPerson.alias("Knows")
80+
personKnowsPerson
81+
.alias("Knows")
8082
.join(persons.cache().as("Person1"), $"Person1.id" === $"Knows.Person1Id")
8183
.join(cities.cache().as("City1"), $"City1.id" === "Person1.LocationCityId")
8284
.join(persons.as("Person2"), $"Person2.id" === $"Knows.Person2Id")
@@ -93,11 +95,12 @@ object FactorGenerationStage extends DatagenStage with Logging {
9395
)
9496
},
9597
"countryPairsNumFriends" -> Factor(PersonKnowsPerson, Person, Place) { case Seq(personKnowsPerson, persons, places) =>
96-
val cities = places.where($"type" === "City").cache()
98+
val cities = places.where($"type" === "City").cache()
9799
val countries = places.where($"type" === "Country").cache()
98100

99101
frequency(
100-
personKnowsPerson.alias("Knows")
102+
personKnowsPerson
103+
.alias("Knows")
101104
.join(persons.cache().as("Person1"), $"Person1.id" === $"Knows.Person1Id")
102105
.join(cities.cache().as("City1"), $"City1.id" === "Person1.LocationCityId")
103106
.join(countries.cache().as("Country1"), $"Country1.id" === "City1.PartOfPlaceId")
@@ -132,7 +135,8 @@ object FactorGenerationStage extends DatagenStage with Logging {
132135
},
133136
"messageTagClasses" -> Factor(CommentHasTag, PostHasTag, Tag, TagClass) { case Seq(commentHasTag, postHasTag, tag, tagClass) =>
134137
frequency(
135-
messageTags(commentHasTag, postHasTag, tag).as("MessageTags")
138+
messageTags(commentHasTag, postHasTag, tag)
139+
.as("MessageTags")
136140
.join(tag.as("Tag"), $"MessageTags.tagId" === $"Tag.id")
137141
.join(tagClass.as("TagClass"), $"Tag.TypeTagClassId" === $"TagClass.id"),
138142
value = $"frequency",
@@ -141,10 +145,10 @@ object FactorGenerationStage extends DatagenStage with Logging {
141145
)
142146
},
143147
"personNumFriends" -> Factor(PersonKnowsPerson) { case Seq(knows) =>
144-
frequency(knows, value=$"Person2Id", by=Seq($"Person1Id"))
148+
frequency(knows, value = $"Person2Id", by = Seq($"Person1Id"))
145149
},
146150
"postLanguages" -> Factor(Post) { case Seq(post) =>
147-
frequency(post.where($"language".isNotNull), value=$"id", by=Seq($"language"))
151+
frequency(post.where($"language".isNotNull), value = $"id", by = Seq($"language"))
148152
},
149153
"tagClassNumTags" -> Factor(TagClass, Tag) { case Seq(tagClass, tag) =>
150154
frequency(
@@ -163,5 +167,3 @@ object FactorGenerationStage extends DatagenStage with Logging {
163167
}
164168
)
165169
}
166-
167-

src/main/scala/ldbc/snb/datagen/factors/FactorTable.scala

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,13 @@ package ldbc.snb.datagen.factors
33
import ldbc.snb.datagen.model.{Graph, GraphDef, Mode}
44
import org.apache.spark.sql.DataFrame
55

6-
76
case class FactorTableDef[M <: Mode](
8-
name: String,
9-
sourceDef: GraphDef[M]
10-
)
7+
name: String,
8+
sourceDef: GraphDef[M]
9+
)
1110

1211
case class FactorTable[M <: Mode](
13-
name: String,
14-
data: DataFrame,
15-
source: Graph[M]
16-
)
12+
name: String,
13+
data: DataFrame,
14+
source: Graph[M]
15+
)

src/main/scala/ldbc/snb/datagen/generation/GenerationStage.scala

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,34 +14,34 @@ object GenerationStage extends DatagenStage with Logging {
1414
val optimalPersonsPerFile = 500000
1515

1616
case class Args(
17-
scaleFactor: String = "1",
18-
numThreads: Option[Int] = None,
19-
params: Map[String, String] = Map.empty,
20-
paramFile: Option[String] = None,
21-
outputDir: String = "out",
22-
oversizeFactor: Option[Double] = None
17+
scaleFactor: String = "1",
18+
numThreads: Option[Int] = None,
19+
params: Map[String, String] = Map.empty,
20+
paramFile: Option[String] = None,
21+
outputDir: String = "out",
22+
oversizeFactor: Option[Double] = None
2323
)
2424

2525
def run(args: Args, config: GeneratorConfiguration)(implicit spark: SparkSession) = {
26-
val numPartitions = config.getInt("hadoop.numThreads", spark.sparkContext.defaultParallelism)
26+
val numPartitions = config.getInt("hadoop.numThreads", spark.sparkContext.defaultParallelism)
2727
val idealPartitions = DatagenParams.numPersons.toDouble / optimalPersonsPerFile
2828

2929
val oversizeFactor = args.oversizeFactor.getOrElse(Math.max(numPartitions / idealPartitions, 1.0))
3030

3131
val persons = SparkPersonGenerator(config)
3232

33-
val percentages = Seq(0.45f, 0.45f, 0.1f)
33+
val percentages = Seq(0.45f, 0.45f, 0.1f)
3434
val knowsGeneratorClassName = DatagenParams.getKnowsGenerator
3535

3636
import ldbc.snb.datagen.entities.Keys._
3737

38-
val uniRanker = SparkRanker.create(_.byUni)
38+
val uniRanker = SparkRanker.create(_.byUni)
3939
val interestRanker = SparkRanker.create(_.byInterest)
40-
val randomRanker = SparkRanker.create(_.byRandomId)
40+
val randomRanker = SparkRanker.create(_.byRandomId)
4141

42-
val uniKnows = SparkKnowsGenerator(persons, uniRanker, config, percentages, 0, knowsGeneratorClassName)
42+
val uniKnows = SparkKnowsGenerator(persons, uniRanker, config, percentages, 0, knowsGeneratorClassName)
4343
val interestKnows = SparkKnowsGenerator(persons, interestRanker, config, percentages, 1, knowsGeneratorClassName)
44-
val randomKnows = SparkKnowsGenerator(persons, randomRanker, config, percentages, 2, knowsGeneratorClassName)
44+
val randomKnows = SparkKnowsGenerator(persons, randomRanker, config, percentages, 2, knowsGeneratorClassName)
4545

4646
val merged = SparkKnowsMerger(uniKnows, interestKnows, randomKnows).cache()
4747

src/main/scala/ldbc/snb/datagen/generation/generator/SparkKnowsGenerator.scala

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@ import scala.reflect.ClassTag
1515

1616
object SparkKnowsGenerator {
1717
def apply(
18-
persons: RDD[Person],
19-
ranker: SparkRanker,
20-
conf: GeneratorConfiguration,
21-
percentages: Seq[Float],
22-
stepIndex: Int,
23-
knowsGeneratorClassName: String
18+
persons: RDD[Person],
19+
ranker: SparkRanker,
20+
conf: GeneratorConfiguration,
21+
percentages: Seq[Float],
22+
stepIndex: Int,
23+
knowsGeneratorClassName: String
2424
)(implicit spark: SparkSession) = {
2525
val blockSize = DatagenParams.blockSize
2626

@@ -33,14 +33,14 @@ object SparkKnowsGenerator {
3333
// groupByKey wouldn't guarantee keeping the order inside groups
3434
// TODO check if it actually has better performance than sorting inside mapPartitions (probably not)
3535
.combineByKeyWithClassTag(
36-
personByRank => SortedMap(personByRank),
37-
(map: SortedMap[Long, Person], personByRank) => map + personByRank,
38-
(a: SortedMap[Long, Person], b: SortedMap[Long, Person]) => a ++ b
39-
)
36+
personByRank => SortedMap(personByRank),
37+
(map: SortedMap[Long, Person], personByRank) => map + personByRank,
38+
(a: SortedMap[Long, Person], b: SortedMap[Long, Person]) => a ++ b
39+
)
4040
.mapPartitions(groups => {
4141
DatagenContext.initialize(conf)
4242
val knowsGeneratorClass = Class.forName(knowsGeneratorClassName)
43-
val knowsGenerator = knowsGeneratorClass.getConstructor().newInstance().asInstanceOf[KnowsGenerator]
43+
val knowsGenerator = knowsGeneratorClass.getConstructor().newInstance().asInstanceOf[KnowsGenerator]
4444
knowsGenerator.initialize(conf)
4545
val personSimilarity = DatagenParams.getPersonSimularity
4646

@@ -53,9 +53,9 @@ object SparkKnowsGenerator {
5353
clonedPersons
5454
}
5555

56-
for {
56+
for {
5757
persons <- personGroups
58-
person <- persons.iterator().asScala
58+
person <- persons.iterator().asScala
5959
} yield person
6060
})
6161
}

src/main/scala/ldbc/snb/datagen/generation/generator/SparkPersonGenerator.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ object SparkPersonGenerator {
2121

2222
for {
2323
i <- blocks
24-
_ = println(s"Processing person block $i (${DatagenParams.blockSize})")
24+
_ = println(s"Processing person block $i (${DatagenParams.blockSize})")
2525
size = Math.min(DatagenParams.numPersons - DatagenParams.blockSize * i, DatagenParams.blockSize).toInt
2626
person <- personGenerator.generatePersonBlock(i.toInt, DatagenParams.blockSize).asScala.take(size)
2727
} yield person

src/main/scala/ldbc/snb/datagen/generation/generator/SparkRanker.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ object SparkRanker {
2424
.mapPartitionsWithIndex((i, ps) => Array((i, ps.size)).iterator, preservesPartitioning = true)
2525
.collectAsMap()
2626

27-
val aggregatedCounts = SortedMap(counts.toSeq : _*)
28-
.foldLeft((0L, Map.empty[Int, Long])) {
29-
case ((total, map), (i, c)) => (total + c, map + (i -> total))
27+
val aggregatedCounts = SortedMap(counts.toSeq: _*)
28+
.foldLeft((0L, Map.empty[Int, Long])) { case ((total, map), (i, c)) =>
29+
(total + c, map + (i -> total))
3030
}
3131
._2
3232

src/main/scala/ldbc/snb/datagen/generation/serializer/SparkActivitySerializer.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ import scala.collection.JavaConverters._
2020

2121
object SparkActivitySerializer {
2222

23-
def apply(persons: RDD[Person], ranker: SparkRanker, conf: GeneratorConfiguration, partitions: Option[Int] = None, oversizeFactor: Double = 1.0)(implicit spark: SparkSession) = {
23+
def apply(persons: RDD[Person], ranker: SparkRanker, conf: GeneratorConfiguration, partitions: Option[Int] = None, oversizeFactor: Double = 1.0)(implicit
24+
spark: SparkSession
25+
) = {
2426

2527
val blockSize = DatagenParams.blockSize
2628
val blocks = ranker(persons)
@@ -33,8 +35,8 @@ object SparkActivitySerializer {
3335
blocks.foreachPartition(groups => {
3436
DatagenContext.initialize(conf)
3537
val partitionId = TaskContext.getPartitionId()
36-
val hadoopConf = serializableHadoopConf.value
37-
val buildDir = conf.getOutputDir
38+
val hadoopConf = serializableHadoopConf.value
39+
val buildDir = conf.getOutputDir
3840

3941
val fs = FileSystem.get(new URI(buildDir), hadoopConf)
4042
fs.mkdirs(new Path(buildDir))
@@ -44,10 +46,10 @@ object SparkActivitySerializer {
4446
dynamicActivitySerializer.initialize(fs, conf.getOutputDir, partitionId, oversizeFactor, false)
4547

4648
val generator = new PersonActivityGenerator
47-
val exporter = new PersonActivityExporter(dynamicActivitySerializer)
49+
val exporter = new PersonActivityExporter(dynamicActivitySerializer)
4850

4951
try {
50-
for {(blockId, persons) <- groups} {
52+
for { (blockId, persons) <- groups } {
5153
val clonedPersons = new util.ArrayList[Person]
5254
for (p <- persons) {
5355
clonedPersons.add(new Person(p))

src/main/scala/ldbc/snb/datagen/generation/serializer/SparkPersonSerializer.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,20 @@ import java.net.URI
1515
object SparkPersonSerializer {
1616

1717
def apply(
18-
persons: RDD[Person],
19-
conf: GeneratorConfiguration,
20-
partitions: Option[Int] = None,
21-
oversizeFactor: Double = 1.0
18+
persons: RDD[Person],
19+
conf: GeneratorConfiguration,
20+
partitions: Option[Int] = None,
21+
oversizeFactor: Double = 1.0
2222
)(implicit spark: SparkSession): Unit = {
2323
val serializableHadoopConf = new SerializableConfiguration(spark.sparkContext.hadoopConfiguration)
2424

2525
persons
2626
.pipeFoldLeft(partitions)((rdd: RDD[Person], p: Int) => rdd.coalesce(p))
2727
.foreachPartition(persons => {
2828
val dynamicPersonSerializer = new DynamicPersonSerializer
29-
val hadoopConf = serializableHadoopConf.value
30-
val partitionId = TaskContext.getPartitionId()
31-
val buildDir = conf.getOutputDir
29+
val hadoopConf = serializableHadoopConf.value
30+
val partitionId = TaskContext.getPartitionId()
31+
val buildDir = conf.getOutputDir
3232

3333
val fs = FileSystem.get(new URI(buildDir), hadoopConf)
3434
fs.mkdirs(new Path(buildDir))
@@ -45,7 +45,7 @@ object SparkPersonSerializer {
4545

4646
personExporter use { pe =>
4747
DatagenContext.initialize(conf)
48-
for {p <- persons} {
48+
for { p <- persons } {
4949
pe.export(p)
5050
}
5151
}

0 commit comments

Comments
 (0)