Skip to content

Commit f19433b

Browse files
committed
iteration 3
1 parent 333356a commit f19433b

File tree

1 file changed

+33
-28
lines changed

1 file changed

+33
-28
lines changed

website/blog/2025-06-30-fluss-java-client.md

Lines changed: 33 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,18 @@ authors: [giannis]
2525
![Banner](assets/java_client/banner.png)
2626

2727
## Introduction
28-
Fluss is a streaming data storage system built for real-time analytics, serving as a low-latency data layer in modern data architectures.
28+
Fluss is a streaming data storage system built for real-time analytics, serving as a low-latency data layer in modern data Lakehouses.
2929
It supports sub-second streaming reads and writes, storing data in a columnar format for efficiency, and offers two flexible table types: **append-only Log Tables** and **updatable Primary Key Tables**.
3030
In practice, this means Fluss can ingest high-throughput event streams *(using log tables)* while also maintaining *up-to-date* reference data or state *(using primary key tables)*, a combination ideal for
31-
scenarios like IoT, where you might stream sensor readings and look up metadata for those sensors in real-time, without
31+
scenarios like IoT, where you might stream sensor readings and look up information for those sensors in real-time, without
3232
the need for external K/V stores.
3333
<!-- truncate -->
3434

3535
In this tutorial, we'll introduce the **Fluss Java Client** by walking through a simple home IoT system example.
36-
We will use `Fluss's Admin client` to create a primary key table for sensor metadata and a log table for sensor readings, then use the client
36+
We will use `Fluss's Admin client` to create a primary key table for sensor information and a log table for sensor readings, then use the client
3737
to write data to these tables and read/enrich the streaming sensor data.
3838

39-
By the end, you'll see how a sensor reading can be ingested into a log table and immediately enriched with metadata from a primary key table (essentially performing a real-time lookup join for streaming data enrichment).
39+
By the end, you'll see how a sensor reading can be ingested into a log table and immediately enriched with information from a primary key table (essentially performing a real-time lookup join for streaming data enrichment).
4040

4141
## Preflight Check
4242
The full source code can be found [here](https://github.com/ververica/ververica-fluss-examples).
@@ -45,7 +45,7 @@ The full source code can be found [here](https://github.com/ververica/ververica-
4545
docker compose up
4646
```
4747

48-
Then, establish a connection to the Fluss cluster.
48+
The first thing we need to do is establish a connection to the Fluss cluster.
4949
The `Connection` is the main entry point for the Fluss client, from which we obtain an `Admin` (for metadata operations) and Table instances (for data operations)
5050

5151
```java
@@ -58,8 +58,7 @@ Connection connection = ConnectionFactory.createConnection(conf);
5858
Admin admin = connection.getAdmin();
5959
```
6060
The above code snippet shows the bare minimum requirements for connecting and interacting with a Fluss Cluster.
61-
62-
For our example we will use mock data which you can find below:
61+
For our example we will the following mock data - to keep things simple - which you can find below:
6362
```java
6463
public static final List<SensorReading> readings = List.of(
6564
new SensorReading(1, LocalDateTime.of(2025, 6, 23, 9, 15), 22.5, 45.0, 1013.2, 87.5),
@@ -94,7 +93,7 @@ public static final List<SensorInfo> sensorInfos = List.of(
9493
);
9594
```
9695

97-
## Creating a Database and Tables with the Admin Client
96+
## Operating The Cluster
9897
Let's create a database for our IoT data, and within it define two tables:
9998
* **Sensor Readings Table:** A log table that will collect time-series readings from sensors (like temperature and humidity readings). This table is append-only (new records are added continuously, with no updates/deletes) which is ideal for immutable event streams
10099
* **Sensor Information Table:** A primary key table that stores metadata for each sensor (like sensor ID, location, type). Each `sensorId` will be unique and acts as the primary key. This table can be updated as sensor info changes (e.g., sensor relocated or reconfigured).
@@ -103,7 +102,8 @@ Using the Admin client, we can programmatically create these tables.
103102

104103
First, we'll ensure the database exists (creating it if not), then define schemas for each table and create them:
105104

106-
### Define schema for the Log table (sensor readings)
105+
### Schema Definitions
106+
#### Log table (sensor readings)
107107

108108
```java
109109
public static Schema getSensorReadingsSchema() {
@@ -118,7 +118,7 @@ public static Schema getSensorReadingsSchema() {
118118
}
119119
```
120120

121-
### Define schema for the Primary Key table (sensor information)
121+
#### Primary Key table (sensor information)
122122
```java
123123
public static Schema getSensorInfoSchema() {
124124
return Schema.newBuilder()
@@ -129,12 +129,12 @@ public static Schema getSensorInfoSchema() {
129129
.column("installationDate", DataTypes.DATE())
130130
.column("state", DataTypes.STRING())
131131
.column("lastUpdated", DataTypes.TIMESTAMP())
132-
.primaryKey("sensorId")
132+
.primaryKey("sensorId") <-- Define a Primary Key
133133
.build();
134134
}
135135
```
136136

137-
### Then let's go ahead and create those tables.
137+
### Table Creation
138138
```java
139139
public static void setupTables(Admin admin) throws ExecutionException, InterruptedException {
140140
TableDescriptor readingsDescriptor = TableDescriptor.builder()
@@ -158,19 +158,19 @@ public static void setupTables(Admin admin) throws ExecutionException, Interrupt
158158
admin.createTable(sensorInfoTablePath, sensorInfoDescriptor, true).get();
159159
}
160160
```
161-
We specify a distribution hint with `.distributedBy(3, "sensorId")`.
161+
We specify a distribution with `.distributedBy(3, "sensorId")`.
162162
Fluss tables are partitioned into buckets (similar to partitions in Kafka topics) for scalability.
163163
Here we use 3 buckets, meaning data gets distributed across 3 buckets. Multiple buckets allow for higher throughput or to parallelize reads/writes.
164-
If using multiple buckets, Fluss would hash on the bucket key (sensorId in our case) to assign records to buckets.
164+
If using multiple buckets, Fluss would hash on the bucket key (`sensorId` in our case) to assign records to buckets.
165165

166-
For the sensor_readings table, we define a schema without any primary key. In Fluss, a table created without a primary key clause is a Log Table.
166+
For the `sensor_readings` table, we define a schema without any primary key. In Fluss, a table created without a primary key clause is a Log Table.
167167
A log table only supports appending new records (no updates or deletes), making it perfect for immutable time-series data or logs.
168168

169-
In a log table, specifying a bucket key like `sensorId` ensures all readings from the same sensor end up to the same bucket providing strict ordering guarantees.
169+
In the log table, specifying a bucket key like `sensorId` ensures all readings from the same sensor end up to the same bucket providing strict ordering guarantees.
170170

171171
With our tables created let's go and write some data.
172172

173-
## Writing Data to the Tables (Upserts and Appends)
173+
## Table Writes
174174
With our tables in place, let's insert some data using the Fluss Java API.
175175
The client allows us to write or read data from it.
176176
We'll demonstrate two patterns:
@@ -180,7 +180,10 @@ We'll demonstrate two patterns:
180180
Fluss provides specialized writer interfaces for each table type: an **UpsertWriter** for primary key tables and an **AppendWriter** for log tables.
181181
Under the hood, the Fluss client currently expects data as **GenericRow** objects (a generic row data format).
182182

183-
> **Note:** Discuss **InternalRow** And **GenericRow**
183+
> **Note:** Internally Fluss uses **InternalRow** as an optimized, binary representation of data for better performance and memory efficiency.
184+
> **GenericRow** is a generic implementation of InternalRow. This allows developers to interact with data easily while Fluss processes it efficiently using the underlying binary format.
185+
186+
Since we are creating **Pojos** though this means that we need to convert these into a GenericRow in order to write them into Fluss.
184187

185188
```java
186189
public static GenericRow energyReadingToRow(SensorReading reading) {
@@ -193,7 +196,6 @@ public static GenericRow energyReadingToRow(SensorReading reading) {
193196
row.setField(5, reading.batteryLevel());
194197
return row;
195198
}
196-
197199
public static GenericRow sensorInfoToRow(SensorInfo sensorInfo) {
198200
GenericRow row = new GenericRow(SensorInfo.class.getDeclaredFields().length);
199201
row.setField(0, sensorInfo.sensorId());
@@ -206,6 +208,9 @@ public static GenericRow sensorInfoToRow(SensorInfo sensorInfo) {
206208
return row;
207209
}
208210
```
211+
**Note:** For certain data types like `String` or `LocalDateTime` we need to use certain functions like
212+
`BinaryString.fromString("string_value")` or `TimestampNtz.fromLocalDateTime(datetime)` otherwise you might
213+
come across some conversion exceptions.
209214

210215
Let's start by writing data to the `Log Table`. This requires getting an `AppendWriter` as follows:
211216

@@ -241,15 +246,15 @@ upsertWriter.flush();
241246
At this point we have successfully written 10 sensor information records to our table, because
242247
updates will be handled on the primary key and merged.
243248

244-
## Fluss Scanner & Lookups
249+
## Scans & Lookups
245250
Now comes the real-time data enrichment part of our example.
246251
We want to simulate a process where each incoming sensor reading is immediately looked up against the sensor information table to add context (like location and type) to the raw reading.
247252
This is a common pattern in streaming systems, often achieved with lookup joins.
248253

249254
With the Fluss Java client, we can do this by combining a **log scanner on the readings table** with **point lookups on the sensor information table**.
250255

251-
To consume data from a Fluss table, we use a *Scanner*.
252-
For a log table, Fluss provides a *LogScanner* that allows us to *subscribe to one or more buckets* and poll for new records.
256+
To consume data from a Fluss table, we use a **Scanner*.
257+
For a log table, Fluss provides a **LogScanner** that allows us to **subscribe to one or more buckets** and poll for new records.
253258

254259
```java
255260
LogScanner logScanner = readingsTable.newScan()
@@ -262,7 +267,7 @@ Lookuper sensorInforLookuper = sensorInfoTable
262267
.createLookuper();
263268
```
264269

265-
We set up a scanner on the sensor_readings table, and next we need to subscribe to all its buckets, and then poll for any available records:
270+
We set up a scanner on the `sensor_readings` table, and next we need to subscribe to all its buckets, and then poll for any available records:
266271
```java
267272
int numBuckets = readingsTable.getTableInfo().getNumBuckets();
268273
for (int i = 0; i < numBuckets; i++) {
@@ -271,7 +276,7 @@ for (int i = 0; i < numBuckets; i++) {
271276
}
272277
```
273278

274-
Start polling for records. For each incoming record we will use the **Lookuper** to *lookup** sensor information from the primary key table,
279+
Start polling for records. For each incoming record we will use the **Lookuper** to `lookup` sensor information from the primary key table,
275280
and creating a **SensorReadingEnriched** record.
276281
```java
277282
while (true) {
@@ -324,14 +329,14 @@ and creating a **SensorReadingEnriched** record.
324329
```
325330
Let's summarize what's happening here:
326331
* We create a LogScanner for the `sensor_readings` table using *table.newScan().createLogScanner()*.
327-
* We subscribe to each bucket of the table from the beginning (offset 0). Subscribing "from beginning" means we'll read all existing data from the start; alternatively, one could subscribe from the latest position to only get new incoming data or based on other attributes like time. In our case, since we just inserted data, from-beginning will capture those inserts.
332+
* We subscribe to each bucket of the table from the beginning (offset 0). Subscribing `from beginning` means we'll read all existing data from the start; alternatively, one could subscribe from the latest position to only get new incoming data or based on other attributes like time. In our case, since we just inserted data, from-beginning will capture those inserts.
328333
* We then call `poll(Duration)` on the scanner to retrieve available records, waiting up to the given timeout (1 second here). This returns a `ScanRecords` batch containing any records that were present. We iterate over each `TableBucket` and then over each `ScanRecord` within that bucket.
329334
* For each record, we extract the fields via the InternalRow interface (which provides typed access to each column in the row) and **convert them into a Pojo**.
330335
* Next, for each reading, we perform a **lookup** on the **sensor_information** table to get the sensor's info. We construct a key (GenericRow with just the sensor_id) and use **sensorTable.newLookup().createLookuper().lookup(key)**. This performs a point lookup by primary key and returns a `LookupResult future`; we call `.get()` to get the result synchronously. If present, we retrieve the InternalRow of the sensor information and **convert it into a Pojo**.
331336
* We then combine the data: logging an enriched message that includes the sensor's information alongside the reading values.
332337

333338
Fluss's lookup API gives us quick primary-key retrieval from a table, which is exactly what we need to enrich the streaming data.
334-
In a real application, this enrichment could be done on the fly in a streaming job (and indeed Fluss is designed to support high-QPS lookup joins in real-time pipelines), but here we're simulating it with client calls for clarity.
339+
In a real application, this enrichment could be done on the fly in a streaming job (and indeed **Fluss is designed to support high-QPS lookup joins in real-time pipelines**), but here we're simulating it with client calls for clarity.
335340

336341
If you run the above code found [here](https://github.com/ververica/ververica-fluss-examples), you should see an output like the following:
337342
```shell
@@ -357,8 +362,8 @@ If you run the above code found [here](https://github.com/ververica/ververica-fl
357362
16:07:14.229 INFO [main] com.ververica.scanner.FlussScanner - ---------------------------------------
358363
```
359364
360-
## Reads with Column Pruning
361-
Column pruning lets you fetch only the columns you need, reducing network overhead and improving read performance. With Fluss’s Java client, you can specify a subset of columns in your scan:
365+
## Column Pruning Scans
366+
Column pruning lets you fetch only the columns you need, **reducing network overhead and improving read performance**. With Fluss’s Java client, you can specify a subset of columns in your scan:
362367
```java
363368
LogScanner logScanner = readingsTable.newScan()
364369
.project(List.of("sensorId", "timestamp", "temperature"))

0 commit comments

Comments
 (0)