Skip to content

Commit 95f3830

Browse files
committed
add env switch
1 parent 15accb6 commit 95f3830

File tree

3 files changed

+24
-9
lines changed

3 files changed

+24
-9
lines changed

src/main/scala/ldbc/snb/datagen/LdbcDatagen.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,18 @@ object LdbcDatagen extends SparkApp {
123123

124124
def run(args: Args): Unit = {
125125

126+
val irFormat = {
127+
val _f = System.getenv("LDBC_DATAGEN_IR_FORMAT")
128+
if (_f == null || _f == "") "parquet" else _f
129+
}
130+
126131
val generatorArgs = GenerationStage.Args(
127132
scaleFactor = args.scaleFactor,
128133
params = args.params,
129134
paramFile = args.paramFile,
130135
outputDir = args.outputDir,
131136
numThreads = args.numThreads,
137+
format = irFormat,
132138
oversizeFactor = args.oversizeFactor
133139
)
134140

@@ -155,6 +161,7 @@ object LdbcDatagen extends SparkApp {
155161
case "interactive" => Mode.Interactive(bulkLoadPortion = args.bulkloadPortion)
156162
case "raw" => Mode.Raw
157163
},
164+
irFormat,
158165
args.format,
159166
args.formatOptions
160167
)

src/main/scala/ldbc/snb/datagen/generator/GenerationStage.scala

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,14 @@ object GenerationStage extends DatagenStage with Logging {
1313
val optimalPersonsPerFile = 500000
1414

1515
case class Args(
16-
scaleFactor: String = "1",
17-
numThreads: Option[Int] = None,
18-
params: Map[String, String] = Map.empty,
19-
paramFile: Option[String] = None,
20-
outputDir: String = "out",
21-
oversizeFactor: Option[Double] = None
22-
)
16+
scaleFactor: String = "1",
17+
numThreads: Option[Int] = None,
18+
params: Map[String, String] = Map.empty,
19+
paramFile: Option[String] = None,
20+
outputDir: String = "out",
21+
format: String = "parquet",
22+
oversizeFactor: Option[Double] = None
23+
)
2324

2425
def run(args: Args, config: GeneratorConfiguration)(implicit spark: SparkSession) = {
2526
val numPartitions = config.getInt("hadoop.numThreads", spark.sparkContext.defaultParallelism)
@@ -44,9 +45,15 @@ object GenerationStage extends DatagenStage with Logging {
4445

4546
val merged = SparkKnowsMerger(uniKnows, interestKnows, randomKnows).cache()
4647

48+
val format = args.format match {
49+
case "csv" => Csv
50+
case "parquet" => Parquet
51+
case a => throw new IllegalArgumentException(s"Format `${a}` is not supported by the generator.")
52+
}
53+
4754
SparkUI.job(simpleNameOf[RawSerializer], "serialize persons") {
4855
val rawSerializer = new RawSerializer(randomRanker)
49-
rawSerializer.write(merged, RawSink(Parquet, Some(numPartitions), config, oversizeFactor))
56+
rawSerializer.write(merged, RawSink(format, Some(numPartitions), config, oversizeFactor))
5057
}
5158
}
5259

src/main/scala/ldbc/snb/datagen/transformation/TransformationStage.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ object TransformationStage extends DatagenStage with Logging {
1818
simulationStart: Long = 0,
1919
simulationEnd: Long = 0,
2020
mode: Mode = Mode.Raw,
21+
irFormat: String = "parquet",
2122
format: String = "csv",
2223
formatOptions: Map[String, String] = Map.empty
2324
)
@@ -41,7 +42,7 @@ object TransformationStage extends DatagenStage with Logging {
4142
Graph[Mode.BI] :+:
4243
CNil
4344

44-
GraphSource(model.graphs.Raw.graphDef, args.outputDir, "parquet").read
45+
GraphSource(model.graphs.Raw.graphDef, args.outputDir, args.irFormat).read
4546
.pipeFoldLeft(args.explodeAttrs.fork)((graph, _: Unit) => ExplodeAttrs.transform(graph))
4647
.pipeFoldLeft(args.explodeEdges.fork)((graph, _: Unit) => ExplodeEdges.transform(graph))
4748
.pipe(ConvertDates.transform)

0 commit comments

Comments
 (0)