Skip to content

Commit b3c1008

Browse files
authored
Add insert-into command for testing purposes (#14)
* Publish to LinkedInJFrog * Publish hoptimator-cli and hoptimator-operator * Add insert-into command to CLI for testing purposes * Case insensitive regex h/t @ehoner * Fix integration test workflow * Fix sample kafka topic spec
1 parent 8a825df commit b3c1008

File tree

6 files changed

+113
-4
lines changed

6 files changed

+113
-4
lines changed

.github/workflows/integration-tests.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,10 @@ jobs:
3535
run: make deploy-dev-environment
3636
- name: Deploy Hoptimator
3737
run: make deploy
38-
- name: Wait for Readiness
39-
run: kubectl wait pod hoptimator --for condition=Ready --timeout=10m
4038
- name: Deploy Samples
4139
run: make deploy-samples
40+
- name: Wait for Readiness
41+
run: kubectl wait pod hoptimator --for condition=Ready --timeout=10m
4242
- name: Wait for Flink Jobs
4343
run: |
4444
i=0
@@ -48,7 +48,7 @@ jobs:
4848
i=$(($i+1))
4949
echo "No stable Flink jobs after $i tries..."
5050
done
51-
- name: Integration Tests
51+
- name: Run Integration Tests
5252
run: make integration-tests
5353
- name: Capture Cluster State
5454
if: always()

deploy/samples/kafkatopics.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
apiVersion: hoptimator.linkedin.com/v1alpha1
2+
kind: KafkaTopic
3+
metadata:
4+
name: test-sink
5+
spec:
6+
topicName: test-sink
7+
clientOverrides:
8+
bootstrap.servers: my-cluster-kafka-bootstrap.kafka.svc:9092
9+

etc/integration-tests.sql

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,7 @@ SELECT * FROM INVENTORY."products_on_hand" LIMIT 1;
1313
-- MySQL CDC -> Kafka
1414
SELECT * FROM RAWKAFKA."products" LIMIT 1;
1515

16+
-- test insert into command
17+
!insert into RAWKAFKA."test-sink" SELECT AGE AS PAYLOAD, NAME AS KEY FROM DATAGEN.PERSON
18+
SELECT * FROM RAWKAFKA."test-sink" LIMIT 5;
19+

etc/readiness-probe.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@
44
SELECT * FROM DATAGEN.PERSON;
55
SELECT * FROM DATAGEN.COMPANY;
66
SELECT * FROM INVENTORY."products_on_hand" LIMIT 1;
7+
SELECT * FROM RAWKAFKA."test-sink" LIMIT 0;

hoptimator-cli/src/main/java/com/linkedin/hoptimator/HoptimatorCliApp.java

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.ArrayList;
2121
import java.util.Collections;
2222
import java.util.List;
23+
import java.util.Iterator;
2324
import java.util.Scanner;
2425
import java.util.Properties;
2526
import java.io.IOException;
@@ -47,6 +48,7 @@ protected int run(String[] args) throws IOException {
4748
commandHandlers.add(new YamlCommandHandler());
4849
commandHandlers.add(new PipelineCommandHandler());
4950
commandHandlers.add(new IntroCommandHandler());
51+
commandHandlers.add(new InsertCommandHandler());
5052
sqlline.updateCommandHandlers(commandHandlers);
5153
return sqlline.begin(args, null, true).ordinal();
5254
}
@@ -264,6 +266,98 @@ public boolean echoToFile() {
264266
}
265267
}
266268

269+
private class InsertCommandHandler implements CommandHandler {
270+
271+
@Override
272+
public String getName() {
273+
return "insert";
274+
}
275+
276+
@Override
277+
public List<String> getNames() {
278+
return Collections.singletonList(getName());
279+
}
280+
281+
@Override
282+
public String getHelpText() {
283+
return "Run an ephemeral pipeline with an existing sink.";
284+
}
285+
286+
@Override
287+
public String matches(String line) {
288+
String sql = line;
289+
if (sql.startsWith(SqlLine.COMMAND_PREFIX)) {
290+
sql = sql.substring(1);
291+
}
292+
293+
if (sql.startsWith("insert into")) {
294+
sql = sql.substring("insert into".length() + 1);
295+
return sql;
296+
}
297+
298+
return null;
299+
}
300+
301+
@Override
302+
public void execute(String line, DispatchCallback dispatchCallback) {
303+
String sql = line;
304+
if (sql.startsWith(SqlLine.COMMAND_PREFIX)) {
305+
sql = sql.substring(1);
306+
}
307+
308+
if (sql.startsWith("insert into")) {
309+
sql = sql.substring("insert into".length() + 1);
310+
}
311+
312+
String connectionUrl = sqlline.getConnectionMetadata().getUrl();
313+
try {
314+
String[] parts = sql.split("(?i)SELECT"); // case insensitive
315+
if (parts.length != 2) {
316+
throw new IllegalArgumentException("Expected ... SELECT ...");
317+
}
318+
String[] parts2 = parts[0].split("\\.");
319+
if (parts2.length != 2) {
320+
throw new IllegalArgumentException("Expected ... DATABASE.TABLE ...");
321+
}
322+
// TODO unquote correctly
323+
String database = parts2[0].replaceAll("[\\\"']", "").trim();
324+
String table = parts2[1].replaceAll("[\\\"']", "").trim();
325+
String query = parts[1];
326+
327+
HoptimatorPlanner planner = HoptimatorPlanner.fromModelFile(connectionUrl, new Properties());
328+
PipelineRel plan = planner.pipeline("SELECT " + query);
329+
PipelineRel.Implementor impl = new PipelineRel.Implementor(plan);
330+
HopTable sink = planner.database(database).makeTable(table, impl.rowType());
331+
String pipelineSql = impl.insertInto(sink) + "\nSELECT 'SUCCESS';";
332+
FlinkIterable iterable = new FlinkIterable(pipelineSql);
333+
Iterator<String> iter = iterable.<String>field(0).iterator();
334+
if (iter.hasNext()) {
335+
dispatchCallback.setToSuccess();
336+
} else {
337+
throw new IllegalArgumentException("No result from:\n" + pipelineSql);
338+
}
339+
while (iter.hasNext()) {
340+
sqlline.output(iter.next());
341+
}
342+
} catch (Exception e) {
343+
sqlline.error(e.toString());
344+
e.printStackTrace();
345+
dispatchCallback.setToFailure();
346+
}
347+
}
348+
349+
@Override
350+
public List<Completer> getParameterCompleters() {
351+
return Collections.emptyList();
352+
}
353+
354+
@Override
355+
public boolean echoToFile() {
356+
return false;
357+
}
358+
}
359+
360+
267361
private class IntroCommandHandler implements CommandHandler {
268362

269363
@Override

hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRel.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ public String query() {
7878

7979
/** Script ending in INSERT INTO ... */
8080
public String insertInto(HopTable sink) {
81-
return script.insert(sink.database(), sink.name(), relNode).sql(OUTPUT_DIALECT);
81+
return script.database(sink.database()).with(sink)
82+
.insert(sink.database(), sink.name(), relNode).sql(OUTPUT_DIALECT);
8283
}
8384

8485
/** Add any resources, SQL, DDL etc required to access the table. */

0 commit comments

Comments
 (0)