Skip to content

Commit 16ad74a

Browse files
committed
filter not nn edges
1 parent ad99962 commit 16ad74a

File tree

7 files changed

+78
-51
lines changed

7 files changed

+78
-51
lines changed

src/main/scala/ldbc/snb/datagen/io/graphs.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import ldbc.snb.datagen.io.dataframes.{DataFrameSink, DataFrameSource}
44
import ldbc.snb.datagen.model.EntityType.{Attr, Edge, Node}
55
import ldbc.snb.datagen.model.Mode.Raw
66
import ldbc.snb.datagen.model._
7+
import ldbc.snb.datagen.model.raw.{ForumType, PersonType}
78
import ldbc.snb.datagen.syntax._
89
import ldbc.snb.datagen.util.{Logging, SparkUI}
910
import org.apache.spark.sql.functions.col
@@ -36,10 +37,9 @@ object graphs {
3637
implicit val atNode = at[Node](n => Seq(n.name.hashCode, 0, n.hashCode()))
3738
implicit val atEdge = at[Edge](e => {
3839
val primary = e match {
39-
case Edge("Likes", "Person", "Comment", _, _) => "Comment"
40-
case Edge("Likes", "Person", "Post", _, _) => "Post"
41-
case Edge("ContainerOf", "Forum", "Post", _, _) => "Post"
42-
case Edge(_, source, _, _, _) => source
40+
case Edge("Likes", PersonType, dest, _, _, _, _) => dest.name
41+
case Edge("ContainerOf", ForumType, dest, _, _, _, _) => dest.name
42+
case Edge(_, source, _, _, _, _, _) => source.name
4343
}
4444
Seq(primary.hashCode, 2, e.hashCode)
4545
})

src/main/scala/ldbc/snb/datagen/model/package.scala

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -48,36 +48,47 @@ package object model {
4848
private def s(isStatic: Boolean) = if (isStatic) "static" else "dynamic"
4949

5050
final case class Node(name: String, isStatic: Boolean = false) extends EntityType {
51-
override val entityPath: String = s"${s(isStatic)}/${name}"
51+
override val entityPath: String = s"${s(isStatic)}/$name"
5252
override val primaryKey: Seq[String] = Seq("id")
5353
override def toString: String = s"$name"
5454
}
5555

5656
final case class Edge(
5757
`type`: String,
58-
source: String,
59-
destination: String,
58+
source: Node,
59+
destination: Node,
6060
cardinality: Cardinality,
61-
isStatic: Boolean = false
61+
isStatic: Boolean = false,
62+
sourceNameOverride: Option[String] = None,
63+
destinationNameOverride: Option[String] = None,
6264
) extends EntityType {
63-
override val entityPath: String = s"${s(isStatic)}/${source}_${pascalToCamel(`type`)}_${destination}"
65+
val sourceName: String = sourceNameOverride match {
66+
case Some(name) => name
67+
case _ => source.name
68+
}
69+
val destinationName: String = destinationNameOverride match {
70+
case Some(name) => name
71+
case _ => destination.name
72+
}
73+
74+
override val entityPath: String = s"${s(isStatic)}/${sourceName}_${pascalToCamel(`type`)}_${destinationName}"
6475

65-
override val primaryKey: Seq[String] = ((source, destination) match {
76+
override val primaryKey: Seq[String] = ((sourceName, destinationName) match {
6677
case (s, d) if s == d => Seq(s"${s}1", s"${d}2")
6778
case (s, d) => Seq(s, d)
6879
}).map(name => s"${name}Id")
6980

70-
override def toString: String = s"$source -[${`type`}]-> $destination"
81+
override def toString: String = s"${sourceName} -[${`type`}]-> ${destinationName}"
7182
}
7283

73-
final case class Attr(`type`: String, parent: String, attribute: String, isStatic: Boolean = false) extends EntityType {
74-
override val entityPath: String = s"${s(isStatic)}/${parent}_${pascalToCamel(`type`)}_${attribute}"
84+
final case class Attr(`type`: String, parent: Node, attribute: String, isStatic: Boolean = false) extends EntityType {
85+
override val entityPath: String = s"${s(isStatic)}/${parent.name}_${pascalToCamel(`type`)}_${attribute}"
7586

76-
override val primaryKey: Seq[String] = ((parent, attribute) match {
87+
override val primaryKey: Seq[String] = ((parent.name, attribute) match {
7788
case (s, d) if s == d => Seq(s"${s}1", s"${d}2")
7889
case (s, d) => Seq(s, d)
7990
}).map(name => s"${name}Id")
80-
override def toString: String = s"$parent ♢-[${`type`}]-> $attribute"
91+
override def toString: String = s"${parent.name} ♢-[${`type`}]-> $attribute"
8192
}
8293

8394
}

src/main/scala/ldbc/snb/datagen/model/raw.scala

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -174,19 +174,21 @@ object raw {
174174
val TagType = Node("Tag", isStatic = true)
175175
val TagClassType = Node("TagClass", isStatic = true)
176176
val CommentType = Node("Comment")
177-
val CommentHasTagType = Edge("HasTag", "Comment", "Tag", NN)
178-
val ForumType = Node("Forum")
179-
val ForumHasMemberType = Edge("HasMember", "Forum", "Person", NN)
180-
val ForumHasTagType = Edge("HasTag", "Forum", "Tag", NN)
181177
val PersonType = Node("Person")
182-
val PersonHasInterestTagType = Edge("HasInterest", "Person", "Tag", NN)
183-
val PersonKnowsPersonType = Edge("Knows", "Person", "Person", NN)
184-
val PersonLikesCommentType = Edge("Likes", "Person", "Comment", NN)
185-
val PersonLikesPostType = Edge("Likes", "Person", "Post", NN)
186-
val PersonStudyAtUniversityType = Edge("StudyAt", "Person", "University", OneN)
187-
val PersonWorkAtCompanyType = Edge("WorkAt", "Person", "Company", NN)
178+
val ForumType = Node("Forum")
188179
val PostType = Node("Post")
189-
val PostHasTagType = Edge("HasTag", "Post", "Tag", NN)
180+
181+
val CommentHasTagType = Edge("HasTag", CommentType, TagType, NN)
182+
val ForumHasMemberType = Edge("HasMember", ForumType, PersonType, NN)
183+
val ForumHasTagType = Edge("HasTag", ForumType, TagType, NN)
184+
val PersonHasInterestTagType = Edge("HasInterest", PersonType, TagType, NN)
185+
val PersonKnowsPersonType = Edge("Knows", PersonType, PersonType, NN)
186+
val PersonLikesCommentType = Edge("Likes", PersonType, CommentType, NN)
187+
val PersonLikesPostType = Edge("Likes", PersonType, PostType, NN)
188+
val PersonStudyAtUniversityType = Edge("StudyAt", PersonType, OrganisationType, OneN, destinationNameOverride = Some("University"))
189+
val PersonWorkAtCompanyType = Edge("WorkAt", PersonType, OrganisationType, NN, destinationNameOverride = Some("Company"))
190+
191+
val PostHasTagType = Edge("HasTag", PostType, TagType, NN)
190192

191193
trait EntityTraitsInstances {
192194
import EntityTraits._

src/main/scala/ldbc/snb/datagen/transformation/transform/ExplodeAttrs.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ object ExplodeAttrs extends Transform[Mode.Raw.type, Mode.Raw.type] {
1717
val updatedEntities = input.entities
1818
.collect { case (k @ Node("Person", false), v) =>
1919
Map(
20-
explodedAttr(Attr("Email", "Person", "EmailAddress"), v, $"email"),
21-
explodedAttr(Attr("Speaks", "Person", "Language"), v, $"language"),
20+
explodedAttr(Attr("Email", k, "EmailAddress"), v, $"email"),
21+
explodedAttr(Attr("Speaks", k, "Language"), v, $"language"),
2222
k -> v.drop("email", "language")
2323
)
2424
}

src/main/scala/ldbc/snb/datagen/transformation/transform/ExplodeEdges.scala

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import ldbc.snb.datagen.model.Cardinality.{NN, NOne, OneN}
44
import ldbc.snb.datagen.model.EntityType.{Edge, Node}
55
import ldbc.snb.datagen.model.Mode
66
import ldbc.snb.datagen.model.Mode.Raw.withRawColumns
7+
import ldbc.snb.datagen.model.raw._
78
import ldbc.snb.datagen.syntax._
89
import org.apache.spark.sql.functions._
910
import org.apache.spark.sql.{Column, DataFrame}
@@ -25,51 +26,51 @@ object ExplodeEdges extends Transform[Mode.Raw.type, Mode.Raw.type] {
2526

2627
val updatedEntities = entities
2728
.collect {
28-
case (k @ Node("Organisation", true), v) =>
29+
case (k @ OrganisationType, v) =>
2930
Map(
30-
explodedEdge(Edge("IsLocatedIn", "Organisation", "Place", OneN, isStatic = true), v, $"LocationPlaceId"),
31+
explodedEdge(Edge("IsLocatedIn", k, PlaceType, OneN, isStatic = true), v, $"LocationPlaceId"),
3132
k -> v.drop("LocationPlaceId")
3233
)
33-
case (k @ Node("Place", true), v) =>
34+
case (k @ PlaceType, v) =>
3435
Map(
35-
explodedEdge(Edge("IsPartOf", "Place", "Place", OneN, isStatic = true), v, $"PartOfPlaceId"),
36+
explodedEdge(Edge("IsPartOf", k, k, OneN, isStatic = true), v, $"PartOfPlaceId"),
3637
k -> v.drop("PartOfPlaceId")
3738
)
38-
case (k @ Node("Tag", true), v) =>
39+
case (k @ TagType, v) =>
3940
Map(
40-
explodedEdge(Edge("HasType", "Tag", "TagClass", OneN, isStatic = true), v, $"TypeTagClassId"),
41+
explodedEdge(Edge("HasType", k, TagClassType, OneN, isStatic = true), v, $"TypeTagClassId"),
4142
k -> v.drop("TypeTagClassId")
4243
)
43-
case (k @ Node("TagClass", true), v) =>
44+
case (k @ TagClassType, v) =>
4445
Map(
45-
explodedEdge(Edge("IsSubclassOf", "TagClass", "TagClass", OneN, isStatic = true), v, $"SubclassOfTagClassId"),
46+
explodedEdge(Edge("IsSubclassOf", k, k, OneN, isStatic = true), v, $"SubclassOfTagClassId"),
4647
k -> v.drop("SubclassOfTagClassId")
4748
)
48-
case (k @ Node("Comment", false), v) =>
49+
case (k @ CommentType, v) =>
4950
Map(
50-
explodedEdge(Edge("HasCreator", "Comment", "Person", OneN), v, $"CreatorPersonId"),
51-
explodedEdge(Edge("IsLocatedIn", "Comment", "Country", OneN), v, $"LocationCountryId"),
52-
explodedEdge(Edge("ReplyOf", "Comment", "Post", OneN), v, $"ParentPostId"),
53-
explodedEdge(Edge("ReplyOf", "Comment", "Comment", OneN), v, $"ParentCommentId"),
51+
explodedEdge(Edge("HasCreator", k, PersonType, OneN), v, $"CreatorPersonId"),
52+
explodedEdge(Edge("IsLocatedIn", k, PlaceType, OneN, destinationNameOverride = Some("Country")), v, $"LocationCountryId"),
53+
explodedEdge(Edge("ReplyOf", k, PostType, OneN), v, $"ParentPostId"),
54+
explodedEdge(Edge("ReplyOf", k, k, OneN), v, $"ParentCommentId"),
5455
k -> v.drop("CreatorPersonId", "LocationCountryId", "ParentPostId", "ParentCommentId")
5556
)
56-
case (k @ Node("Forum", false), v) =>
57+
case (k @ ForumType, v) =>
5758
Map(
58-
explodedEdge(Edge("HasModerator", "Forum", "Person", OneN), v, $"ModeratorPersonId"),
59+
explodedEdge(Edge("HasModerator", k, PersonType, OneN), v, $"ModeratorPersonId"),
5960
k -> v.drop("ModeratorPersonId")
6061
)
6162

62-
case (k @ Node("Person", false), v) =>
63+
case (k @ PersonType, v) =>
6364
Map(
64-
explodedEdge(Edge("IsLocatedIn", "Person", "City", OneN), v, $"LocationCityId"),
65+
explodedEdge(Edge("IsLocatedIn", k, PlaceType, OneN, destinationNameOverride = Some("City")), v, $"LocationCityId"),
6566
k -> v.drop("LocationCityId")
6667
)
6768

68-
case (k @ Node("Post", false), v) =>
69+
case (k @ PostType, v) =>
6970
Map(
70-
explodedEdge(Edge("HasCreator", "Post", "Person", OneN), v, $"CreatorPersonId"),
71-
explodedEdge(Edge("ContainerOf", "Forum", "Post", NOne), v, $"ContainerForumId"),
72-
explodedEdge(Edge("IsLocatedIn", "Post", "Country", OneN), v, $"LocationCountryId"),
71+
explodedEdge(Edge("HasCreator", k, PersonType, OneN), v, $"CreatorPersonId"),
72+
explodedEdge(Edge("ContainerOf", ForumType, k, NOne), v, $"ContainerForumId"),
73+
explodedEdge(Edge("IsLocatedIn", k, PlaceType, OneN, destinationNameOverride = Some("Country")), v, $"LocationCountryId"),
7374
k -> v.drop("CreatorPersonId", "LocationCountryId", "ContainerForumId")
7475
)
7576
}

src/main/scala/ldbc/snb/datagen/transformation/transform/RawToBiTransform.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
package ldbc.snb.datagen.transformation.transform
22

3+
import ldbc.snb.datagen.model.Cardinality.{NOne, OneN}
4+
import ldbc.snb.datagen.model.EntityType.Edge
35
import ldbc.snb.datagen.model.Mode.BI
46
import ldbc.snb.datagen.model._
7+
import ldbc.snb.datagen.model.graphs.Raw
8+
import ldbc.snb.datagen.model.raw._
59
import ldbc.snb.datagen.syntax._
610
import ldbc.snb.datagen.util.Logging
711
import ldbc.snb.datagen.util.sql._
@@ -24,6 +28,14 @@ case class RawToBiTransform(mode: BI, simulationStart: Long, simulationEnd: Long
2428
case _ => throw new IllegalStateException("Unrecognized partition key")
2529
}
2630

31+
private def notOneToManyEdge(entityType: EntityType): Boolean = entityType match {
32+
case Edge(_, _, _, OneN, _, _, _) => false
33+
case Edge(_, _, _, NOne, _, _, _) => false
34+
case _ => true
35+
}
36+
37+
// private def isRawEntity(entityType: EntityType): Boolean = Raw.graphDef.entities.contains(entityType)
38+
2739
override def transform(input: In): Out = {
2840
val batch_id = (col: Column) => date_format(date_trunc(mode.batchPeriod, to_timestamp(col / lit(1000L))), batchPeriodFormat(mode.batchPeriod))
2941

@@ -64,7 +76,7 @@ case class RawToBiTransform(mode: BI, simulationStart: Long, simulationEnd: Long
6476
tpe -> BatchedEntity(
6577
RawToInteractiveTransform.snapshotPart(tpe, v, bulkLoadThreshold, filterDeletion = false),
6678
Some(Batched(insertBatchPart(tpe, v, bulkLoadThreshold, simulationEnd), Seq("batch_id"), Seq($"creationDate"))),
67-
if (keepImplicitDeletes || v.columns.contains("explicitlyDeleted"))
79+
if (notOneToManyEdge(tpe) && (keepImplicitDeletes || v.columns.contains("explicitlyDeleted")))
6880
Some(Batched(deleteBatchPart(tpe, v, bulkLoadThreshold, simulationEnd), Seq("batch_id"), Seq($"deletionDate")))
6981
else
7082
None

src/main/scala/ldbc/snb/datagen/transformation/transform/RawToInteractiveTransform.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package ldbc.snb.datagen.transformation.transform
22

33
import ldbc.snb.datagen.model.Cardinality.NN
44
import ldbc.snb.datagen.model.EntityType.Edge
5+
import ldbc.snb.datagen.model.raw.PersonType
56
import ldbc.snb.datagen.model.{EntityType, Graph, Mode}
67
import ldbc.snb.datagen.syntax._
78
import ldbc.snb.datagen.util.Logging
@@ -29,7 +30,7 @@ object RawToInteractiveTransform {
2930

3031
def columns(tpe: EntityType, cols: Seq[String]) = tpe match {
3132
case tpe if tpe.isStatic => cols
32-
case Edge("Knows", "Person", "Person", NN, false) =>
33+
case Edge("Knows", PersonType, PersonType, NN, false, _, _) =>
3334
val rawCols = Set("deletionDate", "explicitlyDeleted", "weight")
3435
cols.filter(!rawCols.contains(_))
3536
case _ =>

0 commit comments

Comments
 (0)