Skip to content
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
c7f0503
Add "International Sales" to data generator
piotrpdev Jun 12, 2025
560916c
Add "CurrencyConverter" UDF code
piotrpdev Jun 12, 2025
981bff5
Add User Defined Function tutorial
piotrpdev Jun 12, 2025
ad2ae44
Make UDF `TEMPORARY`
piotrpdev Jun 13, 2025
c9fa11e
Persist query with UDF to Kafka
piotrpdev Jun 13, 2025
b80b7d2
Add note for Flink Archetype
piotrpdev Jun 13, 2025
073eadc
Change where UDF is mounted
piotrpdev Jun 13, 2025
8f72f9b
Apply suggestions from code review
piotrpdev Jun 17, 2025
ab382c5
Create `flink.version` pom property
piotrpdev Jun 17, 2025
0c7191c
Explain what first kafka-consumer command is doing
piotrpdev Jun 17, 2025
1ce56b3
Move solution after problem
piotrpdev Jun 17, 2025
cf0e74b
Ask for currency symbols
piotrpdev Jun 17, 2025
d43dc83
Change currency `Map` to `enum`
piotrpdev Jun 17, 2025
96ece59
Don't show `pom.xml`
piotrpdev Jun 17, 2025
adb7c80
Don't check Maven artefacts
piotrpdev Jun 17, 2025
4c9829d
Replace all `example` with `streamshub`
piotrpdev Jun 17, 2025
94563f7
Move separator to enum
piotrpdev Jun 18, 2025
676072c
Move parsing to `Currency.fromCurrencyAmount()`
piotrpdev Jun 18, 2025
72d5457
Combine `enum` and `Map` approaches
piotrpdev Jun 18, 2025
3b1d36a
Move all parsing logic to `enum`
piotrpdev Jun 18, 2025
8c3c961
Extract `enum` to separate file
piotrpdev Jun 18, 2025
54d5e90
Update tutorial with new code
piotrpdev Jun 18, 2025
52ff05d
Use latest flink docs everywhere
piotrpdev Jun 18, 2025
4f31fbb
Add UDF tests
piotrpdev Jun 18, 2025
fa5b023
Add `docker-maven-plugin`
piotrpdev Jun 18, 2025
b71320b
Add `FlinkDeployment`
piotrpdev Jun 18, 2025
144f477
Apply suggestions from code review
piotrpdev Jun 20, 2025
b76ef8b
Make tutorial code match example code better
piotrpdev Jun 23, 2025
6210b13
Change query for faster results
piotrpdev Jun 23, 2025
05a02e7
Build and push image in CI
piotrpdev Jun 23, 2025
b400a3b
Apply suggestions from code review
piotrpdev Jun 24, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,6 @@ dependency-reduced-pom.xml

# Rendered Docs
*.html

# Maven Wrapper files
.mvn
685 changes: 685 additions & 0 deletions docs/user-defined-functions/index.md

Large diffs are not rendered by default.

114 changes: 114 additions & 0 deletions tutorials/currency-converter/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.github.streamshub</groupId>
<artifactId>flink-sql-tutorials</artifactId>
<version>0.1.0-SNAPSHOT</version>
</parent>

<artifactId>currency-converter</artifactId>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.release>17</maven.compiler.release>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.junit</groupId>
<artifactId>junit-bom</artifactId>
<version>5.11.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<!-- Optionally: parameterized tests support -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
<version>0.46.0</version>
<configuration>
<images>
<image>
<name>flink-sql-runner-with-${project.artifactId}</name>
<build>
<from>quay.io/streamshub/flink-sql-runner:0.2.0</from>
<assembly>
<descriptorRef>artifact</descriptorRef>
<targetDir>/opt</targetDir>
</assembly>
</build>
</image>
</images>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.github.streamshub.flink.functions.currency-converter</mainClass>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.github.streamshub.flink.enums;

import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

// https://www.unicode.org/charts/nameslist/n_20A0.html
// https://www.iso.org/iso-4217-currency-codes.html
public enum Currency {
EUR("€", "EUR"),
INR("₹", "INR"),
TRY("₺", "TRY"),
THB("฿", "THB"),
UAH("₴", "UAH"),
MNT("₮", "MNT"),
ERR("?", "ERR");

public static final String SEPARATOR = " ";
private static final Map<String, Currency> SYMBOL_TO_CURRENCY = Stream.of(Currency.values())
.collect(Collectors.toMap(Currency::getSymbol, c -> c));

private final String symbol;
private final String isoCode;

Currency(String symbol, String isoCode) {
this.symbol = symbol;
this.isoCode = isoCode;
}

public String getSymbol() {
return symbol;
}

public String getIsoCode() {
return isoCode;
}

public static Currency fromUnicodeAmount(String unicodeAmount) {
String currencySymbol = unicodeAmount.substring(0, 1); // "€100" -> "€"
try {
return SYMBOL_TO_CURRENCY.getOrDefault(currencySymbol, ERR); // "€100" -> EUR
} catch (Exception e) {
return ERR; // "]100" -> ERR
}
}

public String concatIsoCodeToAmount(String amount) {
return amount + SEPARATOR + isoCode; // "100" + EUR -> "100 EUR"
}

public static String unicodeAmountToIsoAmount(String unicodeAmount) {
String trimmedUnicodeAmount = unicodeAmount.trim();

Currency currency = fromUnicodeAmount(trimmedUnicodeAmount); // "€100" -> EUR
String amount = trimmedUnicodeAmount.substring(1); // "€100" -> "100"

return currency.concatIsoCodeToAmount(amount); // "100" + EUR -> "100 EUR"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.github.streamshub.flink.functions;

import org.apache.flink.table.functions.ScalarFunction;

import com.github.streamshub.flink.enums.Currency;

public class CurrencyConverter extends ScalarFunction {
// e.g. unicodeAmount = "€100"
public String eval(String unicodeAmount) {
return Currency.unicodeAmountToIsoAmount(unicodeAmount); // "€100" -> "100 EUR"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.github.streamshub.flink.functions;

import static org.junit.jupiter.api.Assertions.assertEquals;

import org.junit.jupiter.api.Test;

import com.github.streamshub.flink.enums.Currency;

public class CurrencyConverterTest {
public static final String VALID_UNICODE_AMOUNT = " €100 ";
public static final String VALID_ISO_AMOUNT = "100" + Currency.SEPARATOR + Currency.EUR.getIsoCode();

public static final String INVALID_UNICODE_AMOUNT = " ]100 ";
public static final String INVALID_ISO_AMOUNT = "100" + Currency.SEPARATOR + Currency.ERR.getIsoCode();

@Test
public void shouldConvertValidUnicodeAmount() throws Exception {
CurrencyConverter currencyConverter = new CurrencyConverter();

assertEquals(VALID_ISO_AMOUNT, currencyConverter.eval(VALID_UNICODE_AMOUNT));
}

@Test
public void shouldConvertInvalidUnicodeAmount() throws Exception {
CurrencyConverter currencyConverter = new CurrencyConverter();

assertEquals(INVALID_ISO_AMOUNT, currencyConverter.eval(INVALID_UNICODE_AMOUNT));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.github.streamshub.kafka.data.generator.examples.ClickStreamData;
import com.github.streamshub.kafka.data.generator.examples.SalesData;
import com.github.streamshub.kafka.data.generator.examples.InternationalSalesData;

import java.util.Arrays;
import java.util.List;
Expand All @@ -21,6 +22,7 @@ private static Data getDataClass(String dataType) {
switch(dataType) {
case "clickStream" -> data = new ClickStreamData();
case "sales" -> data = new SalesData();
case "internationalSales" -> data = new InternationalSalesData();
default -> throw new RuntimeException("Unknown data type " + dataType);
}
return data;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.github.streamshub.kafka.data.generator.examples;

import com.github.streamshub.kafka.data.generator.Data;
import com.github.streamshub.kafka.data.generator.schema.InternationalSales;
import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificRecord;

import java.util.Random;

public class InternationalSalesData implements Data {
private static final char[] CURRENCY_SYMBOLS = {'€', '₹', '₺', '฿', '₴', '₮'};
private final Random random = new Random();

public String topic() {
return "flink.international.sales.records";
}
public Schema schema() {
return InternationalSales.SCHEMA$;
}

public SpecificRecord generate() {
return InternationalSales.newBuilder()
.setInvoiceId(generateInvoiceId())
.setUserId(generateUserId())
.setProductId(generateProductId())
.setQuantity(generateQuantity())
.setUnitCost(generateUnitCost())
.build();
}
public String generateCsv() {
return String.join(",",
generateInvoiceId(),
generateUserId(),
generateProductId(),
generateQuantity(),
generateUnitCost());
}

private String generateInvoiceId() {
return String.valueOf(Math.abs(random.nextLong()));
}

private String generateUserId() {
return "user-" + Math.abs(random.nextInt(100));
}

private String generateProductId() {
return String.valueOf(Math.abs(random.nextInt(200)));
}

private String generateQuantity() {
return String.valueOf(Math.abs(random.nextInt(3) + 1));
}

private String generateUnitCost() {
char randomCurrencySymbol = CURRENCY_SYMBOLS[random.nextInt(CURRENCY_SYMBOLS.length)];
return randomCurrencySymbol + String.valueOf(Math.abs(random.nextInt(1000) + 1));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"namespace": "com.github.streamshub.kafka.data.generator.schema",
"type": "record",
"name": "InternationalSales",
"fields": [
{"name": "user_id", "type": "string"},
{"name": "product_id", "type": "string"},
{"name": "invoice_id", "type": "string"},
{"name": "quantity", "type": "string"},
{"name": "unit_cost", "type": "string"}
]
}
2 changes: 2 additions & 0 deletions tutorials/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
<kafka.version>3.9.1</kafka.version>
<apicurio-registry.version>2.6.8.Final</apicurio-registry.version>
<avro.version>1.12.0</avro.version>
<flink.version>2.0.0</flink.version>
</properties>

<modules>
<module>data-generator</module>
<module>currency-converter</module>
</modules>

<build>
Expand Down
2 changes: 1 addition & 1 deletion tutorials/recommendation-app/data-generator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ spec:
- name: KAFKA_BOOTSTRAP_SERVERS
value: "my-cluster-kafka-bootstrap.flink.svc:9092"
- name: DATA
value: "clickStream,sales"
value: "clickStream,sales,internationalSales"
- name: USE_APICURIO_REGISTRY
value: "true"
- name: REGISTRY_URL
Expand Down
18 changes: 18 additions & 0 deletions tutorials/user-defined-functions/flink-session-udf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: session-cluster-udf
spec:
image: quay.io/streamshub/flink-sql-runner:0.2.0
flinkVersion: v2_0
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 2
Loading