Skip to content

Commit f495841

Browse files
authored
Expand MCP server from #145 (#152)
* MCP extensions * Refactor to make extensible * Better formatting & limit query call * Update query description
1 parent 9cb36dd commit f495841

File tree

9 files changed

+724
-135
lines changed

9 files changed

+724
-135
lines changed

gradle/libs.versions.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ junit = "junit:junit:4.12"
2727
kafka-clients = "org.apache.kafka:kafka-clients:3.2.0"
2828
kubernetes-client = "io.kubernetes:client-java:18.0.0"
2929
kubernetes-extended-client = "io.kubernetes:client-java-extended:18.0.0"
30-
mcp-bom = "io.modelcontextprotocol.sdk:mcp-bom:0.10.0"
30+
mcp = "io.modelcontextprotocol.sdk:mcp:0.10.0"
3131
slf4j-simple = "org.slf4j:slf4j-simple:2.0.11"
3232
slf4j-api = "org.slf4j:slf4j-api:2.0.11"
3333
sqlline = "sqlline:sqlline:1.12.0"

hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java

Lines changed: 68 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package sqlline;
22

3+
import com.linkedin.hoptimator.jdbc.HoptimatorDdlUtils;
34
import java.nio.charset.StandardCharsets;
45
import java.sql.SQLException;
56
import java.util.Arrays;
@@ -13,6 +14,13 @@
1314

1415
import org.apache.calcite.plan.RelOptTable;
1516
import org.apache.calcite.rel.RelRoot;
17+
import org.apache.calcite.schema.SchemaPlus;
18+
import org.apache.calcite.schema.Table;
19+
import org.apache.calcite.sql.SqlKind;
20+
import org.apache.calcite.sql.SqlNode;
21+
import org.apache.calcite.sql.ddl.SqlCreateMaterializedView;
22+
import org.apache.calcite.sql.dialect.CalciteSqlDialect;
23+
import org.apache.calcite.util.Pair;
1624
import org.jline.reader.Completer;
1725

1826
import com.linkedin.hoptimator.Pipeline;
@@ -95,19 +103,47 @@ public void execute(String line, DispatchCallback dispatchCallback) {
95103
}
96104
String sql = split[1];
97105
HoptimatorConnection conn = (HoptimatorConnection) sqlline.getConnection();
106+
Pair<SchemaPlus, Table> schemaSnapshot = null;
107+
String viewName = null;
98108
try {
99-
RelRoot root = HoptimatorDriver.convert(conn, sql).root;
109+
String querySql = sql;
110+
SqlCreateMaterializedView create = null;
111+
SqlNode sqlNode = HoptimatorDriver.parseQuery(conn, sql);
112+
if (sqlNode.getKind().belongsTo(SqlKind.DDL)) {
113+
if (sqlNode instanceof SqlCreateMaterializedView) {
114+
create = (SqlCreateMaterializedView) sqlNode;
115+
final SqlNode q = HoptimatorDdlUtils.renameColumns(create.columnList, create.query);
116+
querySql = q.toSqlString(CalciteSqlDialect.DEFAULT).getSql();
117+
viewName = HoptimatorDdlUtils.viewName(create.name);
118+
} else {
119+
sqlline.error("Unsupported DDL statement: " + sql);
120+
dispatchCallback.setToFailure();
121+
return;
122+
}
123+
}
124+
125+
RelRoot root = HoptimatorDriver.convert(conn, querySql).root;
100126
Properties connectionProperties = conn.connectionProperties();
101127
RelOptTable table = root.rel.getTable();
102128
if (table != null) {
103129
connectionProperties.setProperty(DeploymentService.PIPELINE_OPTION, String.join(".", table.getQualifiedName()));
104130
}
105131
PipelineRel.Implementor plan = DeploymentService.plan(root, conn.materializations(), connectionProperties);
132+
if (create != null) {
133+
schemaSnapshot = HoptimatorDdlUtils.snapshotAndSetSinkSchema(conn.createPrepareContext(),
134+
new HoptimatorDriver.Prepare(conn), plan, create, querySql);
135+
}
106136
sqlline.output(plan.sql(conn).apply(SqlDialect.ANSI));
107137
} catch (SQLException e) {
108138
sqlline.error(e);
109139
dispatchCallback.setToFailure();
110140
}
141+
if (schemaSnapshot != null) {
142+
if (schemaSnapshot.right != null) {
143+
schemaSnapshot.left.add(viewName, schemaSnapshot.right);
144+
}
145+
schemaSnapshot.left.removeTable(viewName);
146+
}
111147
}
112148

113149
@Override
@@ -241,14 +277,37 @@ public void execute(String line, DispatchCallback dispatchCallback) {
241277
}
242278
String sql = split[1];
243279
HoptimatorConnection conn = (HoptimatorConnection) sqlline.getConnection();
244-
RelRoot root = HoptimatorDriver.convert(conn, sql).root;
280+
Pair<SchemaPlus, Table> schemaSnapshot = null;
281+
String viewName = "sink";
245282
try {
283+
String querySql = sql;
284+
SqlCreateMaterializedView create = null;
285+
SqlNode sqlNode = HoptimatorDriver.parseQuery(conn, sql);
286+
if (sqlNode.getKind().belongsTo(SqlKind.DDL)) {
287+
if (sqlNode instanceof SqlCreateMaterializedView) {
288+
create = (SqlCreateMaterializedView) sqlNode;
289+
final SqlNode q = HoptimatorDdlUtils.renameColumns(create.columnList, create.query);
290+
querySql = q.toSqlString(CalciteSqlDialect.DEFAULT).getSql();
291+
viewName = HoptimatorDdlUtils.viewName(create.name);
292+
} else {
293+
sqlline.error("Unsupported DDL statement: " + sql);
294+
dispatchCallback.setToFailure();
295+
return;
296+
}
297+
}
298+
299+
RelRoot root = HoptimatorDriver.convert(conn, querySql).root;
246300
Properties connectionProperties = conn.connectionProperties();
247301
RelOptTable table = root.rel.getTable();
248302
if (table != null) {
249303
connectionProperties.setProperty(DeploymentService.PIPELINE_OPTION, String.join(".", table.getQualifiedName()));
250304
}
251-
Pipeline pipeline = DeploymentService.plan(root, conn.materializations(), connectionProperties).pipeline("sink", conn);
305+
PipelineRel.Implementor plan = DeploymentService.plan(root, conn.materializations(), connectionProperties);
306+
if (create != null) {
307+
schemaSnapshot = HoptimatorDdlUtils.snapshotAndSetSinkSchema(conn.createPrepareContext(),
308+
new HoptimatorDriver.Prepare(conn), plan, create, querySql);
309+
}
310+
Pipeline pipeline = plan.pipeline(viewName, conn);
252311
List<String> specs = new ArrayList<>();
253312
for (Source source : pipeline.sources()) {
254313
specs.addAll(DeploymentService.specify(source, conn));
@@ -260,6 +319,12 @@ public void execute(String line, DispatchCallback dispatchCallback) {
260319
sqlline.error(e);
261320
dispatchCallback.setToFailure();
262321
}
322+
if (schemaSnapshot != null) {
323+
if (schemaSnapshot.right != null) {
324+
schemaSnapshot.left.add(viewName, schemaSnapshot.right);
325+
}
326+
schemaSnapshot.left.removeTable(viewName);
327+
}
263328
}
264329

265330
@Override

hoptimator-cli/src/main/resources/intro.txt

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ Try:
44
> !tables
55
> !schemas
66
> create view foo as select * from ads.ad_clicks natural join profile.members;
7-
> !specify select * from foo
8-
> !pipeline select * from foo
9-
10-
7+
> !specify create view foo as select * from ads.ad_clicks natural join profile.members;
8+
> !pipeline create view foo as select * from ads.ad_clicks natural join profile.members;
9+
> !resolve ADS.AD_CLICKS

hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java

Lines changed: 14 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -30,45 +30,30 @@
3030
import java.io.Reader;
3131
import java.sql.SQLException;
3232
import java.util.ArrayList;
33-
import java.util.Arrays;
3433
import java.util.Collection;
35-
import java.util.Collections;
3634
import java.util.List;
37-
import java.util.Objects;
3835
import java.util.Properties;
3936
import org.apache.calcite.jdbc.CalcitePrepare;
4037
import org.apache.calcite.jdbc.CalciteSchema;
4138
import org.apache.calcite.rel.RelRoot;
42-
import org.apache.calcite.rel.type.RelDataType;
43-
import org.apache.calcite.rel.type.RelDataTypeFactory;
44-
import org.apache.calcite.rel.type.RelDataTypeImpl;
45-
import org.apache.calcite.rel.type.RelDataTypeSystem;
46-
import org.apache.calcite.rel.type.RelProtoDataType;
4739
import org.apache.calcite.schema.Function;
4840
import org.apache.calcite.schema.SchemaPlus;
4941
import org.apache.calcite.schema.Table;
5042
import org.apache.calcite.schema.impl.ViewTable;
5143
import org.apache.calcite.server.DdlExecutor;
5244
import org.apache.calcite.server.ServerDdlExecutor;
53-
import org.apache.calcite.sql.SqlCall;
54-
import org.apache.calcite.sql.SqlIdentifier;
5545
import org.apache.calcite.sql.SqlKind;
5646
import org.apache.calcite.sql.SqlNode;
57-
import org.apache.calcite.sql.SqlNodeList;
58-
import org.apache.calcite.sql.SqlSelect;
5947
import org.apache.calcite.sql.ddl.SqlCreateMaterializedView;
6048
import org.apache.calcite.sql.ddl.SqlCreateView;
6149
import org.apache.calcite.sql.ddl.SqlDropObject;
6250
import org.apache.calcite.sql.dialect.CalciteSqlDialect;
63-
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
6451
import org.apache.calcite.sql.parser.SqlAbstractParserImpl;
6552
import org.apache.calcite.sql.parser.SqlParserImplFactory;
6653
import org.apache.calcite.sql.parser.SqlParserPos;
6754
import org.apache.calcite.sql.parser.ddl.SqlDdlParserImpl;
68-
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
6955
import org.apache.calcite.sql.validate.SqlConformanceEnum;
7056
import org.apache.calcite.util.Pair;
71-
import org.apache.calcite.util.Util;
7257

7358

7459
public final class HoptimatorDdlExecutor extends ServerDdlExecutor {
@@ -107,7 +92,7 @@ public void execute(SqlCreateView create, CalcitePrepare.Context context) {
10792
throw new DdlException(create, e.getMessage(), e);
10893
}
10994

110-
final Pair<CalciteSchema, String> pair = schema(context, true, create.name);
95+
final Pair<CalciteSchema, String> pair = HoptimatorDdlUtils.schema(context, true, create.name);
11196
if (pair.left == null) {
11297
throw new DdlException(create, "Schema for " + create.name + " not found.");
11398
}
@@ -126,15 +111,13 @@ public void execute(SqlCreateView create, CalcitePrepare.Context context) {
126111
}
127112
}
128113

129-
final SqlNode q = renameColumns(create.columnList, create.query);
114+
final SqlNode q = HoptimatorDdlUtils.renameColumns(create.columnList, create.query);
130115
final String sql = q.toSqlString(CalciteSqlDialect.DEFAULT).getSql();
131116
List<String> schemaPath = pair.left.path(null);
132117
String viewName = pair.right;
133118
List<String> viewPath = new ArrayList<>(schemaPath);
134119
viewPath.add(viewName);
135-
CalcitePrepare.AnalyzeViewResult analyzed = HoptimatorDriver.analyzeView(connection, sql);
136-
RelProtoDataType protoType = RelDataTypeImpl.proto(analyzed.rowType);
137-
ViewTable viewTable = new ViewTable(Object.class, protoType, sql, schemaPath, viewPath);
120+
ViewTable viewTable = HoptimatorDdlUtils.viewTable(context, sql, new HoptimatorDriver.Prepare(connection), schemaPath, viewPath);
138121
View view = new View(viewPath, sql);
139122
logger.info("Validated sql statement. The view is named {} and has path {}",
140123
viewName, viewPath);
@@ -177,7 +160,7 @@ public void execute(SqlCreateMaterializedView create, CalcitePrepare.Context con
177160
throw new DdlException(create, e.getMessage(), e);
178161
}
179162

180-
final Pair<CalciteSchema, String> pair = schema(context, true, create.name);
163+
final Pair<CalciteSchema, String> pair = HoptimatorDdlUtils.schema(context, true, create.name);
181164
if (pair.left == null) {
182165
throw new DdlException(create, "Schema for " + create.name + " not found.");
183166
}
@@ -201,7 +184,7 @@ public void execute(SqlCreateMaterializedView create, CalcitePrepare.Context con
201184
}
202185
}
203186

204-
final SqlNode q = renameColumns(create.columnList, create.query);
187+
final SqlNode q = HoptimatorDdlUtils.renameColumns(create.columnList, create.query);
205188
final String sql = q.toSqlString(CalciteSqlDialect.DEFAULT).getSql();
206189
final List<String> schemaPath = pair.left.path(null);
207190

@@ -212,21 +195,13 @@ public void execute(SqlCreateMaterializedView create, CalcitePrepare.Context con
212195
List<String> viewPath = new ArrayList<>(schemaPath);
213196
viewPath.add(viewName);
214197

215-
Table currentViewTable = schemaPlus.getTable(viewName);
198+
Pair<SchemaPlus, Table> schemaSnapshot = null;
216199
try {
217200
if (!(pair.left.schema instanceof Database)) {
218201
throw new DdlException(create, schemaName + " is not a physical database.");
219202
}
220203
String database = ((Database) pair.left.schema).databaseName();
221204

222-
// Table does not exist. Create it.
223-
RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
224-
CalcitePrepare.AnalyzeViewResult analyzed = HoptimatorDriver.analyzeView(connection, sql);
225-
RelProtoDataType protoType = RelDataTypeImpl.proto(analyzed.rowType);
226-
ViewTable viewTable = new ViewTable(Object.class, protoType, sql, schemaPath, viewPath);
227-
MaterializedViewTable materializedViewTable = new MaterializedViewTable(viewTable);
228-
RelDataType viewRowType = materializedViewTable.getRowType(typeFactory);
229-
230205
logger.info("Validated sql statement. The view is named {} and has path {}",
231206
viewName, viewPath);
232207

@@ -241,26 +216,11 @@ public void execute(SqlCreateMaterializedView create, CalcitePrepare.Context con
241216
logger.info("Pipeline name for view {} is {}", viewName, pipelineName);
242217
Properties connectionProperties = connection.connectionProperties();
243218
connectionProperties.setProperty(DeploymentService.PIPELINE_OPTION, pipelineName);
244-
List<String> sinkPath = new ArrayList<>(schemaPath);
245-
sinkPath.add(sinkName);
246-
Table sink = schemaPlus.getTable(sinkName);
247-
248-
final RelDataType rowType;
249-
if (sink != null) {
250-
// For "partial views", the sink may already exist. Use the existing row type.
251-
rowType = sink.getRowType(typeFactory);
252-
} else {
253-
// For normal views, we create the sink based on the view row type.
254-
rowType = viewRowType;
255-
}
256219

257220
// Plan a pipeline to materialize the view.
258221
RelRoot root = new HoptimatorDriver.Prepare(connection).convert(context, sql).root;
259222
PipelineRel.Implementor plan = DeploymentService.plan(root, connection.materializations(), connectionProperties);
260-
plan.setSink(database, sinkPath, rowType, Collections.emptyMap());
261-
262-
// Need to add the view table to the connection so that the ConnectorService can find it when resolving options.
263-
schemaPlus.add(viewName, materializedViewTable);
223+
schemaSnapshot = HoptimatorDdlUtils.snapshotAndSetSinkSchema(context, new HoptimatorDriver.Prepare(connection), plan, sql, pair);
264224
logger.info("Added view {} to schema {}", viewName, schemaPlus.getName());
265225
Pipeline pipeline = plan.pipeline(viewName, connection);
266226
MaterializedView hook = new MaterializedView(database, viewPath, sql, pipeline.job().sql(), pipeline);
@@ -282,10 +242,12 @@ public void execute(SqlCreateMaterializedView create, CalcitePrepare.Context con
282242
if (deployers != null) {
283243
DeploymentService.restore(deployers);
284244
}
285-
if (currentViewTable == null) {
286-
schemaPlus.removeTable(viewName);
287-
} else {
288-
schemaPlus.add(viewName, currentViewTable);
245+
if (schemaSnapshot != null) {
246+
if (schemaSnapshot.right == null) {
247+
schemaSnapshot.left.removeTable(viewName);
248+
} else {
249+
schemaPlus.add(viewName, schemaSnapshot.right);
250+
}
289251
}
290252
throw new DdlException(create, e.getMessage(), e);
291253
}
@@ -304,7 +266,7 @@ public void execute(SqlDropObject drop, CalcitePrepare.Context context) {
304266
return;
305267
}
306268

307-
final Pair<CalciteSchema, String> pair = schema(context, false, drop.name);
269+
final Pair<CalciteSchema, String> pair = HoptimatorDdlUtils.schema(context, false, drop.name);
308270
String viewName = pair.right;
309271

310272
SchemaPlus schemaPlus = pair.left.plus();
@@ -344,41 +306,6 @@ public void execute(SqlDropObject drop, CalcitePrepare.Context context) {
344306
schemaPlus.removeTable(viewName);
345307
}
346308

347-
// N.B. copy-pasted from Apache Calcite
348-
349-
/** Returns the schema in which to create an object. */
350-
static Pair<CalciteSchema, String> schema(CalcitePrepare.Context context, boolean mutable, SqlIdentifier id) {
351-
final String name;
352-
final List<String> path;
353-
if (id.isSimple()) {
354-
path = context.getDefaultSchemaPath();
355-
name = id.getSimple();
356-
} else {
357-
path = Util.skipLast(id.names);
358-
name = Util.last(id.names);
359-
}
360-
CalciteSchema schema = mutable ? context.getMutableRootSchema() : context.getRootSchema();
361-
for (String p : path) {
362-
schema = Objects.requireNonNull(schema).getSubSchema(p, true);
363-
}
364-
return Pair.of(schema, name);
365-
}
366-
367-
// N.B. copy-pasted from Apache Calcite
368-
369-
/** Wraps a query to rename its columns. Used by CREATE VIEW and CREATE
370-
* MATERIALIZED VIEW. */
371-
static SqlNode renameColumns(SqlNodeList columnList, SqlNode query) {
372-
if (columnList == null) {
373-
return query;
374-
}
375-
final SqlParserPos p = query.getParserPosition();
376-
final SqlNodeList selectList = SqlNodeList.SINGLETON_STAR;
377-
final SqlCall from = SqlStdOperatorTable.AS.createCall(p,
378-
Arrays.asList(query, new SqlIdentifier("_", p), columnList));
379-
return new SqlSelect(p, null, selectList, from, null, null, null, null, null, null, null, null, null);
380-
}
381-
382309
/** Unchecked exception related to a DDL statement. */
383310
static public class DdlException extends RuntimeException {
384311

0 commit comments

Comments
 (0)