|
| 1 | +// Read from CrateDB using Apache Flink. |
| 2 | +// Invoke: gradle run read |
| 3 | +// Source: https://github.com/crate/cratedb-examples/blob/main/framework/flink/tour/java/read.java |
| 4 | + |
| 5 | +// https://tacnode.io/docs/guides/ecosystem/bigdata/flink#catalog-registration |
| 6 | +// https://github.com/crate/cratedb-flink-jobs/blob/main/src/main/java/io/crate/flink/demo/SimpleTableApiJob.java |
| 7 | +// https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/table/jdbc/#jdbc-catalog-for-postgresql |
| 8 | +import org.apache.flink.connector.jdbc.catalog.JdbcCatalog; |
| 9 | +import org.apache.flink.table.api.*; |
| 10 | + |
| 11 | +import static org.apache.flink.table.api.Expressions.$; |
| 12 | + |
| 13 | +public class read { |
| 14 | + |
| 15 | + public static String CATALOG_NAME = "example_catalog"; |
| 16 | + |
| 17 | + public static void main(String[] args) throws Exception { |
| 18 | + |
| 19 | + // Create Flink Table API environment. |
| 20 | + EnvironmentSettings settings = EnvironmentSettings.inStreamingMode(); |
| 21 | + TableEnvironment env = TableEnvironment.create(settings); |
| 22 | + |
| 23 | + // Define catalog. |
| 24 | + // CrateDB only knows a single database called `crate`, |
| 25 | + // but you can separate concerns using schemata. The |
| 26 | + // default schema is `doc`. |
| 27 | + JdbcCatalog catalog = new JdbcCatalog( |
| 28 | + CATALOG_NAME, |
| 29 | + "crate", |
| 30 | + "crate", |
| 31 | + "crate", |
| 32 | + "jdbc:crate://localhost:5432" |
| 33 | + ); |
| 34 | + |
| 35 | + // Register catalog and set as default. |
| 36 | + env.registerCatalog(CATALOG_NAME, catalog); |
| 37 | + env.useCatalog(CATALOG_NAME); |
| 38 | + |
| 39 | + // Invoke query using plain SQL. |
| 40 | + // FIXME: Currently does not work with `sys.summits`. |
| 41 | + // SqlValidatorException: Object 'sys.summits' not found |
| 42 | + env.executeSql("SELECT * FROM `doc.person` LIMIT 3").print(); |
| 43 | + |
| 44 | + // Invoke query using DSL. |
| 45 | + env.from("`doc.person`") |
| 46 | + .select($("name"), $("age")) |
| 47 | + .execute() |
| 48 | + .print(); |
| 49 | + } |
| 50 | + |
| 51 | +} |
0 commit comments