Skip to content

Commit bd87e33

Browse files
authored
Merge pull request #391 from ldbc/fix-large-factor-table
Fix: generate factor tables on multiple machines
2 parents 346d792 + 578a7c9 commit bd87e33

File tree

2 files changed

+30
-20
lines changed

2 files changed

+30
-20
lines changed

src/main/scala/ldbc/snb/datagen/factors/FactorGenerationStage.scala

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,15 @@ import shapeless._
1313

1414
import scala.util.matching.Regex
1515

16-
case class Factor(requiredEntities: EntityType*)(f: Seq[DataFrame] => DataFrame) extends (Seq[DataFrame] => DataFrame) {
16+
trait FactorTrait extends (Seq[DataFrame] => DataFrame) {
17+
def requiredEntities: Seq[EntityType]
18+
}
19+
20+
case class Factor(override val requiredEntities: EntityType*)(f: Seq[DataFrame] => DataFrame) extends FactorTrait {
21+
override def apply(v1: Seq[DataFrame]): DataFrame = f(v1).coalesce(1)
22+
}
23+
24+
case class LargeFactor(override val requiredEntities: EntityType*)(f: Seq[DataFrame] => DataFrame) extends FactorTrait {
1725
override def apply(v1: Seq[DataFrame]): DataFrame = f(v1)
1826
}
1927

@@ -160,8 +168,8 @@ object FactorGenerationStage extends DatagenStage with Logging {
160168
)
161169
},
162170
"cityPairsNumFriends" -> Factor(PersonKnowsPersonType, PersonType, PlaceType) { case Seq(personKnowsPerson, persons, places) =>
163-
val cities = places.where($"type" === "City").cache()
164-
val knows = undirectedKnows(personKnowsPerson)
171+
val cities = places.where($"type" === "City").cache()
172+
val knows = undirectedKnows(personKnowsPerson)
165173
val countries = places.where($"type" === "Country").cache()
166174

167175
frequency(
@@ -279,7 +287,8 @@ object FactorGenerationStage extends DatagenStage with Logging {
279287
)
280288
},
281289
"personNumFriends" -> Factor(PersonKnowsPersonType, PersonType) { case Seq(personKnowsPerson, person1) =>
282-
val knows = person1.as("Person1")
290+
val knows = person1
291+
.as("Person1")
283292
.join(undirectedKnows(personKnowsPerson).as("knows"), $"Person1.id" === $"knows.Person1Id", "leftouter")
284293
frequency(knows, value = $"knows.Person2Id", by = Seq($"Person1.id", $"Person1.creationDate", $"Person1.deletionDate"))
285294
},
@@ -318,7 +327,7 @@ object FactorGenerationStage extends DatagenStage with Logging {
318327
$"Company.name".alias("companyName"),
319328
$"Company.id".alias("companyId"),
320329
$"Person2.creationDate".alias("person2creationDate"),
321-
$"Person2.deletionDate".alias("person2deletionDate"),
330+
$"Person2.deletionDate".alias("person2deletionDate")
322331
)
323332
.distinct()
324333
},
@@ -331,8 +340,8 @@ object FactorGenerationStage extends DatagenStage with Logging {
331340
)
332341
},
333342
"people4Hops" -> Factor(PersonType, PlaceType, PersonKnowsPersonType) { case Seq(person, place, knows) =>
334-
val cities = place.where($"type" === "City").cache()
335-
val allKnows = undirectedKnows(knows).cache()
343+
val cities = place.where($"type" === "City").cache()
344+
val allKnows = undirectedKnows(knows).cache()
336345
val minSampleSize = 100.0
337346

338347
val chinesePeopleSample = (relations: DataFrame) => {
@@ -377,8 +386,8 @@ object FactorGenerationStage extends DatagenStage with Logging {
377386
.limit(10000)
378387
},
379388
"people2Hops" -> Factor(PersonType, PlaceType, PersonKnowsPersonType) { case Seq(person, place, knows) =>
380-
val cities = place.where($"type" === "City").cache()
381-
val allKnows = undirectedKnows(knows).cache()
389+
val cities = place.where($"type" === "City").cache()
390+
val allKnows = undirectedKnows(knows).cache()
382391
val minSampleSize = 100.0
383392

384393
val chinesePeopleSample = (relations: DataFrame) => {
@@ -422,16 +431,17 @@ object FactorGenerationStage extends DatagenStage with Logging {
422431
.sort($"Person1Id", $"Person2Id")
423432
.limit(10000)
424433
},
425-
"sameUniversityKnows" -> Factor(PersonKnowsPersonType, PersonStudyAtUniversityType) {
426-
case Seq(personKnowsPerson, studyAt) =>
427-
undirectedKnows(personKnowsPerson)
428-
.join(studyAt.as("studyAt1"), $"studyAt1.personId" === $"knows.person1Id")
429-
.join(studyAt.as("studyAt2"), $"studyAt2.personId" === $"knows.person2Id")
430-
.where($"studyAt1.universityId" === $"studyAt2.universityId")
431-
.select(
432-
$"knows.person1Id".as("person1Id"),
433-
$"knows.person2Id".as("person2Id")
434-
)
434+
"sameUniversityKnows" -> LargeFactor(PersonKnowsPersonType, PersonStudyAtUniversityType) { case Seq(personKnowsPerson, studyAt) =>
435+
val size = Math.max(Math.ceil(personKnowsPerson.rdd.getNumPartitions / 10).toInt, 1)
436+
undirectedKnows(personKnowsPerson)
437+
.join(studyAt.as("studyAt1"), $"studyAt1.personId" === $"knows.person1Id")
438+
.join(studyAt.as("studyAt2"), $"studyAt2.personId" === $"knows.person2Id")
439+
.where($"studyAt1.universityId" === $"studyAt2.universityId")
440+
.select(
441+
$"knows.person1Id".as("person1Id"),
442+
$"knows.person2Id".as("person2Id")
443+
)
444+
.coalesce(size)
435445
}
436446
)
437447
}

src/main/scala/ldbc/snb/datagen/factors/io/package.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ package object io {
2121
val dfSink = if (sink.overwrite) {
2222
DataFrameSink(p, sink.format, mode = SaveMode.Overwrite)
2323
} else DataFrameSink(p, sink.format)
24-
self.data.coalesce(1).write(dfSink)
24+
self.data.write(dfSink)
2525
log.info(s"Factor table ${self.name} written")
2626
}
2727
}

0 commit comments

Comments
 (0)