Skip to content

Commit e22f6ed

Browse files
author
zhuanglw
committed
add test
1 parent 30515f3 commit e22f6ed

File tree

6 files changed

+325
-0
lines changed

6 files changed

+325
-0
lines changed

dinky-admin/openlineage.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#serverUrl: "http://192.168.0.84:5000"
2+
transport:
3+
# type: console
4+
type: "http"
5+
url: "http://hadoop804:5000/api/v1/lineage"
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package org.apache.flink.table.test;
2+
3+
import org.apache.flink.table.test.utils.TestUtil;
4+
import org.junit.Ignore;
5+
import org.junit.Test;
6+
7+
/**
8+
* Author: lwjhn
9+
* Date: 2023/10/19 14:05
10+
* Description:
11+
*/
12+
13+
@Ignore
14+
public class TestDemo1UDF {
15+
@Test
16+
public void test1() {
17+
TestUtil.executeSql("./sql/TestDemoUnnest.sql");
18+
}
19+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package org.apache.flink.table.test.utils;
2+
3+
import java.util.regex.Pattern;
4+
5+
/**
6+
* Author: lwjhn
7+
* Date: 2024/6/15 10:21
8+
* Description:
9+
*/
10+
public enum SqlType {
11+
SELECT("SELECT", "^SELECT.*"),
12+
13+
CREATE("CREATE", "^CREATE(?!\\s+TABLE.*AS SELECT).*$"),
14+
15+
DROP("DROP", "^DROP.*"),
16+
17+
ALTER("ALTER", "^ALTER.*"),
18+
19+
INSERT("INSERT", "^INSERT.*"),
20+
21+
DESC("DESC", "^DESC.*"),
22+
23+
DESCRIBE("DESCRIBE", "^DESCRIBE.*"),
24+
25+
EXPLAIN("EXPLAIN", "^EXPLAIN.*"),
26+
27+
USE("USE", "^USE.*"),
28+
29+
SHOW("SHOW", "^SHOW.*"),
30+
31+
LOAD("LOAD", "^LOAD.*"),
32+
33+
UNLOAD("UNLOAD", "^UNLOAD.*"),
34+
35+
SET("SET", "^SET.*"),
36+
37+
RESET("RESET", "^RESET.*"),
38+
39+
EXECUTE("EXECUTE", "^EXECUTE.*"),
40+
41+
ADD_JAR("ADD_JAR", "^ADD\\s+JAR\\s+\\S+"),
42+
43+
ADD("ADD", "^ADD\\s+CUSTOMJAR\\s+\\S+"),
44+
45+
ADD_FILE("ADD_FILE", "^ADD\\s+FILE\\s+\\S+"),
46+
47+
PRINT("PRINT", "^PRINT.*"),
48+
49+
CTAS("CTAS", "^CREATE\\s.*AS\\sSELECT.*$"),
50+
51+
WITH("WITH", "^WITH.*"),
52+
53+
UNKNOWN("UNKNOWN", "^UNKNOWN.*");
54+
55+
private String type;
56+
private Pattern pattern;
57+
58+
SqlType(String type, String regexp) {
59+
this.type = type;
60+
this.pattern = Pattern.compile(regexp, Pattern.CASE_INSENSITIVE | Pattern.DOTALL);
61+
}
62+
63+
public void setType(String type) {
64+
this.type = type;
65+
}
66+
67+
public String getType() {
68+
return type;
69+
}
70+
71+
public boolean match(String statement) {
72+
return pattern.matcher(statement).matches();
73+
}
74+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package org.apache.flink.table.test.utils;
2+
3+
import org.apache.commons.lang3.StringUtils;
4+
5+
import java.util.Map;
6+
import java.util.regex.Pattern;
7+
8+
/**
9+
* Author: lwjhn
10+
* Date: 2024/5/30 11:30
11+
* Description:
12+
*/
13+
public class SqlUtil {
14+
15+
private static final String SEMICOLON = ";";
16+
private static final String SQL_SEPARATOR = ";\\s*(?:\\n|--.*)";
17+
18+
private SqlUtil() {}
19+
20+
public static String[] getStatements(String sql) {
21+
return getStatements(sql, SQL_SEPARATOR);
22+
}
23+
24+
public static String[] getStatements(String sql, String sqlSeparator) {
25+
if (StringUtils.isBlank(sql)) {
26+
return new String[0];
27+
}
28+
29+
final String localSqlSeparator = ";\\s*(?:\\n|--.*)";
30+
String[] splits = sql.replace("\r\n", "\n").split(localSqlSeparator);
31+
String lastStatement = splits[splits.length - 1].trim();
32+
if (lastStatement.endsWith(SEMICOLON)) {
33+
splits[splits.length - 1] = lastStatement.substring(0, lastStatement.length() - 1);
34+
}
35+
36+
return splits;
37+
}
38+
39+
public static String removeNote(String sql) {
40+
41+
if (StringUtils.isNotBlank(sql)) {
42+
// Remove the special-space characters
43+
sql = sql.replaceAll("\u00A0", " ").replaceAll("[\r\n]+", "\n");
44+
// Remove annotations Support '--aa' , '/**aaa*/' , '//aa' , '#aaa'
45+
Pattern p = Pattern.compile("(?ms)('(?:''|[^'])*')|--.*?$|/\\*[^+].*?\\*/|");
46+
String presult = p.matcher(sql).replaceAll("$1");
47+
return presult.trim();
48+
}
49+
return sql;
50+
}
51+
52+
public static String[] preparedStatement(String sql) {
53+
return getStatements(removeNote(sql));
54+
}
55+
56+
/*
57+
public static String replaceAllParam(String sql, String name, String value) {
58+
return sql.replaceAll("#\\{" + name + "\\}", value);
59+
}*/
60+
61+
public static String replaceAllParam(String sql, Map<String, String> values) {
62+
if (StringUtils.isBlank(sql)) {
63+
return "";
64+
}
65+
for (Map.Entry<String, String> entry : values.entrySet()) {
66+
sql = replaceAllParam(sql, entry.getKey(), entry.getValue());
67+
}
68+
return sql;
69+
}
70+
71+
public static String replaceAllParam(String sql, String name, String value) {
72+
return sql.replaceAll("\\$\\{" + name + "}", value);
73+
}
74+
75+
76+
public static String addLineNumber(String input) {
77+
String[] lines = input.split("\n");
78+
StringBuilder sb = new StringBuilder();
79+
for (int i = 0; i < lines.length; i++) {
80+
sb.append(String.format("%-4d", i + 1));
81+
sb.append(" ");
82+
sb.append(lines[i]);
83+
sb.append("\n");
84+
}
85+
return sb.toString();
86+
}
87+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package org.apache.flink.table.test.utils;
2+
3+
import org.apache.commons.io.IOUtils;
4+
import org.apache.commons.lang3.StringUtils;
5+
import org.apache.flink.streaming.api.CheckpointingMode;
6+
import org.apache.flink.streaming.api.environment.CheckpointConfig;
7+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
8+
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
9+
10+
import java.io.IOException;
11+
import java.io.InputStream;
12+
import java.nio.charset.StandardCharsets;
13+
import java.util.concurrent.CountDownLatch;
14+
15+
/**
16+
* Author: lwjhn
17+
* Date: 2024/5/30 10:11
18+
* Description:
19+
*/
20+
public class TestUtil {
21+
public static StreamExecutionEnvironment getLocalExecutionEnvironment(){
22+
// StreamTableEnvironment tEnv = StreamTableEnvironment.create(StreamExecutionEnvironment.createLocalEnvironment());
23+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
24+
// "decimal.handling.mode", "string"
25+
env.enableCheckpointing(10000);
26+
// 设置模式为exactly-once (这是默认值)
27+
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
28+
// 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
29+
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
30+
// 检查点必须在10分钟内完成,或者被丢弃【checkpoint的超时时间】
31+
env.getCheckpointConfig().setCheckpointTimeout(600000);
32+
// 同一时间只允许进行一个检查点
33+
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
34+
// cancel后,保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
35+
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
36+
// scan.incremental.snapshot.enabled
37+
env.getCheckpointConfig().setCheckpointStorage("file:///dist/test/checkpoint");
38+
return env;
39+
}
40+
41+
public static StreamTableEnvironment getLocalStreamTableEnvironment(){
42+
return StreamTableEnvironment.create(getLocalExecutionEnvironment());
43+
}
44+
45+
public static String loadFile(String path){
46+
try (InputStream inputStream =
47+
Thread.currentThread().getContextClassLoader().getResourceAsStream(path)){
48+
assert inputStream != null;
49+
return IOUtils.toString(inputStream, StandardCharsets.UTF_8);
50+
/*
51+
String text = new BufferedReader(
52+
new InputStreamReader(Objects.requireNonNull(TestUtil.class.getResourceAsStream(path)), StandardCharsets.UTF_8))
53+
.lines()
54+
.collect(Collectors.joining("\n"));
55+
*/
56+
} catch (IOException e) {
57+
throw new RuntimeException(e);
58+
}
59+
60+
}
61+
62+
public static String[] getStatements(String path){
63+
return SqlUtil.preparedStatement(loadFile(path));
64+
}
65+
66+
public static void executeSql(String path) {
67+
StreamTableEnvironment environment = getLocalStreamTableEnvironment();
68+
String[] statements = TestUtil.getStatements(path);
69+
for(String statement : statements){
70+
if(StringUtils.isBlank(statement=statement.trim()))
71+
continue;
72+
System.out.printf("%nFlink SQL [%d]> %s;%n", System.currentTimeMillis(), statement);
73+
environment.executeSql(statement).print();
74+
}
75+
System.out.printf("[%d] END...%n", System.currentTimeMillis());
76+
try {
77+
new CountDownLatch(1).await();
78+
} catch (InterruptedException e) {
79+
throw new RuntimeException(e);
80+
}
81+
}
82+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
CREATE TABLE json_table (
2+
total BIGINT,
3+
`data` ARRAY<ROW<
4+
deal_date STRING,
5+
close_date STRING,
6+
card_no STRING,
7+
deal_value STRING,
8+
deal_type STRING,
9+
company_name STRING,
10+
car_no STRING,
11+
station STRING,
12+
conn_mark STRING,
13+
deal_money STRING,
14+
equ_no STRING
15+
>>,
16+
`page` BIGINT,
17+
`rows` BIGINT
18+
)
19+
WITH
20+
(
21+
'connector' = 'filesystem',
22+
'path' = 'hdfs:///userFiles/U0000001/2018record3.jsons',
23+
'format' = 'json'
24+
);
25+
26+
CREATE TABLE `default_catalog`.`default_database`.szt_data1 WITH (
27+
'connector' = 'blackhole'
28+
) AS
29+
SELECT
30+
total,
31+
data_row.d1,
32+
data_row.close_date,
33+
data_row.card_no1,
34+
data_row.deal_value,
35+
data_row.deal_type,
36+
data_row.company_name,
37+
data_row.car_no,
38+
data_row.station,
39+
data_row.conn_mark,
40+
data_row.deal_money,
41+
data_row.equ_no,
42+
page,
43+
`rows`
44+
FROM
45+
json_table,
46+
UNNEST(json_table.data) AS data_row (
47+
d1,
48+
close_date,
49+
card_no1,
50+
deal_value,
51+
deal_type,
52+
company_name,
53+
car_no,
54+
station,
55+
conn_mark,
56+
deal_money,
57+
equ_no
58+
);

0 commit comments

Comments
 (0)