Skip to content

Commit 908cde7

Browse files
committed
refactor io classes
1 parent 2b440bb commit 908cde7

24 files changed

+626
-339
lines changed
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package ldbc.snb.datagen.factors
2+
3+
import ldbc.snb.datagen.model.Mode.Raw
4+
import ldbc.snb.datagen.model.{Graph, graphs}
5+
import ldbc.snb.datagen.syntax._
6+
import org.apache.spark.sql.functions.{broadcast, count}
7+
import org.apache.spark.sql.{Column, DataFrame}
8+
9+
object Factors {
10+
11+
def frequency(df: DataFrame, value: Column, by: Seq[Column], sortBy: Seq[Column]) =
12+
df
13+
.groupBy(by: _*).agg(count(value).as("count"))
14+
.select(by :+ $"count": _*)
15+
.orderBy($"count".desc +: by: _*)
16+
17+
18+
def countryNumPersons(graph: Graph[Raw.type]): FactorTable[Raw.type] = {
19+
val places = graph.entities(graphs.Raw.entities.Place).cache()
20+
val cities = places.where($"type" === "City")
21+
val countries = places.where($"type" === "Country")
22+
23+
val persons = graph.entities(graphs.Raw.entities.Person)
24+
val df = frequency(
25+
persons.as("Person")
26+
.join(broadcast(cities.as("City")), $"City.id" === $"Person.LocationCityId")
27+
.join(broadcast(countries.as("Country")), $"Country.id" === $"City.PartOfPlaceId"),
28+
value = $"Person.id",
29+
by = Seq($"Country.id", $"Country.name"),
30+
sortBy = Seq($"Country.id")
31+
)
32+
FactorTable[Raw.type](name="countryNumPersons", data=df, source=graph)
33+
}
34+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package ldbc.snb.datagen.factors
2+
3+
import ldbc.snb.datagen.SparkApp
4+
import ldbc.snb.datagen.factors.Factors.countryNumPersons
5+
import ldbc.snb.datagen.factors.io.FactorTableSink
6+
import ldbc.snb.datagen.io.graphs.GraphSource
7+
import ldbc.snb.datagen.model
8+
import ldbc.snb.datagen.syntax._
9+
import ldbc.snb.datagen.util.Logging
10+
import org.apache.spark.sql.SparkSession
11+
12+
object FactorGenerationStage extends SparkApp with Logging {
13+
override def appName: String = "LDBC SNB Datagen for Spark: Factor Generation Stage"
14+
15+
case class Args(outputDir: String = "out")
16+
17+
def run(args: Args)(implicit spark: SparkSession): Unit = {
18+
import ldbc.snb.datagen.io.instances._
19+
import ldbc.snb.datagen.io.Reader.ops._
20+
import ldbc.snb.datagen.io.Writer.ops._
21+
import ldbc.snb.datagen.factors.io.instances._
22+
23+
GraphSource(model.graphs.Raw.graphDef, args.outputDir, "csv")
24+
.read
25+
.pipe(countryNumPersons)
26+
.write(FactorTableSink(args.outputDir))
27+
}
28+
29+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package ldbc.snb.datagen.factors
2+
3+
import ldbc.snb.datagen.model.{Graph, GraphDef, Mode}
4+
import org.apache.spark.sql.DataFrame
5+
6+
7+
case class FactorTableDef[M <: Mode](
8+
name: String,
9+
sourceDef: GraphDef[M]
10+
)
11+
12+
case class FactorTable[M <: Mode](
13+
name: String,
14+
data: DataFrame,
15+
source: Graph[M]
16+
)
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package ldbc.snb.datagen.factors
2+
3+
import ldbc.snb.datagen.io.{PathComponent, Writer}
4+
import ldbc.snb.datagen.util.Logging
5+
import ldbc.snb.datagen.model.{GraphLike, Mode}
6+
import better.files._
7+
import ldbc.snb.datagen.io.dataframes.DataFrameSink
8+
9+
package object io {
10+
case class FactorTableSink(path: String, format: String = "csv")
11+
12+
import ldbc.snb.datagen.io.dataframes.instances._
13+
import ldbc.snb.datagen.io.Writer.ops._
14+
15+
private final class FactorTableWriter[M <: Mode] extends Writer[FactorTableSink] with Logging {
16+
override type CoRet = FactorTable[M]
17+
18+
override def write(self: FactorTable[M], sink: FactorTableSink): Unit = {
19+
val p = (sink.path / "factors" / sink.format / PathComponent[GraphLike[M]].path(self.source) / self.name).toString()
20+
self.data.coalesce(1).write(DataFrameSink(p, sink.format))
21+
}
22+
}
23+
trait WriterInstances {
24+
implicit def factorTableWriter[M <: Mode]: Writer.Aux[FactorTableSink, FactorTable[M]] = new FactorTableWriter[M]
25+
}
26+
27+
object instances extends WriterInstances
28+
}

src/main/scala/ldbc/snb/datagen/transformation/io/PathComponent.scala renamed to src/main/scala/ldbc/snb/datagen/io/PathComponent.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
package ldbc.snb.datagen.transformation.io
1+
package ldbc.snb.datagen.io
22

3-
import ldbc.snb.datagen.transformation.model.{GraphLike, Mode}
3+
import ldbc.snb.datagen.model.{GraphLike, Mode}
44

55
trait PathComponent[A] {
66
def path(a: A): String
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package ldbc.snb.datagen.io
2+
3+
trait Reader[T] {
4+
type Ret
5+
6+
def read(self: T): Ret
7+
def exists(self: T): Boolean
8+
}
9+
10+
object Reader {
11+
type Aux[T, R] = Reader[T] { type Ret = R }
12+
13+
def apply[T, R](implicit r: Reader.Aux[T, R]): Reader.Aux[T, R] = implicitly[Reader.Aux[T, R]]
14+
15+
trait ReaderOps[T] {
16+
type Ret
17+
def tcInstance: Reader.Aux[T, Ret]
18+
def self: T
19+
def read: Ret = tcInstance.read(self)
20+
}
21+
22+
object ReaderOps {
23+
type Aux[T, R] = ReaderOps[T] { type Ret = R }
24+
}
25+
26+
object ops {
27+
import scala.language.implicitConversions
28+
implicit def toReaderOps[T, R](target: T)(implicit tc: Reader.Aux[T, R]): ReaderOps.Aux[T, R] = new ReaderOps[T] {
29+
override type Ret = R
30+
override def tcInstance: Aux[T, R] = tc
31+
override def self: T = target
32+
}
33+
}
34+
}
35+
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package ldbc.snb.datagen.io
2+
3+
4+
trait Writer[S] {
5+
type CoRet
6+
def write(self: CoRet, sink: S): Unit
7+
}
8+
9+
object Writer {
10+
type Aux[S, C] = Writer[S] { type CoRet = C }
11+
def apply[S, C](implicit r: Writer.Aux[S, C]): Writer.Aux[S, C] = implicitly[Writer.Aux[S, C]]
12+
13+
trait WriterOps[CoRet] {
14+
type Sink
15+
def tcInstance: Writer.Aux[Sink, CoRet]
16+
def self: CoRet
17+
def write(sink: Sink): Unit = tcInstance.write(self, sink)
18+
}
19+
20+
object WriterOps {
21+
type Aux[CoRet, S] = WriterOps[CoRet] { type Sink = S }
22+
}
23+
24+
object ops {
25+
import scala.language.implicitConversions
26+
implicit def toWriterOps[CoRet, S](target: CoRet)(implicit tc: Writer.Aux[S, CoRet]): WriterOps.Aux[CoRet, S] = new WriterOps[CoRet] {
27+
override type Sink = S
28+
override def tcInstance: Aux[S, CoRet] = tc
29+
override def self: CoRet = target
30+
}
31+
}
32+
}
33+
34+
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package ldbc.snb.datagen.io
2+
3+
import ldbc.snb.datagen.syntax.fluentSyntaxOps
4+
import org.apache.spark.sql.types.StructType
5+
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
6+
7+
8+
object dataframes {
9+
10+
case class DataFrameSource(
11+
path: String,
12+
format: String,
13+
formatOptions: Map[String, String] = Map.empty,
14+
schema: Option[StructType] = None
15+
)
16+
17+
private class DataFrameReader(implicit spark: SparkSession) extends Reader[DataFrameSource] {
18+
override type Ret = DataFrame
19+
20+
override def read(self: DataFrameSource): DataFrame = {
21+
spark.read
22+
.format(self.format)
23+
.options(self.formatOptions)
24+
.pipeFoldLeft(self.schema)(_ schema _)
25+
.load(self.path)
26+
}
27+
28+
override def exists(self: DataFrameSource): Boolean = utils.fileExists(self.path)
29+
}
30+
31+
trait ReaderInstances {
32+
implicit def dataFrameReader(implicit spark: SparkSession): Reader.Aux[DataFrameSource, DataFrame] = new DataFrameReader
33+
}
34+
35+
case class DataFrameSink(path: String,
36+
format: String,
37+
formatOptions: Map[String, String] = Map.empty,
38+
mode: SaveMode = SaveMode.ErrorIfExists,
39+
partitionBy: Seq[String] = Seq.empty
40+
)
41+
42+
case class DataFrameWriterOptions(
43+
format: String,
44+
partitionBy: Seq[String] = Seq.empty,
45+
formatOptions: Map[String, String] = Map.empty,
46+
mode: SaveMode = SaveMode.ErrorIfExists
47+
)
48+
49+
private object DataFrameWriter extends Writer[DataFrameSink] {
50+
override type CoRet = DataFrame
51+
override def write(self: DataFrame, sink: DataFrameSink): Unit = {
52+
self.write
53+
.partitionBy(sink.partitionBy: _*)
54+
.format(sink.format)
55+
.options(sink.formatOptions)
56+
.mode(sink.mode)
57+
.save(sink.path)
58+
}
59+
}
60+
61+
trait WriterInstances {
62+
implicit val dataFrameWriter: Writer.Aux[DataFrameSink, DataFrame] = DataFrameWriter
63+
}
64+
65+
trait Instances extends WriterInstances with ReaderInstances
66+
67+
object instances extends Instances
68+
}

0 commit comments

Comments
 (0)