Skip to content

Commit 59262ff

Browse files
committed
Flink Tour: Add Java examples
1 parent 6531d0c commit 59262ff

File tree

5 files changed

+148
-0
lines changed

5 files changed

+148
-0
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@ __pycache__
44
_build
55
.coverage*
66
.env
7+
.gradle
78
.DS_Store
89
.meltano
910
.pytest_cache
11+
build
1012
coverage.xml
1113
mlruns/
1214
archive/
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# Apache Flink and CrateDB with Java
2+
3+
Basic examples demonstrating how to read and write from/to
4+
CrateDB when using Apache Flink.
5+
6+
The examples use both the [CrateDB JDBC] and the [PostgreSQL JDBC]
7+
driver. CrateDB JDBC is needed for catalog operations, which are
8+
required when reading from CrateDB using Flink.
9+
10+
```sql
11+
uvx crash -c 'CREATE TABLE person (name STRING, age INT);'
12+
```
13+
```shell
14+
gradle run write
15+
```
16+
```shell
17+
gradle run read
18+
```
19+
20+
21+
[CrateDB JDBC]: https://cratedb.com/docs/guide/connect/java/cratedb-jdbc.html
22+
[PostgreSQL JDBC]: https://cratedb.com/docs/guide/connect/java/postgresql-jdbc.html
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
// https://medium.com/@santoshkotagiri/setting-up-flink-projects-with-gradle-a-step-by-step-guide-4d1ef85f017
2+
apply plugin : 'application'
3+
repositories.mavenCentral()
4+
dependencies {
5+
implementation 'org.apache.flink:flink-clients:1.20.3'
6+
implementation 'org.apache.flink:flink-connector-jdbc:3.2.0-1.19'
7+
implementation 'org.apache.flink:flink-table-planner_2.12:1.20.3'
8+
implementation 'org.apache.flink:flink-table-runtime:1.20.3'
9+
implementation 'org.slf4j:slf4j-simple:1.7.36'
10+
runtimeOnly 'io.crate:crate-jdbc:2.7.0'
11+
runtimeOnly 'org.postgresql:postgresql:42.7.8'
12+
}
13+
java.toolchain.languageVersion = JavaLanguageVersion.of(11)
14+
sourceSets.main.java.srcDirs = ['.']
15+
tasks.register('read') { application.mainClass = 'read' }
16+
tasks.register('write') { application.mainClass = 'write' }
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
// Read from CrateDB using Apache Flink.
2+
// Invoke: gradle run read
3+
// Source: https://github.com/crate/cratedb-examples/blob/main/framework/flink/tour/java/read.java
4+
5+
// https://tacnode.io/docs/guides/ecosystem/bigdata/flink#catalog-registration
6+
// https://github.com/crate/cratedb-flink-jobs/blob/main/src/main/java/io/crate/flink/demo/SimpleTableApiJob.java
7+
// https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/table/jdbc/#jdbc-catalog-for-postgresql
8+
import org.apache.flink.connector.jdbc.catalog.JdbcCatalog;
9+
import org.apache.flink.table.api.*;
10+
11+
import static org.apache.flink.table.api.Expressions.$;
12+
13+
public class read {
14+
15+
public static String CATALOG_NAME = "example_catalog";
16+
17+
public static void main(String[] args) throws Exception {
18+
19+
// Create Flink Table API environment.
20+
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
21+
TableEnvironment env = TableEnvironment.create(settings);
22+
23+
// Define catalog.
24+
// CrateDB only knows a single database called `crate`,
25+
// but you can separate concerns using schemata. The
26+
// default schema is `doc`.
27+
JdbcCatalog catalog = new JdbcCatalog(
28+
CATALOG_NAME,
29+
"crate",
30+
"crate",
31+
"crate",
32+
"jdbc:crate://localhost:5432"
33+
);
34+
35+
// Register catalog and set as default.
36+
env.registerCatalog(CATALOG_NAME, catalog);
37+
env.useCatalog(CATALOG_NAME);
38+
39+
// Invoke query using plain SQL.
40+
// FIXME: Currently does not work with `sys.summits`.
41+
// SqlValidatorException: Object 'sys.summits' not found
42+
env.executeSql("SELECT * FROM `doc.person` LIMIT 3").print();
43+
44+
// Invoke query using DSL.
45+
env.from("`doc.person`")
46+
.select($("name"), $("age"))
47+
.execute()
48+
.print();
49+
}
50+
51+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// Write to CrateDB using Apache Flink.
2+
// Invoke: uvx crash -c 'CREATE TABLE person (name STRING, age INT);'
3+
// Invoke: gradle run write
4+
// Source: https://github.com/crate/cratedb-examples/blob/main/framework/flink/tour/java/write.java
5+
6+
// https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/jdbc/#full-example
7+
8+
import org.apache.flink.connector.jdbc.*;
9+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
10+
11+
public class write {
12+
13+
public static void main(String[] args) throws Exception {
14+
var env = StreamExecutionEnvironment.getExecutionEnvironment();
15+
16+
// Define source data.
17+
env.fromElements(
18+
new Person("Fred", 35),
19+
new Person("Wilma", 35),
20+
new Person("Pebbles", 2)
21+
22+
// Define CrateDB as data sink.
23+
).addSink(
24+
JdbcSink.sink(
25+
"INSERT INTO person (name, age) VALUES (?, ?)",
26+
(statement, person) -> {
27+
statement.setString(1, person.name);
28+
statement.setInt(2, person.age);
29+
},
30+
JdbcExecutionOptions.builder()
31+
.withBatchSize(1000)
32+
.withBatchIntervalMs(200)
33+
.withMaxRetries(5)
34+
.build(),
35+
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
36+
.withUrl("jdbc:postgresql://localhost:5432/doc?sslmode=disable")
37+
.withDriverName("org.postgresql.Driver")
38+
.withUsername("crate")
39+
.withPassword("crate")
40+
.build()
41+
));
42+
43+
// Execute pipeline.
44+
env.execute();
45+
}
46+
47+
public static class Person {
48+
public String name;
49+
public Integer age;
50+
public Person() {}
51+
public Person(String name, Integer age) {
52+
this.name = name;
53+
this.age = age;
54+
}
55+
}
56+
57+
}

0 commit comments

Comments
 (0)