Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import java.util.{Collection => JavaCollection}
import org.apache.commons.lang3.Validate
import org.apache.wayang.api.util.DataQuantaBuilderCache
import org.apache.wayang.basic.data.Record
import org.apache.wayang.basic.operators.{TableSource, TextFileSource, KafkaTopicSource, GoogleCloudStorageSource, AmazonS3Source, AzureBlobStorageSource}
import org.apache.wayang.basic.operators.{TableSource, TextFileSource, ParquetSource, KafkaTopicSource, GoogleCloudStorageSource, AmazonS3Source, AzureBlobStorageSource}
import org.apache.wayang.commons.util.profiledb.model.Experiment
import org.apache.wayang.core.api.WayangContext
import org.apache.wayang.core.plan.wayangplan._
Expand Down Expand Up @@ -121,6 +121,14 @@ class JavaPlanBuilder(wayangCtx: WayangContext, jobName: String) {
*/
def readTable(source: TableSource) = createSourceBuilder(source)(ClassTag(classOf[Record])).asRecords

/**
* Read a parquet file and provide it as a dataset of [[Record]]s.
*
* @param source from that the [[Record]]s should be read
* @return [[DataQuantaBuilder]] for the file
*/
def readParquet(source: ParquetSource): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, Record], Record] =
createSourceBuilder(source)(ClassTag(classOf[Record]))

/**
* Load [[DataQuanta]] from an arbitrary [[UnarySource]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ package org.apache.wayang.api
import org.apache.commons.lang3.Validate
import org.apache.wayang.api
import org.apache.wayang.basic.data.Record
import org.apache.wayang.basic.operators.{CollectionSource, ObjectFileSource, TableSource, TextFileSource, GoogleCloudStorageSource, AmazonS3Source, AzureBlobStorageSource}
import org.apache.wayang.basic.operators.{CollectionSource, ObjectFileSource, TableSource, TextFileSource, ParquetSource, GoogleCloudStorageSource, AmazonS3Source, AzureBlobStorageSource}
import org.apache.wayang.commons.util.profiledb.model.Experiment
import org.apache.wayang.core.api.WayangContext
import org.apache.wayang.core.plan.wayangplan._
Expand Down Expand Up @@ -152,7 +152,7 @@ class PlanBuilder(private[api] val wayangContext: WayangContext, private var job
* @return [[DataQuanta]] for the file
*/
def readAzureBlobStorageFile(storageContainer: String, blobName: String, filePathToCredentialsFile: String ): DataQuanta[String] = load(new AzureBlobStorageSource(storageContainer, blobName, filePathToCredentialsFile))


/**
* Read a text file and provide it as a dataset of [[String]]s, one per line.
Expand All @@ -179,6 +179,14 @@ class PlanBuilder(private[api] val wayangContext: WayangContext, private var job
*/
def readTable(source: TableSource): DataQuanta[Record] = load(source)

/**
* Read a parquet file and provide it as a dataset of [[Record]]s.
*
* @param source from that the [[Record]]s should be read
* @return [[DataQuanta]] of [[Record]]s in the file
*/
def readParquet(source: ParquetSource): DataQuanta[Record] = load(source)

/**
* Loads a [[java.util.Collection]] into Wayang and represents them as [[DataQuanta]].
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.wayang.apps.wordcount;

import org.apache.wayang.api.JavaPlanBuilder;
import org.apache.wayang.basic.data.Tuple2;
import org.apache.wayang.basic.operators.ParquetSource;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.java.Java;
import org.apache.wayang.spark.Spark;

import java.util.Arrays;
import java.util.Collection;

public class WordCountParquet {

public static void main(String[] args){

if (args.length == 0) {
System.err.print("Usage: <input file URL>");
System.exit(1);
}

WayangContext wayangContext = new WayangContext();
for (String platform : args[0].split(",")) {
switch (platform) {
case "java":
wayangContext.register(Java.basicPlugin());
break;
case "spark":
wayangContext.register(Spark.basicPlugin());
break;
default:
System.err.format("Unknown platform: \"%s\"\n", platform);
System.exit(3);
return;
}
}

/* Get a plan builder */
JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
.withJobName("WordCount")
.withUdfJarOf(WordCountParquet.class);

/* Start building the Apache WayangPlan */
Collection<Tuple2<String, Integer>> wordcounts = planBuilder
/* Read the text file */
// .readParquet(new ParquetSource(args[1], new String[] { projectionColumns }, Arrays.copyOfRange(args, 2, args.length))) // In case of projection
.readParquet(new ParquetSource(args[1], null, Arrays.copyOfRange(args, 2, args.length)))
.withName("Load file")

/* Split each line by non-word characters */
.flatMap(record -> Arrays.asList(record.getString(0).split("\\W+")))
.withSelectivity(1, 100, 0.9)
.withName("Split words")

/* Filter empty tokens */
.filter(token -> !token.isEmpty())
.withName("Filter empty words")

/* Attach counter to each word */
.map(word -> new Tuple2<>(word.toLowerCase(), 1)).withName("To lower case, add counter")

/* Sum up counters for every word */
.reduceByKey(
Tuple2::getField0,
(t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1())
)
.withName("Add counters")

/* Execute the plan and collect the results */
.collect();

System.out.printf("Found %d words:\n", wordcounts.size());
wordcounts.forEach(wc -> System.out.printf("%dx %s\n", wc.field1, wc.field0));
}
}

5 changes: 5 additions & 0 deletions wayang-commons/wayang-basic/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.12.3</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;

/**
Expand All @@ -36,6 +37,10 @@ public Record(Object... values) {
this.values = values;
}

public Record(List<Object> values) {
this.values = values.toArray();
}

@Override
public Record copy() {
return new Record(this.values.clone());
Expand Down Expand Up @@ -114,6 +119,28 @@ public String getString(int index) {
return field == null ? null : field.toString();
}

/**
* Set a field of this instance, at a given index.
*
* @param index the index of the field
* @param field the new value of the field to be set
*/
public void setField(int index, Object field) {
this.values[index] = field;
}

/**
* Append a field to this instance.
*
* @param field the field to add
*/
public void addField(Object field) {
int size = this.size();
Object[] newValues = Arrays.copyOf(this.values, size + 1);
newValues[size] = field;
this.values = newValues;
}

/**
* Retrieve the size of this instance.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.wayang.basic.operators;

import org.apache.commons.lang3.Validate;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.wayang.basic.data.Record;
import org.apache.wayang.basic.types.RecordType;
import org.apache.wayang.commons.util.profiledb.model.measurement.TimeMeasurement;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate;
import org.apache.wayang.core.plan.wayangplan.UnarySource;
import org.apache.wayang.core.types.DataSetType;
import org.apache.wayang.core.util.fs.FileSystems;

import java.io.IOException;
import java.util.Optional;
import java.util.OptionalLong;

/**
* This source reads a parquet file and outputs the lines as data units.
*/
public class ParquetSource extends UnarySource<Record> {

private final Logger logger = LogManager.getLogger(this.getClass());

private final String inputUrl;

private final String[] projection;

/**
* Creates a new instance.
*
* @param inputUrl name of the file to be read
* @param projection names of the columns to filter; can be omitted but allows for an early projection
* @param columnNames names of the columns in the tables; can be omitted but allows to inject schema information
* into Wayang, so as to allow specific optimizations
*/
public ParquetSource(String inputUrl, String[] projection, String... columnNames) {
this(inputUrl, projection, createOutputDataSetType(columnNames));
}

public ParquetSource(String inputUrl, String[] projection, DataSetType<Record> type) {
super(type);
this.inputUrl = inputUrl;
this.projection = projection;
}

public String getInputUrl() { return this.inputUrl; }

public String[] getProjection() { return this.projection; }

private static DataSetType<Record> createOutputDataSetType(String[] columnNames) {
return columnNames.length == 0 ?
DataSetType.createDefault(Record.class) :
DataSetType.createDefault(new RecordType(columnNames));
}

/**
* Copies an instance (exclusive of broadcasts).
*
* @param that that should be copied
*/
public ParquetSource(ParquetSource that) {
super(that);
this.inputUrl = that.getInputUrl();
this.projection = that.getProjection();
}

@Override
public Optional<org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator> createCardinalityEstimator(
final int outputIndex,
final Configuration configuration) {
Validate.inclusiveBetween(0, this.getNumOutputs() - 1, outputIndex);
return Optional.of(new ParquetSource.CardinalityEstimator());
}

/**
* Custom {@link org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator} for {@link FlatMapOperator}s.
*/
protected class CardinalityEstimator implements org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator {

public final CardinalityEstimate FALLBACK_ESTIMATE = new CardinalityEstimate(1000L, 100000000L, 0.7);

@Override
public CardinalityEstimate estimate(OptimizationContext optimizationContext, CardinalityEstimate... inputEstimates) {
Validate.isTrue(ParquetSource.this.getNumInputs() == inputEstimates.length);

// see Job for StopWatch measurements
final TimeMeasurement timeMeasurement = optimizationContext.getJob().getStopWatch().start(
"Optimization", "Cardinality&Load Estimation", "Push Estimation", "Estimate source cardinalities"
);

// Query the job cache first to see if there is already an estimate.
String jobCacheKey = String.format("%s.estimate(%s)", this.getClass().getCanonicalName(), ParquetSource.this.inputUrl);
CardinalityEstimate cardinalityEstimate = optimizationContext.queryJobCache(jobCacheKey, CardinalityEstimate.class);
if (cardinalityEstimate != null) return cardinalityEstimate;

// Otherwise calculate the cardinality.
// First, inspect the size of the file and its line sizes.
OptionalLong fileSize = FileSystems.getFileSize(ParquetSource.this.inputUrl);
if (fileSize.isEmpty()) {
ParquetSource.this.logger.warn("Could not determine size of {}... deliver fallback estimate.",
ParquetSource.this.inputUrl);
timeMeasurement.stop();
return this.FALLBACK_ESTIMATE;

} else if (fileSize.getAsLong() == 0L) {
timeMeasurement.stop();
return new CardinalityEstimate(0L, 0L, 1d);
}

OptionalLong numberRows = this.extractNumberRows();
if (numberRows.isEmpty()) {
ParquetSource.this.logger.warn("Could not determine the cardinality of {}... deliver fallback estimate.",
ParquetSource.this.inputUrl);
timeMeasurement.stop();
return this.FALLBACK_ESTIMATE;
}

// Create an exact cardinality estimate for the complete file.
long rowCount = numberRows.getAsLong();
cardinalityEstimate = new CardinalityEstimate(rowCount, rowCount, 1d);

// Cache the result, so that it will not be recalculated again.
optimizationContext.putIntoJobCache(jobCacheKey, cardinalityEstimate);

timeMeasurement.stop();
return cardinalityEstimate;
}

/**
* Extract the number of rows in the file
*
* @return the number of rows in the file
*/
private OptionalLong extractNumberRows() {
Path path = new Path(ParquetSource.this.inputUrl);

try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, new org.apache.hadoop.conf.Configuration()))) {

long rowCount = reader.getRecordCount();

if (rowCount == 0) {
ParquetSource.this.logger.warn("Could not find any row in {}.", ParquetSource.this.inputUrl);
return OptionalLong.empty();
}
return OptionalLong.of(rowCount);
} catch (IOException e) {
ParquetSource.this.logger.error("Could not extract the number of rows in the input file.", e);
}

return OptionalLong.empty();
}
}

}
Loading
Loading