@@ -385,12 +385,13 @@ object FactorGenerationStage extends DatagenStage with Logging {
385
385
.select($" knows.Person1Id" .alias(" Person1Id" ), $" knows.Person2Id" .alias(" Person2Id" ))
386
386
}
387
387
388
- nHops(
389
- allKnows,
390
- n = 4 ,
391
- joinKeys = (" Person2Id" , " Person1Id" ),
392
- sample = Some (chinesePeopleSample)
393
- ).join(person.as(" Person1" ), $" Person1.id" === $" Person1Id" )
388
+ val personPairs = nHops(
389
+ allKnows,
390
+ n = 4 ,
391
+ joinKeys = (" Person2Id" , " Person1Id" ),
392
+ sample = Some (chinesePeopleSample)
393
+ )
394
+ .join(person.as(" Person1" ), $" Person1.id" === $" Person1Id" )
394
395
.join(person.as(" Person2" ), $" Person2.id" === $" Person1Id" )
395
396
.select(
396
397
$" Person1Id" ,
@@ -400,8 +401,9 @@ object FactorGenerationStage extends DatagenStage with Logging {
400
401
$" Person2.creationDate" .as(" Person2CreationDate" ),
401
402
$" Person2.deletionDate" .as(" Person2DeletionDate" )
402
403
)
403
- .sort($" Person1Id" , $" Person2Id" )
404
- .limit(10000 )
404
+
405
+ val sampleFractionPersonPairs = 10000.0 / personPairs.count()
406
+ personPairs.sample(sampleFractionPersonPairs, 42 )
405
407
},
406
408
" people2Hops" -> Factor (PersonType , PlaceType , PersonKnowsPersonType ) { case Seq (person, place, knows) =>
407
409
val cities = place.where($" type" === " City" ).cache()
@@ -431,12 +433,13 @@ object FactorGenerationStage extends DatagenStage with Logging {
431
433
.select($" knows.Person1Id" .alias(" Person1Id" ), $" knows.Person2Id" .alias(" Person2Id" ))
432
434
}
433
435
434
- nHops(
435
- allKnows,
436
- n = 2 ,
437
- joinKeys = (" Person2Id" , " Person1Id" ),
438
- sample = Some (chinesePeopleSample)
439
- ).join(person.as(" Person1" ), $" Person1.id" === $" Person1Id" )
436
+ val personPairs = nHops(
437
+ allKnows,
438
+ n = 2 ,
439
+ joinKeys = (" Person2Id" , " Person1Id" ),
440
+ sample = Some (chinesePeopleSample)
441
+ )
442
+ .join(person.as(" Person1" ), $" Person1.id" === $" Person1Id" )
440
443
.join(person.as(" Person2" ), $" Person2.id" === $" Person1Id" )
441
444
.select(
442
445
$" Person1Id" ,
@@ -446,8 +449,9 @@ object FactorGenerationStage extends DatagenStage with Logging {
446
449
$" Person2.creationDate" .as(" Person2CreationDate" ),
447
450
$" Person2.deletionDate" .as(" Person2DeletionDate" )
448
451
)
449
- .sort($" Person1Id" , $" Person2Id" )
450
- .limit(10000 )
452
+
453
+ val sampleFractionPersonPairs = 10000.0 / personPairs.count()
454
+ personPairs.sample(sampleFractionPersonPairs, 42 )
451
455
},
452
456
" sameUniversityKnows" -> LargeFactor (PersonKnowsPersonType , PersonStudyAtUniversityType ) { case Seq (personKnowsPerson, studyAt) =>
453
457
val size = Math .max(Math .ceil(personKnowsPerson.rdd.getNumPartitions / 10 ).toInt, 1 )
0 commit comments