Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@
public enum LookupType {
LOOKUP,
LOOKUP_WITH_INSERT_IF_NOT_EXISTS,
PREFIX_LOOKUP;
PREFIX_LOOKUP
}
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ void testProduceWithNoWriteAuthorization() throws Exception {
Collections.singletonList(noWriteAclTable)));
}

// 2. Try to write data to writeAclTable. It will success and writeId will be set.
// 2. Try to write data to writeAclTable. It will succeed and writeId will be set.
try (Table table = guestConn.getTable(writeAclTable)) {
AppendWriter appendWriter = table.newAppend().createWriter();
appendWriter.append(row(1, "a")).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ public void testToArrayWithNullThrowsException() {
writer.writeInt(2, 30);
writer.complete();

assertThatThrownBy(() -> array.toIntArray())
assertThatThrownBy(array::toIntArray)
.isInstanceOf(RuntimeException.class)
.hasMessageContaining("Primitive array must not contain a null value");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public void testNullInObjectArray() {
assertThat(array.isNullAt(1)).isTrue();
assertThat(array.isNullAt(2)).isFalse();

assertThatThrownBy(() -> array.toIntArray())
assertThatThrownBy(array::toIntArray)
.isInstanceOf(RuntimeException.class)
.hasMessageContaining("Primitive array must not contain a null value");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class TableBucketWriteResult<WriteResult> implements Serializable {
private final long maxTimestamp;

// the total number of write results in one round of tiering,
// used for downstream commiter operator to determine when all write results
// used for downstream committer operator to determine when all write results
// for the round of tiering is finished
private final int numberOfWriteResults;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ private List<TieringSplit> populateNumberOfTieringSplits(List<TieringSplit> tier

@Override
public TieringSourceEnumeratorState snapshotState(long checkpointId) throws Exception {
// do nothing, the downstream lake commiter will snapshot the state to Fluss Cluster
// do nothing, the downstream lake committer will snapshot the state to Fluss Cluster
return new TieringSourceEnumeratorState();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ void beforeEach() throws Exception {
DEFAULT_DB, new CatalogDatabaseImpl(Collections.emptyMap(), null), true);
} catch (CatalogException e) {
// the auto partitioned manager may create the db zk node
// in an another thread, so if exception is NodeExistsException, just ignore
// in another thread, so if exception is NodeExistsException, just ignore
if (!ExceptionUtils.findThrowableWithMessage(e, "KeeperException$NodeExistsException")
.isPresent()) {
throw e;
Expand Down Expand Up @@ -275,7 +275,7 @@ void testCreateTable() throws Exception {
this.tableInDefaultDb, CATALOG_NAME));
// should be ok since we set ignoreIfNotExists = true
catalog.dropTable(this.tableInDefaultDb, true);
// create table from an non-exist db
// create table from a non-exist db
ObjectPath nonExistDbPath = ObjectPath.fromString("non.exist");

// remove bucket-key
Expand All @@ -300,7 +300,7 @@ void testCreateTable() throws Exception {
resolvedSchema);
catalog.createTable(this.tableInDefaultDb, table2, false);
tableCreated = catalog.getTable(this.tableInDefaultDb);
// need to over write the option
// need to overwrite the option
addedOptions.put(BUCKET_KEY.key(), "third");

expectedTable = addOptions(table2, addedOptions);
Expand Down Expand Up @@ -492,7 +492,7 @@ void testCreateAndDropMaterializedTable() throws Exception {
// should be ok since we set ignoreIfNotExists = true
catalog.dropTable(mt1, true);

// create table from an non-exist db
// create table from a non-exist db
ObjectPath nonExistDbPath = ObjectPath.fromString("non.exist");

// remove bucket-key
Expand Down Expand Up @@ -658,10 +658,10 @@ void testDatabase() throws Exception {
@Test
void testOperatePartitions() throws Exception {
catalog.createDatabase("db1", new CatalogDatabaseImpl(Collections.emptyMap(), null), false);
assertThatThrownBy(() -> catalog.listPartitions(new ObjectPath("db1", "unkown_table")))
assertThatThrownBy(() -> catalog.listPartitions(new ObjectPath("db1", "unknown_table")))
.isInstanceOf(TableNotExistException.class)
.hasMessage(
"Table (or view) db1.unkown_table does not exist in Catalog test-catalog.");
"Table (or view) db1.unknown_table does not exist in Catalog test-catalog.");

// create a none partitioned table.
CatalogTable table = this.newCatalogTable(Collections.emptyMap());
Expand Down Expand Up @@ -843,7 +843,7 @@ void testConnectionFailureHandling() {
Collections::emptyMap);

// Test open() throws proper exception
assertThatThrownBy(() -> badCatalog.open())
assertThatThrownBy(badCatalog::open)
.isInstanceOf(IllegalConfigurationException.class)
.hasMessageContaining("No resolvable bootstrap urls");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ void testTableReachMaxTieringDuration() throws Throwable {
try (FlussMockSplitEnumeratorContext<TieringSplit> context =
new FlussMockSplitEnumeratorContext<>(numSubtasks);
TieringSourceEnumerator enumerator =
createTieringSourceEnumerator(flussConf, context); ) {
createTieringSourceEnumerator(flussConf, context)) {
enumerator.start();

// Register all readers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

import java.io.Serializable;

/** The write result of Iceberg lake writer to pass to commiter to commit. */
/** The write result of Iceberg lake writer to pass to committer to commit. */
public class IcebergWriteResult implements Serializable {

private static final long serialVersionUID = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.io.Serializable;
import java.util.List;

/** The write result of Lance lake writer to pass to commiter to commit. */
/** The write result of Lance lake writer to pass to committer to commit. */
public class LanceWriteResult implements Serializable {
private static final long serialVersionUID = 1L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public int getVersion() {
@Override
public byte[] serialize(LanceWriteResult lanceWriteResult) throws IOException {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos); ) {
ObjectOutputStream oos = new ObjectOutputStream(baos)) {
oos.writeObject(lanceWriteResult);
return baos.toByteArray();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import java.io.Serializable;

/** The write result of Paimon lake writer to pass to commiter to commit. */
/** The write result of Paimon lake writer to pass to committer to commit. */
public class PaimonWriteResult implements Serializable {

private static final long serialVersionUID = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ void testMultipleEndpoint() throws Exception {
.get();
assertThat(nettyClient.connections().size()).isEqualTo(1);
try (NettyClient client =
new NettyClient(conf, TestingClientMetricGroup.newInstance(), false); ) {
new NettyClient(conf, TestingClientMetricGroup.newInstance(), false)) {
client.sendRequest(
new ServerNode(
2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,7 @@ private void processNewTabletServer(NewTabletServerEvent newTabletServerEvent) {
// it may happen during coordinator server initiation, the watcher watch a new tablet
// server register event and put it to event manager, but after that, the coordinator
// server read
// all tablet server nodes registered which contain the tablet server a; in this case,
// all tablet server nodes registered which contain the tablet server; in this case,
// we can ignore it.
return;
}
Expand Down Expand Up @@ -1398,7 +1398,7 @@ private void tryToCompleteRebalanceTask(TableBucket tableBucket) {
*
* <ul>
* <li>B1. Move all replicas in AR to OnlineReplica state.
* <li>B2. Send a LeaderAndIsr request with RS = ORS +TRS. The will make the origin leader
* <li>B2. Send a LeaderAndIsr request with RS = ORS +TRS. This will make the origin leader
* change to the new leader. this request will be sent to every tabletServer in ORS +TRS.
* <li>B3. Set RS = TRS, AR = [], RR = [] in memory.
* <li>Re-send LeaderAndIsr request with new leader and a new RS (using TRS) and same isr to
Expand All @@ -1408,8 +1408,8 @@ private void tryToCompleteRebalanceTask(TableBucket tableBucket) {
* Leader to notify it of the shrunk isr. After that, we send a StopReplica (delete =
* false and deleteRemote = false) to the replicas in RR.
* <li>B6. Move all replicas in RR to ReplicaMigrationStarted state. This will send a
* StopReplica (delete = true and deleteRemote = false) to he replicas in RR to physically
* delete the replicas on disk but don't delete the data in remote storage.
* StopReplica (delete = true and deleteRemote = false) to the replicas in RR to
* physically delete the replicas on disk but don't delete the data in remote storage.
* <li>B7. Update ZK with RS=TRS, AR=[], RR=[].
* <li>B8. After electing leader, the replicas and isr information changes. So resend the
* update metadata request to every tabletServer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,11 @@ public void addStopReplicaRequestForTabletServers(
* none-partitioned table
* <li>case3: Table create and bucketAssignment don't generated, case will happen for new
* created partitioned table
* <li>case4: Table is queued for deletion, in this case we will set a empty tableBucket set
* <li>case4: Table is queued for deletion, in this case we will set an empty tableBucket set
* and tableId set to {@link TableMetadata#DELETED_TABLE_ID} to avoid send unless info to
* tabletServer
* <li>case5: Partition create and bucketAssignment of this partition generated.
* <li>case6: Partition is queued for deletion, in this case we will set a empty tableBucket
* <li>case6: Partition is queued for deletion, in this case we will set an empty tableBucket
* set and partitionId set to {@link PartitionMetadata#DELETED_PARTITION_ID } to avoid
* send unless info to tabletServer
* <li>case7: Leader and isr is changed for these input tableBuckets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import java.util.concurrent.CompletableFuture;

/** An event for receiving the request of commiting a completed snapshot to coordinator server. */
/** An event for receiving the request of committing a completed snapshot to coordinator server. */
public class CommitKvSnapshotEvent implements FencedCoordinatorEvent {

private final CommitKvSnapshotData commitKvSnapshotData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ public enum ActionType {
* Move leadership of a leader from a source tabletServer to a follower of the same replica
* residing in a destination tabletServer.
*/
LEADERSHIP_MOVEMENT;
LEADERSHIP_MOVEMENT
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void close() {
// shutdown asyncOperationsThreadPool now
asyncOperationsThreadPool.shutdownNow();
// close kvSnapshotScheduler, also stop any actively executing task immediately
// otherwise, a snapshot will still be take although it's closed, which will case exception
// otherwise, a snapshot will still be take although it's closed, which will cause exception
kvSnapshotScheduler.shutdownNow();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,8 @@ private void handleSnapshotCommitException(
// Fix for issue: https://github.com/apache/fluss/issues/1304
// Tablet server try to commit kv snapshot to coordinator server,
// coordinator server commit the kv snapshot to zk, then failover.
// Tablet server will got exception from coordinator server, but mistake it as a fail
// commit although coordinator server has committed to zk, then discard the commited kv
// Tablet server will get exception from coordinator server, but mistake it as a fail
// commit although coordinator server has committed to zk, then discard the committed kv
// snapshot.
//
// Idempotent check: Double check ZK to verify if the snapshot actually exists before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ static int run(File buildResult, Path root) throws IOException {
"Extracted "
+ deployedModules.size()
+ " modules that were deployed and "
+ modulesWithBundledDependencies.keySet().size()
+ modulesWithBundledDependencies.size()
+ " modules which bundle dependencies with a total of "
+ modulesWithBundledDependencies.values().stream().mapToInt(Set::size).sum()
+ " dependencies");
Expand Down
4 changes: 2 additions & 2 deletions website/blog/2025-06-01-partial-updates.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ Next, let's try and better understand how this works in practice with a concrete
### Example: Building a Unified Wide Table
> You can find the full source code on github [here](https://github.com/ververica/ververica-fluss-examples/tree/main/partial_updates).

Start by cloning the repository, run `docker compose up` to spin up the development enviroment and finally grab a terminal
Start by cloning the repository, run `docker compose up` to spin up the development environment and finally grab a terminal
into the `jobmanager` and start the Flink SQL cli, by running the following command:
```shell
./bin/sql-client.sh
Expand Down Expand Up @@ -149,7 +149,7 @@ and then run:
```sql
SELECT * FROM user_rec_wide;
```
to observe the output of the table, as we insert `partially` records into the it from the different sources.
to observe the output of the table, as we insert `partially` records into it from the different sources.

**Step 5:** Let's insert the records from the `recommendations` table into the `user_rec_wide` table.
```sql
Expand Down
2 changes: 1 addition & 1 deletion website/community/how-to-contribute/contribute-code.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Create an issue and reach consensus.

### Implement

Implement the change according to the Code Style and Quality(refer to the [Flink doc](https://flink.apache.org/how-to-contribute/code-style-and-quality-preamble/) Guide and the approach agreed upon in the issue.
Implement the change according to the Code Style and Quality (refer to the [Flink doc](https://flink.apache.org/how-to-contribute/code-style-and-quality-preamble/) Guide and the approach agreed upon in the issue).

1. Only start working on the implementation if there is consensus on the approach (e.g. you are assigned to the ticket)
2. If you are newer, can refer to [ide setup](/community/dev/ide-setup) to setup a Fluss dev environment.
Expand Down
2 changes: 1 addition & 1 deletion website/docs/maintenance/filesystems/oss.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ sidebar_position: 3

## Configurations setup

To enabled OSS as remote storage, there are some required configurations that must be add to Fluss' `server.yaml`:
To enabled OSS as remote storage, there are some required configurations that must be added to Fluss' `server.yaml`:

```yaml
# The dir that used to be as the remote storage of Fluss
Expand Down