Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ __pycache__
_build
.coverage*
.env
.gradle
.DS_Store
.meltano
.pytest_cache
build
coverage.xml
mlruns/
archive/
Expand Down
11 changes: 11 additions & 0 deletions framework/flink/tour/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# A tour of Apache Flink and CrateDB

This folder includes concise executable examples demonstrating
how to use Apache Flink with CrateDB, with both Java and Python.

Usually, you will submit your Apache Flink job to a cluster for
execution The examples in this tour are self-contained, and will
use the [Flink MiniCluster] for execution.


[Flink MiniCluster]: https://speakerdeck.com/rmetzger/tiny-flink-minimizing-the-memory-footprint-of-apache-flink
22 changes: 22 additions & 0 deletions framework/flink/tour/java/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Apache Flink and CrateDB with Java

Basic examples demonstrating how to read and write from/to
CrateDB when using Apache Flink.

The examples use both the [CrateDB JDBC] and the [PostgreSQL JDBC]
driver. CrateDB JDBC is needed for catalog operations, which are
required when reading from CrateDB using Flink.

```sql
uvx crash -c 'CREATE TABLE person (name STRING, age INT);'
```
```shell
gradle run write
```
```shell
gradle run read
```


[CrateDB JDBC]: https://cratedb.com/docs/guide/connect/java/cratedb-jdbc.html
[PostgreSQL JDBC]: https://cratedb.com/docs/guide/connect/java/postgresql-jdbc.html
16 changes: 16 additions & 0 deletions framework/flink/tour/java/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// https://medium.com/@santoshkotagiri/setting-up-flink-projects-with-gradle-a-step-by-step-guide-4d1ef85f017
apply plugin : 'application'
repositories.mavenCentral()
dependencies {
implementation 'org.apache.flink:flink-clients:1.20.3'
implementation 'org.apache.flink:flink-connector-jdbc:3.2.0-1.19'
implementation 'org.apache.flink:flink-table-planner_2.12:1.20.3'
implementation 'org.apache.flink:flink-table-runtime:1.20.3'
implementation 'org.slf4j:slf4j-simple:1.7.36'
runtimeOnly 'io.crate:crate-jdbc:2.7.0'
runtimeOnly 'org.postgresql:postgresql:42.7.8'
}
java.toolchain.languageVersion = JavaLanguageVersion.of(11)
sourceSets.main.java.srcDirs = ['.']
tasks.register('read') { application.mainClass = 'read' }
tasks.register('write') { application.mainClass = 'write' }
51 changes: 51 additions & 0 deletions framework/flink/tour/java/read.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Read from CrateDB using Apache Flink.
// Invoke: gradle run read
// Source: https://github.com/crate/cratedb-examples/blob/main/framework/flink/tour/java/read.java

// https://tacnode.io/docs/guides/ecosystem/bigdata/flink#catalog-registration
// https://github.com/crate/cratedb-flink-jobs/blob/main/src/main/java/io/crate/flink/demo/SimpleTableApiJob.java
// https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/table/jdbc/#jdbc-catalog-for-postgresql
import org.apache.flink.connector.jdbc.catalog.JdbcCatalog;
import org.apache.flink.table.api.*;

import static org.apache.flink.table.api.Expressions.$;

public class read {

public static String CATALOG_NAME = "example_catalog";

public static void main(String[] args) throws Exception {

// Create Flink Table API environment.
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment env = TableEnvironment.create(settings);

// Define catalog.
// CrateDB only knows a single database called `crate`,
// but you can separate concerns using schemata. The
// default schema is `doc`.
JdbcCatalog catalog = new JdbcCatalog(
CATALOG_NAME,
"crate",
"crate",
"crate",
"jdbc:crate://localhost:5432"
);

// Register catalog and set as default.
env.registerCatalog(CATALOG_NAME, catalog);
env.useCatalog(CATALOG_NAME);

// Invoke query using plain SQL.
// FIXME: Currently does not work with `sys.summits`.
// SqlValidatorException: Object 'sys.summits' not found
env.executeSql("SELECT * FROM `doc.person` LIMIT 3").print();

// Invoke query using DSL.
env.from("`doc.person`")
.select($("name"), $("age"))
.execute()
.print();
}

}
57 changes: 57 additions & 0 deletions framework/flink/tour/java/write.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Write to CrateDB using Apache Flink.
// Invoke: uvx crash -c 'CREATE TABLE person (name STRING, age INT);'
// Invoke: gradle run write
// Source: https://github.com/crate/cratedb-examples/blob/main/framework/flink/tour/java/write.java

// https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/jdbc/#full-example

import org.apache.flink.connector.jdbc.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class write {

public static void main(String[] args) throws Exception {
var env = StreamExecutionEnvironment.getExecutionEnvironment();

// Define source data.
env.fromElements(
new Person("Fred", 35),
new Person("Wilma", 35),
new Person("Pebbles", 2)

// Define CrateDB as data sink.
).addSink(
JdbcSink.sink(
"INSERT INTO person (name, age) VALUES (?, ?)",
(statement, person) -> {
statement.setString(1, person.name);
statement.setInt(2, person.age);
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:postgresql://localhost:5432/doc?sslmode=disable")
.withDriverName("org.postgresql.Driver")
.withUsername("crate")
.withPassword("crate")
.build()
));

// Execute pipeline.
env.execute();
}

public static class Person {
public String name;
public Integer age;
public Person() {}
public Person(String name, Integer age) {
this.name = name;
this.age = age;
}
}

}
28 changes: 28 additions & 0 deletions framework/flink/tour/python/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Apache Flink and CrateDB with Python

Basic examples demonstrating how to read and write from/to
CrateDB when using Apache Flink (PyFlink).

The examples use both the [CrateDB JDBC] and the [PostgreSQL JDBC]
driver. CrateDB JDBC is needed for catalog operations, which are
required when reading from CrateDB using Flink.

```sql
uvx crash -c 'CREATE TABLE person (name STRING, age INT);'
```
Flink >= 1.19 has problems with JDBC and PyFlink,
but previous versions need a Python of the same age.
```shell
uv venv --python 3.10 --seed .venv310
uv pip install -r requirements.txt
```
```shell
python write.py
```
```shell
python ready.py
```


[CrateDB JDBC]: https://cratedb.com/docs/guide/connect/java/cratedb-jdbc.html
[PostgreSQL JDBC]: https://cratedb.com/docs/guide/connect/java/postgresql-jdbc.html
6 changes: 6 additions & 0 deletions framework/flink/tour/python/read.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
def main():
print("Hello World")


if __name__ == "__main__":
main()
2 changes: 2 additions & 0 deletions framework/flink/tour/python/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
apache-flink==1.18.1
avro<1.13
80 changes: 80 additions & 0 deletions framework/flink/tour/python/write.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import dataclasses
import logging

from pathlib import Path

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.jdbc import JdbcSink, JdbcConnectionOptions, JdbcExecutionOptions


logger = logging.getLogger(__name__)

JARS_PATH = Path(__file__).parent / 'jars'


@dataclasses.dataclass
class Person:
name: str
age: int


def main():

env = StreamExecutionEnvironment.get_execution_environment()
jars = list(map(lambda x: 'file://' + str(x), (JARS_PATH.glob('*.jar'))))
env.add_jars(*jars)

# Define source data.
ds = env.from_collection([
Person("Fred", 35),
Person("Wilma", 35),
Person("Pebbles", 2),
])

# Define CrateDB as data sink.
row_type_info = Types.ROW_NAMED(['name', 'age'], [Types.STRING(), Types.INT()])
ds.add_sink(
JdbcSink.sink(
"INSERT INTO person (name, age) VALUES (?, ?)",
row_type_info,

# FIXME (Flink >= 1.19): java.lang.NoSuchMethodException: org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.createRowJdbcStatementBuilder

# This is due to a bug in the python Flink library. In `flink-connector-jdbc` v3.1,
# the `JdbcOutputFormat` was renamed to `RowJdbcOutputFormat`. This change has up till
# now not been implemented in the python Flink library.
# https://stackoverflow.com/questions/78960829/java-lang-nosuchmethodexception-in-python-flink-jdbc

# As you see, java `JdbcSink` connector class has different shape from Python `JdbcSink` connector.
# In Java code, `jdbcSink` object is generated from `JdbcSinkBuilder` class, but in Python it is not.
# I think these errors are due to API version mismatch. Any idea to solve these errors?
# https://stackoverflow.com/questions/79604252/issue-with-pyflink-api-code-for-inserting-data-into-sql

JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.with_url('jdbc:postgresql://localhost:5432/crate')
.with_driver_name('org.postgresql.Driver')
.with_user_name("crate")
.with_password("crate")
.build(),
JdbcExecutionOptions.builder()
.with_batch_interval_ms(1000)
.with_batch_size(200)
.with_max_retries(5)
.build()
)
)

# Execute pipeline.
env.execute()


if __name__ == '__main__':
logging.basicConfig(
format='[%(asctime)s] [%(levelname)s] [%(filename)s:%(lineno)d] - %(message)s',
level=logging.DEBUG
)

logger.info("Start")
main()
logger.info("Ready")