Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ import java.util.{Collection => JavaCollection}
import org.apache.commons.lang3.Validate
import org.apache.wayang.api.util.DataQuantaBuilderCache
import org.apache.wayang.basic.data.Record
import org.apache.wayang.basic.operators.{TableSource, TextFileSource, ParquetSource, KafkaTopicSource, GoogleCloudStorageSource, AmazonS3Source, AzureBlobStorageSource}
import org.apache.wayang.basic.operators.{AmazonS3Source, AzureBlobStorageSource, GoogleCloudStorageSource, KafkaTopicSource, ParquetSource, TableSource, TextFileSource}
import org.apache.wayang.commons.util.profiledb.model.Experiment
import org.apache.wayang.core.api.WayangContext
import org.apache.wayang.core.plan.wayangplan._
import org.apache.wayang.core.types.DataSetType

import java.util
import scala.reflect.ClassTag

/**
Expand Down Expand Up @@ -60,7 +61,17 @@ class JavaPlanBuilder(wayangCtx: WayangContext, jobName: String) {
* @return [[DataQuantaBuilder]] for the file
*/
def readTextFile(url: String): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, String], String] =
createSourceBuilder(new TextFileSource(url))(ClassTag(classOf[String]))
createSourceBuilder(new TextFileSource(url))(ClassTag(classOf[String]))

/**
* Read a parquet file and provide it as a dataset of [[Record]]s.
*
* @param url the URL of the Parquet file
* @param projection the projection, if any
* @return [[DataQuantaBuilder]] for the file
*/
def readParquet(url: String, projection: Array[String]): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, Record], Record] =
createSourceBuilder(ParquetSource.create(url, projection))(ClassTag(classOf[Record]))

/**
* Read a text file from a Google Cloud Storage bucket and provide it as a dataset of [[String]]s, one per line.
Expand All @@ -71,7 +82,7 @@ class JavaPlanBuilder(wayangCtx: WayangContext, jobName: String) {
* @return [[DataQuantaBuilder]] for the file
*/
def readGoogleCloudStorageFile(bucket: String, blobName: String, filePathToCredentialsFile: String): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, String], String] =
createSourceBuilder(new GoogleCloudStorageSource(bucket, blobName, filePathToCredentialsFile))(ClassTag(classOf[String]))
createSourceBuilder(new GoogleCloudStorageSource(bucket, blobName, filePathToCredentialsFile))(ClassTag(classOf[String]))

/**
* Read a text file from a Amazon S3 bucket and provide it as a dataset of [[String]]s, one per line.
Expand All @@ -82,7 +93,7 @@ class JavaPlanBuilder(wayangCtx: WayangContext, jobName: String) {
* @return [[DataQuantaBuilder]] for the file
*/
def readAmazonS3File(bucket: String, blobName: String, filePathToCredentialsFile: String): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, String], String] =
createSourceBuilder(new AmazonS3Source(bucket, blobName, filePathToCredentialsFile))(ClassTag(classOf[String]))
createSourceBuilder(new AmazonS3Source(bucket, blobName, filePathToCredentialsFile))(ClassTag(classOf[String]))

/**
* Read a text file from a Azure Blob Storage container and provide it as a dataset of [[String]]s, one per line.
Expand All @@ -93,23 +104,23 @@ class JavaPlanBuilder(wayangCtx: WayangContext, jobName: String) {
* @return [[DataQuantaBuilder]] for the file
*/
def readAzureBlobStorageFile(storageContainer: String, blobName: String, filePathToCredentialsFile: String): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, String], String] =
createSourceBuilder(new AzureBlobStorageSource(storageContainer, blobName, filePathToCredentialsFile))(ClassTag(classOf[String]))
createSourceBuilder(new AzureBlobStorageSource(storageContainer, blobName, filePathToCredentialsFile))(ClassTag(classOf[String]))

/**
* Read a textmessages from a Kafka topic and provide it as a dataset of [[String]]s, one per message.
*
* @param topicName the topic's name
* @return [[DataQuantaBuilder]] for the content in the topic
*/
* Read a textmessages from a Kafka topic and provide it as a dataset of [[String]]s, one per message.
*
* @param topicName the topic's name
* @return [[DataQuantaBuilder]] for the content in the topic
*/
def readKafkaTopic(topicName: String): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, String], String] =
createSourceBuilder(new KafkaTopicSource(topicName))(ClassTag(classOf[String]))

/**
* Read a remote text file and provide it as a dataset of [[String]]s, one per line.
*
* @param url the URL of the text file
* @return [[DataQuantaBuilder]] for the file
*/
/**
* Read a remote text file and provide it as a dataset of [[String]]s, one per line.
*
* @param url the URL of the text file
* @return [[DataQuantaBuilder]] for the file
*/
def readRemoteTextFile(url: String): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, String], String] =
createSourceBuilder(new TextFileSource(url))(ClassTag(classOf[String]))

Expand All @@ -121,15 +132,6 @@ class JavaPlanBuilder(wayangCtx: WayangContext, jobName: String) {
*/
def readTable(source: TableSource) = createSourceBuilder(source)(ClassTag(classOf[Record])).asRecords

/**
* Read a parquet file and provide it as a dataset of [[Record]]s.
*
* @param source from that the [[Record]]s should be read
* @return [[DataQuantaBuilder]] for the file
*/
def readParquet(source: ParquetSource): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, Record], Record] =
createSourceBuilder(source)(ClassTag(classOf[Record]))

/**
* Load [[DataQuanta]] from an arbitrary [[UnarySource]].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ package org.apache.wayang.api
import org.apache.commons.lang3.Validate
import org.apache.wayang.api
import org.apache.wayang.basic.data.Record
import org.apache.wayang.basic.operators.{CollectionSource, ObjectFileSource, TableSource, TextFileSource, ParquetSource, GoogleCloudStorageSource, AmazonS3Source, AzureBlobStorageSource}
import org.apache.wayang.basic.operators.{AmazonS3Source, AzureBlobStorageSource, CollectionSource, GoogleCloudStorageSource, ObjectFileSource, ParquetSource, TableSource, TextFileSource}
import org.apache.wayang.commons.util.profiledb.model.Experiment
import org.apache.wayang.core.api.WayangContext
import org.apache.wayang.core.plan.wayangplan._
Expand Down Expand Up @@ -122,6 +122,15 @@ class PlanBuilder(private[api] val wayangContext: WayangContext, private var job
*/
def readTextFile(url: String): DataQuanta[String] = load(new TextFileSource(url))

/**
* Read a parquet file and provide it as a dataset of [[Record]]s.
*
* @param url the URL of the Parquet file
* @param projection the projection, if any
* @return [[DataQuanta]] of [[Record]]s representing the file
*/
def readParquet(url: String, projection: Array[String]): RecordDataQuanta = load(ParquetSource.create(url, projection))

/**
* Read a text file from a Google Cloud Storage bucket and provide it as a dataset of [[String]]s, one per line.
*
Expand All @@ -142,7 +151,6 @@ class PlanBuilder(private[api] val wayangContext: WayangContext, private var job
*/
def readAmazonS3File(bucket: String, blobName: String, filePathToCredentialsFile: String ): DataQuanta[String] = load(new AmazonS3Source(bucket, blobName, filePathToCredentialsFile))


/**
* Read a text file from a Azure Blob Storage storage container and provide it as a dataset of [[String]]s, one per line.
*
Expand All @@ -153,7 +161,6 @@ class PlanBuilder(private[api] val wayangContext: WayangContext, private var job
*/
def readAzureBlobStorageFile(storageContainer: String, blobName: String, filePathToCredentialsFile: String ): DataQuanta[String] = load(new AzureBlobStorageSource(storageContainer, blobName, filePathToCredentialsFile))


/**
* Read a text file and provide it as a dataset of [[String]]s, one per line.
*
Expand All @@ -162,7 +169,6 @@ class PlanBuilder(private[api] val wayangContext: WayangContext, private var job
*/
def readRemoteTextFile(url: String): DataQuanta[String] = load(new TextFileSource(url))


/**
* Read a object's file and provide it as a dataset of [[Object]]s.
*
Expand All @@ -179,14 +185,6 @@ class PlanBuilder(private[api] val wayangContext: WayangContext, private var job
*/
def readTable(source: TableSource): DataQuanta[Record] = load(source)

/**
* Read a parquet file and provide it as a dataset of [[Record]]s.
*
* @param source from that the [[Record]]s should be read
* @return [[DataQuanta]] of [[Record]]s in the file
*/
def readParquet(source: ParquetSource): DataQuanta[Record] = load(source)

/**
* Loads a [[java.util.Collection]] into Wayang and represents them as [[DataQuanta]].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ public static void main(String[] args){
/* Start building the Apache WayangPlan */
Collection<Tuple2<String, Integer>> wordcounts = planBuilder
/* Read the text file */
// .readParquet(new ParquetSource(args[1], new String[] { projectionColumns }, Arrays.copyOfRange(args, 2, args.length))) // In case of projection
.readParquet(new ParquetSource(args[1], null, Arrays.copyOfRange(args, 2, args.length)))
.readParquet(args[1], Arrays.copyOfRange(args, 2, args.length))
.withName("Load file")

/* Split each line by non-word characters */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,27 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.apache.wayang.basic.data.Record;
import org.apache.wayang.basic.types.RecordType;
import org.apache.wayang.commons.util.profiledb.model.measurement.TimeMeasurement;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.exception.WayangException;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate;
import org.apache.wayang.core.plan.wayangplan.UnarySource;
import org.apache.wayang.core.types.DataSetType;
import org.apache.wayang.core.util.fs.FileSystems;

import java.io.IOException;
import java.util.Optional;
import java.util.OptionalLong;

/**
* This source reads a parquet file and outputs the lines as data units.
* This source reads a parquet file and outputs the lines as {@link Record} units.
*/
public class ParquetSource extends UnarySource<Record> {

Expand All @@ -49,16 +53,34 @@ public class ParquetSource extends UnarySource<Record> {

private final String[] projection;

private ParquetMetadata metadata;

private MessageType schema;

/**
* Creates a new instance.
*
* @param inputUrl name of the file to be read
* @param projection names of the columns to filter; can be omitted but allows for an early projection
* @param columnNames names of the columns in the tables; can be omitted but allows to inject schema information
* into Wayang, so as to allow specific optimizations
*/
public ParquetSource(String inputUrl, String[] projection, String... columnNames) {
this(inputUrl, projection, createOutputDataSetType(columnNames));
public static ParquetSource create(String inputUrl, String[] projection) {
ParquetMetadata metadata = readMetadata(inputUrl);
MessageType schema = metadata.getFileMetaData().getSchema();

String[] columnNames = schema.getFields().stream()
.map(Type::getName)
.toArray(String[]::new);

ParquetSource instance = new ParquetSource(inputUrl, projection, createOutputDataSetType(columnNames));

instance.metadata = metadata;
instance.schema = schema;

return instance;
}

public ParquetSource(String inputUrl, String[] projection, String... fieldNames) {
this(inputUrl, projection, createOutputDataSetType(fieldNames));
}

public ParquetSource(String inputUrl, String[] projection, DataSetType<Record> type) {
Expand All @@ -67,10 +89,24 @@ public ParquetSource(String inputUrl, String[] projection, DataSetType<Record> t
this.projection = projection;
}

private static ParquetMetadata readMetadata(String inputUrl) {
Path path = new Path(inputUrl);

try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, new org.apache.hadoop.conf.Configuration()))) {
return reader.getFooter();
} catch (Exception e) {
throw new WayangException("Could not read metadata.", e);
}
}

public String getInputUrl() { return this.inputUrl; }

public String[] getProjection() { return this.projection; }

public ParquetMetadata getMetadata() { return this.metadata; }

public MessageType getSchema() { return this.schema; }

private static DataSetType<Record> createOutputDataSetType(String[] columnNames) {
return columnNames.length == 0 ?
DataSetType.createDefault(Record.class) :
Expand All @@ -86,6 +122,8 @@ public ParquetSource(ParquetSource that) {
super(that);
this.inputUrl = that.getInputUrl();
this.projection = that.getProjection();
this.metadata = that.getMetadata();
this.schema = that.getSchema();
}

@Override
Expand Down Expand Up @@ -156,22 +194,15 @@ public CardinalityEstimate estimate(OptimizationContext optimizationContext, Car
* @return the number of rows in the file
*/
private OptionalLong extractNumberRows() {
Path path = new Path(ParquetSource.this.inputUrl);

try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, new org.apache.hadoop.conf.Configuration()))) {
long rowCount = ParquetSource.this.metadata.getBlocks().stream()
.mapToLong(BlockMetaData::getRowCount)
.sum();

long rowCount = reader.getRecordCount();

if (rowCount == 0) {
ParquetSource.this.logger.warn("Could not find any row in {}.", ParquetSource.this.inputUrl);
return OptionalLong.empty();
}
return OptionalLong.of(rowCount);
} catch (IOException e) {
ParquetSource.this.logger.error("Could not extract the number of rows in the input file.", e);
if (rowCount == 0) {
ParquetSource.this.logger.warn("Could not find any row in {}.", ParquetSource.this.inputUrl);
return OptionalLong.empty();
}

return OptionalLong.empty();
return OptionalLong.of(rowCount);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public Collection<PlanTransformation> getTransformations() {

private SubplanPattern createSubplanPattern() {
final OperatorPattern operatorPattern = new OperatorPattern(
"source", new org.apache.wayang.basic.operators.ParquetSource((String) null, (String[]) null), false
"source", new ParquetSource((String) null, (String[]) null), false
);
return SubplanPattern.createSingleton(operatorPattern);
}
Expand Down
Loading
Loading