Skip to content

Commit 89c8a9d

Browse files
authored
Add ResolvedTable (#151)
1 parent 814c3f5 commit 89c8a9d

File tree

4 files changed

+135
-0
lines changed

4 files changed

+135
-0
lines changed

hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.nio.charset.StandardCharsets;
44
import java.sql.SQLException;
5+
import java.util.Arrays;
56
import java.util.ArrayList;
67
import java.util.Collection;
78
import java.util.Collections;
@@ -19,6 +20,7 @@
1920
import com.linkedin.hoptimator.SqlDialect;
2021
import com.linkedin.hoptimator.jdbc.HoptimatorConnection;
2122
import com.linkedin.hoptimator.jdbc.HoptimatorDriver;
23+
import com.linkedin.hoptimator.jdbc.ResolvedTable;
2224
import com.linkedin.hoptimator.util.DeploymentService;
2325
import com.linkedin.hoptimator.util.planner.PipelineRel;
2426

@@ -41,6 +43,7 @@ public Collection<CommandHandler> getCommandHandlers(SqlLine sqlline) {
4143
Collection<CommandHandler> list = new ArrayList<>(super.getCommandHandlers(sqlline));
4244
list.add(new IntroCommandHandler(sqlline));
4345
list.add(new PipelineCommandHandler(sqlline));
46+
list.add(new ResolveCommandHandler(sqlline));
4447
list.add(new SpecifyCommandHandler(sqlline));
4548
return list;
4649
}
@@ -118,6 +121,75 @@ public boolean echoToFile() {
118121
}
119122
}
120123

124+
private static final class ResolveCommandHandler implements CommandHandler {
125+
126+
private final SqlLine sqlline;
127+
128+
private ResolveCommandHandler(SqlLine sqlline) {
129+
this.sqlline = sqlline;
130+
}
131+
132+
@Override
133+
public String getName() {
134+
return "resolve";
135+
}
136+
137+
@Override
138+
public List<String> getNames() {
139+
return Collections.singletonList(getName());
140+
}
141+
142+
@Override
143+
public String getHelpText() {
144+
return "Resolve a table path.";
145+
}
146+
147+
@Override
148+
public String matches(String line) {
149+
if (startsWith(line, "!resolve") || startsWith(line, "resolve")) {
150+
return line;
151+
} else {
152+
return null;
153+
}
154+
}
155+
156+
@Override
157+
public void execute(String line, DispatchCallback dispatchCallback) {
158+
if (!(sqlline.getConnection() instanceof HoptimatorConnection)) {
159+
sqlline.error("This connection doesn't support `!resolve`.");
160+
dispatchCallback.setToFailure();
161+
return;
162+
}
163+
String[] split = line.split("\\s+", 2);
164+
if (split.length < 2) {
165+
sqlline.error("Missing argument.");
166+
dispatchCallback.setToFailure();
167+
return;
168+
}
169+
List<String> tablePath = Arrays.asList(split[1].split("\\."));
170+
HoptimatorConnection conn = (HoptimatorConnection) sqlline.getConnection();
171+
try {
172+
ResolvedTable resolved = conn.resolve(tablePath, Collections.emptyMap());
173+
sqlline.output("Avro schema:\n");
174+
sqlline.output(resolved.avroSchemaString());
175+
} catch (SQLException e) {
176+
sqlline.error(e);
177+
dispatchCallback.setToFailure();
178+
}
179+
}
180+
181+
@Override
182+
public List<Completer> getParameterCompleters() {
183+
return Collections.emptyList();
184+
}
185+
186+
@Override
187+
public boolean echoToFile() {
188+
return false;
189+
}
190+
}
191+
192+
121193
private static final class SpecifyCommandHandler implements CommandHandler {
122194

123195
private final SqlLine sqlline;

hoptimator-jdbc/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ plugins {
66

77
dependencies {
88
implementation project(':hoptimator-api')
9+
implementation project(':hoptimator-avro')
910
implementation project(':hoptimator-util')
11+
implementation libs.avro
1012
implementation libs.calcite.core
1113
implementation libs.calcite.server
1214
implementation libs.slf4j.api

hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorConnection.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44
import java.sql.SQLException;
55
import java.sql.Statement;
66
import java.util.ArrayList;
7+
import java.util.Collections;
78
import java.util.List;
9+
import java.util.Locale;
10+
import java.util.Map;
811
import java.util.Properties;
912
import java.util.stream.Collectors;
1013

@@ -13,6 +16,7 @@
1316
import org.apache.calcite.plan.RelOptMaterialization;
1417
import org.apache.calcite.rel.RelNode;
1518

19+
import com.linkedin.hoptimator.avro.AvroConverter;
1620
import com.linkedin.hoptimator.util.DelegatingConnection;
1721

1822

@@ -28,6 +32,26 @@ public HoptimatorConnection(CalciteConnection connection, Properties connectionP
2832
this.connectionProperties = connectionProperties;
2933
}
3034

35+
public ResolvedTable resolve(List<String> tablePath, Map<String, String> hints) throws SQLException {
36+
try {
37+
String tableSql = "SELECT * FROM " + tablePath.stream()
38+
.map(x -> "\"" + x + "\"")
39+
.collect(Collectors.joining("."));
40+
RelNode tableRel = HoptimatorDriver.convert(this, tableSql).root.rel;
41+
String namespace = "com.linkedin."
42+
+ tablePath.stream()
43+
.map(x -> x.toLowerCase(Locale.ROOT))
44+
.limit(tablePath.size() - 1)
45+
.collect(Collectors.joining("."));
46+
String schemaName = tablePath.get(tablePath.size() - 1).toLowerCase(Locale.ROOT);
47+
org.apache.avro.Schema avroSchema = AvroConverter.avro(namespace, schemaName, tableRel.getRowType());
48+
// TODO: generate source and sink configs via ConnectionService
49+
return new ResolvedTable(tablePath, avroSchema, Collections.emptyMap(), Collections.emptyMap());
50+
} catch (Exception e) {
51+
throw new SQLException("Failed to resolve " + String.join(".", tablePath) + ": " + e.getMessage(), e);
52+
}
53+
}
54+
3155
@Override
3256
public Statement createStatement() throws SQLException {
3357
return connection.createStatement();
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package com.linkedin.hoptimator.jdbc;
2+
3+
import java.util.List;
4+
import java.util.Map;
5+
6+
/** Metadata required to access a table in the data plane. */
7+
public class ResolvedTable {
8+
9+
private final List<String> tablePath;
10+
private org.apache.avro.Schema avroSchema;
11+
private final Map<String, String> sourceConnectorConfigs;
12+
private final Map<String, String> sinkConnectorConfigs;
13+
14+
public ResolvedTable(List<String> tablePath, org.apache.avro.Schema avroSchema,
15+
Map<String, String> sourceConnectorConfigs, Map<String, String> sinkConnectorConfigs) {
16+
this.tablePath = tablePath;
17+
this.avroSchema = avroSchema;
18+
this.sourceConnectorConfigs = sourceConnectorConfigs;
19+
this.sinkConnectorConfigs = sinkConnectorConfigs;
20+
}
21+
22+
public Map<String, String> sourceConnectorConfigs() {
23+
return sourceConnectorConfigs;
24+
}
25+
26+
public Map<String, String> sinkConnectorConfigs() {
27+
return sinkConnectorConfigs;
28+
}
29+
30+
public org.apache.avro.Schema avroSchema() {
31+
return avroSchema;
32+
}
33+
34+
public String avroSchemaString() {
35+
return avroSchema.toString(true);
36+
}
37+
}

0 commit comments

Comments
 (0)