Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import static org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_TABLE_REGISTER;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.lancedb.lance.Dataset;
import com.lancedb.lance.WriteParams;
import com.lancedb.lance.index.DistanceType;
import com.lancedb.lance.index.IndexParams;
import com.lancedb.lance.index.IndexType;
import com.lancedb.lance.index.vector.VectorIndexParams;
import com.lancedb.lance.schema.ColumnAlteration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand All @@ -40,6 +42,7 @@
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.catalog.ManagedSchemaOperations;
import org.apache.gravitino.catalog.ManagedTableOperations;
import org.apache.gravitino.connector.GenericTable;
import org.apache.gravitino.connector.SupportsSchemas;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.exceptions.NoSuchTableException;
Expand Down Expand Up @@ -160,31 +163,14 @@ public Table createTable(
@Override
public Table alterTable(NameIdentifier ident, TableChange... changes)
throws NoSuchSchemaException, TableAlreadyExistsException {
// Lance only supports adding indexes for now.
boolean onlyAddIndex =
Arrays.stream(changes).allMatch(change -> change instanceof TableChange.AddIndex);
Preconditions.checkArgument(onlyAddIndex, "Only adding indexes is supported for Lance tables");

List<Index> addedIndexes =
Arrays.stream(changes)
.filter(change -> change instanceof TableChange.AddIndex)
.map(
change -> {
TableChange.AddIndex addIndexChange = (TableChange.AddIndex) change;
return Indexes.IndexImpl.builder()
.withIndexType(addIndexChange.getType())
.withName(addIndexChange.getName())
.withFieldNames(addIndexChange.getFieldNames())
.build();
})
.collect(Collectors.toList());

Table loadedTable = super.loadTable(ident);
addLanceIndex(loadedTable, addedIndexes);
// After adding the index to the Lance dataset, we need to update the table metadata in
long version = handleLanceTableChange(loadedTable, changes);
// After making changes to the Lance dataset, we need to update the table metadata in
// Gravitino. If there's any failure during this process, the code will throw an exception
// and the update won't be applied in Gravitino.
return super.alterTable(ident, changes);
GenericTable table = (GenericTable) super.alterTable(ident, changes);
return table.setProperties(LanceConstants.LANCE_TABLE_VERSION, String.valueOf(version));
}

@Override
Expand Down Expand Up @@ -316,11 +302,48 @@ private org.apache.arrow.vector.types.pojo.Schema convertColumnsToArrowSchema(Co
return new org.apache.arrow.vector.types.pojo.Schema(fields);
}

private void addLanceIndex(Table table, List<Index> addedIndexes) {
// Note: this method can't guarantee the atomicity of the operations on Lance dataset. For
// example, only a subset of changes may be applied if an exception occurs during the process.
/**
* Handle the table changes on the underlying Lance dataset.
*
* @param table the table to be altered
* @param changes the changes to be applied
* @return the new version id of the Lance dataset after applying the changes
*/
private long handleLanceTableChange(Table table, TableChange[] changes) {
List<String> dropColumns = Lists.newArrayList();
List<Index> indexToAdd = Lists.newArrayList();
List<ColumnAlteration> renameColumns = Lists.newArrayList();

for (TableChange change : changes) {
if (change instanceof TableChange.DeleteColumn deleteColumn) {
dropColumns.add(String.join(".", deleteColumn.fieldName()));
} else if (change instanceof TableChange.AddIndex addIndex) {
indexToAdd.add(
Indexes.IndexImpl.builder()
.withIndexType(addIndex.getType())
.withName(addIndex.getName())
.withFieldNames(addIndex.getFieldNames())
.build());
} else if (change instanceof TableChange.RenameColumn renameColumn) {
// Currently, only renaming columns is supported.
// TODO: Support change column type once we have a clear knowledge about the means of
// castTo in Lance.
ColumnAlteration lanceColumnAlter =
new ColumnAlteration.Builder(String.join(".", renameColumn.fieldName()))
.rename(renameColumn.getNewName())
.build();
renameColumns.add(lanceColumnAlter);
} else {
throw new UnsupportedOperationException(
"Unsupported changes to lance table: " + change.getClass().getSimpleName());
}
}

String location = table.properties().get(Table.PROPERTY_LOCATION);
try (Dataset dataset = Dataset.open(location, new RootAllocator())) {
// For Lance, we only support adding indexes, so in fact, we can't handle drop index here.
for (Index index : addedIndexes) {
for (Index index : indexToAdd) {
IndexType indexType = IndexType.valueOf(index.type().name());
IndexParams indexParams = getIndexParamsByIndexType(indexType);

Expand All @@ -333,9 +356,21 @@ private void addLanceIndex(Table table, List<Index> addedIndexes) {
indexParams,
true);
}

if (!dropColumns.isEmpty()) {
dataset.dropColumns(dropColumns);
}

if (!renameColumns.isEmpty()) {
dataset.alterColumns(renameColumns);
}

return dataset.getVersion().getId();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(
"Failed to add indexes to Lance dataset at location " + location, e);
"Failed to handle alterations to Lance dataset at location " + location, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,20 @@ protected GenericTable internalBuild() {
return table;
}
}

// Copy the current table and set new properties
public GenericTable setProperties(String key, String value) {
GenericTable table = new GenericTable();
table.columns = this.columns;
table.comment = this.comment;
table.properties = this.properties;
table.auditInfo = this.auditInfo;
table.distribution = this.distribution;
table.indexes = this.indexes;
table.name = this.name;
table.partitioning = this.partitioning;
table.sortOrders = this.sortOrders;
table.properties.put(key, value);
return table;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@
*/
package org.apache.gravitino.lance.common.ops;

import com.lancedb.lance.namespace.model.AlterTableAddColumnsRequest;
import com.lancedb.lance.namespace.model.AlterTableAddColumnsResponse;
import com.lancedb.lance.namespace.model.AlterTableAlterColumnsRequest;
import com.lancedb.lance.namespace.model.AlterTableAlterColumnsResponse;
import com.lancedb.lance.namespace.model.AlterTableDropColumnsRequest;
import com.lancedb.lance.namespace.model.AlterTableDropColumnsResponse;
import com.lancedb.lance.namespace.model.CreateEmptyTableResponse;
import com.lancedb.lance.namespace.model.CreateTableRequest;
import com.lancedb.lance.namespace.model.CreateTableResponse;
Expand Down Expand Up @@ -113,4 +119,37 @@ RegisterTableResponse registerTable(
* @return the response of the drop table operation
*/
DropTableResponse dropTable(String tableId, String delimiter);

/**
* Alter a table to drop columns.
*
* @param tableId table ids are in the format of "{namespace}{delimiter}{table_name}"
* @param delimiter the delimiter used in the namespace
* @param request the request containing columns to be dropped
* @return the response of the alter table drop columns operation.
*/
AlterTableDropColumnsResponse alterTableDropColumns(
String tableId, String delimiter, AlterTableDropColumnsRequest request);

/**
* Alter a table to update columns.
*
* @param tableId table ids are in the format of "{namespace}{delimiter}{table_name}"
* @param delimiter the delimiter used in the namespace
* @param request the request containing columns to be altered
* @return the response of the alter table alter columns operation.
*/
AlterTableAlterColumnsResponse alterTableAlterColumns(
String tableId, String delimiter, AlterTableAlterColumnsRequest request);

/**
* Alter a table to add columns.
*
* @param tableId table ids are in the format of "{namespace}{delimiter}{table_name}"
* @param delimiter the delimiter used in the namespace
* @param request the request containing columns to be added
* @return the response of the alter table add columns operation.
*/
AlterTableAddColumnsResponse alterTableAddColumns(
String tableId, String delimiter, AlterTableAddColumnsRequest request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@
import com.google.common.collect.Maps;
import com.lancedb.lance.namespace.LanceNamespaceException;
import com.lancedb.lance.namespace.ObjectIdentifier;
import com.lancedb.lance.namespace.model.AlterTableAddColumnsRequest;
import com.lancedb.lance.namespace.model.AlterTableAddColumnsResponse;
import com.lancedb.lance.namespace.model.AlterTableAlterColumnsRequest;
import com.lancedb.lance.namespace.model.AlterTableAlterColumnsResponse;
import com.lancedb.lance.namespace.model.AlterTableDropColumnsRequest;
import com.lancedb.lance.namespace.model.AlterTableDropColumnsResponse;
import com.lancedb.lance.namespace.model.ColumnAlteration;
import com.lancedb.lance.namespace.model.CreateEmptyTableResponse;
import com.lancedb.lance.namespace.model.CreateTableRequest;
import com.lancedb.lance.namespace.model.CreateTableRequest.ModeEnum;
Expand All @@ -52,17 +59,24 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.exceptions.NoSuchTableException;
import org.apache.gravitino.lance.common.ops.LanceTableOperations;
import org.apache.gravitino.lance.common.utils.ArrowUtils;
import org.apache.gravitino.lance.common.utils.LanceConstants;
import org.apache.gravitino.lance.common.utils.LancePropertiesUtils;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.rel.TableChange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GravitinoLanceTableOperations implements LanceTableOperations {

public static final Logger LOG = LoggerFactory.getLogger(GravitinoLanceTableOperations.class);

private final GravitinoLanceNamespaceWrapper namespaceWrapper;

public GravitinoLanceTableOperations(GravitinoLanceNamespaceWrapper namespaceWrapper) {
Expand Down Expand Up @@ -294,6 +308,72 @@ public DropTableResponse dropTable(String tableId, String delimiter) {
return response;
}

@Override
public AlterTableDropColumnsResponse alterTableDropColumns(
String tableId, String delimiter, AlterTableDropColumnsRequest request) {
ObjectIdentifier nsId = ObjectIdentifier.of(tableId, Pattern.quote(delimiter));
Preconditions.checkArgument(
nsId.levels() == 3, "Expected at 3-level namespace but got: %s", nsId.levels());

String catalogName = nsId.levelAtListPos(0);
Catalog catalog = namespaceWrapper.loadAndValidateLakehouseCatalog(catalogName);

NameIdentifier tableIdentifier =
NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));

TableChange[] changes =
request.getColumns().stream()
.map(colName -> TableChange.deleteColumn(new String[] {colName}, false))
.toArray(TableChange[]::new);

Table table = catalog.asTableCatalog().alterTable(tableIdentifier, changes);
Long version =
Optional.ofNullable(table.properties().get(LanceConstants.LANCE_TABLE_VERSION))
.map(Long::valueOf)
.orElse(null);
AlterTableDropColumnsResponse alterTableDropColumnsResponse =
new AlterTableDropColumnsResponse();
alterTableDropColumnsResponse.setVersion(version);
return alterTableDropColumnsResponse;
}

@Override
public AlterTableAlterColumnsResponse alterTableAlterColumns(
String tableId, String delimiter, AlterTableAlterColumnsRequest request) {
ObjectIdentifier nsId = ObjectIdentifier.of(tableId, Pattern.quote(delimiter));
Preconditions.checkArgument(
nsId.levels() == 3, "Expected at 3-level namespace but got: %s", nsId.levels());

String catalogName = nsId.levelAtListPos(0);
Catalog catalog = namespaceWrapper.loadAndValidateLakehouseCatalog(catalogName);

NameIdentifier tableIdentifier =
NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));

List<TableChange> changes = buildAlterColumnChanges(request);
if (changes.isEmpty()) {
throw new IllegalArgumentException("No valid alterations found in the request.");
}
Table table =
catalog.asTableCatalog().alterTable(tableIdentifier, changes.toArray(new TableChange[0]));
Long version =
Optional.ofNullable(table.properties().get(LanceConstants.LANCE_TABLE_VERSION))
.map(Long::valueOf)
.orElse(null);
AlterTableAlterColumnsResponse response = new AlterTableAlterColumnsResponse();
response.setVersion(version);
return response;
}

@Override
public AlterTableAddColumnsResponse alterTableAddColumns(
String tableId, String delimiter, AlterTableAddColumnsRequest request) {
// We need to parse NewColumnTransform to Column, however, NewColumnTransform only contains
// the name and a string expression.
// More please see: https://docs.lancedb.com/api-reference/data/add-columns
throw new UnsupportedOperationException("Adding columns is not supported yet.");
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation of these 3 methods are basically the same; can they be merged together?

Copy link
Contributor Author

@yuqi1129 yuqi1129 Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean we can define the following interface to replace them all?

    @Override
  public AlterTableAlterColumnsResponse alterTableOperations(
      String tableId, String delimiter, Object request) {
    if (request instanceof AlterTableAddColumnsRequest addColumnsRequest) {
      // add column
    } else if (request instanceof AlterTableAlterColumnsRequest alterColumnsRequest) {
      return alterTableAlterColumns(tableId, delimiter, alterColumnsRequest);
    } else if (request instanceof AlterTableDropColumnsRequest dropColumnsRequest) {
      // drop column
      return alterTableDropColumns(tableId, delimiter, dropColumnsRequest);
    }
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, something like this. Since most of the logic are similar, if we continue to add more alter-related operations, there will be lots of duplicated codes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't change a lot. Most of the code are duplicated, like check id and load catalog, also like getting latest version. Please think seriously about how to do it.

private List<Column> extractColumns(org.apache.arrow.vector.types.pojo.Schema arrowSchema) {
List<Column> columns = new ArrayList<>();

Expand All @@ -319,4 +399,28 @@ private JsonArrowSchema toJsonArrowSchema(Column[] columns) {
return JsonArrowSchemaConverter.convertToJsonArrowSchema(
new org.apache.arrow.vector.types.pojo.Schema(fields));
}

private List<TableChange> buildAlterColumnChanges(AlterTableAlterColumnsRequest request) {
List<ColumnAlteration> columns = request.getAlterations();

List<TableChange> changes = new ArrayList<>();
for (ColumnAlteration column : columns) {
// Column name will not be null according to LanceDB spec.
String columnName = column.getColumn();
String newName = column.getRename();
if (StringUtils.isNotBlank(newName)) {
changes.add(TableChange.renameColumn(new String[] {columnName}, newName));
}

// The format of ColumnAlteration#castTo is unclear, so we will skip it now
// for more, please see:
// https://github.com/lance-format/lance-namespace/blob/9d9cde12520caea2fd80ea5f41a20a4db9b92524/java/lance-namespace-apache-client/api/openapi.yaml#L4508-L4511
if (StringUtils.isNotBlank(column.getCastTo())) {
LOG.error(
"Altering column '{}' data type is not supported yet due to unclear spec.", columnName);
throw new UnsupportedOperationException("Altering column data type is not supported yet.");
}
}
return changes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class LanceConstants {

public static final String LANCE_TABLE_REGISTER = "lance.register";

public static final String LANCE_TABLE_VERSION = "lance.version";
// Mark whether it is to create an empty Lance table(no data files)
public static final String LANCE_TABLE_CREATE_EMPTY = "lance.create-empty";

Expand Down
Loading