Skip to content

Commit 7a9611f

Browse files
committed
[docs] Add Documentation for Fluss Java Client (apache#734)
1 parent 82726dd commit 7a9611f

File tree

3 files changed

+234
-0
lines changed

3 files changed

+234
-0
lines changed

website/docs/apis/_category_.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{
2+
"label": "Fluss APIs",
3+
"position": 9
4+
}

website/docs/apis/java-client.md

Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
---
2+
sidebar_label: "Java Client"
3+
sidebar_position: 1
4+
---
5+
6+
<!--
7+
Copyright (c) 2025 Alibaba Group Holding Ltd.
8+
9+
Licensed under the Apache License, Version 2.0 (the "License");
10+
you may not use this file except in compliance with the License.
11+
You may obtain a copy of the License at
12+
13+
http://www.apache.org/licenses/LICENSE-2.0
14+
15+
Unless required by applicable law or agreed to in writing, software
16+
distributed under the License is distributed on an "AS IS" BASIS,
17+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
See the License for the specific language governing permissions and
19+
limitations under the License.
20+
-->
21+
22+
# Fluss Java Client
23+
## Overview
24+
Fluss `Admin` API that supports asynchronous operations for managing and inspecting Fluss resources. It communicates with the Fluss cluster and provides methods for:
25+
26+
* Managing databases (create, drop, list)
27+
* Managing tables (create, drop, list)
28+
* Managing partitions (create, drop, list)
29+
* Retrieving metadata (schemas, snapshots, server information)
30+
31+
Fluss `Table` API allows you to interact with Fluss tables for reading and writing data.
32+
## Dependency
33+
In order to use the client, you need to add the following dependency to your `pom.xml` file.
34+
35+
```xml
36+
<!-- https://mvnrepository.com/artifact/com.alibaba.fluss/fluss-client -->
37+
<dependency>
38+
<groupId>com.alibaba.fluss</groupId>
39+
<artifactId>fluss-client</artifactId>
40+
<version>0.6.0</version>
41+
</dependency>
42+
```
43+
44+
## Initialization
45+
46+
`Connection` is the main entry point for the Fluss Java client. It is used to create `Admin` and `Table` instances.
47+
The `Connection` object is created using the `ConnectionFactory` class, which takes a `Configuration` object as an argument.
48+
The `Configuration` object contains the necessary configuration parameters for connecting to the Fluss cluster, such as the bootstrap servers.
49+
50+
The `Connection` object is thread-safe and can be shared across multiple threads. It is recommended to create a
51+
single `Connection` instance per application and use it to create multiple `Admin` and `Table` instances.
52+
`Table` and `Admin` instances, on the other hand, are not thread-safe and should be created for each thread that needs to access them.
53+
Caching or pooling of `Table` and `Admin` is not recommended.
54+
55+
Create a new `Admin` instance:
56+
```java
57+
// creating Connection object to connect with Fluss cluster
58+
Configuration conf = new Configuration();
59+
conf.setString("bootstrap.servers", "localhost:9123");
60+
Connection connection = ConnectionFactory.createConnection(conf);
61+
62+
// obtain Admin instance from the Connection
63+
Admin admin = connection.getAdmin();
64+
admin.listDatabases().get().forEach(System.out::println);
65+
66+
// obtain Table instance from the Connection
67+
Table table = connection.getTable(TablePath.of("my_db", "my_table");
68+
System.out.println(table.getTableInfo());
69+
```
70+
71+
## Working Operations
72+
All methods in `FlussAdmin` return `CompletableFuture` objects. You can handle these in two ways:
73+
74+
### Blocking Operations
75+
For synchronous behavior, use the `get()` method:
76+
```java
77+
// Blocking call
78+
List<String> databases = admin.listDatabases().get();
79+
```
80+
81+
### Asynchronous Operations
82+
For non-blocking behavior, use the `thenAccept`, `thenApply`, or other methods:
83+
```java
84+
admin.listDatabases()
85+
.thenAccept(databases -> {
86+
System.out.println("Available databases:");
87+
databases.forEach(System.out::println);
88+
})
89+
.exceptionally(ex -> {
90+
System.err.println("Failed to list databases: " + ex.getMessage());
91+
return null;
92+
});
93+
```
94+
95+
## Creating Databases and Tables
96+
### Creating a Database
97+
```java
98+
99+
// Create database descriptor
100+
DatabaseDescriptor descriptor = DatabaseDescriptor.builder()
101+
.comment("This is a test database")
102+
.customProperty("owner", "data-team")
103+
.build();
104+
105+
// Create database (true means ignore if exists)
106+
admin.createDatabase("my_db", descriptor, true) // non-blocking call
107+
.thenAccept(unused -> System.out.println("Database created successfully"))
108+
.exceptionally(ex -> {
109+
System.err.println("Failed to create database: " + ex.getMessage());
110+
return null;
111+
});
112+
```
113+
114+
115+
### Creating a Table
116+
```java
117+
Schema schema = Schema.newBuilder()
118+
.column("id", DataTypes.STRING())
119+
.column("age", DataTypes.INT())
120+
.column("created_at", DataTypes.TIMESTAMP())
121+
.column("is_active", DataTypes.BOOLEAN())
122+
.primaryKey("id")
123+
.build();
124+
125+
// Use the schema in a table descriptor
126+
TableDescriptor tableDescriptor = TableDescriptor.builder()
127+
.schema(schema)
128+
.distributedBy(1, "id") // Distribute by the id column with 1 buckets
129+
// .partitionedBy("") // Partition by the partition key
130+
.build();
131+
132+
TablePath tablePath = TablePath.of("my_db", "user_table");
133+
admin.createTable(tablePath, tableDescriptor, false).get(); // blocking call
134+
135+
TableInfo tableInfo = admin.getTableInfo(tablePath).get(); // blocking call
136+
System.out.println(tableInfo);
137+
```
138+
139+
## Table API
140+
### Writers
141+
In order to write data to Fluss tables, first you need to create a Table instance.
142+
```java
143+
TablePath tablePath = TablePath.of("my_db", "user_table");
144+
Table table = connection.getTable(tablePath);
145+
```
146+
147+
In Fluss we have both Primary Key Tables and Log Tables, so the client provides different functionality depending on the table type.
148+
You can use an `UpsertWriter` to write data to a Primary Key table, and an `AppendWriter` to write data to a Log Table.
149+
````java
150+
table.newUpsert().createWriter();
151+
table.newAppend().createWriter();
152+
````
153+
154+
Let's take a look at how to write data to a Primary Key table.
155+
```java
156+
List<User> users = List.of(
157+
new User("1", 20, LocalDateTime.now() , true),
158+
new User("2", 22, LocalDateTime.now() , true),
159+
new User("3", 23, LocalDateTime.now() , true),
160+
new User("4", 24, LocalDateTime.now() , true),
161+
new User("5", 25, LocalDateTime.now() , true)
162+
);
163+
```
164+
165+
**Note:** Currently data in Fluss is written in the form of `rows`, so we need to convert our POJO to `GenericRow`, while the Fluss community is working to provide
166+
a more user-friendly API for writing data.
167+
```java
168+
Table table = connection.getTable(tablePath);
169+
170+
List<GenericRow> rows = users.stream().map(user -> {
171+
GenericRow row = new GenericRow(4);
172+
row.setField(0, BinaryString.fromString(user.getId()));
173+
row.setField(1, user.getAge());
174+
row.setField(2, TimestampNtz.fromLocalDateTime(user.getCreatedAt()));
175+
row.setField(3, user.isActive());
176+
return row;
177+
}).collect(Collectors.toList());
178+
179+
System.out.println("Upserting rows to the table");
180+
UpsertWriter writer = table.newUpsert().createWriter();
181+
182+
// upsert() is a non-blocking call that sends data to Fluss server with batching and timeout
183+
rows.forEach(writer::upsert);
184+
185+
// call flush() to blocking the thread until all data is written successfully
186+
writer.flush();
187+
```
188+
189+
For a Log table you can use the `AppendWriter` API to write data.
190+
```java
191+
table.newAppend().createWriter().append(row);
192+
```
193+
194+
### Scanner
195+
In order to read data from Fluss tables, first you need to create a Scanner instance. Then users can subscribe to the table buckets and
196+
start polling for records.
197+
```java
198+
LogScanner logScanner = table.newScan()
199+
.createLogScanner();
200+
201+
int numBuckets = table.getTableInfo().getNumBuckets();
202+
System.out.println("Number of buckets: " + numBuckets);
203+
for (int i = 0; i < numBuckets; i++) {
204+
System.out.println("Subscribing to bucket " + i);
205+
logScanner.subscribeFromBeginning(i);
206+
}
207+
208+
long scanned = 0;
209+
Map<Integer, List<String>> rowsMap = new HashMap<>();
210+
211+
while (true) {
212+
System.out.println("Polling for records...");
213+
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
214+
for (TableBucket bucket : scanRecords.buckets()) {
215+
for (ScanRecord record : scanRecords.records(bucket)) {
216+
InternalRow row = record.getRow();
217+
// Process the row
218+
...
219+
}
220+
}
221+
scanned += scanRecords.count();
222+
}
223+
```
224+
225+
### Lookup
226+
You can also use the Fluss API to perform lookups on a table. This is useful for querying specific records based on their primary key.
227+
```java
228+
LookupResult lookup = table.newLookup().createLookuper().lookup(rowKey).get();
229+
```

website/docusaurus.config.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ const config: Config = {
152152
prism: {
153153
theme: prismThemes.vsDark,
154154
darkTheme: prismThemes.dracula,
155+
additionalLanguages: ['java']
155156
},
156157
algolia: {
157158
appId: "D8RXQUTC99",

0 commit comments

Comments
 (0)