@@ -90,7 +90,7 @@ object graphs {
90
90
TreeMap (self.entities.toSeq: _* ).foreach {
91
91
case (tpe, dataset) =>
92
92
SparkUI .job(getClass.getSimpleName, s " write $tpe" ) {
93
- val p = (sink.path / sink.format / PathComponent [GraphLike [M ]].path(self) / tpe.entityPath).toString
93
+ val p = (sink.path / " graphs " / sink.format / PathComponent [GraphLike [M ]].path(self) / tpe.entityPath).toString
94
94
log.info(s " $tpe: Writing started " )
95
95
val opts = getFormatOptions(sink.format, self.mode)
96
96
the(dataset).write(DataFrameSink (p, sink.format, opts, SaveMode .Ignore ))
@@ -113,7 +113,7 @@ object graphs {
113
113
TreeMap (self.entities.mapValues(ev).toSeq: _* ).foreach {
114
114
case (tpe, BatchedEntity (snapshot, insertBatches, deleteBatches)) =>
115
115
SparkUI .job(getClass.getSimpleName, s " write $tpe snapshot " ) {
116
- val p = (sink.path / sink.format / PathComponent [GraphLike [M ]].path(self) / " initial_snapshot" / tpe.entityPath).toString
116
+ val p = (sink.path / " graphs " / sink.format / PathComponent [GraphLike [M ]].path(self) / " initial_snapshot" / tpe.entityPath).toString
117
117
log.info(s " $tpe: Writing snapshot " )
118
118
snapshot.write(DataFrameSink (p, sink.format, opts, SaveMode .Ignore ))
119
119
log.info(s " $tpe: Writing snapshot completed " )
@@ -123,7 +123,7 @@ object graphs {
123
123
batches.foreach {
124
124
case Batched (entity, partitionKeys) =>
125
125
SparkUI .job(getClass.getSimpleName, s " write $tpe $operation" ) {
126
- val p = (sink.path / sink.format / PathComponent [GraphLike [M ]].path(self) / operation / tpe.entityPath).toString
126
+ val p = (sink.path / " graphs " / sink.format / PathComponent [GraphLike [M ]].path(self) / operation / tpe.entityPath).toString
127
127
log.info(f " $tpe: Writing $operation" )
128
128
entity.write(DataFrameSink (p, sink.format, opts, SaveMode .Ignore , partitionBy = partitionKeys))
129
129
log.info(f " $tpe: Writing $operation completed " )
@@ -151,7 +151,7 @@ object graphs {
151
151
152
152
override def read (self : GraphSource [M ]): Graph [M ] = {
153
153
val entities = for {(entity, schema) <- self.definition.entities} yield {
154
- val p = (self.path / self.format / PathComponent [GraphLike [M ]].path(self.definition) / entity.entityPath).toString()
154
+ val p = (self.path / " graphs " / self.format / PathComponent [GraphLike [M ]].path(self.definition) / entity.entityPath).toString()
155
155
log.info(s " Reading $entity" )
156
156
val opts = getFormatOptions(self.format, self.definition.mode)
157
157
val df = DataFrameSource (p, self.format, opts, schema.map(StructType .fromDDL)).read
0 commit comments