Skip to content

Commit aa5fde1

Browse files
authored
Add runtime failing test jobs (#87)
1 parent 4f06064 commit aa5fde1

File tree

6 files changed

+57
-0
lines changed

6 files changed

+57
-0
lines changed

.github/workflows/build.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ jobs:
117117
compile -c aggregation-query-test-package.json
118118
compile -c aggregation-query-test-iceberg-package.json
119119
compile -c aggregation-query-subscription-test-package.json
120+
compile -c runtime-failing-connector-package.json
121+
compile -c runtime-failing-udf-package.json
120122
121123
- example: udf-jbang
122124
path: user-defined-function/jbang
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
{
2+
"version": "1",
3+
"enabled-engines": ["flink"],
4+
"script": {
5+
"main": "runtime-failing-connector.sqrl"
6+
}
7+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
CREATE TABLE _InputData (
2+
uniqueId STRING NOT NULL,
3+
measure DOUBLE NOT NULL,
4+
partitionId STRING NOT NULL,
5+
event_time TIMESTAMP_LTZ(3) NOT NULL METADATA FROM 'timestamp',
6+
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
7+
) WITH (
8+
'connector' = 'kafka',
9+
'topic' = 'failing-test-source',
10+
'format' = 'json',
11+
'properties.bootstrap.servers' = 'non-existent-broker:9092',
12+
'properties.request.timeout.ms' = '5000', -- Fail after 5sec
13+
'properties.retry.backoff.ms' = '500',
14+
'properties.metadata.max.age.ms' = '1000',
15+
'scan.startup.mode' = 'earliest-offset'
16+
);
17+
18+
_Res := SELECT * FROM _InputData;
19+
20+
EXPORT _Res TO logger.res;
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
{
2+
"version": "1",
3+
"enabled-engines": ["flink"],
4+
"script": {
5+
"main": "runtime-failing-udf.sqrl"
6+
}
7+
}

test-jobs/runtime-failing-udf.sqrl

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
IMPORT udfs.FailingFunction;
2+
3+
Res := SELECT FailingFunction('dummy text') AS text;
4+
5+
EXPORT Res TO logger.res;
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
//DEPS org.apache.flink:flink-table-common:1.19.3
2+
3+
import org.apache.flink.table.functions.FunctionContext;
4+
import org.apache.flink.table.functions.ScalarFunction;
5+
6+
public class FailingFunction extends ScalarFunction {
7+
8+
@Override
9+
public void open(FunctionContext ctx) {
10+
throw new RuntimeException("UDF init failed");
11+
}
12+
13+
public String eval(String s) {
14+
return s;
15+
}
16+
}

0 commit comments

Comments
 (0)