@@ -8,8 +8,9 @@ import ldbc.snb.datagen.model.Mode.Raw
8
8
import ldbc .snb .datagen .syntax ._
9
9
import ldbc .snb .datagen .transformation .transform .ConvertDates
10
10
import ldbc .snb .datagen .util .{DatagenStage , Logging }
11
+ import org .apache .spark .graphx
11
12
import org .apache .spark .sql .functions .{broadcast , col , count , date_trunc , expr , floor , lit , sum }
12
- import org .apache .spark .sql .{Column , DataFrame , functions }
13
+ import org .apache .spark .sql .{Column , DataFrame , Row , functions }
13
14
import shapeless ._
14
15
15
16
import scala .util .matching .Regex
@@ -141,6 +142,20 @@ object FactorGenerationStage extends DatagenStage with Logging {
141
142
).select($" Tag.id" .as(" tagId" ), $" Tag.name" .as(" tagName" ), $" frequency" )
142
143
}
143
144
145
+ private def sameUniversityKnows (personKnowsPerson : DataFrame , studyAt : DataFrame ) = {
146
+ undirectedKnowsTemporal(personKnowsPerson)
147
+ .join(studyAt.as(" studyAt1" ), $" studyAt1.personId" === $" knows.person1Id" )
148
+ .join(studyAt.as(" studyAt2" ), $" studyAt2.personId" === $" knows.person2Id" )
149
+ .where($" studyAt1.universityId" === $" studyAt2.universityId" )
150
+ .select(
151
+ $" knows.person1Id" .as(" person1Id" ),
152
+ $" knows.person2Id" .as(" person2Id" ),
153
+ functions.greatest($" knows.creationDate" , $" studyAt1.creationDate" , $" studyAt2.creationDate" ).alias(" creationDate" ),
154
+ functions.least($" knows.deletionDate" , $" studyAt1.deletionDate" , $" studyAt2.deletionDate" ).alias(" deletionDate" )
155
+ )
156
+ .where($" creationDate" < $" deletionDate" )
157
+ }
158
+
144
159
import model .raw ._
145
160
146
161
private val rawFactors = Map (
@@ -446,21 +461,6 @@ object FactorGenerationStage extends DatagenStage with Logging {
446
461
val sampleFractionPersonPairs = Math .min(10000.0 / personPairs.count(), 1.0 )
447
462
personPairs.sample(sampleFractionPersonPairs, 42 )
448
463
},
449
- " sameUniversityKnows" -> LargeFactor (PersonKnowsPersonType , PersonStudyAtUniversityType ) { case Seq (personKnowsPerson, studyAt) =>
450
- val size = Math .max(Math .ceil(personKnowsPerson.rdd.getNumPartitions / 10 ).toInt, 1 )
451
- undirectedKnowsTemporal(personKnowsPerson)
452
- .join(studyAt.as(" studyAt1" ), $" studyAt1.personId" === $" knows.person1Id" )
453
- .join(studyAt.as(" studyAt2" ), $" studyAt2.personId" === $" knows.person2Id" )
454
- .where($" studyAt1.universityId" === $" studyAt2.universityId" )
455
- .select(
456
- $" knows.person1Id" .as(" person1Id" ),
457
- $" knows.person2Id" .as(" person2Id" ),
458
- functions.greatest($" knows.creationDate" , $" studyAt1.creationDate" , $" studyAt2.creationDate" ).alias(" creationDate" ),
459
- functions.least($" knows.deletionDate" , $" studyAt1.deletionDate" , $" studyAt2.deletionDate" ).alias(" deletionDate" )
460
- )
461
- .where($" creationDate" < $" deletionDate" )
462
- .coalesce(size)
463
- },
464
464
// -- interactive --
465
465
// first names
466
466
" personFirstNames" -> Factor (PersonType ) { case Seq (person) =>
@@ -652,5 +652,22 @@ object FactorGenerationStage extends DatagenStage with Logging {
652
652
)
653
653
numFriendOfFriendCompanies
654
654
},
655
+ " sameUniversityConnected" -> LargeFactor (PersonType , PersonKnowsPersonType , PersonStudyAtUniversityType ) { case Seq (person, personKnowsPerson, studyAt) =>
656
+ val s = spark
657
+ import s .implicits ._
658
+ val vertices = person.select(" id" ).rdd.map(row => (row.getAs[Long ](" id" ), ()))
659
+
660
+ val edges = sameUniversityKnows(personKnowsPerson, studyAt).rdd.map(row =>
661
+ graphx.Edge (row.getAs[Long ](" person1Id" ), row.getAs[Long ](" person2Id" ), ())
662
+ )
663
+ val graph = graphx.Graph (vertices, edges, ())
664
+ val cc = graph.connectedComponents().vertices
665
+ .toDF(" PersonId" , " Component" )
666
+
667
+ val counts = cc.groupBy(" Component" ).agg(count(" *" ).as(" count" ))
668
+
669
+ cc.join(counts, Seq (" Component" )).select(" PersonId" , " Component" , " count" )
670
+
671
+ }
655
672
)
656
673
}
0 commit comments