Skip to content

Commit 32ac282

Browse files
committed
option to use epochMillis for datetime types
1 parent 95be89f commit 32ac282

File tree

6 files changed

+61
-21
lines changed

6 files changed

+61
-21
lines changed

src/main/scala/ldbc/snb/datagen/LdbcDatagen.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ object LdbcDatagen extends SparkApp {
2828
format: String = "csv",
2929
generateFactors: Boolean = false,
3030
formatOptions: Map[String, String] = Map.empty,
31-
oversizeFactor: Option[Double] = None
31+
oversizeFactor: Option[Double] = None,
32+
epochMillis: Boolean = false
3233
)
3334

3435
override type ArgsType = Args
@@ -118,6 +119,10 @@ object LdbcDatagen extends SparkApp {
118119
.text("Generate factor tables")
119120

120121
help('h', "help").text("prints this usage text")
122+
123+
opt[Unit]("epoch-millis")
124+
.action((x, c) => args.epochMillis.set(c)(true))
125+
.text("Use longs with millis since Unix epoch instead of native dates")
121126
}
122127

123128
val parsedArgs = parser.parse(args, Args()).getOrElse(throw new RuntimeException("Invalid arguments"))
@@ -159,7 +164,8 @@ object LdbcDatagen extends SparkApp {
159164
},
160165
irFormat,
161166
args.format,
162-
args.formatOptions
167+
args.formatOptions,
168+
args.epochMillis
163169
)
164170

165171
TransformationStage.run(transformArgs)

src/main/scala/ldbc/snb/datagen/factors/FactorGenerationStage.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import ldbc.snb.datagen.factors.io.FactorTableSink
44
import ldbc.snb.datagen.io.graphs.GraphSource
55
import ldbc.snb.datagen.model
66
import ldbc.snb.datagen.model.EntityType
7+
import ldbc.snb.datagen.model.Mode.Raw
78
import ldbc.snb.datagen.syntax._
89
import ldbc.snb.datagen.transformation.transform.ConvertDates
910
import ldbc.snb.datagen.util.{DatagenStage, Logging}
@@ -74,9 +75,10 @@ object FactorGenerationStage extends DatagenStage with Logging {
7475
import ldbc.snb.datagen.io.Reader.ops._
7576
import ldbc.snb.datagen.io.Writer.ops._
7677
import ldbc.snb.datagen.io.instances._
78+
import ldbc.snb.datagen.transformation.transform.ConvertDates.instances._
7779

7880
GraphSource(model.graphs.Raw.graphDef, args.outputDir, args.irFormat).read
79-
.pipe(ConvertDates.transform)
81+
.pipe(ConvertDates[Raw.type].transform)
8082
.pipe(g =>
8183
rawFactors
8284
.collect {

src/main/scala/ldbc/snb/datagen/transformation/TransformationStage.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import ldbc.snb.datagen.model.{BatchedEntity, Graph, Mode}
66
import ldbc.snb.datagen.syntax._
77
import ldbc.snb.datagen.transformation.transform._
88
import ldbc.snb.datagen.util.{DatagenStage, Logging}
9-
import org.apache.spark.sql.{DataFrame, SparkSession}
9+
import org.apache.spark.sql.DataFrame
1010
import shapeless._
1111

1212
object TransformationStage extends DatagenStage with Logging {
@@ -20,14 +20,16 @@ object TransformationStage extends DatagenStage with Logging {
2020
mode: Mode = Mode.Raw,
2121
irFormat: String = "parquet",
2222
format: String = "csv",
23-
formatOptions: Map[String, String] = Map.empty
23+
formatOptions: Map[String, String] = Map.empty,
24+
epochMillis: Boolean = false
2425
)
2526

2627
override type ArgsType = Args
2728

2829
import ldbc.snb.datagen.io.Reader.ops._
2930
import ldbc.snb.datagen.io.Writer.ops._
3031
import ldbc.snb.datagen.io.instances._
32+
import ldbc.snb.datagen.transformation.transform.ConvertDates.instances._
3133

3234
def run(args: ArgsType) = {
3335
object write extends Poly1 {
@@ -38,12 +40,19 @@ object TransformationStage extends DatagenStage with Logging {
3840
at[Graph[M]](g => g.write(GraphSink(args.outputDir, args.format, args.formatOptions)))
3941
}
4042

43+
object convertDates extends Poly1 {
44+
implicit def caseSimple[M <: Mode](implicit ev: DataFrame =:= M#Layout) =
45+
at[Graph[M]](g => ConvertDates[M].transform(g))
46+
47+
implicit def caseBatched[M <: Mode](implicit ev: BatchedEntity =:= M#Layout) =
48+
at[Graph[M]](g => ConvertDates[M].transform(g))
49+
}
50+
4151
type Out = Graph[Mode.Raw.type] :+: Graph[Mode.Interactive] :+: Graph[Mode.BI] :+: CNil
4252

4353
GraphSource(model.graphs.Raw.graphDef, args.outputDir, args.irFormat).read
4454
.pipeFoldLeft(args.explodeAttrs.fork)((graph, _: Unit) => ExplodeAttrs.transform(graph))
4555
.pipeFoldLeft(args.explodeEdges.fork)((graph, _: Unit) => ExplodeEdges.transform(graph))
46-
.pipe(ConvertDates.transform)
4756
.pipe[Out] { g =>
4857
args.mode match {
4958
case bi @ Mode.BI(_, _) => Coproduct[Out](RawToBiTransform(bi, args.simulationStart, args.simulationEnd, args.keepImplicitDeletes).transform(g))
@@ -52,6 +61,7 @@ object TransformationStage extends DatagenStage with Logging {
5261
case Mode.Raw => Coproduct[Out](g)
5362
}
5463
}
64+
.pipeFoldLeft((!args.epochMillis).fork)((graph, _: Unit) => graph.map(convertDates))
5565
.map(write)
5666
()
5767
}
Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,52 @@
11
package ldbc.snb.datagen.transformation.transform
22

3-
import ldbc.snb.datagen.model.{EntityType, Mode}
3+
import ldbc.snb.datagen.model.{Batched, BatchedEntity, EntityType, Mode}
44
import ldbc.snb.datagen.util.sql.qcol
55
import org.apache.spark.sql.DataFrame
66
import org.apache.spark.sql.functions.lit
77
import org.apache.spark.sql.types.{DateType, TimestampType}
88
import shapeless._
99

10-
object ConvertDates extends Transform[Mode.Raw.type, Mode.Raw.type] {
11-
10+
trait ConvertDates[M <: Mode] extends Transform[M, M] {
1211
def convertDates(tpe: EntityType, df: DataFrame): DataFrame = {
1312
tpe match {
1413
case tpe if !tpe.isStatic =>
1514
df.select(df.columns.map {
16-
case col @ ("creationDate" | "deletionDate") => (qcol(col) / lit(1000L)).cast(TimestampType).as(col)
17-
case col @ "birthday" => (qcol(col) / lit(1000L)).cast(TimestampType).cast(DateType).as(col)
18-
case col => qcol(col)
15+
case col@("creationDate" | "deletionDate") => (qcol(col) / lit(1000L)).cast(TimestampType).as(col)
16+
case col@"birthday" => (qcol(col) / lit(1000L)).cast(TimestampType).cast(DateType).as(col)
17+
case col => qcol(col)
1918
}: _*)
2019
case _ => df
2120
}
2221
}
22+
}
23+
24+
object ConvertDates {
25+
def apply[T <: Mode : ConvertDates] = implicitly[ConvertDates[T]]
2326

24-
override def transform(input: In): Out = {
25-
lens[In].entities.modify(input)(
26-
_.map { case (tpe, v) => tpe -> convertDates(tpe, v) }
27-
)
27+
object instances {
28+
implicit def batchedConvertDates[M <: Mode](implicit ev: BatchedEntity =:= M#Layout) = new ConvertDates[M] {
29+
override def transform(input: In): Out = {
30+
lens[In].entities.modify(input)(
31+
_.map { case (tpe, layout) => tpe -> {
32+
val be = layout.asInstanceOf[BatchedEntity]
33+
ev(BatchedEntity(
34+
convertDates(tpe, be.snapshot),
35+
be.insertBatches.map(b => Batched(convertDates(tpe, b.entity), b.batchId, b.ordering)),
36+
be.deleteBatches.map(b => Batched(convertDates(tpe, b.entity), b.batchId, b.ordering))
37+
))
38+
}
39+
}
40+
)
41+
}
42+
}
43+
44+
implicit def simpleConvertDates[M <: Mode](implicit ev: DataFrame =:= M#Layout) = new ConvertDates[M] {
45+
override def transform(input: In): Out = {
46+
lens[In].entities.modify(input)(
47+
_.map { case (tpe, v) => tpe -> ev(convertDates(tpe, v.asInstanceOf[DataFrame])) }
48+
)
49+
}
50+
}
2851
}
2952
}

src/main/scala/ldbc/snb/datagen/transformation/transform/RawToBiTransform.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,10 @@ case class RawToBiTransform(mode: BI, simulationStart: Long, simulationEnd: Long
2525
}
2626

2727
override def transform(input: In): Out = {
28-
val batch_id = (col: Column) => date_format(date_trunc(mode.batchPeriod, col), batchPeriodFormat(mode.batchPeriod))
28+
val batch_id = (col: Column) => date_format(date_trunc(mode.batchPeriod, to_timestamp(col / lit(1000L))), batchPeriodFormat(mode.batchPeriod))
2929

3030
def inBatch(col: Column, batchStart: Long, batchEnd: Long) =
31-
col >= to_timestamp(lit(batchStart / 1000)) &&
32-
col < to_timestamp(lit(batchEnd / 1000))
31+
col >= lit(batchStart) && col < lit(batchEnd)
3332

3433
val batched = (df: DataFrame) =>
3534
df

src/main/scala/ldbc/snb/datagen/transformation/transform/RawToInteractiveTransform.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ object RawToInteractiveTransform {
4545
val filterBulkLoad = (ds: DataFrame) =>
4646
ds
4747
.filter(
48-
$"creationDate" < to_timestamp(lit(bulkLoadThreshold / 1000)) &&
49-
(!lit(filterDeletion) || $"deletionDate" >= to_timestamp(lit(bulkLoadThreshold / 1000)))
48+
$"creationDate" < lit(bulkLoadThreshold) &&
49+
(!lit(filterDeletion) || $"deletionDate" >= lit(bulkLoadThreshold))
5050
)
5151

5252
tpe match {

0 commit comments

Comments
 (0)