Skip to content

Commit d3147da

Browse files
committed
fix FactorGenerationStage
1 parent 638b24d commit d3147da

File tree

2 files changed

+5
-3
lines changed

2 files changed

+5
-3
lines changed

src/main/scala/ldbc/snb/datagen/LdbcDatagen.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ object LdbcDatagen extends SparkApp {
145145
GenerationStage.run(generatorArgs, generatorConfig)
146146

147147
if (args.generateFactors) {
148-
val factorArgs = FactorGenerationStage.Args(outputDir = args.outputDir)
148+
val factorArgs = FactorGenerationStage.Args(outputDir = args.outputDir, irFormat = irFormat)
149149
FactorGenerationStage.run(factorArgs)
150150
}
151151

src/main/scala/ldbc/snb/datagen/factors/FactorGenerationStage.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import ldbc.snb.datagen.io.graphs.GraphSource
55
import ldbc.snb.datagen.model
66
import ldbc.snb.datagen.model.EntityType
77
import ldbc.snb.datagen.syntax._
8+
import ldbc.snb.datagen.transformation.transform.ConvertDates
89
import ldbc.snb.datagen.util.{DatagenStage, Logging}
910
import org.apache.spark.sql.functions.{broadcast, count, date_trunc, sum}
1011
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
@@ -15,15 +16,16 @@ case class Factor(requiredEntities: EntityType*)(f: Seq[DataFrame] => DataFrame)
1516

1617
object FactorGenerationStage extends DatagenStage with Logging {
1718

18-
case class Args(outputDir: String = "out")
19+
case class Args(outputDir: String = "out", irFormat: String = "parquet")
1920

2021
def run(args: Args)(implicit spark: SparkSession): Unit = {
2122
import ldbc.snb.datagen.factors.io.instances._
2223
import ldbc.snb.datagen.io.Reader.ops._
2324
import ldbc.snb.datagen.io.Writer.ops._
2425
import ldbc.snb.datagen.io.instances._
2526

26-
GraphSource(model.graphs.Raw.graphDef, args.outputDir, "csv").read
27+
GraphSource(model.graphs.Raw.graphDef, args.outputDir, args.irFormat).read
28+
.pipe(ConvertDates.transform)
2729
.pipe(g =>
2830
rawFactors.map { case (name, calc) =>
2931
val resolvedEntities = calc.requiredEntities.foldLeft(Seq.empty[DataFrame])((args, et) => args :+ g.entities(et))

0 commit comments

Comments
 (0)