Skip to content

Commit 11397c2

Browse files
authored
Merge pull request #417 from ldbc/minor-model-improvement
Minor transformation model improvements
2 parents 0596fd2 + 55dcb00 commit 11397c2

File tree

11 files changed

+107
-50
lines changed

11 files changed

+107
-50
lines changed

src/main/scala/ldbc/snb/datagen/factors/io/package.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package ldbc.snb.datagen.factors
22

33
import ldbc.snb.datagen.io.dataframes.DataFrameSink
44
import ldbc.snb.datagen.io.{PathComponent, Writer}
5-
import ldbc.snb.datagen.model.{GraphLike, Mode}
5+
import ldbc.snb.datagen.model.{GraphDef, Mode}
66
import ldbc.snb.datagen.syntax._
77
import ldbc.snb.datagen.util.Logging
88
import org.apache.spark.sql.SaveMode
@@ -17,7 +17,7 @@ package object io {
1717
override type Data = FactorTable[M]
1818

1919
override def write(self: FactorTable[M], sink: FactorTableSink): Unit = {
20-
val p = (sink.path / "factors" / sink.format / PathComponent[GraphLike[M]].path(self.source) / self.name).toString
20+
val p = (sink.path / "factors" / sink.format / PathComponent[GraphDef[M]].path(self.source.definition) / self.name).toString
2121
val dfSink = if (sink.overwrite) {
2222
DataFrameSink(p, sink.format, mode = SaveMode.Overwrite)
2323
} else DataFrameSink(p, sink.format)

src/main/scala/ldbc/snb/datagen/io/PathComponent.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package ldbc.snb.datagen.io
22

3-
import ldbc.snb.datagen.model.{GraphLike, Mode}
3+
import ldbc.snb.datagen.model.{GraphDef, Mode}
44

55
trait PathComponent[A] {
66
def path(a: A): String
@@ -11,7 +11,7 @@ object PathComponent {
1111

1212
private def pure[A](f: A => String) = new PathComponent[A] { override def path(a: A): String = f(a) }
1313

14-
implicit def pathComponentForGraphDef[M <: Mode] = pure((g: GraphLike[M]) => {
14+
implicit def pathComponentForGraphDef[M <: Mode] = pure((g: GraphDef[M]) => {
1515
val explodedPart = g match {
1616
case g if g.isAttrExploded && g.isEdgesExploded => "singular-projected-fk"
1717
case g if g.isAttrExploded => "singular-merged-fk"

src/main/scala/ldbc/snb/datagen/io/graphs.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -89,11 +89,12 @@ object graphs {
8989
import CacheFriendlyEntityOrdering._
9090

9191
override def write(self: Graph[M], sink: GraphSink): Unit = {
92+
val spec = self.definition
9293
TreeMap(self.entities.toSeq: _*).foreach { case (tpe, dataset) =>
9394
SparkUI.job(getClass.getSimpleName, s"write $tpe") {
94-
val p = (sink.path / "graphs" / sink.format / PathComponent[GraphLike[M]].path(self) / tpe.entityPath).toString
95+
val p = (sink.path / "graphs" / sink.format / PathComponent[GraphDef[M]].path(spec) / tpe.entityPath).toString
9596
log.info(s"$tpe: Writing started")
96-
val opts = getFormatOptions(sink.format, self.mode, sink.formatOptions)
97+
val opts = getFormatOptions(sink.format, spec.mode, sink.formatOptions)
9798
the(dataset).write(DataFrameSink(p, sink.format, opts, SaveMode.Ignore))
9899
log.info(s"$tpe: Writing completed")
99100
}(dataset.sparkSession)
@@ -112,10 +113,11 @@ object graphs {
112113
import CacheFriendlyEntityOrdering._
113114

114115
override def write(self: Graph[M], sink: GraphSink): Unit = {
115-
val opts = getFormatOptions(sink.format, self.mode, sink.formatOptions)
116+
val spec = self.definition
117+
val opts = getFormatOptions(sink.format, spec.mode, sink.formatOptions)
116118
TreeMap(self.entities.mapValues(ev).toSeq: _*).foreach { case (tpe, BatchedEntity(snapshot, insertBatches, deleteBatches)) =>
117119
SparkUI.job(getClass.getSimpleName, s"write $tpe snapshot") {
118-
val p = (sink.path / "graphs" / sink.format / PathComponent[GraphLike[M]].path(self) / "initial_snapshot" / tpe.entityPath).toString
120+
val p = (sink.path / "graphs" / sink.format / PathComponent[GraphDef[M]].path(spec) / "initial_snapshot" / tpe.entityPath).toString
119121
log.info(s"$tpe: Writing snapshot")
120122
snapshot.write(DataFrameSink(p, sink.format, opts, SaveMode.Ignore))
121123
log.info(s"$tpe: Writing snapshot completed")
@@ -132,7 +134,7 @@ object graphs {
132134
for { (operation, (batches, sizeFactor)) <- operations } {
133135
batches.foreach { case Batched(entity, partitionKeys, orderingKeys) =>
134136
SparkUI.job(getClass.getSimpleName, s"write $tpe $operation") {
135-
val p = (sink.path / "graphs" / sink.format / PathComponent[GraphLike[M]].path(self) / operation / tpe.entityPath).toString
137+
val p = (sink.path / "graphs" / sink.format / PathComponent[GraphDef[M]].path(spec) / operation / tpe.entityPath).toString
136138
val numPartitions = Math.max(1.0, entity.rdd.getNumPartitions * sizeFactor).toInt
137139
log.info(f"$tpe: Writing $operation")
138140
entity
@@ -160,13 +162,13 @@ object graphs {
160162

161163
override def read(self: GraphSource[M]): Graph[M] = {
162164
val entities = for { (entity, schema) <- self.definition.entities } yield {
163-
val p = (self.path / "graphs" / self.format / PathComponent[GraphLike[M]].path(self.definition) / entity.entityPath).toString()
165+
val p = (self.path / "graphs" / self.format / PathComponent[GraphDef[M]].path(self.definition) / entity.entityPath).toString()
164166
log.info(s"Reading $entity")
165167
val opts = getFormatOptions(self.format, self.definition.mode)
166168
val df = DataFrameSource(p, self.format, opts, schema.map(StructType.fromDDL)).read
167169
entity -> ev(df)
168170
}
169-
Graph[M](self.definition.isAttrExploded, self.definition.isEdgesExploded, self.definition.mode, entities)
171+
Graph[M](self.definition, entities)
170172
}
171173

172174
override def exists(self: GraphSource[M]): Boolean = utils.fileExists(self.path)

src/main/scala/ldbc/snb/datagen/model/graphs.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ object graphs {
1212
val graphDef = GraphDef(
1313
isAttrExploded = false,
1414
isEdgesExploded = false,
15+
useTimestamp = false,
1516
Mode.Raw,
1617
UntypedEntities[RawEntity].value.map { case (k, v) => (k, Some(v.toDDL)) }
1718
)

src/main/scala/ldbc/snb/datagen/model/package.scala

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -149,25 +149,18 @@ package object model {
149149
}
150150
}
151151

152-
trait GraphLike[+M <: Mode] {
153-
def isAttrExploded: Boolean
154-
def isEdgesExploded: Boolean
155-
def mode: M
156-
}
157-
158152
case class Graph[+M <: Mode](
159-
isAttrExploded: Boolean,
160-
isEdgesExploded: Boolean,
161-
mode: M,
153+
definition: GraphDef[M],
162154
entities: Map[EntityType, M#Layout]
163-
) extends GraphLike[M]
155+
)
164156

165-
case class GraphDef[M <: Mode](
157+
case class GraphDef[+M <: Mode](
166158
isAttrExploded: Boolean,
167159
isEdgesExploded: Boolean,
160+
useTimestamp: Boolean,
168161
mode: M,
169162
entities: Map[EntityType, Option[String]]
170-
) extends GraphLike[M]
163+
)
171164

172165
sealed trait BatchPeriod
173166
object BatchPeriod {

src/main/scala/ldbc/snb/datagen/transformation/transform/ConvertDates.scala

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
package ldbc.snb.datagen.transformation.transform
22

3-
import ldbc.snb.datagen.model.{Batched, BatchedEntity, EntityType, Mode}
3+
import ldbc.snb.datagen.model.{Batched, BatchedEntity, EntityType, Graph, Mode}
44
import ldbc.snb.datagen.util.sql.qcol
55
import org.apache.spark.sql.DataFrame
66
import org.apache.spark.sql.functions.lit
77
import org.apache.spark.sql.types.{DateType, TimestampType}
88
import shapeless._
99

1010
trait ConvertDates[M <: Mode] extends Transform[M, M] {
11+
def convertDatesInEntities(entities: Map[EntityType, M#Layout]): Map[EntityType, (Option[String], M#Layout)]
12+
1113
def convertDates(tpe: EntityType, df: DataFrame): DataFrame = {
1214
tpe match {
1315
case tpe if !tpe.isStatic =>
@@ -19,33 +21,51 @@ trait ConvertDates[M <: Mode] extends Transform[M, M] {
1921
case _ => df
2022
}
2123
}
24+
25+
override def transform(input: Graph[M]): Graph[M] = {
26+
if (input.definition.useTimestamp) {
27+
throw new AssertionError("Already using timestamp for dates")
28+
}
29+
30+
val updatedEntities = convertDatesInEntities(input.entities)
31+
32+
val modifiedEntities = updatedEntities
33+
.map { case (k, (_, data)) => k -> data }
34+
35+
val modifiedEntityDefinitions = updatedEntities
36+
.map { case (k, (schema, _)) => k -> schema }
37+
38+
val l = lens[In]
39+
(l.definition.useTimestamp ~ l.definition.entities ~ l.entities).set(input)((true, modifiedEntityDefinitions, modifiedEntities))
40+
}
2241
}
2342

2443
object ConvertDates {
2544
def apply[T <: Mode : ConvertDates] = implicitly[ConvertDates[T]]
2645

2746
object instances {
2847
implicit def batchedConvertDates[M <: Mode](implicit ev: BatchedEntity =:= M#Layout) = new ConvertDates[M] {
29-
override def transform(input: In): Out = {
30-
lens[In].entities.modify(input)(
31-
_.map { case (tpe, layout) => tpe -> {
32-
val be = layout.asInstanceOf[BatchedEntity]
33-
ev(BatchedEntity(
34-
convertDates(tpe, be.snapshot),
35-
be.insertBatches.map(b => Batched(convertDates(tpe, b.entity), b.batchId, b.ordering)),
36-
be.deleteBatches.map(b => Batched(convertDates(tpe, b.entity), b.batchId, b.ordering))
37-
))
38-
}
39-
}
40-
)
48+
override def convertDatesInEntities(entities: Map[EntityType,M#Layout]): Map[EntityType, (Option[String], M#Layout)] = {
49+
entities.map { case (tpe, layout) =>
50+
val be = layout.asInstanceOf[BatchedEntity]
51+
val convertedSnapshot = convertDates(tpe, be.snapshot)
52+
53+
val convertedInserts = be.insertBatches.map(b => Batched(convertDates(tpe, b.entity), b.batchId, b.ordering))
54+
val convertedDeletes = be.deleteBatches.map(b => Batched(convertDates(tpe, b.entity), b.batchId, b.ordering))
55+
(tpe, (
56+
Some(convertedSnapshot.schema.toDDL),
57+
ev(BatchedEntity(convertedSnapshot, convertedInserts, convertedDeletes))
58+
))
59+
}
4160
}
4261
}
4362

4463
implicit def simpleConvertDates[M <: Mode](implicit ev: DataFrame =:= M#Layout) = new ConvertDates[M] {
45-
override def transform(input: In): Out = {
46-
lens[In].entities.modify(input)(
47-
_.map { case (tpe, v) => tpe -> ev(convertDates(tpe, v.asInstanceOf[DataFrame])) }
48-
)
64+
override def convertDatesInEntities(entities: Map[EntityType, M#Layout]): Map[EntityType, (Option[String], M#Layout)] = {
65+
entities.map { case (tpe, v) =>
66+
val convertedSnapshot = convertDates(tpe, v.asInstanceOf[DataFrame])
67+
(tpe, (Some(convertedSnapshot.schema.toDDL), ev(convertedSnapshot)))
68+
}
4969
}
5070
}
5171
}

src/main/scala/ldbc/snb/datagen/transformation/transform/ExplodeAttrs.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,32 @@ import shapeless.lens
1010

1111
object ExplodeAttrs extends Transform[Mode.Raw.type, Mode.Raw.type] {
1212
override def transform(input: In): Out = {
13+
if (input.definition.isAttrExploded) {
14+
throw new AssertionError("Attributes already exploded in the input graph")
15+
}
1316

1417
def explodedAttr(attr: Attr, node: DataFrame, column: Column) =
1518
attr -> node.select(withRawColumns(attr, $"id".as(s"${attr.parent}Id"), explode(split(column, ";")).as(s"${attr.attribute}Id")))
1619

17-
val updatedEntities = input.entities
20+
val modifiedEntities = input.entities
1821
.collect { case (k @ Node("Person", false), v) =>
1922
Map(
2023
explodedAttr(Attr("Email", k, "EmailAddress"), v, $"email"),
2124
explodedAttr(Attr("Speaks", k, "Language"), v, $"language"),
2225
k -> v.drop("email", "language")
2326
)
2427
}
28+
29+
val updatedEntities = modifiedEntities
2530
.foldLeft(input.entities)(_ ++ _)
2631

32+
val updatedEntityDefinitions = modifiedEntities
33+
.foldLeft(input.definition.entities) { (e, v) =>
34+
e ++ v.map{ case (k, v) => k -> Some(v.schema.toDDL) }
35+
}
36+
2737
val l = lens[In]
2838

29-
(l.isAttrExploded ~ l.entities).set(input)((true, updatedEntities))
39+
(l.definition.isAttrExploded ~ l.definition.entities ~ l.entities).set(input)((true, updatedEntityDefinitions, updatedEntities))
3040
}
3141
}

src/main/scala/ldbc/snb/datagen/transformation/transform/ExplodeEdges.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ import shapeless.lens
1212

1313
object ExplodeEdges extends Transform[Mode.Raw.type, Mode.Raw.type] {
1414
override def transform(input: In): Out = {
15+
if (input.definition.isEdgesExploded) {
16+
throw new AssertionError("Edges already exploded in the input graph")
17+
}
1518
val entities = input.entities
1619

1720
def explodedEdge(edge: Edge, node: DataFrame, column: Column) = {
@@ -24,7 +27,7 @@ object ExplodeEdges extends Transform[Mode.Raw.type, Mode.Raw.type] {
2427
}
2528
}
2629

27-
val updatedEntities = entities
30+
val modifiedEntities = entities
2831
.collect {
2932
case (k @ OrganisationType, v) =>
3033
Map(
@@ -74,9 +77,16 @@ object ExplodeEdges extends Transform[Mode.Raw.type, Mode.Raw.type] {
7477
k -> v.drop("CreatorPersonId", "LocationCountryId", "ContainerForumId")
7578
)
7679
}
80+
81+
val updatedEntities = modifiedEntities
7782
.foldLeft(entities)(_ ++ _)
7883

84+
val updatedEntityDefinitions = modifiedEntities
85+
.foldLeft(input.definition.entities) { (e, v) =>
86+
e ++ v.map{ case (k, v) => k -> Some(v.schema.toDDL) }
87+
}
88+
7989
val l = lens[In]
80-
(l.isEdgesExploded ~ l.entities).set(input)((true, updatedEntities))
90+
(l.definition.isEdgesExploded ~ l.definition.entities ~ l.entities).set(input)((true, updatedEntityDefinitions, updatedEntities))
8191
}
8292
}

src/main/scala/ldbc/snb/datagen/transformation/transform/RawToBiTransform.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ case class RawToBiTransform(mode: BI, simulationStart: Long, simulationEnd: Long
2323
case "day" => "yyyy-MM-dd"
2424
case "hour" => "yyyy-MM-dd'T'hh"
2525
case "minute" => "yyyy-MM-dd'T'hh:mm"
26-
case _ => throw new IllegalStateException("Unrecognized partition key")
26+
case _ => throw new IllegalArgumentException("Unrecognized partition key")
2727
}
2828

2929
private def notDerived(entityType: EntityType): Boolean = entityType match {
@@ -79,6 +79,16 @@ case class RawToBiTransform(mode: BI, simulationStart: Long, simulationEnd: Long
7979
None
8080
)
8181
}
82-
Graph[Mode.BI](isAttrExploded = input.isAttrExploded, isEdgesExploded = input.isEdgesExploded, mode, entities)
82+
83+
Graph[Mode.BI](
84+
GraphDef[Mode.BI](
85+
isAttrExploded = input.definition.isAttrExploded,
86+
isEdgesExploded = input.definition.isEdgesExploded,
87+
useTimestamp = input.definition.useTimestamp,
88+
mode = mode,
89+
entities = input.definition.entities
90+
),
91+
entities
92+
)
8393
}
8494
}

src/main/scala/ldbc/snb/datagen/transformation/transform/RawToInteractiveTransform.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package ldbc.snb.datagen.transformation.transform
33
import ldbc.snb.datagen.model.Cardinality._
44
import ldbc.snb.datagen.model.EntityType._
55
import ldbc.snb.datagen.model.raw._
6-
import ldbc.snb.datagen.model.{EntityType, Graph, Mode}
6+
import ldbc.snb.datagen.model.{EntityType, Graph, GraphDef, Mode}
77
import ldbc.snb.datagen.syntax._
88
import ldbc.snb.datagen.util.Logging
99
import ldbc.snb.datagen.util.sql._
@@ -22,8 +22,17 @@ case class RawToInteractiveTransform(mode: Mode.Interactive, simulationStart: Lo
2222
.map { case (tpe, v) =>
2323
tpe -> RawToInteractiveTransform.snapshotPart(tpe, v, bulkLoadThreshold, filterDeletion = true)
2424
}
25-
Graph[Mode.Interactive](isAttrExploded = input.isAttrExploded, isEdgesExploded = input.isEdgesExploded, mode, entities)
26-
}
25+
Graph[Mode.Interactive](
26+
GraphDef[Mode.Interactive](
27+
isAttrExploded = input.definition.isAttrExploded,
28+
isEdgesExploded = input.definition.isEdgesExploded,
29+
useTimestamp = input.definition.useTimestamp,
30+
mode = mode,
31+
entities = input.definition.entities
32+
),
33+
entities
34+
)
35+
}
2736
}
2837

2938
object RawToInteractiveTransform {

0 commit comments

Comments
 (0)