Skip to content

Commit a80da94

Browse files
committed
cc on pkp graph
1 parent 153d98b commit a80da94

File tree

1 file changed

+17
-0
lines changed

1 file changed

+17
-0
lines changed

src/main/scala/ldbc/snb/datagen/factors/FactorGenerationStage.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -668,6 +668,23 @@ object FactorGenerationStage extends DatagenStage with Logging {
668668

669669
cc.join(counts, Seq("Component")).select("PersonId", "Component", "count")
670670

671+
},
672+
"personKnowsPersonConnected" -> LargeFactor(PersonType, PersonKnowsPersonType) { case Seq(person, personKnowsPerson) =>
673+
val s = spark
674+
import s.implicits._
675+
val vertices = person.select("id").rdd.map(row => (row.getAs[Long]("id"), ()))
676+
677+
val edges = personKnowsPerson.rdd.map(row =>
678+
graphx.Edge(row.getAs[Long]("Person1Id"), row.getAs[Long]("Person2Id"), ())
679+
)
680+
val graph = graphx.Graph(vertices, edges, ())
681+
val cc = graph.connectedComponents().vertices
682+
.toDF("PersonId", "Component")
683+
684+
val counts = cc.groupBy("Component").agg(count("*").as("count"))
685+
686+
cc.join(counts, Seq("Component")).select("PersonId", "Component", "count")
687+
671688
}
672689
)
673690
}

0 commit comments

Comments
 (0)