Skip to content

Commit f3f4a9c

Browse files
authored
[kv] Validate non-primary-key table for KvSnapshotLease acquire and release APIs (#2723) (#2736)
1 parent abf4c91 commit f3f4a9c

File tree

2 files changed

+66
-10
lines changed

2 files changed

+66
-10
lines changed

fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.fluss.exception.InvalidPartitionException;
4242
import org.apache.fluss.exception.InvalidReplicationFactorException;
4343
import org.apache.fluss.exception.InvalidTableException;
44+
import org.apache.fluss.exception.NonPrimaryKeyTableException;
4445
import org.apache.fluss.exception.PartitionAlreadyExistsException;
4546
import org.apache.fluss.exception.PartitionNotExistException;
4647
import org.apache.fluss.exception.SchemaNotExistException;
@@ -2016,4 +2017,40 @@ void testGetTableStatsForWalChangelogModeTable() throws Exception {
20162017
.isInstanceOf(InvalidTableException.class)
20172018
.hasMessageContaining("Row count is disabled for this table");
20182019
}
2020+
2021+
@Test
2022+
void testKvSnapshotLeaseForLogTable() throws Exception {
2023+
// Create a log table (without primary key)
2024+
TablePath logTablePath = TablePath.of("test_db", "test_log_table_kv_lease");
2025+
Schema logSchema =
2026+
Schema.newBuilder()
2027+
.column("id", DataTypes.INT())
2028+
.column("name", DataTypes.STRING())
2029+
.column("age", DataTypes.INT())
2030+
.build();
2031+
TableDescriptor logTableDescriptor =
2032+
TableDescriptor.builder().schema(logSchema).distributedBy(3).build();
2033+
long tableId = createTable(logTablePath, logTableDescriptor, true);
2034+
2035+
// Create a KvSnapshotLease
2036+
KvSnapshotLease lease = admin.createKvSnapshotLease("test-lease-log", 60000);
2037+
2038+
// Test acquireSnapshots should fail for log table
2039+
Map<TableBucket, Long> snapshotIds = new HashMap<>();
2040+
snapshotIds.put(new TableBucket(tableId, 0), 0L);
2041+
assertThatThrownBy(() -> lease.acquireSnapshots(snapshotIds).get())
2042+
.cause()
2043+
.isInstanceOf(NonPrimaryKeyTableException.class)
2044+
.hasMessageContaining("is not a primary key table");
2045+
2046+
// Test releaseSnapshots should fail for log table
2047+
assertThatThrownBy(
2048+
() ->
2049+
lease.releaseSnapshots(
2050+
Collections.singleton(new TableBucket(tableId, 0)))
2051+
.get())
2052+
.cause()
2053+
.isInstanceOf(NonPrimaryKeyTableException.class)
2054+
.hasMessageContaining("is not a primary key table");
2055+
}
20192056
}

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.fluss.exception.InvalidDatabaseException;
3434
import org.apache.fluss.exception.InvalidTableException;
3535
import org.apache.fluss.exception.LakeTableAlreadyExistException;
36+
import org.apache.fluss.exception.NonPrimaryKeyTableException;
3637
import org.apache.fluss.exception.SecurityDisabledException;
3738
import org.apache.fluss.exception.TableAlreadyExistException;
3839
import org.apache.fluss.exception.TableNotPartitionedException;
@@ -50,6 +51,7 @@
5051
import org.apache.fluss.metadata.TableBucket;
5152
import org.apache.fluss.metadata.TableChange;
5253
import org.apache.fluss.metadata.TableDescriptor;
54+
import org.apache.fluss.metadata.TableInfo;
5355
import org.apache.fluss.metadata.TablePath;
5456
import org.apache.fluss.rpc.gateway.CoordinatorGateway;
5557
import org.apache.fluss.rpc.messages.AcquireKvSnapshotLeaseRequest;
@@ -300,6 +302,11 @@ public void authorizeTable(OperationType operationType, long tableId) {
300302
*/
301303
private void authorizeTableWithSession(
302304
Session session, OperationType operationType, long tableId) {
305+
TablePath tablePath = getTablePathById(tableId);
306+
authorizer.authorize(session, operationType, Resource.table(tablePath));
307+
}
308+
309+
private TablePath getTablePathById(long tableId) {
303310
TablePath tablePath;
304311
try {
305312
// TODO: this will block on the coordinator event thread, consider refactor
@@ -320,8 +327,16 @@ private void authorizeTableWithSession(
320327
+ "metadata cache in the server is not updated yet.",
321328
name(), tableId));
322329
}
330+
return tablePath;
331+
}
323332

324-
authorizer.authorize(session, operationType, Resource.table(tablePath));
333+
private void validateKvTable(long tableId) {
334+
TablePath tablePath = getTablePathById(tableId);
335+
TableInfo tableInfo = metadataManager.getTable(tablePath);
336+
if (!tableInfo.hasPrimaryKey()) {
337+
throw new NonPrimaryKeyTableException(
338+
"Table '" + tablePath + "' is not a primary key table");
339+
}
325340
}
326341

327342
@Override
@@ -881,13 +896,15 @@ public CompletableFuture<ControlledShutdownResponse> controlledShutdown(
881896
@Override
882897
public CompletableFuture<AcquireKvSnapshotLeaseResponse> acquireKvSnapshotLease(
883898
AcquireKvSnapshotLeaseRequest request) {
884-
// Authorization: require WRITE permission on all tables in the request
885-
if (authorizer != null) {
886-
for (PbKvSnapshotLeaseForTable kvSnapshotLeaseForTable :
887-
request.getSnapshotsToLeasesList()) {
888-
long tableId = kvSnapshotLeaseForTable.getTableId();
899+
for (PbKvSnapshotLeaseForTable kvSnapshotLeaseForTable :
900+
request.getSnapshotsToLeasesList()) {
901+
long tableId = kvSnapshotLeaseForTable.getTableId();
902+
if (authorizer != null) {
903+
// Authorization: require WRITE permission on all tables in the request
889904
authorizeTable(OperationType.READ, tableId);
890905
}
906+
907+
validateKvTable(tableId);
891908
}
892909

893910
String leaseId = request.getLeaseId();
@@ -913,12 +930,14 @@ public CompletableFuture<AcquireKvSnapshotLeaseResponse> acquireKvSnapshotLease(
913930
@Override
914931
public CompletableFuture<ReleaseKvSnapshotLeaseResponse> releaseKvSnapshotLease(
915932
ReleaseKvSnapshotLeaseRequest request) {
916-
// Authorization: require WRITE permission on all tables in the request
917-
if (authorizer != null) {
918-
for (PbTableBucket tableBucket : request.getBucketsToReleasesList()) {
919-
long tableId = tableBucket.getTableId();
933+
for (PbTableBucket tableBucket : request.getBucketsToReleasesList()) {
934+
long tableId = tableBucket.getTableId();
935+
if (authorizer != null) {
936+
// Authorization: require WRITE permission on all tables in the request.
920937
authorizeTable(OperationType.READ, tableId);
921938
}
939+
940+
validateKvTable(tableId);
922941
}
923942

924943
String leaseId = request.getLeaseId();

0 commit comments

Comments
 (0)