Skip to content

Commit 21b4070

Browse files
authored
Merge pull request #330 from ldbc/distinguish-graphs
2 parents 8f3f3ea + 76241a3 commit 21b4070

File tree

2 files changed

+5
-5
lines changed

2 files changed

+5
-5
lines changed

src/main/java/ldbc/snb/datagen/serializer/LdbcSerializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public Map<FileName, HdfsCsvWriter> initialize(
3333
for (FileName f : fileNames) {
3434
writers.put(f, new HdfsCsvWriter(
3535
fs,
36-
outputDir + "/csv/raw/composite-merged-fk" + (dynamic ? "/dynamic/" : "/static/") + f.name + "/",
36+
outputDir + "/graphs/csv/raw/composite-merged-fk" + (dynamic ? "/dynamic/" : "/static/") + f.name + "/",
3737
String.valueOf(reducerId),
3838
(int)Math.ceil(f.size / oversizeFactor),
3939
isCompressed,

src/main/scala/ldbc/snb/datagen/io/graphs.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ object graphs {
9090
TreeMap(self.entities.toSeq: _*).foreach {
9191
case (tpe, dataset) =>
9292
SparkUI.job(getClass.getSimpleName, s"write $tpe") {
93-
val p = (sink.path / sink.format / PathComponent[GraphLike[M]].path(self) / tpe.entityPath).toString
93+
val p = (sink.path / "graphs" / sink.format / PathComponent[GraphLike[M]].path(self) / tpe.entityPath).toString
9494
log.info(s"$tpe: Writing started")
9595
val opts = getFormatOptions(sink.format, self.mode)
9696
the(dataset).write(DataFrameSink(p, sink.format, opts, SaveMode.Ignore))
@@ -113,7 +113,7 @@ object graphs {
113113
TreeMap(self.entities.mapValues(ev).toSeq: _*).foreach {
114114
case (tpe, BatchedEntity(snapshot, insertBatches, deleteBatches)) =>
115115
SparkUI.job(getClass.getSimpleName, s"write $tpe snapshot") {
116-
val p = (sink.path / sink.format / PathComponent[GraphLike[M]].path(self) / "initial_snapshot" / tpe.entityPath).toString
116+
val p = (sink.path / "graphs" / sink.format / PathComponent[GraphLike[M]].path(self) / "initial_snapshot" / tpe.entityPath).toString
117117
log.info(s"$tpe: Writing snapshot")
118118
snapshot.write(DataFrameSink(p, sink.format, opts, SaveMode.Ignore))
119119
log.info(s"$tpe: Writing snapshot completed")
@@ -123,7 +123,7 @@ object graphs {
123123
batches.foreach {
124124
case Batched(entity, partitionKeys) =>
125125
SparkUI.job(getClass.getSimpleName, s"write $tpe $operation") {
126-
val p = (sink.path / sink.format / PathComponent[GraphLike[M]].path(self) / operation / tpe.entityPath).toString
126+
val p = (sink.path / "graphs" / sink.format / PathComponent[GraphLike[M]].path(self) / operation / tpe.entityPath).toString
127127
log.info(f"$tpe: Writing $operation")
128128
entity.write(DataFrameSink(p, sink.format, opts, SaveMode.Ignore, partitionBy = partitionKeys))
129129
log.info(f"$tpe: Writing $operation completed")
@@ -151,7 +151,7 @@ object graphs {
151151

152152
override def read(self: GraphSource[M]): Graph[M] = {
153153
val entities = for {(entity, schema) <- self.definition.entities} yield {
154-
val p = (self.path / self.format / PathComponent[GraphLike[M]].path(self.definition) / entity.entityPath).toString()
154+
val p = (self.path / "graphs" / self.format / PathComponent[GraphLike[M]].path(self.definition) / entity.entityPath).toString()
155155
log.info(s"Reading $entity")
156156
val opts = getFormatOptions(self.format, self.definition.mode)
157157
val df = DataFrameSource(p, self.format, opts, schema.map(StructType.fromDDL)).read

0 commit comments

Comments
 (0)