Skip to content

Commit 15accb6

Browse files
committed
refactor date conversion
1 parent 652cc55 commit 15accb6

File tree

4 files changed

+4
-9
lines changed

4 files changed

+4
-9
lines changed

src/main/scala/ldbc/snb/datagen/transformation/TransformationStage.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import ldbc.snb.datagen.io.graphs.{GraphSink, GraphSource}
44
import ldbc.snb.datagen.model
55
import ldbc.snb.datagen.model.{BatchedEntity, Graph, Mode}
66
import ldbc.snb.datagen.syntax._
7-
import ldbc.snb.datagen.transformation.transform.{ExplodeAttrs, ExplodeEdges, IrToRawTransform, RawToBiTransform, RawToInteractiveTransform}
7+
import ldbc.snb.datagen.transformation.transform.{ExplodeAttrs, ExplodeEdges, ConvertDates, RawToBiTransform, RawToInteractiveTransform}
88
import ldbc.snb.datagen.util.{DatagenStage, Logging}
99
import org.apache.spark.sql.{DataFrame, SparkSession}
1010
import shapeless._
@@ -44,11 +44,12 @@ object TransformationStage extends DatagenStage with Logging {
4444
GraphSource(model.graphs.Raw.graphDef, args.outputDir, "parquet").read
4545
.pipeFoldLeft(args.explodeAttrs.fork)((graph, _: Unit) => ExplodeAttrs.transform(graph))
4646
.pipeFoldLeft(args.explodeEdges.fork)((graph, _: Unit) => ExplodeEdges.transform(graph))
47+
.pipe(ConvertDates.transform)
4748
.pipe[OutputTypes] { g =>
4849
args.mode match {
4950
case bi @ Mode.BI(_, _) => Inr(Inr(Inl(RawToBiTransform(bi, args.simulationStart, args.simulationEnd, args.keepImplicitDeletes).transform(g))))
5051
case interactive @ Mode.Interactive(_) => Inr(Inl(RawToInteractiveTransform(interactive, args.simulationStart, args.simulationEnd).transform(g)))
51-
case Mode.Raw => Inl(IrToRawTransform.transform(g))
52+
case Mode.Raw => Inl(g)
5253
}
5354
}
5455
.map(write)

src/main/scala/ldbc/snb/datagen/transformation/transform/IrToRawTransform.scala renamed to src/main/scala/ldbc/snb/datagen/transformation/transform/ConvertDates.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import org.apache.spark.sql.functions.lit
77
import org.apache.spark.sql.types.{DateType, TimestampType}
88
import shapeless._
99

10-
object IrToRawTransform extends Transform[Mode.Raw.type, Mode.Raw.type] {
10+
object ConvertDates extends Transform[Mode.Raw.type, Mode.Raw.type] {
1111

1212
def convertDates(tpe: EntityType, df: DataFrame): DataFrame = {
1313
tpe match {

src/main/scala/ldbc/snb/datagen/transformation/transform/RawToBiTransform.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,6 @@ case class RawToBiTransform(mode: BI, simulationStart: Long, simulationEnd: Long
6363
}
6464

6565
val entities = input.entities
66-
.map { case (tpe, v) =>
67-
tpe -> IrToRawTransform.convertDates(tpe, v)
68-
}
6966
.map {
7067
case (tpe, v) if tpe.isStatic => tpe -> BatchedEntity(v, None, None)
7168
case (tpe, v) =>

src/main/scala/ldbc/snb/datagen/transformation/transform/RawToInteractiveTransform.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,6 @@ case class RawToInteractiveTransform(mode: Mode.Interactive, simulationStart: Lo
1818

1919
override def transform(input: In): Out = {
2020
val entities = input.entities
21-
.map { case (tpe, v) =>
22-
tpe -> IrToRawTransform.convertDates(tpe, v)
23-
}
2421
.map { case (tpe, v) =>
2522
tpe -> RawToInteractiveTransform.snapshotPart(tpe, v, bulkLoadThreshold, filterDeletion = true)
2623
}

0 commit comments

Comments
 (0)