Skip to content

Commit 81e3f3a

Browse files
[kv] Add Producer Offset Snapshot for Exactly-Once semantics (#2434)
1 parent 62db319 commit 81e3f3a

File tree

30 files changed

+4124
-22
lines changed

30 files changed

+4124
-22
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171

7272
import java.util.Collection;
7373
import java.util.List;
74+
import java.util.Map;
7475
import java.util.Optional;
7576
import java.util.concurrent.CompletableFuture;
7677

@@ -604,4 +605,65 @@ CompletableFuture<Optional<RebalanceProgress>> listRebalanceProgress(
604605
* NoRebalanceInProgressException} will be thrown.
605606
*/
606607
CompletableFuture<Void> cancelRebalance(@Nullable String rebalanceId);
608+
609+
// ==================================================================================
610+
// Producer Offset Management APIs (for Exactly-Once Semantics)
611+
// ==================================================================================
612+
613+
/**
614+
* Register producer offset snapshot.
615+
*
616+
* <p>This method provides atomic "check and register" semantics:
617+
*
618+
* <ul>
619+
* <li>If snapshot does not exist: create new snapshot and return {@link
620+
* RegisterResult#CREATED}
621+
* <li>If snapshot already exists: do NOT overwrite and return {@link
622+
* RegisterResult#ALREADY_EXISTS}
623+
* </ul>
624+
*
625+
* <p>The atomicity is guaranteed by the server implementation. This enables the caller to
626+
* determine whether undo recovery is needed based on the return value.
627+
*
628+
* <p>The snapshot will be automatically cleaned up after the configured TTL expires.
629+
*
630+
* <p>This API is typically used by Flink Operator Coordinator at job startup to register the
631+
* initial offset snapshot before any data is written.
632+
*
633+
* @param producerId the ID of the producer (typically Flink job ID)
634+
* @param offsets map of TableBucket to offset for all tables
635+
* @return a CompletableFuture containing the registration result indicating whether the
636+
* snapshot was newly created or already existed
637+
* @since 0.9
638+
*/
639+
CompletableFuture<RegisterResult> registerProducerOffsets(
640+
String producerId, Map<TableBucket, Long> offsets);
641+
642+
/**
643+
* Get producer offset snapshot.
644+
*
645+
* <p>This method retrieves the registered offset snapshot for a producer. Returns null if no
646+
* snapshot exists for the given producer ID.
647+
*
648+
* <p>This API is typically used by Flink Operator Coordinator at job startup to check if a
649+
* previous snapshot exists (indicating a failover before first checkpoint).
650+
*
651+
* @param producerId the ID of the producer
652+
* @return a CompletableFuture containing the producer offsets, or null if not found
653+
* @since 0.9
654+
*/
655+
CompletableFuture<ProducerOffsetsResult> getProducerOffsets(String producerId);
656+
657+
/**
658+
* Delete producer offset snapshot.
659+
*
660+
* <p>This method deletes the registered offset snapshot for a producer. This is typically
661+
* called after the first checkpoint completes successfully, as the checkpoint state will be
662+
* used for recovery instead of the initial snapshot.
663+
*
664+
* @param producerId the ID of the producer
665+
* @return a CompletableFuture that completes when deletion succeeds
666+
* @since 0.9
667+
*/
668+
CompletableFuture<Void> deleteProducerOffsets(String producerId);
607669
}

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.apache.fluss.rpc.messages.CreateTableRequest;
5858
import org.apache.fluss.rpc.messages.DatabaseExistsRequest;
5959
import org.apache.fluss.rpc.messages.DatabaseExistsResponse;
60+
import org.apache.fluss.rpc.messages.DeleteProducerOffsetsRequest;
6061
import org.apache.fluss.rpc.messages.DescribeClusterConfigsRequest;
6162
import org.apache.fluss.rpc.messages.DropAclsRequest;
6263
import org.apache.fluss.rpc.messages.DropDatabaseRequest;
@@ -65,6 +66,7 @@
6566
import org.apache.fluss.rpc.messages.GetKvSnapshotMetadataRequest;
6667
import org.apache.fluss.rpc.messages.GetLatestKvSnapshotsRequest;
6768
import org.apache.fluss.rpc.messages.GetLatestLakeSnapshotRequest;
69+
import org.apache.fluss.rpc.messages.GetProducerOffsetsRequest;
6870
import org.apache.fluss.rpc.messages.GetTableInfoRequest;
6971
import org.apache.fluss.rpc.messages.GetTableSchemaRequest;
7072
import org.apache.fluss.rpc.messages.ListAclsRequest;
@@ -105,6 +107,7 @@
105107
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeDropPartitionRequest;
106108
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeListOffsetsRequest;
107109
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makePbPartitionSpec;
110+
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeRegisterProducerOffsetsRequest;
108111
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.toConfigEntries;
109112
import static org.apache.fluss.client.utils.MetadataUtils.sendMetadataRequestAndRebuildCluster;
110113
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindings;
@@ -588,6 +591,49 @@ public CompletableFuture<Void> cancelRebalance(@Nullable String rebalanceId) {
588591
return gateway.cancelRebalance(request).thenApply(r -> null);
589592
}
590593

594+
// ==================================================================================
595+
// Producer Offset Management APIs (for Exactly-Once Semantics)
596+
// ==================================================================================
597+
598+
@Override
599+
public CompletableFuture<RegisterResult> registerProducerOffsets(
600+
String producerId, Map<TableBucket, Long> offsets) {
601+
checkNotNull(producerId, "producerId must not be null");
602+
checkNotNull(offsets, "offsets must not be null");
603+
604+
return gateway.registerProducerOffsets(
605+
makeRegisterProducerOffsetsRequest(producerId, offsets))
606+
.thenApply(
607+
response -> {
608+
int code =
609+
response.hasResult()
610+
? response.getResult()
611+
: RegisterResult.CREATED.getCode();
612+
return RegisterResult.fromCode(code);
613+
});
614+
}
615+
616+
@Override
617+
public CompletableFuture<ProducerOffsetsResult> getProducerOffsets(String producerId) {
618+
checkNotNull(producerId, "producerId must not be null");
619+
620+
GetProducerOffsetsRequest request = new GetProducerOffsetsRequest();
621+
request.setProducerId(producerId);
622+
623+
return gateway.getProducerOffsets(request)
624+
.thenApply(ClientRpcMessageUtils::toProducerOffsetsResult);
625+
}
626+
627+
@Override
628+
public CompletableFuture<Void> deleteProducerOffsets(String producerId) {
629+
checkNotNull(producerId, "producerId must not be null");
630+
631+
DeleteProducerOffsetsRequest request = new DeleteProducerOffsetsRequest();
632+
request.setProducerId(producerId);
633+
634+
return gateway.deleteProducerOffsets(request).thenApply(r -> null);
635+
}
636+
591637
@Override
592638
public void close() {
593639
// nothing to do yet
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.admin;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.metadata.TableBucket;
22+
23+
import java.util.Collections;
24+
import java.util.Map;
25+
26+
/**
27+
* Result containing producer offset snapshot data.
28+
*
29+
* <p>This class holds the offset snapshot for a producer, which is used for undo recovery when a
30+
* Flink job fails over before completing its first checkpoint.
31+
*
32+
* <p>The snapshot contains bucket offsets organized by table ID, allowing the Flink Operator
33+
* Coordinator to coordinate undo recovery across all subtasks.
34+
*
35+
* @since 0.9
36+
*/
37+
@PublicEvolving
38+
public class ProducerOffsetsResult {
39+
40+
private final String producerId;
41+
private final Map<Long, Map<TableBucket, Long>> tableOffsets;
42+
private final long expirationTime;
43+
44+
/**
45+
* Creates a new ProducerOffsetsResult.
46+
*
47+
* @param producerId the producer ID (typically Flink job ID)
48+
* @param tableOffsets map of table ID to bucket offsets
49+
* @param expirationTime the expiration timestamp in milliseconds
50+
*/
51+
public ProducerOffsetsResult(
52+
String producerId,
53+
Map<Long, Map<TableBucket, Long>> tableOffsets,
54+
long expirationTime) {
55+
this.producerId = producerId;
56+
this.tableOffsets = Collections.unmodifiableMap(tableOffsets);
57+
this.expirationTime = expirationTime;
58+
}
59+
60+
/**
61+
* Get the producer ID.
62+
*
63+
* @return the producer ID
64+
*/
65+
public String getProducerId() {
66+
return producerId;
67+
}
68+
69+
/**
70+
* Get the offset snapshot for all tables.
71+
*
72+
* @return unmodifiable map of table ID to bucket offsets
73+
*/
74+
public Map<Long, Map<TableBucket, Long>> getTableOffsets() {
75+
return tableOffsets;
76+
}
77+
78+
/**
79+
* Get the expiration timestamp.
80+
*
81+
* @return the expiration timestamp in milliseconds since epoch
82+
*/
83+
public long getExpirationTime() {
84+
return expirationTime;
85+
}
86+
87+
@Override
88+
public String toString() {
89+
return "ProducerOffsetsResult{"
90+
+ "producerId='"
91+
+ producerId
92+
+ '\''
93+
+ ", tableCount="
94+
+ tableOffsets.size()
95+
+ ", expirationTime="
96+
+ expirationTime
97+
+ '}';
98+
}
99+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.admin;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
22+
/**
23+
* Result of producer offset registration.
24+
*
25+
* <p>This enum indicates whether a producer offset snapshot was newly created or already existed
26+
* when calling registerProducerOffsets API.
27+
*
28+
* @since 0.9
29+
*/
30+
@PublicEvolving
31+
public enum RegisterResult {
32+
/**
33+
* Snapshot was newly created.
34+
*
35+
* <p>This indicates a first startup scenario where no previous snapshot existed. The caller
36+
* does not need to perform undo recovery.
37+
*/
38+
CREATED(0),
39+
40+
/**
41+
* Snapshot already existed and was not overwritten.
42+
*
43+
* <p>This indicates a failover scenario where a previous snapshot exists. The caller should
44+
* perform undo recovery using the existing snapshot offsets.
45+
*/
46+
ALREADY_EXISTS(1);
47+
48+
/** The code used in RPC messages. */
49+
private final int code;
50+
51+
RegisterResult(int code) {
52+
this.code = code;
53+
}
54+
55+
/** Returns the code used in RPC messages. */
56+
public int getCode() {
57+
return code;
58+
}
59+
60+
/** Returns the RegisterResult for the given code. */
61+
public static RegisterResult fromCode(int code) {
62+
for (RegisterResult result : values()) {
63+
if (result.code == code) {
64+
return result;
65+
}
66+
}
67+
throw new IllegalArgumentException("Unknown RegisterResult code: " + code);
68+
}
69+
}

0 commit comments

Comments
 (0)