Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,35 @@

import org.apache.calcite.rel.RelNode;
import org.apache.wayang.api.sql.calcite.rel.*;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.plan.wayangplan.Operator;

public class WayangRelConverter {
private final Configuration configuration;

public Operator convert(RelNode node) {
if(node instanceof WayangTableScan) {
return new WayangTableScanVisitor(this).visit((WayangTableScan)node);
public WayangRelConverter(final Configuration configuration) {
this.configuration = configuration;
}

public WayangRelConverter() {
this.configuration = null;
}

/**
* Some visitors may rely on configuration like the
* {@link WayangTableScanVisitor}, that uses it
* to specify its calcite schema when fetching from files on disk
*
* @return {@link Configuration}, or null if {@link WayangRelConverter} is not
* constructed with one.
*/
public Configuration getConfiguration() {
return configuration;
}

public Operator convert(final RelNode node) {
if (node instanceof WayangTableScan) {
return new WayangTableScanVisitor(this).visit((WayangTableScan) node);
} else if (node instanceof WayangProject) {
return new WayangProjectVisitor(this).visit((WayangProject) node);
} else if (node instanceof WayangFilter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,52 +28,56 @@
import org.apache.wayang.postgres.operators.PostgresTableSource;
import org.apache.wayang.basic.data.Record;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

import java.util.stream.Collectors;

//TODO: create tablesource with column types
//TODO: support other sources
public class WayangTableScanVisitor extends WayangRelNodeVisitor<WayangTableScan> {
WayangTableScanVisitor(WayangRelConverter wayangRelConverter) {
WayangTableScanVisitor(final WayangRelConverter wayangRelConverter) {
super(wayangRelConverter);
}

@Override
Operator visit(WayangTableScan wayangRelNode) {
Operator visit(final WayangTableScan wayangRelNode) {

String tableName = wayangRelNode.getTableName();
List<String> columnNames = wayangRelNode.getColumnNames();
final String tableName = wayangRelNode.getTableName();
final List<String> columnNames = wayangRelNode.getColumnNames();

// Get the source platform for this table
String tableSource = wayangRelNode.getTable().getQualifiedName().get(0);
final String tableSource = wayangRelNode.getTable().getQualifiedName().get(0);

if (tableSource.equals("postgres")) {
return new PostgresTableSource(tableName, columnNames.toArray(new String[]{}));
}

if (tableSource.equals("fs")) {
ModelParser modelParser;
try {
modelParser = new ModelParser();
} catch (Exception e) {
modelParser = this.wayangRelConverter.getConfiguration() == null
? new ModelParser()
: new ModelParser(this.wayangRelConverter.getConfiguration());
} catch (final Exception e) {
throw new RuntimeException(e);
}
RelDataType rowType = wayangRelNode.getRowType();
List<RelDataType> fieldTypes = new ArrayList<>();
for (RelDataTypeField field : rowType.getFieldList()) {
fieldTypes.add(field.getType());
}
String url = String.format("file:/%s/%s.csv", modelParser.getFsPath(), wayangRelNode.getTableName());

String separator = modelParser.getSeparator();
final List<RelDataType> fieldTypes = wayangRelNode.getRowType().getFieldList().stream()
.map(RelDataTypeField::getType)
.collect(Collectors.toList());

final String url = String.format("file:/%s/%s.csv", modelParser.getFsPath(), wayangRelNode.getTableName());

final String separator = modelParser.getSeparator();

if (Objects.equals(separator, "")) {
return new JavaCSVTableSource(url,
return new JavaCSVTableSource<>(url,
DataSetType.createDefault(Record.class), fieldTypes);
} else {
return new JavaCSVTableSource(url,
return new JavaCSVTableSource<>(url,
DataSetType.createDefault(Record.class), fieldTypes, separator.charAt(0));
}
} else throw new RuntimeException("Source not supported");
} else
throw new RuntimeException("Source not supported");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.wayang.api.sql.calcite.schema.WayangSchema;
import org.apache.wayang.basic.data.Record;
import org.apache.wayang.basic.operators.LocalCallbackSink;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.plan.wayangplan.Operator;
import org.apache.wayang.core.plan.wayangplan.WayangPlan;

Expand Down Expand Up @@ -224,6 +225,16 @@ public WayangPlan convert(RelNode relNode, Collection<Record> collector) {
return new WayangPlan(sink);
}

public WayangPlan convertWithConfig(RelNode relNode, Configuration configuration, Collection<Record> collector) {

LocalCallbackSink<Record> sink = LocalCallbackSink.createCollectingSink(collector, Record.class);

Operator op = new WayangRelConverter(configuration).convert(relNode);

op.connectTo(0, sink, 0);
return new WayangPlan(sink);
}


public static class ConfigProperties {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,32 @@ public ModelParser() throws IOException, ParseException {
}

public ModelParser(Configuration configuration) throws IOException, ParseException {
String calciteModel = "{\"calcite\"" + configuration.getStringProperty("wayang.calcite.model") + ",\"separator\":\";\"}";

this.configuration = configuration;
Object obj = new JSONParser().parse(new FileReader("wayang-api/wayang-api-sql/src/main/resources/model.json"));
Object obj = new JSONParser().parse(calciteModel);
System.out.println("obj: " + obj);
this.json = (JSONObject) obj;
}

/**
* This method allows you to specify the Calcite path, useful for testing.
* See also {@link #ModelParser(Configuration)} and {@link #ModelParser()}.
*
* @param configuration An empty configuration. Usage:
* {@code Configuration configuration = new ModelParser(new Configuration(), calciteModelPath).setProperties();}
* @param calciteModelPath Path to the JSON object containing the Calcite
* model/schema.
* @throws IOException If an I/O error occurs.
* @throws ParseException If unable to parse the file at
* {@code calciteModelPath}.
*/
public ModelParser(Configuration configuration, String calciteModelPath) throws IOException, ParseException {
this.configuration = configuration;
FileReader fr = new FileReader(calciteModelPath);
Object obj = new JSONParser().parse(fr);

this.json = (JSONObject) obj;
}

public Configuration setProperties() {
Expand Down
Loading
Loading