@@ -10,10 +10,12 @@ import org.apache.jena.riot.Lang
1010import org .apache .spark .sql .{Dataset , Encoder , SaveMode , SparkSession }
1111import org .apache .spark .{SparkConf , SparkContext }
1212import org .slf4j .LoggerFactory
13-
1413import scala .language .implicitConversions
14+
1515import org .apache .jena .vocabulary .RDF
1616
17+ import net .sansa_stack .rdf .spark .io .NTripleReader
18+
1719/**
1820 * A class that provides methods to load an RDF graph from disk.
1921 *
@@ -37,17 +39,7 @@ object RDFGraphLoader {
3739 * @return an RDF graph
3840 */
3941 def loadFromDisk (session : SparkSession , path : String , minPartitions : Int = 2 ): RDFGraph = {
40- logger.info(" loading triples from disk..." )
41- val startTime = System .currentTimeMillis()
42-
43- val triples = session.sparkContext
44- .textFile(path, minPartitions) // read the text file
45- .filter(line => ! line.trim().isEmpty & ! line.startsWith(" #" ))
46- .map(new NTriplesStringToJenaTriple ()) // convert to triple object
47- // .repartition(minPartitions)
48-
49- // logger.info("finished loading " + triples.count() + " triples in " + (System.currentTimeMillis()-startTime) + "ms.")
50- RDFGraph (triples)
42+ RDFGraph (NTripleReader .load(session, path))
5143 }
5244
5345 /**
@@ -85,18 +77,7 @@ object RDFGraphLoader {
8577 * @return an RDF graph
8678 */
8779 def loadFromDiskAsRDD (session : SparkSession , path : String , minPartitions : Int ): RDFGraphNative = {
88- logger.info(" loading triples from disk..." )
89- val startTime = System .currentTimeMillis()
90-
91- val converter = new NTriplesStringToJenaTriple ()
92-
93- val triples = session.sparkContext
94- .textFile(path, minPartitions) // read the text file
95- .map(line => converter.apply(line)) // convert to triple object
96-
97- // logger.info("finished loading " + triples.count() + " triples in " +
98- // (System.currentTimeMillis()-startTime) + "ms.")
99- new RDFGraphNative (triples)
80+ new RDFGraphNative (NTripleReader .load(session, path))
10081 }
10182
10283 private case class RDFTriple2 (s : String , p : String , o : String ) extends Product3 [String , String , String ] {
@@ -132,11 +113,8 @@ object RDFGraphLoader {
132113 val spark = session.sqlContext
133114
134115
135-
136- val triples = session.read
137- .textFile(path) // read the text file
138- .map(new NTriplesStringToJenaTriple ())
139- .as[Triple ](rdfTripleEncoder)
116+ val triples = session
117+ .createDataset(NTripleReader .load(session, path))(rdfTripleEncoder)
140118 .as(" triples" )
141119 // (rdfTripleEncoder)
142120 // val rowRDD = session.sparkContext
0 commit comments