Skip to content

Commit 8461db7

Browse files
authored
Merge pull request #406 from ldbc/rm-redundant-deletes
Filter not NN edges when generating BI deletes
2 parents ad99962 + f5afc50 commit 8461db7

File tree

7 files changed

+74
-56
lines changed

7 files changed

+74
-56
lines changed

src/main/scala/ldbc/snb/datagen/io/graphs.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import ldbc.snb.datagen.io.dataframes.{DataFrameSink, DataFrameSource}
44
import ldbc.snb.datagen.model.EntityType.{Attr, Edge, Node}
55
import ldbc.snb.datagen.model.Mode.Raw
66
import ldbc.snb.datagen.model._
7+
import ldbc.snb.datagen.model.raw.{ForumType, PersonType}
78
import ldbc.snb.datagen.syntax._
89
import ldbc.snb.datagen.util.{Logging, SparkUI}
910
import org.apache.spark.sql.functions.col
@@ -36,10 +37,9 @@ object graphs {
3637
implicit val atNode = at[Node](n => Seq(n.name.hashCode, 0, n.hashCode()))
3738
implicit val atEdge = at[Edge](e => {
3839
val primary = e match {
39-
case Edge("Likes", "Person", "Comment", _, _) => "Comment"
40-
case Edge("Likes", "Person", "Post", _, _) => "Post"
41-
case Edge("ContainerOf", "Forum", "Post", _, _) => "Post"
42-
case Edge(_, source, _, _, _) => source
40+
case Edge("Likes", PersonType, dest, _, _, _, _) => dest.name
41+
case Edge("ContainerOf", ForumType, dest, _, _, _, _) => dest.name
42+
case Edge(_, source, _, _, _, _, _) => source.name
4343
}
4444
Seq(primary.hashCode, 2, e.hashCode)
4545
})

src/main/scala/ldbc/snb/datagen/model/package.scala

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -48,36 +48,41 @@ package object model {
4848
private def s(isStatic: Boolean) = if (isStatic) "static" else "dynamic"
4949

5050
final case class Node(name: String, isStatic: Boolean = false) extends EntityType {
51-
override val entityPath: String = s"${s(isStatic)}/${name}"
51+
override val entityPath: String = s"${s(isStatic)}/$name"
5252
override val primaryKey: Seq[String] = Seq("id")
5353
override def toString: String = s"$name"
5454
}
5555

5656
final case class Edge(
5757
`type`: String,
58-
source: String,
59-
destination: String,
58+
source: Node,
59+
destination: Node,
6060
cardinality: Cardinality,
61-
isStatic: Boolean = false
61+
isStatic: Boolean = false,
62+
sourceNameOverride: Option[String] = None,
63+
destinationNameOverride: Option[String] = None
6264
) extends EntityType {
63-
override val entityPath: String = s"${s(isStatic)}/${source}_${pascalToCamel(`type`)}_${destination}"
65+
val sourceName: String = sourceNameOverride.getOrElse(source.name)
66+
val destinationName: String = destinationNameOverride.getOrElse(destination.name)
6467

65-
override val primaryKey: Seq[String] = ((source, destination) match {
68+
override val entityPath: String = s"${s(isStatic)}/${sourceName}_${pascalToCamel(`type`)}_${destinationName}"
69+
70+
override val primaryKey: Seq[String] = ((sourceName, destinationName) match {
6671
case (s, d) if s == d => Seq(s"${s}1", s"${d}2")
6772
case (s, d) => Seq(s, d)
6873
}).map(name => s"${name}Id")
6974

70-
override def toString: String = s"$source -[${`type`}]-> $destination"
75+
override def toString: String = s"${sourceName} -[${`type`}]-> ${destinationName}"
7176
}
7277

73-
final case class Attr(`type`: String, parent: String, attribute: String, isStatic: Boolean = false) extends EntityType {
74-
override val entityPath: String = s"${s(isStatic)}/${parent}_${pascalToCamel(`type`)}_${attribute}"
78+
final case class Attr(`type`: String, parent: Node, attribute: String, isStatic: Boolean = false) extends EntityType {
79+
override val entityPath: String = s"${s(isStatic)}/${parent.name}_${pascalToCamel(`type`)}_${attribute}"
7580

76-
override val primaryKey: Seq[String] = ((parent, attribute) match {
81+
override val primaryKey: Seq[String] = ((parent.name, attribute) match {
7782
case (s, d) if s == d => Seq(s"${s}1", s"${d}2")
7883
case (s, d) => Seq(s, d)
7984
}).map(name => s"${name}Id")
80-
override def toString: String = s"$parent ♢-[${`type`}]-> $attribute"
85+
override def toString: String = s"${parent.name} ♢-[${`type`}]-> $attribute"
8186
}
8287

8388
}

src/main/scala/ldbc/snb/datagen/model/raw.scala

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -174,19 +174,21 @@ object raw {
174174
val TagType = Node("Tag", isStatic = true)
175175
val TagClassType = Node("TagClass", isStatic = true)
176176
val CommentType = Node("Comment")
177-
val CommentHasTagType = Edge("HasTag", "Comment", "Tag", NN)
178-
val ForumType = Node("Forum")
179-
val ForumHasMemberType = Edge("HasMember", "Forum", "Person", NN)
180-
val ForumHasTagType = Edge("HasTag", "Forum", "Tag", NN)
181177
val PersonType = Node("Person")
182-
val PersonHasInterestTagType = Edge("HasInterest", "Person", "Tag", NN)
183-
val PersonKnowsPersonType = Edge("Knows", "Person", "Person", NN)
184-
val PersonLikesCommentType = Edge("Likes", "Person", "Comment", NN)
185-
val PersonLikesPostType = Edge("Likes", "Person", "Post", NN)
186-
val PersonStudyAtUniversityType = Edge("StudyAt", "Person", "University", OneN)
187-
val PersonWorkAtCompanyType = Edge("WorkAt", "Person", "Company", NN)
178+
val ForumType = Node("Forum")
188179
val PostType = Node("Post")
189-
val PostHasTagType = Edge("HasTag", "Post", "Tag", NN)
180+
181+
val CommentHasTagType = Edge("HasTag", CommentType, TagType, NN)
182+
val ForumHasMemberType = Edge("HasMember", ForumType, PersonType, NN)
183+
val ForumHasTagType = Edge("HasTag", ForumType, TagType, NN)
184+
val PersonHasInterestTagType = Edge("HasInterest", PersonType, TagType, NN)
185+
val PersonKnowsPersonType = Edge("Knows", PersonType, PersonType, NN)
186+
val PersonLikesCommentType = Edge("Likes", PersonType, CommentType, NN)
187+
val PersonLikesPostType = Edge("Likes", PersonType, PostType, NN)
188+
val PersonStudyAtUniversityType = Edge("StudyAt", PersonType, OrganisationType, OneN, destinationNameOverride = Some("University"))
189+
val PersonWorkAtCompanyType = Edge("WorkAt", PersonType, OrganisationType, NN, destinationNameOverride = Some("Company"))
190+
191+
val PostHasTagType = Edge("HasTag", PostType, TagType, NN)
190192

191193
trait EntityTraitsInstances {
192194
import EntityTraits._

src/main/scala/ldbc/snb/datagen/transformation/transform/ExplodeAttrs.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ object ExplodeAttrs extends Transform[Mode.Raw.type, Mode.Raw.type] {
1717
val updatedEntities = input.entities
1818
.collect { case (k @ Node("Person", false), v) =>
1919
Map(
20-
explodedAttr(Attr("Email", "Person", "EmailAddress"), v, $"email"),
21-
explodedAttr(Attr("Speaks", "Person", "Language"), v, $"language"),
20+
explodedAttr(Attr("Email", k, "EmailAddress"), v, $"email"),
21+
explodedAttr(Attr("Speaks", k, "Language"), v, $"language"),
2222
k -> v.drop("email", "language")
2323
)
2424
}

src/main/scala/ldbc/snb/datagen/transformation/transform/ExplodeEdges.scala

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
package ldbc.snb.datagen.transformation.transform
22

3-
import ldbc.snb.datagen.model.Cardinality.{NN, NOne, OneN}
4-
import ldbc.snb.datagen.model.EntityType.{Edge, Node}
3+
import ldbc.snb.datagen.model.Cardinality._
4+
import ldbc.snb.datagen.model.EntityType._
55
import ldbc.snb.datagen.model.Mode
66
import ldbc.snb.datagen.model.Mode.Raw.withRawColumns
7+
import ldbc.snb.datagen.model.raw._
78
import ldbc.snb.datagen.syntax._
89
import org.apache.spark.sql.functions._
910
import org.apache.spark.sql.{Column, DataFrame}
@@ -25,51 +26,51 @@ object ExplodeEdges extends Transform[Mode.Raw.type, Mode.Raw.type] {
2526

2627
val updatedEntities = entities
2728
.collect {
28-
case (k @ Node("Organisation", true), v) =>
29+
case (k @ OrganisationType, v) =>
2930
Map(
30-
explodedEdge(Edge("IsLocatedIn", "Organisation", "Place", OneN, isStatic = true), v, $"LocationPlaceId"),
31+
explodedEdge(Edge("IsLocatedIn", k, PlaceType, OneN, isStatic = true), v, $"LocationPlaceId"),
3132
k -> v.drop("LocationPlaceId")
3233
)
33-
case (k @ Node("Place", true), v) =>
34+
case (k @ PlaceType, v) =>
3435
Map(
35-
explodedEdge(Edge("IsPartOf", "Place", "Place", OneN, isStatic = true), v, $"PartOfPlaceId"),
36+
explodedEdge(Edge("IsPartOf", k, k, OneN, isStatic = true), v, $"PartOfPlaceId"),
3637
k -> v.drop("PartOfPlaceId")
3738
)
38-
case (k @ Node("Tag", true), v) =>
39+
case (k @ TagType, v) =>
3940
Map(
40-
explodedEdge(Edge("HasType", "Tag", "TagClass", OneN, isStatic = true), v, $"TypeTagClassId"),
41+
explodedEdge(Edge("HasType", k, TagClassType, OneN, isStatic = true), v, $"TypeTagClassId"),
4142
k -> v.drop("TypeTagClassId")
4243
)
43-
case (k @ Node("TagClass", true), v) =>
44+
case (k @ TagClassType, v) =>
4445
Map(
45-
explodedEdge(Edge("IsSubclassOf", "TagClass", "TagClass", OneN, isStatic = true), v, $"SubclassOfTagClassId"),
46+
explodedEdge(Edge("IsSubclassOf", k, k, OneN, isStatic = true), v, $"SubclassOfTagClassId"),
4647
k -> v.drop("SubclassOfTagClassId")
4748
)
48-
case (k @ Node("Comment", false), v) =>
49+
case (k @ CommentType, v) =>
4950
Map(
50-
explodedEdge(Edge("HasCreator", "Comment", "Person", OneN), v, $"CreatorPersonId"),
51-
explodedEdge(Edge("IsLocatedIn", "Comment", "Country", OneN), v, $"LocationCountryId"),
52-
explodedEdge(Edge("ReplyOf", "Comment", "Post", OneN), v, $"ParentPostId"),
53-
explodedEdge(Edge("ReplyOf", "Comment", "Comment", OneN), v, $"ParentCommentId"),
51+
explodedEdge(Edge("HasCreator", k, PersonType, OneN), v, $"CreatorPersonId"),
52+
explodedEdge(Edge("IsLocatedIn", k, PlaceType, OneN, destinationNameOverride = Some("Country")), v, $"LocationCountryId"),
53+
explodedEdge(Edge("ReplyOf", k, PostType, OneN), v, $"ParentPostId"),
54+
explodedEdge(Edge("ReplyOf", k, k, OneN), v, $"ParentCommentId"),
5455
k -> v.drop("CreatorPersonId", "LocationCountryId", "ParentPostId", "ParentCommentId")
5556
)
56-
case (k @ Node("Forum", false), v) =>
57+
case (k @ ForumType, v) =>
5758
Map(
58-
explodedEdge(Edge("HasModerator", "Forum", "Person", OneN), v, $"ModeratorPersonId"),
59+
explodedEdge(Edge("HasModerator", k, PersonType, OneN), v, $"ModeratorPersonId"),
5960
k -> v.drop("ModeratorPersonId")
6061
)
6162

62-
case (k @ Node("Person", false), v) =>
63+
case (k @ PersonType, v) =>
6364
Map(
64-
explodedEdge(Edge("IsLocatedIn", "Person", "City", OneN), v, $"LocationCityId"),
65+
explodedEdge(Edge("IsLocatedIn", k, PlaceType, OneN, destinationNameOverride = Some("City")), v, $"LocationCityId"),
6566
k -> v.drop("LocationCityId")
6667
)
6768

68-
case (k @ Node("Post", false), v) =>
69+
case (k @ PostType, v) =>
6970
Map(
70-
explodedEdge(Edge("HasCreator", "Post", "Person", OneN), v, $"CreatorPersonId"),
71-
explodedEdge(Edge("ContainerOf", "Forum", "Post", NOne), v, $"ContainerForumId"),
72-
explodedEdge(Edge("IsLocatedIn", "Post", "Country", OneN), v, $"LocationCountryId"),
71+
explodedEdge(Edge("HasCreator", k, PersonType, OneN), v, $"CreatorPersonId"),
72+
explodedEdge(Edge("ContainerOf", ForumType, k, NOne), v, $"ContainerForumId"),
73+
explodedEdge(Edge("IsLocatedIn", k, PlaceType, OneN, destinationNameOverride = Some("Country")), v, $"LocationCountryId"),
7374
k -> v.drop("CreatorPersonId", "LocationCountryId", "ContainerForumId")
7475
)
7576
}

src/main/scala/ldbc/snb/datagen/transformation/transform/RawToBiTransform.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package ldbc.snb.datagen.transformation.transform
22

3+
import ldbc.snb.datagen.model.Cardinality._
4+
import ldbc.snb.datagen.model.EntityType._
35
import ldbc.snb.datagen.model.Mode.BI
46
import ldbc.snb.datagen.model._
57
import ldbc.snb.datagen.syntax._
@@ -24,6 +26,13 @@ case class RawToBiTransform(mode: BI, simulationStart: Long, simulationEnd: Long
2426
case _ => throw new IllegalStateException("Unrecognized partition key")
2527
}
2628

29+
private def notDerived(entityType: EntityType): Boolean = entityType match {
30+
case Edge(_, _, _, OneN, _, _, _) => false
31+
case Edge(_, _, _, NOne, _, _, _) => false
32+
case Attr(_, _, _, _) => false
33+
case _ => true
34+
}
35+
2736
override def transform(input: In): Out = {
2837
val batch_id = (col: Column) => date_format(date_trunc(mode.batchPeriod, to_timestamp(col / lit(1000L))), batchPeriodFormat(mode.batchPeriod))
2938

@@ -64,7 +73,7 @@ case class RawToBiTransform(mode: BI, simulationStart: Long, simulationEnd: Long
6473
tpe -> BatchedEntity(
6574
RawToInteractiveTransform.snapshotPart(tpe, v, bulkLoadThreshold, filterDeletion = false),
6675
Some(Batched(insertBatchPart(tpe, v, bulkLoadThreshold, simulationEnd), Seq("batch_id"), Seq($"creationDate"))),
67-
if (keepImplicitDeletes || v.columns.contains("explicitlyDeleted"))
76+
if (notDerived(tpe) && (keepImplicitDeletes || v.columns.contains("explicitlyDeleted")))
6877
Some(Batched(deleteBatchPart(tpe, v, bulkLoadThreshold, simulationEnd), Seq("batch_id"), Seq($"deletionDate")))
6978
else
7079
None

src/main/scala/ldbc/snb/datagen/transformation/transform/RawToInteractiveTransform.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
package ldbc.snb.datagen.transformation.transform
22

3-
import ldbc.snb.datagen.model.Cardinality.NN
4-
import ldbc.snb.datagen.model.EntityType.Edge
3+
import ldbc.snb.datagen.model.Cardinality._
4+
import ldbc.snb.datagen.model.EntityType._
5+
import ldbc.snb.datagen.model.raw._
56
import ldbc.snb.datagen.model.{EntityType, Graph, Mode}
67
import ldbc.snb.datagen.syntax._
78
import ldbc.snb.datagen.util.Logging
89
import ldbc.snb.datagen.util.sql._
910
import org.apache.spark.sql.DataFrame
10-
import org.apache.spark.sql.functions.{col, lit, to_timestamp}
11+
import org.apache.spark.sql.functions.{col, lit}
1112

1213
case class RawToInteractiveTransform(mode: Mode.Interactive, simulationStart: Long, simulationEnd: Long)
1314
extends Transform[Mode.Raw.type, Mode.Interactive]
@@ -29,7 +30,7 @@ object RawToInteractiveTransform {
2930

3031
def columns(tpe: EntityType, cols: Seq[String]) = tpe match {
3132
case tpe if tpe.isStatic => cols
32-
case Edge("Knows", "Person", "Person", NN, false) =>
33+
case Edge("Knows", PersonType, PersonType, NN, false, _, _) =>
3334
val rawCols = Set("deletionDate", "explicitlyDeleted", "weight")
3435
cols.filter(!rawCols.contains(_))
3536
case _ =>

0 commit comments

Comments
 (0)