Skip to content

Commit 40c926b

Browse files
Backport to branch(3.13) : Add table metadata service (#2504)
Co-authored-by: inv-jishnu <[email protected]> Co-authored-by: Jishnu J <[email protected]>
1 parent 76dbc58 commit 40c926b

File tree

9 files changed

+381
-0
lines changed

9 files changed

+381
-0
lines changed

core/src/main/java/com/scalar/db/common/error/CoreError.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -700,6 +700,10 @@ public enum CoreError implements ScalarDbError {
700700
"The provided partition key order does not match the table schema. Required order: %s",
701701
"",
702702
""),
703+
DATA_LOADER_MISSING_NAMESPACE_OR_TABLE(
704+
Category.USER_ERROR, "0165", "Missing namespace or table: %s, %s", "", ""),
705+
DATA_LOADER_TABLE_METADATA_RETRIEVAL_FAILED(
706+
Category.USER_ERROR, "0166", "Failed to retrieve table metadata. Details: %s", "", ""),
703707

704708
//
705709
// Errors for the concurrency error category
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package com.scalar.db.dataloader.core.dataimport.controlfile;
2+
3+
import com.fasterxml.jackson.annotation.JsonCreator;
4+
import com.fasterxml.jackson.annotation.JsonProperty;
5+
import java.util.ArrayList;
6+
import java.util.List;
7+
import lombok.Getter;
8+
import lombok.Setter;
9+
10+
/**
11+
* Represents the configuration for a single table in the control file, including its namespace,
12+
* table name, and field mappings. This class is used to define how data from a control file maps to
13+
* a specific table in ScalarDB.
14+
*/
15+
@Getter
16+
@Setter
17+
public class ControlFileTable {
18+
19+
/** The namespace of the table in ScalarDB. */
20+
@JsonProperty("namespace")
21+
private String namespace;
22+
23+
/** The name of the table in ScalarDB. */
24+
@JsonProperty("table")
25+
private String table;
26+
27+
/**
28+
* A list of mappings defining the correspondence between control file fields and table columns.
29+
*/
30+
@JsonProperty("mappings")
31+
private final List<ControlFileTableFieldMapping> mappings;
32+
33+
/**
34+
* Creates a new {@code ControlFileTable} instance with the specified namespace and table name.
35+
* The mappings list is initialized as an empty list.
36+
*
37+
* @param namespace The namespace of the table in ScalarDB.
38+
* @param table The name of the table in ScalarDB.
39+
*/
40+
public ControlFileTable(String namespace, String table) {
41+
this.namespace = namespace;
42+
this.table = table;
43+
this.mappings = new ArrayList<>();
44+
}
45+
46+
/**
47+
* Constructs a {@code ControlFileTable} instance using data from a serialized JSON object. This
48+
* constructor is used for deserialization of API requests or control files.
49+
*
50+
* @param namespace The namespace of the table in ScalarDB.
51+
* @param table The name of the table in ScalarDB.
52+
* @param mappings A list of mappings that define the relationship between control file fields and
53+
* table columns.
54+
*/
55+
@JsonCreator
56+
public ControlFileTable(
57+
@JsonProperty("namespace") String namespace,
58+
@JsonProperty("table") String table,
59+
@JsonProperty("mappings") List<ControlFileTableFieldMapping> mappings) {
60+
this.namespace = namespace;
61+
this.table = table;
62+
this.mappings = mappings;
63+
}
64+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package com.scalar.db.dataloader.core.dataimport.controlfile;
2+
3+
import com.fasterxml.jackson.annotation.JsonCreator;
4+
import com.fasterxml.jackson.annotation.JsonProperty;
5+
import lombok.Getter;
6+
import lombok.Setter;
7+
8+
/**
9+
* Represents the mapping of a single field in the control file to a column in a ScalarDB table.
10+
* This class defines how data from a specific field in the input source should be mapped to the
11+
* corresponding column in the database.
12+
*/
13+
@Getter
14+
@Setter
15+
public class ControlFileTableFieldMapping {
16+
17+
/** The name of the field in the input source (e.g., JSON or CSV). */
18+
@JsonProperty("source_field")
19+
private String sourceField;
20+
21+
/** The name of the column in the ScalarDB table that the field maps to. */
22+
@JsonProperty("target_column")
23+
private String targetColumn;
24+
25+
/**
26+
* Constructs a {@code ControlFileTableFieldMapping} instance using data from a serialized JSON
27+
* object. This constructor is primarily used for deserialization of control file mappings.
28+
*
29+
* @param sourceField The name of the field in the input source (e.g., JSON or CSV).
30+
* @param targetColumn The name of the corresponding column in the ScalarDB table.
31+
*/
32+
@JsonCreator
33+
public ControlFileTableFieldMapping(
34+
@JsonProperty("source_field") String sourceField,
35+
@JsonProperty("target_column") String targetColumn) {
36+
this.sourceField = sourceField;
37+
this.targetColumn = targetColumn;
38+
}
39+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package com.scalar.db.dataloader.core.tablemetadata;
2+
3+
/** A custom exception that encapsulates errors thrown by the TableMetaDataService */
4+
public class TableMetadataException extends Exception {
5+
6+
/**
7+
* Class constructor
8+
*
9+
* @param message error message
10+
* @param cause reason for exception
11+
*/
12+
public TableMetadataException(String message, Throwable cause) {
13+
super(message, cause);
14+
}
15+
16+
/**
17+
* Class constructor
18+
*
19+
* @param message error message
20+
*/
21+
public TableMetadataException(String message) {
22+
super(message);
23+
}
24+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package com.scalar.db.dataloader.core.tablemetadata;
2+
3+
import lombok.Getter;
4+
5+
/** Represents the request for metadata for a single ScalarDB table */
6+
@Getter
7+
public class TableMetadataRequest {
8+
9+
private final String namespace;
10+
private final String table;
11+
12+
/**
13+
* Class constructor
14+
*
15+
* @param namespace ScalarDB namespace
16+
* @param table ScalarDB table name
17+
*/
18+
public TableMetadataRequest(String namespace, String table) {
19+
this.namespace = namespace;
20+
this.table = table;
21+
}
22+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package com.scalar.db.dataloader.core.tablemetadata;
2+
3+
import com.scalar.db.api.DistributedStorageAdmin;
4+
import com.scalar.db.api.TableMetadata;
5+
import com.scalar.db.common.error.CoreError;
6+
import com.scalar.db.dataloader.core.util.TableMetadataUtil;
7+
import com.scalar.db.exception.storage.ExecutionException;
8+
import java.util.Collection;
9+
import java.util.HashMap;
10+
import java.util.Map;
11+
import lombok.RequiredArgsConstructor;
12+
13+
/**
14+
* Service for retrieving {@link TableMetadata} from ScalarDB. Provides methods to fetch metadata
15+
* for individual tables or a collection of tables.
16+
*/
17+
@RequiredArgsConstructor
18+
public class TableMetadataService {
19+
20+
private final DistributedStorageAdmin storageAdmin;
21+
22+
/**
23+
* Retrieves the {@link TableMetadata} for a specific namespace and table name.
24+
*
25+
* @param namespace The ScalarDB namespace.
26+
* @param tableName The name of the table within the specified namespace.
27+
* @return The {@link TableMetadata} object containing schema details of the specified table.
28+
* @throws TableMetadataException If the table or namespace does not exist, or if an error occurs
29+
* while fetching the metadata.
30+
*/
31+
public TableMetadata getTableMetadata(String namespace, String tableName)
32+
throws TableMetadataException {
33+
try {
34+
TableMetadata tableMetadata = storageAdmin.getTableMetadata(namespace, tableName);
35+
if (tableMetadata == null) {
36+
throw new TableMetadataException(
37+
CoreError.DATA_LOADER_MISSING_NAMESPACE_OR_TABLE.buildMessage(namespace, tableName));
38+
}
39+
return tableMetadata;
40+
} catch (ExecutionException e) {
41+
throw new TableMetadataException(
42+
CoreError.DATA_LOADER_TABLE_METADATA_RETRIEVAL_FAILED.buildMessage(e.getMessage()), e);
43+
}
44+
}
45+
46+
/**
47+
* Retrieves the {@link TableMetadata} for a collection of table metadata requests.
48+
*
49+
* <p>Each request specifies a namespace and table name. The method consolidates the metadata into
50+
* a map keyed by a unique lookup key generated for each table.
51+
*
52+
* @param requests A collection of {@link TableMetadataRequest} objects specifying the tables to
53+
* retrieve metadata for.
54+
* @return A map where the keys are unique lookup keys (namespace + table name) and the values are
55+
* the corresponding {@link TableMetadata} objects.
56+
* @throws TableMetadataException If any of the requested tables or namespaces are missing, or if
57+
* an error occurs while fetching the metadata.
58+
*/
59+
public Map<String, TableMetadata> getTableMetadata(Collection<TableMetadataRequest> requests)
60+
throws TableMetadataException {
61+
Map<String, TableMetadata> metadataMap = new HashMap<>();
62+
63+
for (TableMetadataRequest request : requests) {
64+
String namespace = request.getNamespace();
65+
String tableName = request.getTable();
66+
TableMetadata tableMetadata = getTableMetadata(namespace, tableName);
67+
String key = TableMetadataUtil.getTableLookupKey(namespace, tableName);
68+
metadataMap.put(key, tableMetadata);
69+
}
70+
71+
return metadataMap;
72+
}
73+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package com.scalar.db.dataloader.core.util;
2+
3+
import com.scalar.db.api.TableMetadata;
4+
import com.scalar.db.dataloader.core.Constants;
5+
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTable;
6+
import com.scalar.db.transaction.consensuscommit.Attribute;
7+
import com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils;
8+
import java.util.ArrayList;
9+
import java.util.List;
10+
import lombok.AccessLevel;
11+
import lombok.NoArgsConstructor;
12+
13+
/** Utility class for handling ScalarDB table metadata operations. */
14+
@NoArgsConstructor(access = AccessLevel.PRIVATE)
15+
public class TableMetadataUtil {
16+
17+
/**
18+
* Generates a unique lookup key for a table within a namespace.
19+
*
20+
* @param namespace The namespace of the table.
21+
* @param tableName The name of the table.
22+
* @return A formatted string representing the table lookup key.
23+
*/
24+
public static String getTableLookupKey(String namespace, String tableName) {
25+
return String.format(Constants.TABLE_LOOKUP_KEY_FORMAT, namespace, tableName);
26+
}
27+
28+
/**
29+
* Generates a unique lookup key for a table using control file table data.
30+
*
31+
* @param controlFileTable The control file table object containing namespace and table name.
32+
* @return A formatted string representing the table lookup key.
33+
*/
34+
public static String getTableLookupKey(ControlFileTable controlFileTable) {
35+
return String.format(
36+
Constants.TABLE_LOOKUP_KEY_FORMAT,
37+
controlFileTable.getNamespace(),
38+
controlFileTable.getTable());
39+
}
40+
41+
/**
42+
* Adds metadata columns to a list of projection columns for a ScalarDB table.
43+
*
44+
* @param tableMetadata The metadata of the ScalarDB table.
45+
* @param projections A list of projection column names.
46+
* @return A new list containing projection columns along with metadata columns.
47+
*/
48+
public static List<String> populateProjectionsWithMetadata(
49+
TableMetadata tableMetadata, List<String> projections) {
50+
List<String> projectionMetadata = new ArrayList<>();
51+
projections.forEach(
52+
projection -> {
53+
projectionMetadata.add(projection);
54+
if (!isKeyColumn(projection, tableMetadata)) {
55+
projectionMetadata.add(Attribute.BEFORE_PREFIX + projection);
56+
}
57+
});
58+
projectionMetadata.addAll(ConsensusCommitUtils.getTransactionMetaColumns().keySet());
59+
return projectionMetadata;
60+
}
61+
62+
/**
63+
* Checks whether a column is a key column (partition key or clustering key) in the table.
64+
*
65+
* @param column The name of the column to check.
66+
* @param tableMetadata The metadata of the ScalarDB table.
67+
* @return {@code true} if the column is a key column; {@code false} otherwise.
68+
*/
69+
private static boolean isKeyColumn(String column, TableMetadata tableMetadata) {
70+
return tableMetadata.getPartitionKeyNames().contains(column)
71+
|| tableMetadata.getClusteringKeyNames().contains(column);
72+
}
73+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package com.scalar.db.dataloader.core.tablemetadata;
2+
3+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
4+
5+
import com.scalar.db.api.DistributedStorageAdmin;
6+
import com.scalar.db.api.TableMetadata;
7+
import com.scalar.db.common.error.CoreError;
8+
import com.scalar.db.dataloader.core.UnitTestUtils;
9+
import com.scalar.db.exception.storage.ExecutionException;
10+
import java.util.Collections;
11+
import java.util.HashMap;
12+
import java.util.Map;
13+
import org.junit.jupiter.api.Assertions;
14+
import org.junit.jupiter.api.BeforeEach;
15+
import org.junit.jupiter.api.Test;
16+
import org.mockito.Mockito;
17+
18+
class TableMetadataServiceTest {
19+
20+
DistributedStorageAdmin storageAdmin;
21+
TableMetadataService tableMetadataService;
22+
23+
@BeforeEach
24+
void setup() throws ExecutionException {
25+
storageAdmin = Mockito.mock(DistributedStorageAdmin.class);
26+
Mockito.when(storageAdmin.getTableMetadata("namespace", "table"))
27+
.thenReturn(UnitTestUtils.createTestTableMetadata());
28+
tableMetadataService = new TableMetadataService(storageAdmin);
29+
}
30+
31+
@Test
32+
void getTableMetadata_withValidNamespaceAndTable_shouldReturnTableMetadataMap()
33+
throws TableMetadataException {
34+
35+
Map<String, TableMetadata> expected = new HashMap<>();
36+
expected.put("namespace.table", UnitTestUtils.createTestTableMetadata());
37+
TableMetadataRequest tableMetadataRequest = new TableMetadataRequest("namespace", "table");
38+
Map<String, TableMetadata> output =
39+
tableMetadataService.getTableMetadata(Collections.singleton(tableMetadataRequest));
40+
Assertions.assertEquals(expected.get("namespace.table"), output.get("namespace.table"));
41+
}
42+
43+
@Test
44+
void getTableMetadata_withInvalidNamespaceAndTable_shouldThrowException() {
45+
TableMetadataRequest tableMetadataRequest = new TableMetadataRequest("namespace2", "table2");
46+
assertThatThrownBy(
47+
() ->
48+
tableMetadataService.getTableMetadata(Collections.singleton(tableMetadataRequest)))
49+
.isInstanceOf(TableMetadataException.class)
50+
.hasMessage(
51+
CoreError.DATA_LOADER_MISSING_NAMESPACE_OR_TABLE.buildMessage("namespace2", "table2"));
52+
}
53+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.scalar.db.dataloader.core.util;
2+
3+
import static com.scalar.db.dataloader.core.Constants.TABLE_LOOKUP_KEY_FORMAT;
4+
import static org.assertj.core.api.Assertions.assertThat;
5+
6+
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTable;
7+
import org.junit.jupiter.api.Test;
8+
9+
/** Unit tests for TableMetadataUtils */
10+
class TableMetadataUtilTest {
11+
12+
private static final String NAMESPACE = "ns";
13+
private static final String TABLE_NAME = "table";
14+
15+
@Test
16+
void getTableLookupKey_ValidStringArgs_ShouldReturnLookupKey() {
17+
String actual = TableMetadataUtil.getTableLookupKey(NAMESPACE, TABLE_NAME);
18+
String expected = String.format(TABLE_LOOKUP_KEY_FORMAT, NAMESPACE, TABLE_NAME);
19+
assertThat(actual).isEqualTo(expected);
20+
}
21+
22+
@Test
23+
void getTableLookupKey_ValidControlFileArg_ShouldReturnLookupKey() {
24+
ControlFileTable controlFileTable = new ControlFileTable(NAMESPACE, TABLE_NAME);
25+
String actual = TableMetadataUtil.getTableLookupKey(controlFileTable);
26+
String expected = String.format(TABLE_LOOKUP_KEY_FORMAT, NAMESPACE, TABLE_NAME);
27+
assertThat(actual).isEqualTo(expected);
28+
}
29+
}

0 commit comments

Comments
 (0)