Skip to content

Commit b5fbc6e

Browse files
committed
Polish
1 parent a00fb04 commit b5fbc6e

File tree

6 files changed

+228
-22
lines changed

6 files changed

+228
-22
lines changed

catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceTableOperations.java

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.gravitino.catalog.lakehouse.lance;
2020

2121
import com.google.common.base.Preconditions;
22+
import com.google.common.collect.Lists;
2223
import com.lancedb.lance.Dataset;
2324
import com.lancedb.lance.WriteParams;
2425
import com.lancedb.lance.index.DistanceType;
@@ -144,27 +145,9 @@ public Table createTable(
144145
@Override
145146
public Table alterTable(NameIdentifier ident, TableChange... changes)
146147
throws NoSuchSchemaException, TableAlreadyExistsException {
147-
// Lance only supports adding indexes for now.
148-
boolean onlyAddIndex =
149-
Arrays.stream(changes).allMatch(change -> change instanceof TableChange.AddIndex);
150-
Preconditions.checkArgument(onlyAddIndex, "Only adding indexes is supported for Lance tables");
151-
152-
List<Index> addedIndexes =
153-
Arrays.stream(changes)
154-
.filter(change -> change instanceof TableChange.AddIndex)
155-
.map(
156-
change -> {
157-
TableChange.AddIndex addIndexChange = (TableChange.AddIndex) change;
158-
return Indexes.IndexImpl.builder()
159-
.withIndexType(addIndexChange.getType())
160-
.withName(addIndexChange.getName())
161-
.withFieldNames(addIndexChange.getFieldNames())
162-
.build();
163-
})
164-
.collect(Collectors.toList());
165148

166149
Table loadedTable = super.loadTable(ident);
167-
addLanceIndex(loadedTable, addedIndexes);
150+
handleLanceTableChange(loadedTable, changes);
168151
// After adding the index to the Lance dataset, we need to update the table metadata in
169152
// Gravitino. If there's any failure during this process, the code will throw an exception
170153
// and the update won't be applied in Gravitino.
@@ -240,11 +223,31 @@ private org.apache.arrow.vector.types.pojo.Schema convertColumnsToArrowSchema(Co
240223
return new org.apache.arrow.vector.types.pojo.Schema(fields);
241224
}
242225

243-
private void addLanceIndex(Table table, List<Index> addedIndexes) {
226+
private void handleLanceTableChange(Table table, TableChange[] changes) {
227+
// Lance only supports adding indexes for now.
228+
List<String> dropColumns = Lists.newArrayList();
229+
List<Index> indexToAdd = Lists.newArrayList();
230+
231+
for (TableChange change : changes) {
232+
if (change instanceof TableChange.DeleteColumn deleteColumn) {
233+
dropColumns.add(deleteColumn.fieldName()[0]);
234+
} else if (change instanceof TableChange.AddIndex addIndex) {
235+
indexToAdd.add(
236+
Indexes.IndexImpl.builder()
237+
.withIndexType(addIndex.getType())
238+
.withName(addIndex.getName())
239+
.withFieldNames(addIndex.getFieldNames())
240+
.build());
241+
} else {
242+
throw new UnsupportedOperationException(
243+
"LanceTableOperations only supports adding indexes currently.");
244+
}
245+
}
246+
244247
String location = table.properties().get(Table.PROPERTY_LOCATION);
245248
try (Dataset dataset = Dataset.open(location, new RootAllocator())) {
246249
// For Lance, we only support adding indexes, so in fact, we can't handle drop index here.
247-
for (Index index : addedIndexes) {
250+
for (Index index : indexToAdd) {
248251
IndexType indexType = IndexType.valueOf(index.type().name());
249252
IndexParams indexParams = getIndexParamsByIndexType(indexType);
250253

@@ -256,6 +259,10 @@ private void addLanceIndex(Table table, List<Index> addedIndexes) {
256259
Optional.of(index.name()),
257260
indexParams,
258261
true);
262+
263+
if (!dropColumns.isEmpty()) {
264+
dataset.dropColumns(dropColumns);
265+
}
259266
}
260267
} catch (Exception e) {
261268
throw new RuntimeException(

lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceTableOperations.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818
*/
1919
package org.apache.gravitino.lance.common.ops;
2020

21+
import com.lancedb.lance.namespace.model.AlterTableAddColumnsRequest;
22+
import com.lancedb.lance.namespace.model.AlterTableAddColumnsResponse;
23+
import com.lancedb.lance.namespace.model.AlterTableDropColumnsRequest;
24+
import com.lancedb.lance.namespace.model.AlterTableDropColumnsResponse;
2125
import com.lancedb.lance.namespace.model.CreateEmptyTableResponse;
2226
import com.lancedb.lance.namespace.model.CreateTableRequest;
2327
import com.lancedb.lance.namespace.model.CreateTableResponse;
@@ -113,4 +117,26 @@ RegisterTableResponse registerTable(
113117
* @return the response of the drop table operation
114118
*/
115119
DropTableResponse dropTable(String tableId, String delimiter);
120+
121+
/**
122+
* Alter a table to drop columns.
123+
*
124+
* @param tableId table ids are in the format of "{namespace}{delimiter}{table_name}"
125+
* @param delimiter the delimiter used in the namespace
126+
* @param request the request containing columns to be dropped
127+
* @return the response of the alter table drop columns operation.
128+
*/
129+
AlterTableDropColumnsResponse alterTableDropColumns(
130+
String tableId, String delimiter, AlterTableDropColumnsRequest request);
131+
132+
/**
133+
* Alter a table to add columns.
134+
*
135+
* @param tableId table ids are in the format of "{namespace}{delimiter}{table_name}"
136+
* @param delimiter the delimiter used in the namespace
137+
* @param request the request containing columns to be added
138+
* @return the response of the alter table add columns operation.
139+
*/
140+
AlterTableAddColumnsResponse alterTableAddColumns(
141+
String tableId, String delimiter, AlterTableAddColumnsRequest request);
116142
}

lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceTableOperations.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@
2828
import com.google.common.collect.Maps;
2929
import com.lancedb.lance.namespace.LanceNamespaceException;
3030
import com.lancedb.lance.namespace.ObjectIdentifier;
31+
import com.lancedb.lance.namespace.model.AlterTableAddColumnsRequest;
32+
import com.lancedb.lance.namespace.model.AlterTableAddColumnsResponse;
33+
import com.lancedb.lance.namespace.model.AlterTableDropColumnsRequest;
34+
import com.lancedb.lance.namespace.model.AlterTableDropColumnsResponse;
3135
import com.lancedb.lance.namespace.model.CreateEmptyTableResponse;
3236
import com.lancedb.lance.namespace.model.CreateTableRequest;
3337
import com.lancedb.lance.namespace.model.CreateTableRequest.ModeEnum;
@@ -57,6 +61,7 @@
5761
import org.apache.gravitino.lance.common.utils.LancePropertiesUtils;
5862
import org.apache.gravitino.rel.Column;
5963
import org.apache.gravitino.rel.Table;
64+
import org.apache.gravitino.rel.TableChange;
6065
import org.slf4j.Logger;
6166
import org.slf4j.LoggerFactory;
6267

@@ -324,6 +329,38 @@ public DropTableResponse dropTable(String tableId, String delimiter) {
324329
return response;
325330
}
326331

332+
@Override
333+
public AlterTableDropColumnsResponse alterTableDropColumns(
334+
String tableId, String delimiter, AlterTableDropColumnsRequest request) {
335+
ObjectIdentifier nsId = ObjectIdentifier.of(tableId, Pattern.quote(delimiter));
336+
Preconditions.checkArgument(
337+
nsId.levels() == 3, "Expected at 3-level namespace but got: %s", nsId.levels());
338+
339+
String catalogName = nsId.levelAtListPos(0);
340+
Catalog catalog = namespaceWrapper.loadAndValidateLakehouseCatalog(catalogName);
341+
342+
NameIdentifier tableIdentifier =
343+
NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));
344+
345+
TableChange[] changes =
346+
request.getColumns().stream()
347+
.map(colName -> TableChange.deleteColumn(new String[] {colName}, false))
348+
.toArray(TableChange[]::new);
349+
350+
catalog.asTableCatalog().alterTable(tableIdentifier, changes);
351+
352+
return new AlterTableDropColumnsResponse();
353+
}
354+
355+
@Override
356+
public AlterTableAddColumnsResponse alterTableAddColumns(
357+
String tableId, String delimiter, AlterTableAddColumnsRequest request) {
358+
// We need to parse NewColumnTransform to Column, however, NewColumnTransform only contains
359+
// the name and a string expression.
360+
// More please see: https://docs.lancedb.com/api-reference/data/add-columns
361+
throw new UnsupportedOperationException("Adding columns is not supported yet.");
362+
}
363+
327364
private List<Column> extractColumns(org.apache.arrow.vector.types.pojo.Schema arrowSchema) {
328365
List<Column> columns = new ArrayList<>();
329366

lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@
2525

2626
import com.codahale.metrics.annotation.ResponseMetered;
2727
import com.codahale.metrics.annotation.Timed;
28+
import com.google.common.base.Preconditions;
2829
import com.google.common.collect.Maps;
30+
import com.lancedb.lance.namespace.model.AlterTableDropColumnsRequest;
31+
import com.lancedb.lance.namespace.model.AlterTableDropColumnsResponse;
2932
import com.lancedb.lance.namespace.model.CreateEmptyTableRequest;
3033
import com.lancedb.lance.namespace.model.CreateEmptyTableResponse;
3134
import com.lancedb.lance.namespace.model.CreateTableRequest;
@@ -235,6 +238,27 @@ public Response dropTable(
235238
}
236239
}
237240

241+
@POST
242+
@Path("/drop_columns")
243+
@Timed(name = "drop-columns." + MetricNames.HTTP_PROCESS_DURATION, absolute = true)
244+
@ResponseMetered(name = "drop-columns", absolute = true)
245+
public Response dropColumns(
246+
@PathParam("id") String tableId,
247+
@QueryParam("delimiter") @DefaultValue("$") String delimiter,
248+
@Context HttpHeaders headers,
249+
AlterTableDropColumnsRequest alterTableDropColumnsRequest) {
250+
try {
251+
validateDropColumnsRequest(alterTableDropColumnsRequest);
252+
AlterTableDropColumnsResponse response =
253+
lanceNamespace
254+
.asTableOps()
255+
.alterTableDropColumns(tableId, delimiter, alterTableDropColumnsRequest);
256+
return Response.ok(response).build();
257+
} catch (Exception e) {
258+
return LanceExceptionMapper.toRESTResponse(tableId, e);
259+
}
260+
}
261+
238262
private void validateCreateEmptyTableRequest(
239263
@SuppressWarnings("unused") CreateEmptyTableRequest request) {
240264
// No specific fields to validate for now
@@ -266,4 +290,9 @@ private void validateDropTableRequest(@SuppressWarnings("unused") DropTableReque
266290
// We will ignore the id in the request body since it's already provided in the path param
267291
// No specific fields to validate for now
268292
}
293+
294+
private void validateDropColumnsRequest(AlterTableDropColumnsRequest request) {
295+
Preconditions.checkArgument(
296+
!request.getColumns().isEmpty(), "Columns to drop cannot be empty.");
297+
}
269298
}

lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/integration/test/LanceRESTServiceIT.java

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import com.lancedb.lance.namespace.LanceNamespaceException;
2626
import com.lancedb.lance.namespace.LanceNamespaces;
2727
import com.lancedb.lance.namespace.client.apache.ApiException;
28+
import com.lancedb.lance.namespace.client.apache.api.TableApi;
29+
import com.lancedb.lance.namespace.model.AlterTableDropColumnsRequest;
2830
import com.lancedb.lance.namespace.model.CreateEmptyTableRequest;
2931
import com.lancedb.lance.namespace.model.CreateEmptyTableResponse;
3032
import com.lancedb.lance.namespace.model.CreateNamespaceRequest;
@@ -51,6 +53,7 @@
5153
import com.lancedb.lance.namespace.model.RegisterTableRequest.ModeEnum;
5254
import com.lancedb.lance.namespace.model.RegisterTableResponse;
5355
import com.lancedb.lance.namespace.model.TableExistsRequest;
56+
import com.lancedb.lance.namespace.rest.RestNamespace;
5457
import com.lancedb.lance.namespace.rest.RestNamespaceConfig;
5558
import java.io.File;
5659
import java.io.IOException;
@@ -69,6 +72,7 @@
6972
import org.apache.arrow.vector.types.pojo.ArrowType;
7073
import org.apache.arrow.vector.types.pojo.Field;
7174
import org.apache.commons.io.FileUtils;
75+
import org.apache.commons.lang3.reflect.FieldUtils;
7276
import org.apache.gravitino.Catalog;
7377
import org.apache.gravitino.NameIdentifier;
7478
import org.apache.gravitino.Schema;
@@ -467,7 +471,7 @@ void testCreateEmptyTable() throws ApiException {
467471
}
468472

469473
@Test
470-
void testCreateTable() throws IOException, ApiException {
474+
void testCreateTable() throws IOException, ApiException, IllegalAccessException {
471475
catalog = createCatalog(CATALOG_NAME);
472476
createSchema();
473477

@@ -592,6 +596,56 @@ void testCreateTable() throws IOException, ApiException {
592596
Set<String> stringSet = listResponse.getTables();
593597
Assertions.assertEquals(1, stringSet.size());
594598
Assertions.assertTrue(stringSet.contains(Joiner.on(".").join(ids)));
599+
600+
// Now try to drop columns in the table
601+
AlterTableDropColumnsRequest dropColumnsRequest = new AlterTableDropColumnsRequest();
602+
List.of(ids);
603+
dropColumnsRequest.setId(List.of(CATALOG_NAME, SCHEMA_NAME, "table"));
604+
dropColumnsRequest.setColumns(List.of("value"));
605+
606+
// No alterTableDropColumns in Namespace interface, so we need to get TableApi via reflection
607+
RestNamespace restNamespace = (RestNamespace) ns;
608+
TableApi tableApi = (TableApi) FieldUtils.readField(restNamespace, "tableApi", true);
609+
String delimiter = RestNamespaceConfig.DELIMITER_DEFAULT;
610+
611+
Assertions.assertDoesNotThrow(
612+
() ->
613+
tableApi.alterTableDropColumns(
614+
String.join(delimiter, ids), dropColumnsRequest, delimiter));
615+
616+
describeTableRequest.setId(ids);
617+
loadTable = ns.describeTable(describeTableRequest);
618+
Assertions.assertNotNull(loadTable);
619+
Assertions.assertEquals(newLocation, loadTable.getLocation());
620+
621+
jsonArrowFields = loadTable.getSchema().getFields();
622+
Assertions.assertEquals(1, jsonArrowFields.size());
623+
for (int i = 0; i < jsonArrowFields.size(); i++) {
624+
JsonArrowField jsonArrowField = jsonArrowFields.get(i);
625+
Field originalField = schema.getFields().get(i);
626+
Assertions.assertEquals(originalField.getName(), jsonArrowField.getName());
627+
628+
if (i == 0) {
629+
Assertions.assertEquals("int32", jsonArrowField.getType().getType());
630+
} else if (i == 1) {
631+
Assertions.assertEquals("utf8", jsonArrowField.getType().getType());
632+
}
633+
}
634+
635+
// Drop a non-existing column should fail
636+
AlterTableDropColumnsRequest dropNonExistingColumnsRequest = new AlterTableDropColumnsRequest();
637+
dropNonExistingColumnsRequest.setId(ids);
638+
dropNonExistingColumnsRequest.setColumns(List.of("non_existing_column"));
639+
Exception dropColumnException =
640+
Assertions.assertThrows(
641+
Exception.class,
642+
() ->
643+
tableApi.alterTableDropColumns(
644+
String.join(delimiter, ids), dropNonExistingColumnsRequest, delimiter));
645+
Assertions.assertTrue(
646+
dropColumnException
647+
.getMessage()
648+
.contains("Column non_existing_column not found for deletion "));
595649
}
596650

597651
@Test

lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/service/rest/TestLanceNamespaceOperations.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import com.google.common.collect.Lists;
3131
import com.google.common.collect.Sets;
3232
import com.lancedb.lance.namespace.LanceNamespaceException;
33+
import com.lancedb.lance.namespace.model.AlterTableDropColumnsRequest;
34+
import com.lancedb.lance.namespace.model.AlterTableDropColumnsResponse;
3335
import com.lancedb.lance.namespace.model.CreateEmptyTableRequest;
3436
import com.lancedb.lance.namespace.model.CreateEmptyTableResponse;
3537
import com.lancedb.lance.namespace.model.CreateNamespaceRequest;
@@ -48,6 +50,7 @@
4850
import com.lancedb.lance.namespace.model.RegisterTableRequest;
4951
import com.lancedb.lance.namespace.model.RegisterTableResponse;
5052
import java.io.IOException;
53+
import java.util.List;
5154
import java.util.regex.Pattern;
5255
import javax.servlet.http.HttpServletRequest;
5356
import javax.ws.rs.client.Entity;
@@ -772,4 +775,54 @@ void testDropTable() {
772775
Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), resp.getStatus());
773776
Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getMediaType());
774777
}
778+
779+
@Test
780+
void testDropColumns() {
781+
String tableIds = "catalog.scheme.alter_table_drop_columns";
782+
String delimiter = ".";
783+
784+
// first try to create a table and drop columns from it
785+
AlterTableDropColumnsResponse dropColumnsResponse = new AlterTableDropColumnsResponse();
786+
dropColumnsResponse.setVersion(2L);
787+
788+
AlterTableDropColumnsRequest dropColumnsRequest = new AlterTableDropColumnsRequest();
789+
dropColumnsRequest.setColumns(List.of("id"));
790+
when(tableOps.alterTableDropColumns(any(), any(), any())).thenReturn(dropColumnsResponse);
791+
Response resp =
792+
target(String.format("/v1/table/%s/drop_columns", tableIds))
793+
.queryParam("delimiter", delimiter)
794+
.request(MediaType.APPLICATION_JSON_TYPE)
795+
.post(Entity.entity(dropColumnsRequest, MediaType.APPLICATION_JSON_TYPE));
796+
797+
Assertions.assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus());
798+
Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getMediaType());
799+
AlterTableDropColumnsResponse response = resp.readEntity(AlterTableDropColumnsResponse.class);
800+
Assertions.assertEquals(dropColumnsResponse.getVersion(), response.getVersion());
801+
802+
// Test runtime exception
803+
Mockito.reset(tableOps);
804+
when(tableOps.alterTableDropColumns(any(), any(), any()))
805+
.thenThrow(new RuntimeException("Runtime exception"));
806+
resp =
807+
target(String.format("/v1/table/%s/drop_columns", tableIds))
808+
.queryParam("delimiter", delimiter)
809+
.request(MediaType.APPLICATION_JSON_TYPE)
810+
.post(Entity.entity(dropColumnsRequest, MediaType.APPLICATION_JSON_TYPE));
811+
Assertions.assertEquals(
812+
Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), resp.getStatus());
813+
Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getMediaType());
814+
815+
// Test No such table exception
816+
Mockito.reset(tableOps);
817+
when(tableOps.alterTableDropColumns(any(), any(), any()))
818+
.thenThrow(
819+
LanceNamespaceException.notFound(
820+
"Table not found", "NoSuchTableException", tableIds, ""));
821+
resp =
822+
target(String.format("/v1/table/%s/drop_columns", tableIds))
823+
.queryParam("delimiter", delimiter)
824+
.request(MediaType.APPLICATION_JSON_TYPE)
825+
.post(Entity.entity(dropColumnsRequest, MediaType.APPLICATION_JSON_TYPE));
826+
Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), resp.getStatus());
827+
}
775828
}

0 commit comments

Comments
 (0)