Skip to content

Commit 8b44f3b

Browse files
authored
Add !check command to Hoptimator CLI (#21)
* Added check command * Added check integration test
1 parent a4a34cd commit 8b44f3b

File tree

3 files changed

+141
-0
lines changed

3 files changed

+141
-0
lines changed

etc/integration-tests.sql

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ SELECT * FROM DATAGEN.COMPANY;
1010
-- MySQL CDC tables
1111
SELECT * FROM INVENTORY."products_on_hand" LIMIT 1;
1212

13+
-- Test check command
14+
!check not empty SELECT * FROM INVENTORY."products_on_hand";
15+
1316
-- MySQL CDC -> Kafka
1417
SELECT * FROM RAWKAFKA."products" LIMIT 1;
1518

hoptimator-cli/src/main/java/com/linkedin/hoptimator/FlinkIterable.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,23 @@ public Iterator<Row> rowIterator() {
5959
}
6060
}
6161

62+
/* Iterates over the selected field/column only, with a limit set on the number of collected elements */
63+
public <T> Iterable<T> field(int pos, Integer limit) {
64+
if(limit==null) {
65+
return this.field(pos);
66+
}
67+
return new Iterable<T>() {
68+
@Override
69+
public Iterator<T> iterator() {
70+
try {
71+
return datastream().map(r -> r.<T>getFieldAs(pos)).executeAndCollect(limit).iterator();
72+
} catch (Exception e) {
73+
return new ExceptionalIterator<>(e);
74+
}
75+
}
76+
};
77+
}
78+
6279
/** Iterates over the selected field/column only. */
6380
public <T> Iterable<T> field(int pos) {
6481
return new Iterable<T>() {

hoptimator-cli/src/main/java/com/linkedin/hoptimator/HoptimatorCliApp.java

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ protected int run(String[] args) throws IOException {
4949
commandHandlers.add(new PipelineCommandHandler());
5050
commandHandlers.add(new IntroCommandHandler());
5151
commandHandlers.add(new InsertCommandHandler());
52+
commandHandlers.add(new TestCommandHandler());
5253
sqlline.updateCommandHandlers(commandHandlers);
5354
return sqlline.begin(args, null, true).ordinal();
5455
}
@@ -266,6 +267,126 @@ public boolean echoToFile() {
266267
}
267268
}
268269

270+
private class TestCommandHandler implements CommandHandler {
271+
272+
@Override
273+
public String getName() {
274+
return "check";
275+
}
276+
277+
@Override
278+
public List<String> getNames() {
279+
return Collections.singletonList(getName());
280+
}
281+
282+
@Override
283+
public String getHelpText() {
284+
return "Usage: !check <value> <query>, !check empty <query>, !check not empty <query>";
285+
}
286+
287+
@Override
288+
public String matches(String line) {
289+
String sql = line;
290+
if (sql.startsWith(SqlLine.COMMAND_PREFIX)) {
291+
sql = sql.substring(1);
292+
}
293+
294+
if (sql.startsWith("check")) {
295+
sql = sql.substring("check".length() + 1);
296+
return sql;
297+
}
298+
299+
return null;
300+
}
301+
302+
@Override
303+
public void execute(String line, DispatchCallback dispatchCallback) {
304+
String sql = line;
305+
if (sql.startsWith(SqlLine.COMMAND_PREFIX)) {
306+
sql = sql.substring(1);
307+
}
308+
309+
if (sql.startsWith("check")) {
310+
sql = sql.substring("check".length() + 1);
311+
}
312+
313+
//remove semicolon from query if present
314+
if (sql.length() > 0 && sql.charAt(sql.length() - 1) == ';') {
315+
sql = sql.substring(0, sql.length() - 1);
316+
}
317+
318+
String connectionUrl = sqlline.getConnectionMetadata().getUrl();
319+
try {
320+
String[] type = sql.split(" ", 2);
321+
if(type.length < 2) {
322+
throw new IllegalArgumentException("Invalid usage"); //TODO: expand
323+
}
324+
325+
String value = null;
326+
String query = null;
327+
328+
String checkType=type[0];
329+
switch (checkType) {
330+
case "not":
331+
query = type[1].split(" ", 2)[1].trim();
332+
break;
333+
case "empty":
334+
query = type[1].trim();
335+
break;
336+
case "value":
337+
String[] valueQuery = type[1].split(" ", 2);
338+
value = valueQuery[0].trim();
339+
query = valueQuery[1].trim();
340+
break;
341+
default:
342+
throw new IllegalArgumentException("Expected one of 'not', 'empty', or 'value'");
343+
}
344+
345+
HoptimatorPlanner planner = HoptimatorPlanner.fromModelFile(connectionUrl, new Properties());
346+
PipelineRel plan = planner.pipeline(query);
347+
PipelineRel.Implementor impl = new PipelineRel.Implementor(plan);
348+
String pipelineSql = impl.query();
349+
FlinkIterable iterable = new FlinkIterable(pipelineSql);
350+
Iterator<String> iter = iterable.<String>field(0, 1).iterator();
351+
switch(checkType) {
352+
case "not":
353+
if (!iter.hasNext()) {
354+
throw new IllegalArgumentException("Expected >0 rows from query result");
355+
}
356+
break;
357+
case "empty":
358+
if (iter.hasNext()) {
359+
throw new IllegalArgumentException("Expected 0 rows from query result");
360+
}
361+
break;
362+
case "value":
363+
while (iter.hasNext()) {
364+
if(String.valueOf(iter.next()).contains(value)) {
365+
break;
366+
}
367+
}
368+
throw new IllegalArgumentException("Query result did not contain expected value");
369+
}
370+
sqlline.output("PASS");
371+
dispatchCallback.setToSuccess();
372+
} catch (Exception e) {
373+
sqlline.error(e.toString());
374+
e.printStackTrace();
375+
dispatchCallback.setToFailure();
376+
}
377+
}
378+
379+
@Override
380+
public List<Completer> getParameterCompleters() {
381+
return Collections.emptyList();
382+
}
383+
384+
@Override
385+
public boolean echoToFile() {
386+
return false;
387+
}
388+
}
389+
269390
private class InsertCommandHandler implements CommandHandler {
270391

271392
@Override

0 commit comments

Comments
 (0)