Skip to content

Commit fed4187

Browse files
mbwhitenielspardon
andcommitted
feat(isthmus): support calcite 1.39.0
Signed-off-by: MBWhite <[email protected]> Update isthmus/src/test/java/io/substrait/isthmus/api/TestIsthmusEndToEnd.java Co-authored-by: Niels Pardon <[email protected]> Update isthmus/src/test/java/io/substrait/isthmus/api/TestIsthmusEndToEnd.java Co-authored-by: Niels Pardon <[email protected]> Update isthmus/src/main/java/io/substrait/isthmus/SubstraitCalciteSchema.java Co-authored-by: Niels Pardon <[email protected]> Update isthmus/src/main/java/io/substrait/isthmus/SubstraitCalciteSchema.java Co-authored-by: Niels Pardon <[email protected]> Update isthmus/src/main/java/io/substrait/isthmus/SubstraitCalciteSchema.java Co-authored-by: Niels Pardon <[email protected]> Update isthmus/src/test/java/io/substrait/isthmus/api/TestIsthmusEndToEnd.java Co-authored-by: Niels Pardon <[email protected]> Update isthmus/src/main/java/io/substrait/isthmus/SubstraitCalciteSchema.java Co-authored-by: Niels Pardon <[email protected]>
1 parent d737232 commit fed4187

File tree

12 files changed

+435
-123
lines changed

12 files changed

+435
-123
lines changed

.editorconfig

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ trim_trailing_whitespace = true
1010
[*.{yaml,yml}]
1111
indent_size = 2
1212

13-
[{**/*.sql,**/OuterReferenceResolver.md,**gradlew.bat,**/*.parquet,**/*.orc}]
13+
[{**/*.sql,**/OuterReferenceResolver.md,**gradlew.bat,**/*.parquet,**/*.orc,**/*.plan}]
1414
charset = unset
1515
end_of_line = unset
1616
insert_final_newline = unset

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ com.github.vlsi.vlsi-release-plugins.version=1.74
1515

1616
# library version
1717
antlr.version=4.13.1
18-
calcite.version=1.38.0
18+
calcite.version=1.39.0
1919
guava.version=32.1.3-jre
2020
immutables.version=2.10.1
2121
jackson.version=2.16.1

isthmus/build.gradle.kts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,15 @@ plugins {
77
id("com.diffplug.spotless") version "6.19.0"
88
id("com.github.johnrengelman.shadow") version "8.1.1"
99
id("com.google.protobuf") version "0.9.4"
10+
id("com.adarshr.test-logger") version "4.0.0"
1011
signing
1112
}
1213

14+
testlogger {
15+
showStandardStreams = false
16+
showFailedStandardStreams = true
17+
}
18+
1319
publishing {
1420
publications {
1521
create<MavenPublication>("maven-publish") {

isthmus/src/main/java/io/substrait/isthmus/SqlConverterBase.java

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import org.apache.calcite.config.CalciteConnectionProperty;
1111
import org.apache.calcite.jdbc.CalciteSchema;
1212
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
13-
import org.apache.calcite.jdbc.LookupCalciteSchema;
1413
import org.apache.calcite.plan.Contexts;
1514
import org.apache.calcite.plan.RelOptCluster;
1615
import org.apache.calcite.plan.RelOptCostImpl;
@@ -23,7 +22,6 @@
2322
import org.apache.calcite.rel.type.RelDataTypeFactory;
2423
import org.apache.calcite.rex.RexBuilder;
2524
import org.apache.calcite.schema.Schema;
26-
import org.apache.calcite.schema.Table;
2725
import org.apache.calcite.schema.impl.AbstractTable;
2826
import org.apache.calcite.sql.SqlNode;
2927
import org.apache.calcite.sql.SqlNodeList;
@@ -95,19 +93,13 @@ Pair<SqlValidator, CalciteCatalogReader> registerCreateTables(List<String> table
9593

9694
Pair<SqlValidator, CalciteCatalogReader> registerCreateTables(
9795
Function<List<String>, NamedStruct> tableLookup) throws SqlParseException {
98-
Function<List<String>, Table> lookup =
99-
id -> {
100-
NamedStruct table = tableLookup.apply(id);
101-
if (table == null) {
102-
return null;
103-
}
104-
return new DefinedTable(
105-
id.get(id.size() - 1),
106-
factory,
107-
TypeConverter.DEFAULT.toCalcite(factory, table.struct(), table.names()));
108-
};
109-
110-
CalciteSchema rootSchema = LookupCalciteSchema.createRootSchema(lookup);
96+
CalciteSchema rootSchema =
97+
SubstraitCalciteSchema.builder()
98+
.withTableLookup(tableLookup)
99+
.withTypeFactory(factory)
100+
.withTypeConverter(TypeConverter.DEFAULT)
101+
.build()
102+
.getRootSchema();
111103
CalciteCatalogReader catalogReader =
112104
new CalciteCatalogReader(rootSchema, List.of(), factory, config);
113105
SqlValidator validator = Validator.create(factory, catalogReader, SqlValidator.Config.DEFAULT);
@@ -153,7 +145,8 @@ protected List<DefinedTable> parseCreateTable(
153145
for (SqlNode node : create.columnList) {
154146
if (!(node instanceof SqlColumnDeclaration)) {
155147
if (node instanceof SqlKeyConstraint) {
156-
// key constraints declarations, like primary key declaration, are valid and should not
148+
// key constraints declarations, like primary key declaration, are valid and
149+
// should not
157150
// result in parse exceptions. Ignore the constraint declaration.
158151
continue;
159152
}
@@ -217,9 +210,6 @@ public DefinedTable(String name, RelDataTypeFactory factory, RelDataType type) {
217210

218211
@Override
219212
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
220-
// if (factory != typeFactory) {
221-
// throw new IllegalStateException("Different type factory than previously used.");
222-
// }
223213
return type;
224214
}
225215

Lines changed: 284 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,284 @@
1+
package io.substrait.isthmus;
2+
3+
import com.google.common.collect.ImmutableSet;
4+
import io.substrait.isthmus.SqlConverterBase.DefinedTable;
5+
import io.substrait.relation.NamedScan;
6+
import io.substrait.relation.Rel;
7+
import io.substrait.relation.RelCopyOnWriteVisitor;
8+
import io.substrait.type.NamedStruct;
9+
import java.util.ArrayList;
10+
import java.util.HashMap;
11+
import java.util.List;
12+
import java.util.Map;
13+
import java.util.Optional;
14+
import java.util.Set;
15+
import java.util.function.Function;
16+
import java.util.stream.Collectors;
17+
import org.apache.calcite.jdbc.CalciteSchema;
18+
import org.apache.calcite.rel.type.RelDataTypeFactory;
19+
import org.apache.calcite.schema.Schema;
20+
import org.apache.calcite.schema.Table;
21+
import org.apache.calcite.schema.impl.AbstractSchema;
22+
import org.apache.calcite.schema.lookup.LikePattern;
23+
import org.apache.calcite.schema.lookup.Lookup;
24+
import org.apache.calcite.schema.lookup.Named;
25+
import org.checkerframework.checker.nullness.qual.Nullable;
26+
27+
/**
28+
* A subclass of the Calcite Schema for creation from a Substrait relation
29+
*
30+
* <p>Implementation note:
31+
*
32+
* <p>The external Isthmus API can take a function that will return the table schema when needed,
33+
* rather than it being available up front.
34+
*
35+
* <p>This was implemented by a special subclass of the Calcite simple schema. Since this was
36+
* changed in Calcite 1.39.0; it failed to work; the protected methods it extended from changed.
37+
*
38+
* <p>The change, ironically, was to support a lazy approach to looking up Calcite schemas. Good -
39+
* *but* the external function in Isthmus is assuming it's going to get called with a fully
40+
* namespaced table name. Which Calcite though sees as being subschemas.
41+
*
42+
* <p>This results in some 'complex' code below to try and map the lazy way Calcite now works and
43+
* maintain the existing Isthmus API
44+
*
45+
* <p>If that Ishtmus API hadn't existed, this code would be a lot simpler! Maybe a case for future
46+
* deprecation.
47+
*/
48+
public class SubstraitCalciteSchema extends AbstractSchema {
49+
50+
private Map<String, Table> tables;
51+
private Function<List<String>, NamedStruct> tableLookup;
52+
53+
private RelDataTypeFactory typeFactory;
54+
private TypeConverter typeConverter;
55+
56+
/**
57+
* Maintain a track of the 'prefix' of this schema... i.e. allows recreation of the fully
58+
* qualified name of this subschema
59+
*/
60+
private List<String> prefix = new ArrayList<>();
61+
62+
protected SubstraitCalciteSchema(Map<String, Table> tables) {
63+
this.tables = tables;
64+
}
65+
66+
protected SubstraitCalciteSchema(
67+
Function<List<String>, NamedStruct> tableLookup,
68+
RelDataTypeFactory typeFactory,
69+
TypeConverter typeConverter) {
70+
this.tableLookup = tableLookup;
71+
this.typeFactory = typeFactory;
72+
this.typeConverter = typeConverter;
73+
}
74+
75+
@Override
76+
protected Map<String, Table> getTableMap() {
77+
return tables;
78+
}
79+
80+
@Override
81+
public Lookup<? extends Schema> subSchemas() {
82+
var defaultLookup = super.subSchemas();
83+
84+
// Note ono the lookups, calcite prefers calling getIgnoreCase() initially
85+
86+
return new Lookup<>() {
87+
88+
@Override
89+
public @Nullable Schema get(String name) {
90+
91+
// before we create the next subschema, we need to check if this
92+
// is actually the final value. i.e. we need to call the lookup
93+
// if it is the final table, we then return null here.
94+
// Calcite sees that, knows there are no more schemas and instead
95+
// calls the tables() look up to get a table name.
96+
var lookupNameList = new ArrayList<String>(prefix);
97+
lookupNameList.add(name);
98+
99+
NamedStruct table = tableLookup.apply(lookupNameList);
100+
if (table != null) {
101+
return null;
102+
}
103+
104+
var scs = new SubstraitCalciteSchema(tableLookup, typeFactory, typeConverter);
105+
scs.prefix = lookupNameList;
106+
return scs;
107+
}
108+
109+
@Override
110+
public @Nullable Named<Schema> getIgnoreCase(String name) {
111+
112+
// before we create the next subschema, we need to check if this
113+
// is actually the final value. i.e. we need to call the lookup
114+
// if it is the final table, we then return null here/
115+
// Calcite sees that there's no more schemas and instead
116+
// calls the tables() lazy look up to get a table name.
117+
var lookupNameList = new ArrayList<String>(prefix);
118+
lookupNameList.add(name);
119+
120+
NamedStruct table = tableLookup.apply(lookupNameList);
121+
if (table != null) {
122+
return null;
123+
}
124+
125+
var scs = new SubstraitCalciteSchema(tableLookup, typeFactory, typeConverter);
126+
scs.prefix = lookupNameList;
127+
return new Named<>(name, scs);
128+
}
129+
130+
@Override
131+
public Set<String> getNames(LikePattern pattern) {
132+
return defaultLookup.getNames(pattern);
133+
}
134+
};
135+
}
136+
137+
@Override
138+
public Lookup<Table> tables() {
139+
if (this.tables != null) {
140+
// If we do have the list of tables already specified, delegate to the super class to return
141+
// those
142+
return super.tables();
143+
}
144+
145+
return new Lookup<Table>() {
146+
147+
@Override
148+
public @Nullable Table get(String name) {
149+
List<String> p = new ArrayList<>(prefix);
150+
p.add(name);
151+
152+
NamedStruct table = tableLookup.apply(p);
153+
if (table == null) {
154+
return null;
155+
}
156+
157+
return new DefinedTable(
158+
name, typeFactory, typeConverter.toCalcite(typeFactory, table.struct(), table.names()));
159+
}
160+
161+
@Override
162+
public @Nullable Named<Table> getIgnoreCase(String name) {
163+
/** Delegate to the noremal lookup */
164+
return new Named<Table>(name, get(name));
165+
}
166+
167+
@Override
168+
public Set<String> getNames(LikePattern pattern) {
169+
return ImmutableSet.of();
170+
}
171+
};
172+
}
173+
174+
/**
175+
* Turn this into a root Calciteschema Choice of settings is based on current isthmus behaviour
176+
*/
177+
public CalciteSchema getRootSchema() {
178+
return CalciteSchema.createRootSchema(false, false, "", this);
179+
}
180+
181+
public static Builder builder() {
182+
return new Builder();
183+
}
184+
185+
/**
186+
* Builder class to assist with creating the CalciteSchema
187+
*
188+
* <p>Can be created from a Rel or a Lookup function
189+
*/
190+
public static class Builder {
191+
192+
private Rel rel;
193+
private RelDataTypeFactory typeFactory;
194+
private TypeConverter typeConverter;
195+
private Function<List<String>, NamedStruct> tableLookup;
196+
197+
public Builder withTableLookup(Function<List<String>, NamedStruct> tableLookup) {
198+
this.tableLookup = tableLookup;
199+
return this;
200+
}
201+
202+
public Builder withTypeFactory(RelDataTypeFactory typeFactory) {
203+
this.typeFactory = typeFactory;
204+
return this;
205+
}
206+
207+
public Builder withTypeConverter(TypeConverter typeConverter) {
208+
this.typeConverter = typeConverter;
209+
return this;
210+
}
211+
212+
public Builder withSubstraitRel(Rel rel) {
213+
this.rel = rel;
214+
return this;
215+
}
216+
217+
public SubstraitCalciteSchema build() {
218+
if (typeConverter == null) {
219+
throw new IllegalArgumentException("TypeConverter must be specified");
220+
}
221+
222+
if (typeFactory == null) {
223+
throw new IllegalArgumentException("TypeFactory must be specified");
224+
}
225+
226+
if (rel != null && tableLookup != null) {
227+
throw new IllegalArgumentException("Specify either 'rel' or 'tableLookup' ");
228+
}
229+
230+
if (rel != null) {
231+
// If there are any named structs within the relation, gather these and convert
232+
// them to a map of tables
233+
// index by name; note that the name of the table is 'un-namespaced' here.
234+
// This was the existing logic so it has not been altered.
235+
Map<List<String>, NamedStruct> tableMap = NamedStructGatherer.gatherTables(rel);
236+
237+
Map<String, Table> tables =
238+
tableMap.entrySet().stream()
239+
.map(
240+
entry -> {
241+
var id = entry.getKey();
242+
var name = id.get(id.size() - 1);
243+
var table = entry.getValue();
244+
var value =
245+
new SqlConverterBase.DefinedTable(
246+
name,
247+
typeFactory,
248+
typeConverter.toCalcite(typeFactory, table.struct(), table.names()));
249+
return Map.entry(name, value);
250+
})
251+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
252+
253+
return new SubstraitCalciteSchema(tables);
254+
} else {
255+
return new SubstraitCalciteSchema(tableLookup, typeFactory, typeConverter);
256+
}
257+
}
258+
}
259+
260+
private static final class NamedStructGatherer extends RelCopyOnWriteVisitor<RuntimeException> {
261+
Map<List<String>, NamedStruct> tableMap;
262+
263+
private NamedStructGatherer() {
264+
super();
265+
this.tableMap = new HashMap<>();
266+
}
267+
268+
public static Map<List<String>, NamedStruct> gatherTables(Rel rel) {
269+
var visitor = new NamedStructGatherer();
270+
rel.accept(visitor);
271+
return visitor.tableMap;
272+
}
273+
274+
@Override
275+
public Optional<Rel> visit(NamedScan namedScan) {
276+
Optional<Rel> result = super.visit(namedScan);
277+
278+
List<String> tableName = namedScan.getNames();
279+
tableMap.put(tableName, namedScan.getInitialSchema());
280+
281+
return result;
282+
}
283+
}
284+
}

0 commit comments

Comments
 (0)