Skip to content

Commit ad99962

Browse files
authored
Merge pull request #401 from ldbc/dates-as-epoch-milli
Option to use epochMillis for datetime types
2 parents 95be89f + ca53806 commit ad99962

File tree

8 files changed

+72
-21
lines changed

8 files changed

+72
-21
lines changed

.circleci/config.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,11 @@ jobs:
9797
command: |
9898
tools/docker-run.sh --mode bi --scale-factor 0.003 --explode-edges --format-options header=false,quoteAll=true,compression=gzip
9999
mv out/ social-network-sf0.003-bi-composite-projected-fk-neo4j-compressed/
100+
- run:
101+
name: Generate SF0.003 / BI / compressed composite-projected CSVs for Neo4j with epoch milli timestamps
102+
command: |
103+
tools/docker-run.sh --mode bi --scale-factor 0.003 --explode-edges --epoch-millis --format-options header=false,quoteAll=true,compression=gzip
104+
mv out/ social-network-sf0.003-bi-composite-projected-fk-neo4j-compressed-epoch-millis/
100105
# Interactive
101106
- run:
102107
name: Generate SF0.003 / Interactive / singular-projected CSVs

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,12 @@ To get a complete list of the arguments, pass `--help` to the JAR file:
136136
./tools/run.py ./target/ldbc_snb_datagen_${PLATFORM_VERSION}-${DATAGEN_VERSION}.jar -- --format parquet --scale-factor 0.003 --mode bi
137137
```
138138

139+
* Use epoch milliseconds encoded as longs (née `LongDateFormatter`) for serializing date and datetime values:
140+
141+
```bash
142+
./tools/run.py ./target/ldbc_snb_datagen_${PLATFORM_VERSION}-${DATAGEN_VERSION}.jar -- --format csv --scale-factor 0.003 --mode bi --epoch-millis
143+
```
144+
139145
* For the `interactive` and `bi` formats, the `--format-options` argument allows passing formatting options such as timestamp/date formats, the presence/abscence of headers (see the [Spark formatting options](https://spark.apache.org/docs/2.4.8/api/scala/index.html#org.apache.spark.sql.DataFrameWriter) for details), and whether quoting the fields in the CSV required:
140146

141147
```bash

src/main/scala/ldbc/snb/datagen/LdbcDatagen.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ object LdbcDatagen extends SparkApp {
2828
format: String = "csv",
2929
generateFactors: Boolean = false,
3030
formatOptions: Map[String, String] = Map.empty,
31-
oversizeFactor: Option[Double] = None
31+
oversizeFactor: Option[Double] = None,
32+
epochMillis: Boolean = false
3233
)
3334

3435
override type ArgsType = Args
@@ -118,6 +119,10 @@ object LdbcDatagen extends SparkApp {
118119
.text("Generate factor tables")
119120

120121
help('h', "help").text("prints this usage text")
122+
123+
opt[Unit]("epoch-millis")
124+
.action((x, c) => args.epochMillis.set(c)(true))
125+
.text("Use longs with millis since Unix epoch instead of native dates")
121126
}
122127

123128
val parsedArgs = parser.parse(args, Args()).getOrElse(throw new RuntimeException("Invalid arguments"))
@@ -159,7 +164,8 @@ object LdbcDatagen extends SparkApp {
159164
},
160165
irFormat,
161166
args.format,
162-
args.formatOptions
167+
args.formatOptions,
168+
args.epochMillis
163169
)
164170

165171
TransformationStage.run(transformArgs)

src/main/scala/ldbc/snb/datagen/factors/FactorGenerationStage.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import ldbc.snb.datagen.factors.io.FactorTableSink
44
import ldbc.snb.datagen.io.graphs.GraphSource
55
import ldbc.snb.datagen.model
66
import ldbc.snb.datagen.model.EntityType
7+
import ldbc.snb.datagen.model.Mode.Raw
78
import ldbc.snb.datagen.syntax._
89
import ldbc.snb.datagen.transformation.transform.ConvertDates
910
import ldbc.snb.datagen.util.{DatagenStage, Logging}
@@ -74,9 +75,10 @@ object FactorGenerationStage extends DatagenStage with Logging {
7475
import ldbc.snb.datagen.io.Reader.ops._
7576
import ldbc.snb.datagen.io.Writer.ops._
7677
import ldbc.snb.datagen.io.instances._
78+
import ldbc.snb.datagen.transformation.transform.ConvertDates.instances._
7779

7880
GraphSource(model.graphs.Raw.graphDef, args.outputDir, args.irFormat).read
79-
.pipe(ConvertDates.transform)
81+
.pipe(ConvertDates[Raw.type].transform)
8082
.pipe(g =>
8183
rawFactors
8284
.collect {

src/main/scala/ldbc/snb/datagen/transformation/TransformationStage.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import ldbc.snb.datagen.model.{BatchedEntity, Graph, Mode}
66
import ldbc.snb.datagen.syntax._
77
import ldbc.snb.datagen.transformation.transform._
88
import ldbc.snb.datagen.util.{DatagenStage, Logging}
9-
import org.apache.spark.sql.{DataFrame, SparkSession}
9+
import org.apache.spark.sql.DataFrame
1010
import shapeless._
1111

1212
object TransformationStage extends DatagenStage with Logging {
@@ -20,14 +20,16 @@ object TransformationStage extends DatagenStage with Logging {
2020
mode: Mode = Mode.Raw,
2121
irFormat: String = "parquet",
2222
format: String = "csv",
23-
formatOptions: Map[String, String] = Map.empty
23+
formatOptions: Map[String, String] = Map.empty,
24+
epochMillis: Boolean = false
2425
)
2526

2627
override type ArgsType = Args
2728

2829
import ldbc.snb.datagen.io.Reader.ops._
2930
import ldbc.snb.datagen.io.Writer.ops._
3031
import ldbc.snb.datagen.io.instances._
32+
import ldbc.snb.datagen.transformation.transform.ConvertDates.instances._
3133

3234
def run(args: ArgsType) = {
3335
object write extends Poly1 {
@@ -38,12 +40,19 @@ object TransformationStage extends DatagenStage with Logging {
3840
at[Graph[M]](g => g.write(GraphSink(args.outputDir, args.format, args.formatOptions)))
3941
}
4042

43+
object convertDates extends Poly1 {
44+
implicit def caseSimple[M <: Mode](implicit ev: DataFrame =:= M#Layout) =
45+
at[Graph[M]](g => ConvertDates[M].transform(g))
46+
47+
implicit def caseBatched[M <: Mode](implicit ev: BatchedEntity =:= M#Layout) =
48+
at[Graph[M]](g => ConvertDates[M].transform(g))
49+
}
50+
4151
type Out = Graph[Mode.Raw.type] :+: Graph[Mode.Interactive] :+: Graph[Mode.BI] :+: CNil
4252

4353
GraphSource(model.graphs.Raw.graphDef, args.outputDir, args.irFormat).read
4454
.pipeFoldLeft(args.explodeAttrs.fork)((graph, _: Unit) => ExplodeAttrs.transform(graph))
4555
.pipeFoldLeft(args.explodeEdges.fork)((graph, _: Unit) => ExplodeEdges.transform(graph))
46-
.pipe(ConvertDates.transform)
4756
.pipe[Out] { g =>
4857
args.mode match {
4958
case bi @ Mode.BI(_, _) => Coproduct[Out](RawToBiTransform(bi, args.simulationStart, args.simulationEnd, args.keepImplicitDeletes).transform(g))
@@ -52,6 +61,7 @@ object TransformationStage extends DatagenStage with Logging {
5261
case Mode.Raw => Coproduct[Out](g)
5362
}
5463
}
64+
.pipeFoldLeft((!args.epochMillis).fork)((graph, _: Unit) => graph.map(convertDates))
5565
.map(write)
5666
()
5767
}
Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,52 @@
11
package ldbc.snb.datagen.transformation.transform
22

3-
import ldbc.snb.datagen.model.{EntityType, Mode}
3+
import ldbc.snb.datagen.model.{Batched, BatchedEntity, EntityType, Mode}
44
import ldbc.snb.datagen.util.sql.qcol
55
import org.apache.spark.sql.DataFrame
66
import org.apache.spark.sql.functions.lit
77
import org.apache.spark.sql.types.{DateType, TimestampType}
88
import shapeless._
99

10-
object ConvertDates extends Transform[Mode.Raw.type, Mode.Raw.type] {
11-
10+
trait ConvertDates[M <: Mode] extends Transform[M, M] {
1211
def convertDates(tpe: EntityType, df: DataFrame): DataFrame = {
1312
tpe match {
1413
case tpe if !tpe.isStatic =>
1514
df.select(df.columns.map {
16-
case col @ ("creationDate" | "deletionDate") => (qcol(col) / lit(1000L)).cast(TimestampType).as(col)
17-
case col @ "birthday" => (qcol(col) / lit(1000L)).cast(TimestampType).cast(DateType).as(col)
18-
case col => qcol(col)
15+
case col@("creationDate" | "deletionDate") => (qcol(col) / lit(1000L)).cast(TimestampType).as(col)
16+
case col@"birthday" => (qcol(col) / lit(1000L)).cast(TimestampType).cast(DateType).as(col)
17+
case col => qcol(col)
1918
}: _*)
2019
case _ => df
2120
}
2221
}
22+
}
23+
24+
object ConvertDates {
25+
def apply[T <: Mode : ConvertDates] = implicitly[ConvertDates[T]]
2326

24-
override def transform(input: In): Out = {
25-
lens[In].entities.modify(input)(
26-
_.map { case (tpe, v) => tpe -> convertDates(tpe, v) }
27-
)
27+
object instances {
28+
implicit def batchedConvertDates[M <: Mode](implicit ev: BatchedEntity =:= M#Layout) = new ConvertDates[M] {
29+
override def transform(input: In): Out = {
30+
lens[In].entities.modify(input)(
31+
_.map { case (tpe, layout) => tpe -> {
32+
val be = layout.asInstanceOf[BatchedEntity]
33+
ev(BatchedEntity(
34+
convertDates(tpe, be.snapshot),
35+
be.insertBatches.map(b => Batched(convertDates(tpe, b.entity), b.batchId, b.ordering)),
36+
be.deleteBatches.map(b => Batched(convertDates(tpe, b.entity), b.batchId, b.ordering))
37+
))
38+
}
39+
}
40+
)
41+
}
42+
}
43+
44+
implicit def simpleConvertDates[M <: Mode](implicit ev: DataFrame =:= M#Layout) = new ConvertDates[M] {
45+
override def transform(input: In): Out = {
46+
lens[In].entities.modify(input)(
47+
_.map { case (tpe, v) => tpe -> ev(convertDates(tpe, v.asInstanceOf[DataFrame])) }
48+
)
49+
}
50+
}
2851
}
2952
}

src/main/scala/ldbc/snb/datagen/transformation/transform/RawToBiTransform.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,10 @@ case class RawToBiTransform(mode: BI, simulationStart: Long, simulationEnd: Long
2525
}
2626

2727
override def transform(input: In): Out = {
28-
val batch_id = (col: Column) => date_format(date_trunc(mode.batchPeriod, col), batchPeriodFormat(mode.batchPeriod))
28+
val batch_id = (col: Column) => date_format(date_trunc(mode.batchPeriod, to_timestamp(col / lit(1000L))), batchPeriodFormat(mode.batchPeriod))
2929

3030
def inBatch(col: Column, batchStart: Long, batchEnd: Long) =
31-
col >= to_timestamp(lit(batchStart / 1000)) &&
32-
col < to_timestamp(lit(batchEnd / 1000))
31+
col >= lit(batchStart) && col < lit(batchEnd)
3332

3433
val batched = (df: DataFrame) =>
3534
df

src/main/scala/ldbc/snb/datagen/transformation/transform/RawToInteractiveTransform.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ object RawToInteractiveTransform {
4545
val filterBulkLoad = (ds: DataFrame) =>
4646
ds
4747
.filter(
48-
$"creationDate" < to_timestamp(lit(bulkLoadThreshold / 1000)) &&
49-
(!lit(filterDeletion) || $"deletionDate" >= to_timestamp(lit(bulkLoadThreshold / 1000)))
48+
$"creationDate" < lit(bulkLoadThreshold) &&
49+
(!lit(filterDeletion) || $"deletionDate" >= lit(bulkLoadThreshold))
5050
)
5151

5252
tpe match {

0 commit comments

Comments
 (0)