Skip to content

Commit d864f5c

Browse files
ryannedolanclaude
andcommitted
Add integration test for SqlJob with inline UDF files
Tests that CREATE FUNCTION with CODE option produces a SqlJob containing the files map. The inline Python code is extracted from the CODE option and mapped to a filename derived from the AS clause (e.g., 'my_udf.transform' -> 'my_udf.py'). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 6d2a7ac commit d864f5c

File tree

2 files changed

+52
-0
lines changed

2 files changed

+52
-0
lines changed

hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/TestSqlScripts.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ public void k8sDdlScriptUdfDemo() throws Exception {
2929
run("k8s-ddl-udf-demo.id");
3030
}
3131

32+
@Test
33+
public void k8sDdlScriptUdfFiles() throws Exception {
34+
run("k8s-ddl-udf-files.id");
35+
}
36+
3237
@Test
3338
public void k8sValidationScript() throws Exception {
3439
run("k8s-validation.id");
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
!set outputformat mysql
2+
!use k8s
3+
4+
# Register a Python UDF with inline CODE that should appear in SqlJob files
5+
create function my_py_udf as 'my_udf.transform' language python with (CODE='x = 1');
6+
(0 rows modified)
7+
8+
!update
9+
10+
# UDF validates against real data (opaque placeholder returns null)
11+
select my_py_udf("FIRST_NAME") as val from profile.members;
12+
+-----+
13+
| VAL |
14+
+-----+
15+
| |
16+
| |
17+
| |
18+
+-----+
19+
(3 rows)
20+
21+
!ok
22+
23+
# Pipeline generates SqlJob with files containing the inline CODE
24+
insert into ads.page_views select my_py_udf("FIRST_NAME") as page_urn, "MEMBER_URN" as member_urn from profile.members;
25+
apiVersion: hoptimator.linkedin.com/v1alpha1
26+
kind: SqlJob
27+
metadata:
28+
name: ads-database-pageviews
29+
spec:
30+
dialect: Flink
31+
executionMode: Streaming
32+
sql:
33+
- CREATE FUNCTION IF NOT EXISTS `MY_PY_UDF` AS 'my_udf.transform' LANGUAGE PYTHON
34+
- CREATE DATABASE IF NOT EXISTS `PROFILE` WITH ()
35+
- CREATE TABLE IF NOT EXISTS `PROFILE`.`MEMBERS` (`FIRST_NAME` VARCHAR, `LAST_NAME` VARCHAR, `MEMBER_URN` VARCHAR, `COMPANY_URN` VARCHAR) WITH ('connector'='datagen', 'number-of-rows'='10')
36+
- CREATE DATABASE IF NOT EXISTS `ADS` WITH ()
37+
- CREATE TABLE IF NOT EXISTS `ADS`.`PAGE_VIEWS` (`PAGE_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='blackhole')
38+
- INSERT INTO `ADS`.`PAGE_VIEWS` (`PAGE_URN`, `MEMBER_URN`) SELECT `MY_PY_UDF`(`FIRST_NAME`) AS `PAGE_URN`, `MEMBER_URN` FROM `PROFILE`.`MEMBERS`
39+
files:
40+
{my_udf.py: x = 1}
41+
!specify PAGE_VIEWS
42+
43+
# Clean up
44+
drop function my_py_udf;
45+
(0 rows modified)
46+
47+
!update

0 commit comments

Comments
 (0)