@@ -101,6 +101,15 @@ object FactorGenerationStage extends DatagenStage with Logging {
101
101
.alias(" Knows" )
102
102
.cache()
103
103
104
+ private def undirectedKnowsTemporal (personKnowsPerson : DataFrame ) =
105
+ personKnowsPerson
106
+ .select(
107
+ expr(" stack(2, Person1Id, Person2Id, creationDate, deletionDate, Person2Id, Person1Id, creationDate, deletionDate)" )
108
+ .as(Seq (" Person1Id" , " Person2Id" , " creationDate" , " deletionDate" ))
109
+ )
110
+ .alias(" Knows" )
111
+ .cache()
112
+
104
113
private def nHops (relationships : DataFrame , n : Int , joinKeys : (String , String ), sample : Option [DataFrame => DataFrame ] = None ): DataFrame = {
105
114
val (leftKey, rightKey) = joinKeys
106
115
relationships
@@ -433,14 +442,17 @@ object FactorGenerationStage extends DatagenStage with Logging {
433
442
},
434
443
" sameUniversityKnows" -> LargeFactor (PersonKnowsPersonType , PersonStudyAtUniversityType ) { case Seq (personKnowsPerson, studyAt) =>
435
444
val size = Math .max(Math .ceil(personKnowsPerson.rdd.getNumPartitions / 10 ).toInt, 1 )
436
- undirectedKnows (personKnowsPerson)
445
+ undirectedKnowsTemporal (personKnowsPerson)
437
446
.join(studyAt.as(" studyAt1" ), $" studyAt1.personId" === $" knows.person1Id" )
438
447
.join(studyAt.as(" studyAt2" ), $" studyAt2.personId" === $" knows.person2Id" )
439
448
.where($" studyAt1.universityId" === $" studyAt2.universityId" )
440
449
.select(
441
450
$" knows.person1Id" .as(" person1Id" ),
442
- $" knows.person2Id" .as(" person2Id" )
451
+ $" knows.person2Id" .as(" person2Id" ),
452
+ functions.greatest($" knows.creationDate" , $" studyAt1.creationDate" , $" studyAt2.creationDate" ).alias(" creationDate" ),
453
+ functions.least($" knows.deletionDate" , $" studyAt1.deletionDate" , $" studyAt2.deletionDate" ).alias(" deletionDate" )
443
454
)
455
+ .where($" creationDate" < $" deletionDate" )
444
456
.coalesce(size)
445
457
}
446
458
)
0 commit comments