diff --git a/website/docs/apis/_category_.json b/website/docs/apis/_category_.json new file mode 100644 index 0000000000..d1f67432bb --- /dev/null +++ b/website/docs/apis/_category_.json @@ -0,0 +1,4 @@ +{ + "label": "Fluss APIs", + "position": 9 +} diff --git a/website/docs/apis/java-client.md b/website/docs/apis/java-client.md new file mode 100644 index 0000000000..79e380dc1c --- /dev/null +++ b/website/docs/apis/java-client.md @@ -0,0 +1,229 @@ +--- +sidebar_label: "Java Client" +sidebar_position: 1 +--- + + + +# Fluss Java Client +## Overview +Fluss `Admin` API that supports asynchronous operations for managing and inspecting Fluss resources. It communicates with the Fluss cluster and provides methods for: + +* Managing databases (create, drop, list) +* Managing tables (create, drop, list) +* Managing partitions (create, drop, list) +* Retrieving metadata (schemas, snapshots, server information) + +Fluss `Table` API allows you to interact with Fluss tables for reading and writing data. +## Dependency +In order to use the client, you need to add the following dependency to your `pom.xml` file. + +```xml + + + com.alibaba.fluss + fluss-client + 0.6.0 + +``` + +## Initialization + +`Connection` is the main entry point for the Fluss Java client. It is used to create `Admin` and `Table` instances. +The `Connection` object is created using the `ConnectionFactory` class, which takes a `Configuration` object as an argument. +The `Configuration` object contains the necessary configuration parameters for connecting to the Fluss cluster, such as the bootstrap servers. + +The `Connection` object is thread-safe and can be shared across multiple threads. It is recommended to create a +single `Connection` instance per application and use it to create multiple `Admin` and `Table` instances. +`Table` and `Admin` instances, on the other hand, are not thread-safe and should be created for each thread that needs to access them. + Caching or pooling of `Table` and `Admin` is not recommended. + +Create a new `Admin` instance: +```java +// creating Connection object to connect with Fluss cluster +Configuration conf = new Configuration(); +conf.setString("bootstrap.servers", "localhost:9123"); +Connection connection = ConnectionFactory.createConnection(conf); + +// obtain Admin instance from the Connection +Admin admin = connection.getAdmin(); +admin.listDatabases().get().forEach(System.out::println); + +// obtain Table instance from the Connection +Table table = connection.getTable(TablePath.of("my_db", "my_table"); +System.out.println(table.getTableInfo()); +``` + +## Working Operations +All methods in `FlussAdmin` return `CompletableFuture` objects. You can handle these in two ways: + +### Blocking Operations +For synchronous behavior, use the `get()` method: +```java +// Blocking call +List databases = admin.listDatabases().get(); +``` + +### Asynchronous Operations +For non-blocking behavior, use the `thenAccept`, `thenApply`, or other methods: +```java +admin.listDatabases() + .thenAccept(databases -> { + System.out.println("Available databases:"); + databases.forEach(System.out::println); + }) + .exceptionally(ex -> { + System.err.println("Failed to list databases: " + ex.getMessage()); + return null; + }); +``` + +## Creating Databases and Tables +### Creating a Database +```java + +// Create database descriptor +DatabaseDescriptor descriptor = DatabaseDescriptor.builder() + .comment("This is a test database") + .customProperty("owner", "data-team") + .build(); + +// Create database (true means ignore if exists) +admin.createDatabase("my_db", descriptor, true) // non-blocking call + .thenAccept(unused -> System.out.println("Database created successfully")) + .exceptionally(ex -> { + System.err.println("Failed to create database: " + ex.getMessage()); + return null; + }); +``` + + +### Creating a Table +```java +Schema schema = Schema.newBuilder() + .column("id", DataTypes.STRING()) + .column("age", DataTypes.INT()) + .column("created_at", DataTypes.TIMESTAMP()) + .column("is_active", DataTypes.BOOLEAN()) + .primaryKey("id") + .build(); + +// Use the schema in a table descriptor +TableDescriptor tableDescriptor = TableDescriptor.builder() + .schema(schema) + .distributedBy(1, "id") // Distribute by the id column with 1 buckets +// .partitionedBy("") // Partition by the partition key + .build(); + +TablePath tablePath = TablePath.of("my_db", "user_table"); +admin.createTable(tablePath, tableDescriptor, false).get(); // blocking call + +TableInfo tableInfo = admin.getTableInfo(tablePath).get(); // blocking call +System.out.println(tableInfo); +``` + +## Table API +### Writers +In order to write data to Fluss tables, first you need to create a Table instance. +```java +TablePath tablePath = TablePath.of("my_db", "user_table"); +Table table = connection.getTable(tablePath); +``` + +In Fluss we have both Primary Key Tables and Log Tables, so the client provides different functionality depending on the table type. +You can use an `UpsertWriter` to write data to a Primary Key table, and an `AppendWriter` to write data to a Log Table. +````java +table.newUpsert().createWriter(); +table.newAppend().createWriter(); +```` + +Let's take a look at how to write data to a Primary Key table. +```java +List users = List.of( + new User("1", 20, LocalDateTime.now() , true), + new User("2", 22, LocalDateTime.now() , true), + new User("3", 23, LocalDateTime.now() , true), + new User("4", 24, LocalDateTime.now() , true), + new User("5", 25, LocalDateTime.now() , true) +); +``` + +**Note:** Currently data in Fluss is written in the form of `rows`, so we need to convert our POJO to `GenericRow`, while the Fluss community is working to provide +a more user-friendly API for writing data. +```java +Table table = connection.getTable(tablePath); + +List rows = users.stream().map(user -> { + GenericRow row = new GenericRow(4); + row.setField(0, BinaryString.fromString(user.getId())); + row.setField(1, user.getAge()); + row.setField(2, TimestampNtz.fromLocalDateTime(user.getCreatedAt())); + row.setField(3, user.isActive()); + return row; +}).collect(Collectors.toList()); + +System.out.println("Upserting rows to the table"); +UpsertWriter writer = table.newUpsert().createWriter(); + +// upsert() is a non-blocking call that sends data to Fluss server with batching and timeout +rows.forEach(writer::upsert); + +// call flush() to blocking the thread until all data is written successfully +writer.flush(); +``` + +For a Log table you can use the `AppendWriter` API to write data. +```java +table.newAppend().createWriter().append(row); +``` + +### Scanner +In order to read data from Fluss tables, first you need to create a Scanner instance. Then users can subscribe to the table buckets and +start polling for records. +```java +LogScanner logScanner = table.newScan() + .createLogScanner(); + +int numBuckets = table.getTableInfo().getNumBuckets(); +System.out.println("Number of buckets: " + numBuckets); +for (int i = 0; i < numBuckets; i++) { + System.out.println("Subscribing to bucket " + i); + logScanner.subscribeFromBeginning(i); +} + +long scanned = 0; +Map> rowsMap = new HashMap<>(); + +while (true) { + System.out.println("Polling for records..."); + ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1)); + for (TableBucket bucket : scanRecords.buckets()) { + for (ScanRecord record : scanRecords.records(bucket)) { + InternalRow row = record.getRow(); + // Process the row + ... + } + } + scanned += scanRecords.count(); +} +``` + +### Lookup +You can also use the Fluss API to perform lookups on a table. This is useful for querying specific records based on their primary key. +```java +LookupResult lookup = table.newLookup().createLookuper().lookup(rowKey).get(); +``` \ No newline at end of file diff --git a/website/docusaurus.config.ts b/website/docusaurus.config.ts index 42dedd9919..68f24b4a53 100644 --- a/website/docusaurus.config.ts +++ b/website/docusaurus.config.ts @@ -152,6 +152,7 @@ const config: Config = { prism: { theme: prismThemes.vsDark, darkTheme: prismThemes.dracula, + additionalLanguages: ['java'] }, algolia: { appId: "D8RXQUTC99",