Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.fluss.exception.InvalidPartitionException;
import org.apache.fluss.exception.InvalidReplicationFactorException;
import org.apache.fluss.exception.InvalidTableException;
import org.apache.fluss.exception.NonPrimaryKeyTableException;
import org.apache.fluss.exception.PartitionAlreadyExistsException;
import org.apache.fluss.exception.PartitionNotExistException;
import org.apache.fluss.exception.SchemaNotExistException;
Expand Down Expand Up @@ -2016,4 +2017,40 @@ void testGetTableStatsForWalChangelogModeTable() throws Exception {
.isInstanceOf(InvalidTableException.class)
.hasMessageContaining("Row count is disabled for this table");
}

@Test
void testKvSnapshotLeaseForLogTable() throws Exception {
// Create a log table (without primary key)
TablePath logTablePath = TablePath.of("test_db", "test_log_table_kv_lease");
Schema logSchema =
Schema.newBuilder()
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.build();
TableDescriptor logTableDescriptor =
TableDescriptor.builder().schema(logSchema).distributedBy(3).build();
long tableId = createTable(logTablePath, logTableDescriptor, true);

// Create a KvSnapshotLease
KvSnapshotLease lease = admin.createKvSnapshotLease("test-lease-log", 60000);

// Test acquireSnapshots should fail for log table
Map<TableBucket, Long> snapshotIds = new HashMap<>();
snapshotIds.put(new TableBucket(tableId, 0), 0L);
assertThatThrownBy(() -> lease.acquireSnapshots(snapshotIds).get())
.cause()
.isInstanceOf(NonPrimaryKeyTableException.class)
.hasMessageContaining("is not a primary key table");

// Test releaseSnapshots should fail for log table
assertThatThrownBy(
() ->
lease.releaseSnapshots(
Collections.singleton(new TableBucket(tableId, 0)))
.get())
.cause()
.isInstanceOf(NonPrimaryKeyTableException.class)
.hasMessageContaining("is not a primary key table");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.fluss.exception.InvalidDatabaseException;
import org.apache.fluss.exception.InvalidTableException;
import org.apache.fluss.exception.LakeTableAlreadyExistException;
import org.apache.fluss.exception.NonPrimaryKeyTableException;
import org.apache.fluss.exception.SecurityDisabledException;
import org.apache.fluss.exception.TableAlreadyExistException;
import org.apache.fluss.exception.TableNotPartitionedException;
Expand All @@ -50,6 +51,7 @@
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.rpc.gateway.CoordinatorGateway;
import org.apache.fluss.rpc.messages.AcquireKvSnapshotLeaseRequest;
Expand Down Expand Up @@ -300,6 +302,11 @@ public void authorizeTable(OperationType operationType, long tableId) {
*/
private void authorizeTableWithSession(
Session session, OperationType operationType, long tableId) {
TablePath tablePath = getTablePathById(tableId);
authorizer.authorize(session, operationType, Resource.table(tablePath));
}

private TablePath getTablePathById(long tableId) {
TablePath tablePath;
try {
// TODO: this will block on the coordinator event thread, consider refactor
Expand All @@ -320,8 +327,16 @@ private void authorizeTableWithSession(
+ "metadata cache in the server is not updated yet.",
name(), tableId));
}
return tablePath;
}

authorizer.authorize(session, operationType, Resource.table(tablePath));
private void validateKvTable(long tableId) {
TablePath tablePath = getTablePathById(tableId);
TableInfo tableInfo = metadataManager.getTable(tablePath);
if (!tableInfo.hasPrimaryKey()) {
throw new NonPrimaryKeyTableException(
"Table '" + tablePath + "' is not a primary key table");
}
}

@Override
Expand Down Expand Up @@ -881,13 +896,15 @@ public CompletableFuture<ControlledShutdownResponse> controlledShutdown(
@Override
public CompletableFuture<AcquireKvSnapshotLeaseResponse> acquireKvSnapshotLease(
AcquireKvSnapshotLeaseRequest request) {
// Authorization: require WRITE permission on all tables in the request
if (authorizer != null) {
for (PbKvSnapshotLeaseForTable kvSnapshotLeaseForTable :
request.getSnapshotsToLeasesList()) {
long tableId = kvSnapshotLeaseForTable.getTableId();
for (PbKvSnapshotLeaseForTable kvSnapshotLeaseForTable :
request.getSnapshotsToLeasesList()) {
long tableId = kvSnapshotLeaseForTable.getTableId();
if (authorizer != null) {
// Authorization: require WRITE permission on all tables in the request
authorizeTable(OperationType.READ, tableId);
}

validateKvTable(tableId);
}

String leaseId = request.getLeaseId();
Expand All @@ -913,12 +930,14 @@ public CompletableFuture<AcquireKvSnapshotLeaseResponse> acquireKvSnapshotLease(
@Override
public CompletableFuture<ReleaseKvSnapshotLeaseResponse> releaseKvSnapshotLease(
ReleaseKvSnapshotLeaseRequest request) {
// Authorization: require WRITE permission on all tables in the request
if (authorizer != null) {
for (PbTableBucket tableBucket : request.getBucketsToReleasesList()) {
long tableId = tableBucket.getTableId();
for (PbTableBucket tableBucket : request.getBucketsToReleasesList()) {
long tableId = tableBucket.getTableId();
if (authorizer != null) {
// Authorization: require WRITE permission on all tables in the request.
authorizeTable(OperationType.READ, tableId);
}

validateKvTable(tableId);
}

String leaseId = request.getLeaseId();
Expand Down