Skip to content

Commit 3c98665

Browse files
committed
remove unnecessary entities from deletes and add cmdline flag
1 parent 0342a8c commit 3c98665

File tree

3 files changed

+16
-4
lines changed

3 files changed

+16
-4
lines changed

src/main/scala/ldbc/snb/datagen/spark/LdbcDatagen.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ object LdbcDatagen extends SparkApp {
2020
bulkloadPortion: Double = 0.1,
2121
explodeEdges: Boolean = false,
2222
explodeAttrs: Boolean = false,
23+
keepImplicitDeletes: Boolean = false,
2324
mode: String = "raw",
2425
batchPeriod: String = "day",
2526
numThreads: Option[Int] = None,
@@ -83,6 +84,12 @@ object LdbcDatagen extends SparkApp {
8384
.action((x, c) => args.format.set(c)(x))
8485
.text("Output format. Currently, Spark Datasource formats are supported, such as 'csv', 'parquet' or 'orc'.")
8586

87+
opt[Unit]("keep-implicit-deletes")
88+
.action((x, c) => args.keepImplicitDeletes.set(c)(true))
89+
.text("Keep implicit deletes. Only applicable to BI mode. By default the BI output doesn't contain dynamic entities that" +
90+
"without the explicitlyDeleted attribute and filters dynamic entities where explicitlyDeleted is false. " +
91+
"Setting this flag retains all deletes.")
92+
8693
opt[Map[String,String]]("format-options")
8794
.action((x, c) => args.formatOptions.set(c)(x))
8895
.text("Output format options specified as key=value1[,key=value...]. See format options for specific formats " +
@@ -116,6 +123,7 @@ object LdbcDatagen extends SparkApp {
116123
outputDir = args.outputDir,
117124
explodeEdges = args.explodeEdges,
118125
explodeAttrs = args.explodeAttrs,
126+
keepImplicitDeletes = args.keepImplicitDeletes,
119127
simulationStart = Dictionaries.dates.getSimulationStart,
120128
simulationEnd = Dictionaries.dates.getSimulationEnd,
121129
mode = args.mode match {

src/main/scala/ldbc/snb/datagen/transformation/TransformationStage.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ object TransformationStage extends SparkApp with Logging {
1818
outputDir: String = "out",
1919
explodeEdges: Boolean = false,
2020
explodeAttrs: Boolean = false,
21+
keepImplicitDeletes: Boolean = false,
2122
simulationStart: Long = 0,
2223
simulationEnd: Long = 0,
2324
mode: Mode = Mode.Raw,
@@ -110,7 +111,7 @@ object TransformationStage extends SparkApp with Logging {
110111
.pipe[OutputTypes] {
111112
g =>
112113
args.mode match {
113-
case bi@Mode.BI(_, _) => Inr(Inr(Inl(RawToBiTransform(bi, args.simulationStart, args.simulationEnd).transform(g))))
114+
case bi@Mode.BI(_, _) => Inr(Inr(Inl(RawToBiTransform(bi, args.simulationStart, args.simulationEnd, args.keepImplicitDeletes).transform(g))))
114115
case interactive@Mode.Interactive(_) => Inr(Inl(RawToInteractiveTransform(interactive, args.simulationStart, args.simulationEnd).transform(g)))
115116
case Mode.Raw => Inl(g)
116117
}

src/main/scala/ldbc/snb/datagen/transformation/transform/RawToBiTransform.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import ldbc.snb.datagen.util.Logging
88
import org.apache.spark.sql.{Column, DataFrame}
99
import org.apache.spark.sql.functions._
1010

11-
case class RawToBiTransform(mode: BI, simulationStart: Long, simulationEnd: Long) extends Transform[Mode.Raw.type, Mode.BI] with Logging {
11+
case class RawToBiTransform(mode: BI, simulationStart: Long, simulationEnd: Long, keepImplicitDeletes: Boolean) extends Transform[Mode.Raw.type, Mode.BI] with Logging {
1212
log.debug(s"BI Transformation parameters: $mode")
1313

1414
val bulkLoadThreshold = Interactive.calculateBulkLoadThreshold(mode.bulkloadPortion, simulationStart, simulationEnd)
@@ -52,7 +52,7 @@ case class RawToBiTransform(mode: BI, simulationStart: Long, simulationEnd: Long
5252
val idColumns = tpe.primaryKey.map(qcol)
5353
df
5454
.filter(inBatch($"deletionDate", batchStart, batchEnd))
55-
.filter(if (df.columns.contains("explicitlyDeleted")) col("explicitlyDeleted") else lit(false))
55+
.filter(if (df.columns.contains("explicitlyDeleted")) col("explicitlyDeleted") else lit(true))
5656
.pipe(batched)
5757
.select(Seq($"delete_batch_id".as("batch_id"), $"deletionDate") ++ idColumns: _*)
5858
.repartitionByRange($"batch_id")
@@ -65,7 +65,10 @@ case class RawToBiTransform(mode: BI, simulationStart: Long, simulationEnd: Long
6565
tpe -> BatchedEntity(
6666
Interactive.snapshotPart(tpe, v, bulkLoadThreshold, filterDeletion = false),
6767
Some(Batched(insertBatchPart(tpe, v, bulkLoadThreshold, simulationEnd), Seq("batch_id"))),
68-
Some(Batched(deleteBatchPart(tpe, v, bulkLoadThreshold, simulationEnd), Seq("batch_id")))
68+
if (keepImplicitDeletes || v.columns.contains("explicitlyDeleted"))
69+
Some(Batched(deleteBatchPart(tpe, v, bulkLoadThreshold, simulationEnd), Seq("batch_id")))
70+
else
71+
None
6972
)
7073
}
7174
Graph[Mode.BI, DataFrame](isAttrExploded = input.isAttrExploded, isEdgesExploded = input.isEdgesExploded, mode, entities)

0 commit comments

Comments
 (0)