diff --git a/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/Main.java b/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/Main.java index e113858589..e138325037 100644 --- a/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/Main.java +++ b/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/Main.java @@ -308,6 +308,9 @@ public static void main(String[] args) .help("specify the schema name"); argumentParser.addArgument("-t", "--table").required(true) .help("specify the table name"); + argumentParser.addArgument("-c", "--concurrency") + .setDefault("4").required(false) + .help("specify the number of threads used for data stat"); Namespace ns; try diff --git a/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/executor/StatExecutor.java b/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/executor/StatExecutor.java index 3d6ecb4026..9bd5544855 100644 --- a/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/executor/StatExecutor.java +++ b/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/executor/StatExecutor.java @@ -33,8 +33,16 @@ import io.trino.jdbc.TrinoDriver; import net.sourceforge.argparse4j.inf.Namespace; +import java.io.IOException; import java.sql.*; import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; @@ -49,6 +57,7 @@ public void execute(Namespace ns, String command) throws Exception { String schemaName = ns.getString("schema"); String tableName = ns.getString("table"); + int concurrency = Integer.parseInt(ns.getString("concurrency")); boolean orderedEnabled = Boolean.parseBoolean(ConfigFactory.Instance().getProperty("executor.ordered.layout.enabled")); boolean compactEnabled = Boolean.parseBoolean(ConfigFactory.Instance().getProperty("executor.compact.layout.enabled")); @@ -81,7 +90,7 @@ public void execute(Namespace ns, String command) throws Exception List columns = metadataService.getColumns(schemaName, tableName, true); Map columnMap = new HashMap<>(columns.size()); - Map columnStatsMap = new HashMap<>(columns.size()); + Map columnStatsMap = new ConcurrentHashMap<>(columns.size()); for (Column column : columns) { @@ -90,51 +99,28 @@ public void execute(Namespace ns, String command) throws Exception columnMap.put(column.getName(), column); } - int rowGroupCount = 0; - long rowCount = 0; - for (String filePath : files) - { - Storage storage = StorageFactory.Instance().getStorage(filePath); - PixelsReader pixelsReader = PixelsReaderImpl.newBuilder() - .setPath(filePath).setStorage(storage).setEnableCache(false) - .setCacheOrder(ImmutableList.of()).setPixelsCacheReader(null) - .setPixelsFooterCache(new PixelsFooterCache()).build(); - PixelsProto.Footer fileFooter = pixelsReader.getFooter(); - int numRowGroup = pixelsReader.getRowGroupNum(); - rowGroupCount += numRowGroup; - rowCount += pixelsReader.getNumberOfRows(); - List types = fileFooter.getTypesList(); - for (int i = 0; i < numRowGroup; ++i) - { - PixelsProto.RowGroupFooter rowGroupFooter = pixelsReader.getRowGroupFooter(i); - List chunkIndices = - rowGroupFooter.getRowGroupIndexEntry().getColumnChunkIndexEntriesList(); - for (int j = 0; j < types.size(); ++j) + ExecutorService executor = Executors.newFixedThreadPool(concurrency); + AtomicInteger totalRowGroupCount = new AtomicInteger(0); + AtomicLong totalRowCount = new AtomicLong(0L); + System.out.println("Read File Count: " + files.size() + "\tConcurrency: "+ concurrency); + CompletableFuture.allOf(files.stream().map(filePath -> + CompletableFuture.runAsync(() -> { - Column column = columnMap.get(types.get(j).getName()); - long chunkLength = chunkIndices.get(j).getChunkLength(); - column.setSize(chunkLength + column.getSize()); - } - } - List fields = pixelsReader.getFileSchema().getChildren(); - checkArgument(fields.size() == types.size(), - "types.size and fields.size are not consistent"); - for (int i = 0; i < fields.size(); ++i) - { - TypeDescription field = fields.get(i); - PixelsProto.Type type = types.get(i); - StatsRecorder statsRecorder = columnStatsMap.get(type.getName()); - if (statsRecorder == null) - { - columnStatsMap.put(type.getName(), - StatsRecorder.create(field, fileFooter.getColumnStats(i))); - } - else - { - statsRecorder.merge(StatsRecorder.create(field, fileFooter.getColumnStats(i))); - } - } - pixelsReader.close(); + try + { + processFile(filePath, columnMap, columnStatsMap, totalRowGroupCount, totalRowCount); + } + catch (Exception e) + { + System.err.println("Error processing file: " + filePath); + e.printStackTrace(); + } + }, executor) + ).toArray(CompletableFuture[]::new)).join(); + executor.shutdown(); + { + long readFileEndTime = System.currentTimeMillis(); + System.out.println("Read File Elapsed time: " + (readFileEndTime - startTime) / 1000.0 + "s."); } ConfigFactory instance = ConfigFactory.Instance(); @@ -158,7 +144,7 @@ public void execute(Namespace ns, String command) throws Exception for (Column column : columns) { - column.setChunkSize(column.getSize() / rowGroupCount); + column.setChunkSize(column.getSize() / totalRowGroupCount.get()); column.setRecordStats(columnStatsMap.get(column.getName()) .serialize().build().toByteString().asReadOnlyByteBuffer()); column.getRecordStats().mark(); @@ -166,7 +152,7 @@ public void execute(Namespace ns, String command) throws Exception column.getRecordStats().reset(); } - metadataService.updateRowCount(schemaName, tableName, rowCount); + metadataService.updateRowCount(schemaName, tableName, totalRowCount.get()); /* Set cardinality and null_fraction after the chunk size and column size, * because chunk size and column size must exist in the metadata when calculating @@ -188,7 +174,7 @@ public void execute(Namespace ns, String command) throws Exception if (resultSet.next()) { long cardinality = resultSet.getLong("cardinality"); - double nullFraction = resultSet.getLong("null_count") / (double) rowCount; + double nullFraction = resultSet.getLong("null_count") / (double) totalRowCount.get(); System.out.println(column.getName() + " cardinality: " + cardinality + ", null fraction: " + nullFraction); column.setCardinality(cardinality); @@ -207,4 +193,60 @@ public void execute(Namespace ns, String command) throws Exception long endTime = System.currentTimeMillis(); System.out.println("Elapsed time: " + (endTime - startTime) / 1000.0 + "s."); } + + private void processFile(String filePath, + Map columnMap, + Map columnStatsMap, + AtomicInteger totalRowGroupCount, + AtomicLong totalRowCount) throws IOException + { + Storage storage = StorageFactory.Instance().getStorage(filePath); + try (PixelsReader pixelsReader = PixelsReaderImpl.newBuilder() + .setPath(filePath).setStorage(storage).setEnableCache(false) + .setCacheOrder(ImmutableList.of()).setPixelsCacheReader(null) + .setPixelsFooterCache(new PixelsFooterCache()).build()) + { + PixelsProto.Footer fileFooter = pixelsReader.getFooter(); + int numRowGroup = pixelsReader.getRowGroupNum(); + totalRowGroupCount.addAndGet(numRowGroup); + totalRowCount.addAndGet(pixelsReader.getNumberOfRows()); + List types = fileFooter.getTypesList(); + for (int i = 0; i < numRowGroup; ++i) + { + PixelsProto.RowGroupFooter rowGroupFooter = pixelsReader.getRowGroupFooter(i); + List chunkIndices = + rowGroupFooter.getRowGroupIndexEntry().getColumnChunkIndexEntriesList(); + for (int j = 0; j < types.size(); ++j) + { + Column column = columnMap.get(types.get(j).getName()); + synchronized (column) + { + long chunkLength = chunkIndices.get(j).getChunkLength(); + column.setSize(chunkLength + column.getSize()); + } + } + } + List fields = pixelsReader.getFileSchema().getChildren(); + checkArgument(fields.size() == types.size(), + "types.size and fields.size are not consistent"); + for (int i = 0; i < fields.size(); ++i) + { + TypeDescription field = fields.get(i); + PixelsProto.Type type = types.get(i); + + PixelsProto.ColumnStatistic currentStat = fileFooter.getColumnStats(i); + columnStatsMap.compute(type.getName(), (k, existingRecorder) -> + { + if (existingRecorder == null) + { + return StatsRecorder.create(field, currentStat); + } else + { + existingRecorder.merge(StatsRecorder.create(field, currentStat)); + return existingRecorder; + } + }); + } + } + } } diff --git a/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/AbstractPixelsConsumer.java b/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/AbstractPixelsConsumer.java index 0075c9766c..cb1d3c32f5 100644 --- a/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/AbstractPixelsConsumer.java +++ b/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/AbstractPixelsConsumer.java @@ -21,8 +21,8 @@ package io.pixelsdb.pixels.cli.load; import io.pixelsdb.pixels.common.exception.MetadataException; -import io.pixelsdb.pixels.common.index.IndexService; -import io.pixelsdb.pixels.common.index.IndexServiceProvider; +import io.pixelsdb.pixels.common.index.service.IndexService; +import io.pixelsdb.pixels.common.index.service.IndexServiceProvider; import io.pixelsdb.pixels.common.metadata.MetadataService; import io.pixelsdb.pixels.common.metadata.domain.File; import io.pixelsdb.pixels.common.metadata.domain.Path; diff --git a/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/IndexedPixelsConsumer.java b/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/IndexedPixelsConsumer.java index f6d7182df3..36838d3d98 100644 --- a/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/IndexedPixelsConsumer.java +++ b/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/IndexedPixelsConsumer.java @@ -23,12 +23,14 @@ import com.google.protobuf.ByteString; import io.pixelsdb.pixels.common.exception.IndexException; import io.pixelsdb.pixels.common.exception.MetadataException; - import io.pixelsdb.pixels.common.index.IndexService; - import io.pixelsdb.pixels.common.index.RPCIndexService; + import io.pixelsdb.pixels.common.index.IndexOption; + import io.pixelsdb.pixels.common.index.service.IndexService; + import io.pixelsdb.pixels.common.index.service.RPCIndexService; import io.pixelsdb.pixels.common.index.RowIdAllocator; import io.pixelsdb.pixels.common.metadata.domain.File; import io.pixelsdb.pixels.common.metadata.domain.Path; import io.pixelsdb.pixels.common.node.BucketCache; + import io.pixelsdb.pixels.common.node.VnodeIdentifier; import io.pixelsdb.pixels.common.physical.Storage; import io.pixelsdb.pixels.common.physical.StorageFactory; import io.pixelsdb.pixels.common.utils.ConfigFactory; @@ -60,8 +62,7 @@ public class IndexedPixelsConsumer extends AbstractPixelsConsumer { - // Map: Retina Host -> Writer state - private final Map retinaWriters = new ConcurrentHashMap<>(); + private final Map retinaWriters = new ConcurrentHashMap<>(); private final BucketCache bucketCache = BucketCache.getInstance(); private final Map indexServices = new ConcurrentHashMap<>(); private final int indexServerPort; @@ -99,9 +100,8 @@ protected void processSourceFile(String originalFilePath) throws IOException, Me ByteString pkByteString = calculatePrimaryKeyBytes(colsInLine); // Assume BucketCache has the necessary method and configuration int bucketId = RetinaUtils.getBucketIdFromByteBuffer(pkByteString); - String retinaName = RetinaUtils.getRetinaHostNameFromBucketId(bucketId); - // 2. Get/Initialize the Writer for this Bucket - PerRetinaNodeWriter retinaNodeWriter = retinaWriters.computeIfAbsent(retinaName, id -> + VnodeIdentifier vnodeIdentifier = RetinaUtils.getVnodeIdentifierFromBucketId(bucketId); + PerVirtualNodeWriter retinaNodeWriter = retinaWriters.computeIfAbsent(vnodeIdentifier, id -> { try { @@ -132,7 +132,7 @@ protected void processSourceFile(String originalFilePath) throws IOException, Me { closePixelsFile(retinaNodeWriter); // Remove writer to force re-initialization on next use - retinaWriters.remove(retinaName); + retinaWriters.remove(vnodeIdentifier); } } catch (IndexException e) { @@ -145,7 +145,7 @@ protected void processSourceFile(String originalFilePath) throws IOException, Me @Override protected void flushRemainingData() throws IOException, MetadataException { - for (PerRetinaNodeWriter bucketWriter : retinaWriters.values()) + for (PerVirtualNodeWriter bucketWriter : retinaWriters.values()) { if (bucketWriter.rowCounter > 0) { @@ -164,11 +164,10 @@ protected void flushRemainingData() throws IOException, MetadataException /** * Initializes a new PixelsWriter and associated File/Path for a given bucket ID. */ - private PerRetinaNodeWriter initializeRetinaWriter(int bucketId) throws IOException, MetadataException + private PerVirtualNodeWriter initializeRetinaWriter(int bucketId) throws IOException, MetadataException { // Use the Node Cache to find the responsible Retina Node NodeProto.NodeInfo targetNode = bucketCache.getRetinaNodeInfoByBucketId(bucketId); - // Target path selection logic (simple round-robin for the path, but the NodeInfo is bucket-specific) int targetPathId = GlobalTargetPathId.getAndIncrement() % targetPaths.size(); Path currTargetPath = targetPaths.get(targetPathId); @@ -187,7 +186,7 @@ private PerRetinaNodeWriter initializeRetinaWriter(int bucketId) throws IOExcept File currFile = openTmpFile(targetFileName, currTargetPath); tmpFiles.add(currFile); - return new PerRetinaNodeWriter(pixelsWriter, currFile, currTargetPath, targetNode); + return new PerVirtualNodeWriter(pixelsWriter, currFile, currTargetPath, targetNode, targetNode.getVirtualNodeId()); } // --- Private Helper Methods --- @@ -219,7 +218,7 @@ private ByteString calculatePrimaryKeyBytes(String[] colsInLine) return ByteString.copyFrom((ByteBuffer) indexKeyBuffer.rewind()); } - private void updateIndexEntry(PerRetinaNodeWriter bucketWriter, ByteString pkByteString) throws IndexException + private void updateIndexEntry(PerVirtualNodeWriter bucketWriter, ByteString pkByteString) throws IndexException { IndexProto.PrimaryIndexEntry.Builder builder = IndexProto.PrimaryIndexEntry.newBuilder(); builder.getIndexKeyBuilder() @@ -237,7 +236,7 @@ private void updateIndexEntry(PerRetinaNodeWriter bucketWriter, ByteString pkByt bucketWriter.indexEntries.add(builder.build()); } - private void flushRowBatch(PerRetinaNodeWriter bucketWriter) throws IOException, IndexException + private void flushRowBatch(PerVirtualNodeWriter bucketWriter) throws IOException, IndexException { bucketWriter.pixelsWriter.addRowBatch(bucketWriter.rowBatch); bucketWriter.rowBatch.reset(); @@ -250,12 +249,12 @@ private void flushRowBatch(PerRetinaNodeWriter bucketWriter) throws IOException, } // Push index entries to the corresponding IndexService (determined by targetNode address) - bucketWriter.indexService.putPrimaryIndexEntries(index.getTableId(), index.getId(), bucketWriter.indexEntries); - bucketWriter.indexService.flushIndexEntriesOfFile(index.getTableId(), index.getId(),bucketWriter.currFile.getId(), true); + bucketWriter.indexService.putPrimaryIndexEntries(index.getTableId(), index.getId(), bucketWriter.indexEntries, bucketWriter.option); + bucketWriter.indexService.flushIndexEntriesOfFile(index.getTableId(), index.getId(),bucketWriter.currFile.getId(), true, bucketWriter.option); bucketWriter.indexEntries.clear(); } - private void closePixelsFile(PerRetinaNodeWriter bucketWriter) throws IOException, IndexException + private void closePixelsFile(PerVirtualNodeWriter bucketWriter) throws IOException, IndexException { // Final flush of remaining rows/indexes if (bucketWriter.rowBatch.size != 0) @@ -266,7 +265,7 @@ private void closePixelsFile(PerRetinaNodeWriter bucketWriter) throws IOExceptio closeWriterAndAddFile(bucketWriter.pixelsWriter, bucketWriter.currFile, bucketWriter.currTargetPath, bucketWriter.targetNode); } - private class PerRetinaNodeWriter + private class PerVirtualNodeWriter { PixelsWriter pixelsWriter; File currFile; @@ -275,13 +274,15 @@ private class PerRetinaNodeWriter int rgRowOffset; int prevRgId; int rowCounter; + int vNodeId; + IndexOption option; NodeProto.NodeInfo targetNode; List indexEntries = new ArrayList<>(); VectorizedRowBatch rowBatch; IndexService indexService; RowIdAllocator rowIdAllocator; - public PerRetinaNodeWriter(PixelsWriter writer, File file, Path path, NodeProto.NodeInfo node) + public PerVirtualNodeWriter(PixelsWriter writer, File file, Path path, NodeProto.NodeInfo node, int vNodeId) { this.pixelsWriter = writer; this.currFile = file; @@ -292,9 +293,19 @@ public PerRetinaNodeWriter(PixelsWriter writer, File file, Path path, NodeProto. this.rgRowOffset = 0; this.rowCounter = 0; this.rowBatch = schema.createRowBatchWithHiddenColumn(pixelStride, TypeDescription.Mode.NONE); + this.vNodeId = vNodeId; this.indexService = indexServices.computeIfAbsent(node.getAddress(), nodeInfo -> RPCIndexService.CreateInstance(nodeInfo, indexServerPort)); this.rowIdAllocator = new RowIdAllocator(index.getTableId(), maxRowNum, this.indexService); + initIndexOption(); } + + private void initIndexOption() + { + this.option = IndexOption.builder() + .vNodeId(this.vNodeId) + .build(); + } + } } \ No newline at end of file diff --git a/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/Parameters.java b/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/Parameters.java index a12323e672..8fd625a977 100644 --- a/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/Parameters.java +++ b/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/Parameters.java @@ -20,8 +20,6 @@ package io.pixelsdb.pixels.cli.load; import io.pixelsdb.pixels.common.exception.MetadataException; -import io.pixelsdb.pixels.common.index.IndexServiceProvider; -import io.pixelsdb.pixels.common.index.RowIdAllocator; import io.pixelsdb.pixels.common.metadata.MetadataService; import io.pixelsdb.pixels.common.metadata.domain.*; import io.pixelsdb.pixels.core.TypeDescription; diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/IndexOption.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/IndexOption.java new file mode 100644 index 0000000000..3fe0f56325 --- /dev/null +++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/IndexOption.java @@ -0,0 +1,74 @@ +/* + * Copyright 2026 PixelsDB. + * + * This file is part of Pixels. + * + * Pixels is free software: you can redistribute it and/or modify + * it under the terms of the Affero GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Pixels is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Affero GNU General Public License for more details. + * + * You should have received a copy of the Affero GNU General Public + * License along with Pixels. If not, see + * . + */ +package io.pixelsdb.pixels.common.index; + +import io.pixelsdb.pixels.index.IndexProto; + +public class IndexOption +{ + private int vNodeId; + + public int getVNodeId() + { + return vNodeId; + } + + public void setVNodeId(int vNodeId) + { + this.vNodeId = vNodeId; + } + + public IndexOption() {} + + public IndexOption(IndexProto.IndexOption option) + { + this.vNodeId = option.getVirtualNodeId(); + } + + public IndexProto.IndexOption toProto() + { + return IndexProto.IndexOption.newBuilder() + .setVirtualNodeId(vNodeId) + .build(); + } + + public static Builder builder() + { + return new Builder(); + } + + public static class Builder + { + private int vNodeId; + + public Builder vNodeId(int vNodeId) + { + this.vNodeId = vNodeId; + return this; + } + + public IndexOption build() + { + IndexOption option = new IndexOption(); + option.setVNodeId(this.vNodeId); + return option; + } + } +} diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/RowIdAllocator.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/RowIdAllocator.java index 579927386e..1f4d37f89b 100644 --- a/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/RowIdAllocator.java +++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/RowIdAllocator.java @@ -20,6 +20,8 @@ package io.pixelsdb.pixels.common.index; import io.pixelsdb.pixels.common.exception.IndexException; +import io.pixelsdb.pixels.common.index.service.IndexService; +import io.pixelsdb.pixels.common.index.service.IndexServiceProvider; import io.pixelsdb.pixels.index.IndexProto; import java.util.concurrent.locks.ReentrantLock; diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/SinglePointIndexFactory.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/SinglePointIndexFactory.java index bfba54689d..a9ea3dcfaa 100644 --- a/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/SinglePointIndexFactory.java +++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/SinglePointIndexFactory.java @@ -46,7 +46,7 @@ public class SinglePointIndexFactory { private static final Logger logger = LogManager.getLogger(SinglePointIndexFactory.class); - private final Map singlePointIndexImpls = new ConcurrentHashMap<>(); + private final Map> singlePointIndexImpls = new ConcurrentHashMap<>(); private final Set enabledSchemes = new ConcurrentSkipListSet<>(); private final Map indexIdToTableIndex = new ConcurrentHashMap<>(); private final Lock lock = new ReentrantLock(); @@ -133,7 +133,7 @@ public boolean isSchemeEnabled(SinglePointIndex.Scheme scheme) * @return the single point index instance * @throws SinglePointIndexException */ - public SinglePointIndex getSinglePointIndex(long tableId, long indexId) throws SinglePointIndexException + public SinglePointIndex getSinglePointIndex(long tableId, long indexId, IndexOption indexOption) throws SinglePointIndexException { TableIndex tableIndex = this.indexIdToTableIndex.get(indexId); if (tableIndex == null) @@ -163,7 +163,7 @@ public SinglePointIndex getSinglePointIndex(long tableId, long indexId) throws S this.lock.unlock(); } } - return getSinglePointIndex(tableIndex); + return getSinglePointIndex(tableIndex, indexOption); } /** @@ -172,7 +172,7 @@ public SinglePointIndex getSinglePointIndex(long tableId, long indexId) throws S * @return the single point index instance * @throws SinglePointIndexException */ - public SinglePointIndex getSinglePointIndex(TableIndex tableIndex) throws SinglePointIndexException + public SinglePointIndex getSinglePointIndex(TableIndex tableIndex, IndexOption indexOption) throws SinglePointIndexException { requireNonNull(tableIndex, "tableIndex is null"); checkArgument(this.enabledSchemes.contains(tableIndex.scheme), "single point index scheme '" + @@ -180,19 +180,26 @@ public SinglePointIndex getSinglePointIndex(TableIndex tableIndex) throws Single this.indexIdToTableIndex.putIfAbsent(tableIndex.indexId, tableIndex); - SinglePointIndex singlePointIndex = this.singlePointIndexImpls.get(tableIndex); + Map vNodeMap = this.singlePointIndexImpls.computeIfAbsent( + tableIndex, k -> new ConcurrentHashMap<>()); + + int vNodeId = indexOption.getVNodeId(); + SinglePointIndex singlePointIndex = vNodeMap.get(vNodeId); + if (singlePointIndex == null) { this.lock.lock(); try { // double check to avoid redundant creation of singlePointIndex - singlePointIndex = this.singlePointIndexImpls.get(tableIndex); + singlePointIndex = vNodeMap.get(vNodeId); if (singlePointIndex == null) { + logger.info("Creating SinglePointIndex instance for tableId: {}, indexId: {}, vNodeId: {}", + tableIndex.tableId, tableIndex.indexId, vNodeId); singlePointIndex = this.singlePointIndexProviders.get(tableIndex.getScheme()).createInstance( - tableIndex.tableId, tableIndex.indexId, tableIndex.scheme, tableIndex.isUnique()); - this.singlePointIndexImpls.put(tableIndex, singlePointIndex); + tableIndex.tableId, tableIndex.indexId, tableIndex.scheme, tableIndex.isUnique(), indexOption); + vNodeMap.put(vNodeId, singlePointIndex); } } finally @@ -212,21 +219,34 @@ public void closeAll() throws SinglePointIndexException this.lock.lock(); try { - for (TableIndex tableIndex : this.singlePointIndexImpls.keySet()) + for (Map.Entry> outerEntry : this.singlePointIndexImpls.entrySet()) { - try + TableIndex tableIndex = outerEntry.getKey(); + Map vNodeMap = outerEntry.getValue(); + + if (vNodeMap != null) { - SinglePointIndex removing = this.singlePointIndexImpls.get(tableIndex); - if (removing != null) + // Iterate through all SinglePointIndex instances for each vNodeId + for (SinglePointIndex indexImpl : vNodeMap.values()) { - removing.close(); + try + { + if (indexImpl != null) + { + indexImpl.close(); + } + } + catch (IOException e) + { + // Note: As per original logic, an exception here stops the closing process + throw new SinglePointIndexException( + "failed to close single point index with id " + tableIndex.indexId, e); + } } - } catch (IOException e) - { - throw new SinglePointIndexException( - "failed to close single point index with id " + tableIndex.indexId, e); } } + + // Clear all tracking maps after successful closing this.singlePointIndexImpls.clear(); this.indexIdToTableIndex.clear(); } @@ -243,58 +263,58 @@ public void closeAll() throws SinglePointIndexException * @param closeAndRemove remove the index storage after closing if true * @throws SinglePointIndexException */ - public void closeIndex(long tableId, long indexId, boolean closeAndRemove) throws SinglePointIndexException + public void closeIndex(long tableId, long indexId, boolean closeAndRemove, IndexOption indexOption) throws SinglePointIndexException { + // indexOption is ignored as per requirement to close all vNodes for this index this.lock.lock(); try { + // 1. Identify the TableIndex TableIndex tableIndex = this.indexIdToTableIndex.remove(indexId); - if (tableIndex != null) + if (tableIndex == null) { - SinglePointIndex removed = this.singlePointIndexImpls.remove(tableIndex); - if (removed != null) - { - try - { - if (closeAndRemove) - { - removed.closeAndRemove(); - } else - { - removed.close(); - } - } catch (IOException e) - { - throw new SinglePointIndexException( - "failed to close single point index with id " + tableIndex.indexId, e); - } - } else - { - logger.warn("index with id {} once opened but not found", indexId); - } - } else + // Fallback: create a dummy TableIndex if not found in the tracker + tableIndex = new TableIndex(tableId, indexId, null, false); + } + + // 2. Remove the entire inner map (all vNodes) from the implementation map + Map vNodeMap = this.singlePointIndexImpls.remove(tableIndex); + + if (vNodeMap != null) { - TableIndex tableIndex1 = new TableIndex(tableId, indexId, null, false); - SinglePointIndex removed = this.singlePointIndexImpls.remove(tableIndex1); - if (removed != null) + // 3. Iterate through all SinglePointIndex instances for all vNodes + for (Map.Entry entry : vNodeMap.entrySet()) { - try + int vNodeId = entry.getKey(); + SinglePointIndex indexImpl = entry.getValue(); + + if (indexImpl != null) { - if (closeAndRemove) + try { - removed.closeAndRemove(); - } else + if (closeAndRemove) + { + indexImpl.closeAndRemove(); + } + else + { + indexImpl.close(); + } + } + catch (IOException e) { - removed.close(); + // Note: If one vNode fails to close, we still throw to notify the caller, + // but the mappings have already been removed from the factory. + throw new SinglePointIndexException( + "failed to close single point index with id " + indexId + " for vNode " + vNodeId, e); } - } catch (IOException e) - { - throw new SinglePointIndexException( - "failed to close single point index with id " + tableIndex.indexId, e); } - logger.warn("index with id {} is found but not opened properly", indexId); } } + else + { + logger.warn("No active index instances found for indexId {} during close operation", indexId); + } } finally { diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/SinglePointIndexProvider.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/SinglePointIndexProvider.java index f80c48bf43..6283f2b61d 100644 --- a/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/SinglePointIndexProvider.java +++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/SinglePointIndexProvider.java @@ -32,7 +32,7 @@ public interface SinglePointIndexProvider /** * Create an instance of the single point index. */ - SinglePointIndex createInstance(long tableId, long indexId, @Nonnull SinglePointIndex.Scheme scheme, boolean unique) + SinglePointIndex createInstance(long tableId, long indexId, @Nonnull SinglePointIndex.Scheme scheme, boolean unique, IndexOption indexOption) throws SinglePointIndexException; /** diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/IndexService.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/service/IndexService.java similarity index 86% rename from pixels-common/src/main/java/io/pixelsdb/pixels/common/index/IndexService.java rename to pixels-common/src/main/java/io/pixelsdb/pixels/common/index/service/IndexService.java index 519569ea33..627f340207 100644 --- a/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/IndexService.java +++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/service/IndexService.java @@ -17,9 +17,10 @@ * License along with Pixels. If not, see * . */ -package io.pixelsdb.pixels.common.index; +package io.pixelsdb.pixels.common.index.service; import io.pixelsdb.pixels.common.exception.IndexException; +import io.pixelsdb.pixels.common.index.IndexOption; import io.pixelsdb.pixels.index.IndexProto; import java.util.List; @@ -41,28 +42,28 @@ public interface IndexService * @param key the index key * @return the row location or null if the index entry is not found */ - IndexProto.RowLocation lookupUniqueIndex(IndexProto.IndexKey key) throws IndexException; + IndexProto.RowLocation lookupUniqueIndex(IndexProto.IndexKey key, IndexOption indexOption) throws IndexException; /** * Lookup a non-unique index. * @param key the index key * @return the row locations or null if the index entry is not found */ - List lookupNonUniqueIndex(IndexProto.IndexKey key) throws IndexException; + List lookupNonUniqueIndex(IndexProto.IndexKey key, IndexOption indexOption) throws IndexException; /** * Put an index entry into the primary index. * @param entry the index entry * @return true on success */ - boolean putPrimaryIndexEntry(IndexProto.PrimaryIndexEntry entry) throws IndexException; + boolean putPrimaryIndexEntry(IndexProto.PrimaryIndexEntry entry, IndexOption indexOption) throws IndexException; /** * Put an index entry into the secondary index. * @param entry the index entry * @return true on success */ - boolean putSecondaryIndexEntry(IndexProto.SecondaryIndexEntry entry) throws IndexException; + boolean putSecondaryIndexEntry(IndexProto.SecondaryIndexEntry entry, IndexOption indexOption) throws IndexException; /** * Put a batch of index entries into the primary index. @@ -72,7 +73,7 @@ public interface IndexService * @return true on success */ boolean putPrimaryIndexEntries(long tableId, long indexId, - List entries) throws IndexException; + List entries, IndexOption indexOption) throws IndexException; /** * Put a batch of index entries into the secondary index. @@ -82,7 +83,7 @@ boolean putPrimaryIndexEntries(long tableId, long indexId, * @return true on success */ boolean putSecondaryIndexEntries(long tableId, long indexId, - List entries) throws IndexException; + List entries, IndexOption indexOption) throws IndexException; /** * Delete an entry from the primary index. The deleted index entry is marked as deleted using a tombstone. @@ -90,7 +91,7 @@ boolean putSecondaryIndexEntries(long tableId, long indexId, * @return the row location of the deleted index entry * @throws IndexException if no existing entry to delete */ - IndexProto.RowLocation deletePrimaryIndexEntry(IndexProto.IndexKey key) throws IndexException; + IndexProto.RowLocation deletePrimaryIndexEntry(IndexProto.IndexKey key, IndexOption indexOption) throws IndexException; /** * Delete entry(ies) from the secondary index. Each deleted index entry is marked as deleted using a tombstone. @@ -98,18 +99,19 @@ boolean putSecondaryIndexEntries(long tableId, long indexId, * @return the row id(s) of the deleted index entry(ies) * @throws IndexException if no existing entry(ies) to delete */ - List deleteSecondaryIndexEntry(IndexProto.IndexKey key) throws IndexException; + List deleteSecondaryIndexEntry(IndexProto.IndexKey key, IndexOption indexOption) throws IndexException; /** * Delete entries from the primary index. Each deleted index entry is marked as deleted using a tombstone. * @param tableId the table id of the index * @param indexId the index id of the index * @param keys the keys of the entries to delete + * @param indexOption the extra option * @return the row locations of the deleted index entries * @throws IndexException if no existing entry(ies) to delete */ List deletePrimaryIndexEntries(long tableId, long indexId, - List keys) throws IndexException; + List keys, IndexOption indexOption) throws IndexException; /** * Delete entries from the secondary index. Each deleted index entry is marked as deleted using a tombstone. @@ -120,7 +122,7 @@ List deletePrimaryIndexEntries(long tableId, long indexI * @throws IndexException if no existing entry(ies) to delete */ List deleteSecondaryIndexEntries(long tableId, long indexId, - List keys) throws IndexException; + List keys, IndexOption indexOption) throws IndexException; /** * Update the entry of a primary index. @@ -128,7 +130,7 @@ List deleteSecondaryIndexEntries(long tableId, long indexId, * @return the previous row location of the index entry * @throws IndexException if no existing entry to update */ - IndexProto.RowLocation updatePrimaryIndexEntry(IndexProto.PrimaryIndexEntry indexEntry) throws IndexException; + IndexProto.RowLocation updatePrimaryIndexEntry(IndexProto.PrimaryIndexEntry indexEntry, IndexOption indexOption) throws IndexException; /** * Update the entry of a secondary index. @@ -136,7 +138,7 @@ List deleteSecondaryIndexEntries(long tableId, long indexId, * @return the previous row id(s) of the index entry * @throws IndexException if no existing entry(ies) to update */ - List updateSecondaryIndexEntry(IndexProto.SecondaryIndexEntry indexEntry) throws IndexException; + List updateSecondaryIndexEntry(IndexProto.SecondaryIndexEntry indexEntry, IndexOption indexOption) throws IndexException; /** * Update the entries of a primary index. @@ -147,7 +149,7 @@ List deleteSecondaryIndexEntries(long tableId, long indexId, * @throws IndexException if no existing entry(ies) to update */ List updatePrimaryIndexEntries(long tableId, long indexId, - List indexEntries) throws IndexException; + List indexEntries, IndexOption indexOption) throws IndexException; /** * Update the entries of a secondary index. @@ -158,7 +160,7 @@ List updatePrimaryIndexEntries(long tableId, long indexI * @throws IndexException if no existing entry(ies) to update */ List updateSecondaryIndexEntries(long tableId, long indexId, - List indexEntries) throws IndexException; + List indexEntries, IndexOption indexOption) throws IndexException; /** * Purge (remove) the index entries of an index permanently. This should only be done asynchronously by the garbage @@ -170,7 +172,7 @@ List updateSecondaryIndexEntries(long tableId, long indexId, * @return true on success */ boolean purgeIndexEntries(long tableId, long indexId, - List indexKeys, boolean isPrimary) throws IndexException; + List indexKeys, boolean isPrimary, IndexOption indexOption) throws IndexException; /** * Flush the index entries of an index corresponding to a buffered Pixels data file. @@ -184,7 +186,7 @@ boolean purgeIndexEntries(long tableId, long indexId, * @return true on success */ boolean flushIndexEntriesOfFile(long tableId, long indexId, - long fileId, boolean isPrimary) throws IndexException; + long fileId, boolean isPrimary, IndexOption indexOption) throws IndexException; /** * Open an index in the index server. This method is optional and is used to pre-warm an index instance in @@ -194,7 +196,7 @@ boolean flushIndexEntriesOfFile(long tableId, long indexId, * @param isPrimary true if the index is a primary index * @return true on success */ - boolean openIndex(long tableId, long indexId, boolean isPrimary) throws IndexException; + boolean openIndex(long tableId, long indexId, boolean isPrimary, IndexOption indexOption) throws IndexException; /** * Close an index in the index server. @@ -203,7 +205,7 @@ boolean flushIndexEntriesOfFile(long tableId, long indexId, * @param isPrimary true if the index is a primary index * @return true on success */ - boolean closeIndex(long tableId, long indexId, boolean isPrimary) throws IndexException; + boolean closeIndex(long tableId, long indexId, boolean isPrimary, IndexOption option) throws IndexException; /** * Close an index and remove its persistent storage content in the index server. @@ -212,6 +214,6 @@ boolean flushIndexEntriesOfFile(long tableId, long indexId, * @param isPrimary true if the index is a primary index * @return true on success */ - boolean removeIndex(long tableId, long indexId, boolean isPrimary) throws IndexException; + boolean removeIndex(long tableId, long indexId, boolean isPrimary, IndexOption option) throws IndexException; } diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/IndexServiceProvider.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/service/IndexServiceProvider.java similarity index 97% rename from pixels-common/src/main/java/io/pixelsdb/pixels/common/index/IndexServiceProvider.java rename to pixels-common/src/main/java/io/pixelsdb/pixels/common/index/service/IndexServiceProvider.java index fc49846d4e..5356d1219c 100644 --- a/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/IndexServiceProvider.java +++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/service/IndexServiceProvider.java @@ -17,7 +17,7 @@ * License along with Pixels. If not, see * . */ -package io.pixelsdb.pixels.common.index; +package io.pixelsdb.pixels.common.index.service; /** * Factory class for creating IndexService instances. diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/LocalIndexService.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/service/LocalIndexService.java similarity index 88% rename from pixels-common/src/main/java/io/pixelsdb/pixels/common/index/LocalIndexService.java rename to pixels-common/src/main/java/io/pixelsdb/pixels/common/index/service/LocalIndexService.java index 6af6a82bef..ea9c32d4cc 100644 --- a/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/LocalIndexService.java +++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/service/LocalIndexService.java @@ -17,12 +17,13 @@ * License along with Pixels. If not, see * . */ -package io.pixelsdb.pixels.common.index; +package io.pixelsdb.pixels.common.index.service; import io.pixelsdb.pixels.common.exception.IndexException; import io.pixelsdb.pixels.common.exception.MainIndexException; import io.pixelsdb.pixels.common.exception.RowIdException; import io.pixelsdb.pixels.common.exception.SinglePointIndexException; +import io.pixelsdb.pixels.common.index.*; import io.pixelsdb.pixels.index.IndexProto; import java.util.ArrayList; @@ -53,14 +54,14 @@ public IndexProto.RowIdBatch allocateRowIdBatch(long tableId, int numRowIds) thr } @Override - public IndexProto.RowLocation lookupUniqueIndex(IndexProto.IndexKey key) throws IndexException + public IndexProto.RowLocation lookupUniqueIndex(IndexProto.IndexKey key, IndexOption indexOption) throws IndexException { try { long tableId = key.getTableId(); long indexId = key.getIndexId(); MainIndex mainIndex = MainIndexFactory.Instance().getMainIndex(tableId); - SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId); + SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId, indexOption); long rowId = singlePointIndex.getUniqueRowId(key); if (rowId >= 0) { @@ -86,14 +87,14 @@ public IndexProto.RowLocation lookupUniqueIndex(IndexProto.IndexKey key) throws } @Override - public List lookupNonUniqueIndex(IndexProto.IndexKey key) throws IndexException + public List lookupNonUniqueIndex(IndexProto.IndexKey key, IndexOption indexOption) throws IndexException { try { long tableId = key.getTableId(); long indexId = key.getIndexId(); MainIndex mainIndex = MainIndexFactory.Instance().getMainIndex(tableId); - SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId); + SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId, indexOption); // Get all row IDs for the given index key List rowIds = singlePointIndex.getRowIds(key); List rowLocations = new ArrayList<>(); @@ -127,7 +128,7 @@ public List lookupNonUniqueIndex(IndexProto.IndexKey key } @Override - public boolean putPrimaryIndexEntry(IndexProto.PrimaryIndexEntry entry) throws IndexException + public boolean putPrimaryIndexEntry(IndexProto.PrimaryIndexEntry entry, IndexOption indexOption) throws IndexException { try { @@ -135,7 +136,7 @@ public boolean putPrimaryIndexEntry(IndexProto.PrimaryIndexEntry entry) throws I long tableId = key.getTableId(); long indexId = key.getIndexId(); MainIndex mainIndex = MainIndexFactory.Instance().getMainIndex(tableId); - SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId); + SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId, indexOption); // Insert into single point index boolean spSuccess = singlePointIndex.putEntry(entry.getIndexKey(), entry.getRowId()); if (!spSuccess) @@ -161,11 +162,11 @@ public boolean putPrimaryIndexEntry(IndexProto.PrimaryIndexEntry entry) throws I } @Override - public boolean putPrimaryIndexEntries(long tableId, long indexId, List entries) throws IndexException + public boolean putPrimaryIndexEntries(long tableId, long indexId, List entries, IndexOption indexOption) throws IndexException { try { - SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId); + SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId, indexOption); // Batch insert into single point index boolean success = singlePointIndex.putPrimaryEntries(entries); if (!success) @@ -197,14 +198,14 @@ public boolean putPrimaryIndexEntries(long tableId, long indexId, List entries) throws IndexException + public boolean putSecondaryIndexEntries(long tableId, long indexId, List entries, IndexOption indexOption) throws IndexException { try { - SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId); + SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId, indexOption); boolean success = singlePointIndex.putSecondaryEntries(entries); if (!success) { @@ -239,14 +240,14 @@ public boolean putSecondaryIndexEntries(long tableId, long indexId, List deletePrimaryIndexEntries( - long tableId, long indexId, List keys) throws IndexException + long tableId, long indexId, List keys, IndexOption indexOption) throws IndexException { try { MainIndex mainIndex = MainIndexFactory.Instance().getMainIndex(tableId); - SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId); + SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId, indexOption); List prevRowIds = singlePointIndex.deleteEntries(keys); if (prevRowIds == null || prevRowIds.isEmpty()) { @@ -295,13 +296,13 @@ public List deletePrimaryIndexEntries( } @Override - public List deleteSecondaryIndexEntry(IndexProto.IndexKey key) throws IndexException + public List deleteSecondaryIndexEntry(IndexProto.IndexKey key, IndexOption indexOption) throws IndexException { try { long tableId = key.getTableId(); long indexId = key.getIndexId(); - SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId); + SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId, indexOption); List prevRowIds = singlePointIndex.deleteEntry(key); if (prevRowIds == null || prevRowIds.isEmpty()) { @@ -316,11 +317,11 @@ public List deleteSecondaryIndexEntry(IndexProto.IndexKey key) throws Inde } @Override - public List deleteSecondaryIndexEntries(long tableId, long indexId, List keys) throws IndexException + public List deleteSecondaryIndexEntries(long tableId, long indexId, List keys, IndexOption indexOption) throws IndexException { try { - SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId); + SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId, indexOption); List prevRowIds = singlePointIndex.deleteEntries(keys); if (prevRowIds == null || prevRowIds.isEmpty()) { @@ -336,7 +337,7 @@ public List deleteSecondaryIndexEntries(long tableId, long indexId, List updatePrimaryIndexEntries(long tableId, long indexId, List indexEntries) throws IndexException + public List updatePrimaryIndexEntries + (long tableId, long indexId, List indexEntries, IndexOption indexOption) throws IndexException { try { MainIndex mainIndex = MainIndexFactory.Instance().getMainIndex(tableId); - SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId); + SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId, indexOption); // update multiple entries in the single point index, returning previous row IDs List prevRowIds = singlePointIndex.updatePrimaryEntries(indexEntries); if (prevRowIds == null || prevRowIds.isEmpty()) @@ -420,7 +422,7 @@ public List updatePrimaryIndexEntries(long tableId, long } @Override - public List updateSecondaryIndexEntry(IndexProto.SecondaryIndexEntry indexEntry) throws IndexException + public List updateSecondaryIndexEntry(IndexProto.SecondaryIndexEntry indexEntry, IndexOption indexOption) throws IndexException { IndexProto.IndexKey key = indexEntry.getIndexKey(); long tableId = key.getTableId(); @@ -428,7 +430,7 @@ public List updateSecondaryIndexEntry(IndexProto.SecondaryIndexEntry index try { // get the single point index for the table and index ID - SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId); + SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId, indexOption); // update the secondary index entry and return previous row IDs List prevRowIds = singlePointIndex.updateSecondaryEntry(key, indexEntry.getRowId()); if (prevRowIds == null || prevRowIds.isEmpty()) @@ -445,12 +447,13 @@ public List updateSecondaryIndexEntry(IndexProto.SecondaryIndexEntry index } @Override - public List updateSecondaryIndexEntries(long tableId, long indexId, List indexEntries) throws IndexException + public List updateSecondaryIndexEntries + (long tableId, long indexId, List indexEntries, IndexOption indexOption) throws IndexException { try { // get the single point index for the table and index ID - SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId); + SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId, indexOption); // update all secondary index entries and return previous row IDs List prevRowIds = singlePointIndex.updateSecondaryEntries(indexEntries); if (prevRowIds == null || prevRowIds.isEmpty()) @@ -467,12 +470,13 @@ public List updateSecondaryIndexEntries(long tableId, long indexId, List indexKeys, boolean isPrimary) throws IndexException + public boolean purgeIndexEntries + (long tableId, long indexId, List indexKeys, boolean isPrimary, IndexOption indexOption) throws IndexException { try { // get the single point index for the table and index - SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId); + SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId, indexOption); // purge the entries from the index List rowIds = singlePointIndex.purgeEntries(indexKeys); if (rowIds == null || rowIds.isEmpty()) @@ -514,7 +518,8 @@ public boolean purgeIndexEntries(long tableId, long indexId, List getSchemas() throws MetadataException { List schemas = new ArrayList<>(); diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/node/BucketCache.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/node/BucketCache.java index 4c55177e21..9794974635 100644 --- a/pixels-common/src/main/java/io/pixelsdb/pixels/common/node/BucketCache.java +++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/node/BucketCache.java @@ -20,102 +20,100 @@ package io.pixelsdb.pixels.common.node; -import com.google.common.hash.Hashing; -import com.google.protobuf.ByteString; -import io.pixelsdb.pixels.common.utils.ConfigFactory; -import io.pixelsdb.pixels.daemon.NodeProto; + import io.pixelsdb.pixels.common.utils.ConfigFactory; + import io.pixelsdb.pixels.daemon.NodeProto; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; + import java.util.Iterator; + import java.util.Map; + import java.util.concurrent.ConcurrentHashMap; -/** - * Component responsible for managing the cache of bucketId to RetinaNodeInfo mappings. - * It uses the Singleton pattern and lazy initialization to ensure a single instance - * and deferred creation. - * * NOTE: The cache invalidation logic (when the hash ring changes) needs to be integrated - * with NodeServiceImpl, which is simplified in this example. - */ -public class BucketCache -{ - - // Lock object for thread-safe singleton initialization - private static final Object lock = new Object(); - // Lazy-loaded Singleton instance - private static volatile BucketCache instance; - // Thread-safe map cache: Key: bucketId (0 to bucketNum - 1), Value: RetinaNodeInfo - private final Map bucketToNodeMap; + /** + * Component responsible for managing the cache of bucketId to RetinaNodeInfo mappings. + * It uses the Singleton pattern and lazy initialization to ensure a single instance + * and deferred creation. + * * NOTE: The cache invalidation logic (when the hash ring changes) needs to be integrated + * with NodeServiceImpl, which is simplified in this example. + */ + public class BucketCache + { - // NodeService client stub (would be used for actual RPC calls in a real application) - // private final NodeServiceGrpc.NodeServiceBlockingStub nodeServiceStub; - // The total number of discrete hash points (M) loaded from configuration - private final int bucketNum; - private final NodeService nodeService; + private static final Object lock = new Object(); + // Threshold for triggering eviction + private static final int MAX_ENTRIES = 1024 * 1024; + private static volatile BucketCache instance; + // Using ConcurrentHashMap for thread-safe access + private final Map bucketToNodeMap; + private final NodeService nodeService; - /** - * Private constructor to enforce the Singleton pattern. - */ - private BucketCache() - { - // In a real application, bucketNum should be fetched from ConfigFactory - ConfigFactory config = ConfigFactory.Instance(); - this.bucketNum = Integer.parseInt(config.getProperty("node.bucket.num")); + private BucketCache() + { + ConfigFactory config = ConfigFactory.Instance(); + int bucketNum = Integer.parseInt(config.getProperty("node.bucket.num")); + // Initialize with initial capacity to reduce resizing + this.bucketToNodeMap = new ConcurrentHashMap<>(Math.min(MAX_ENTRIES, bucketNum)); + this.nodeService = NodeService.Instance(); + } - // Initialize the cache structure - this.bucketToNodeMap = new ConcurrentHashMap<>(bucketNum); - this.nodeService = NodeService.Instance(); - } + public static BucketCache getInstance() + { + if (instance == null) + { + synchronized (lock) + { + if (instance == null) + { + instance = new BucketCache(); + } + } + } + return instance; + } - /** - * Retrieves the singleton instance of BucketToNodeCache. Uses double-checked - * locking for thread-safe lazy initialization. - * * @return The BucketToNodeCache instance - */ - public static BucketCache getInstance() - { - if (instance == null) - { - synchronized (lock) - { - if (instance == null) - { - instance = new BucketCache(); - } - } - } - return instance; - } + /** + * Retrieves NodeInfo for a given bucketId. + * If the cache exceeds MAX_ENTRIES, a random entry is evicted. + */ + public NodeProto.NodeInfo getRetinaNodeInfoByBucketId(int bucketId) + { + NodeProto.NodeInfo nodeInfo = bucketToNodeMap.get(bucketId); + if (nodeInfo != null) + { + return nodeInfo; + } - /** - * Core lookup method: Retrieves the corresponding RetinaNodeInfo for a given bucketId. - * Uses a cache-aside strategy (lazy loading) to populate the cache upon miss. - * * @param bucketId The hash bucket ID of the data (range 0 to bucketNum - 1) - * - * @return The corresponding RetinaNodeInfo, or null if lookup fails - */ - public NodeProto.NodeInfo getRetinaNodeInfoByBucketId(int bucketId) - { - // 1. Try to get from cache - NodeProto.NodeInfo nodeInfo = bucketToNodeMap.get(bucketId); - if (nodeInfo != null) - { - return nodeInfo; - } + NodeProto.NodeInfo fetchedNodeInfo = fetchNodeInfoFromNodeService(bucketId); - // 2. Cache miss: Fetch from the authoritative source (NodeService RPC) - NodeProto.NodeInfo fetchedNodeInfo = fetchNodeInfoFromNodeService(bucketId); + if (fetchedNodeInfo != null) + { + if (bucketToNodeMap.size() >= MAX_ENTRIES) + { + evictRandomEntry(); + } + bucketToNodeMap.put(bucketId, fetchedNodeInfo); + return fetchedNodeInfo; + } - if (fetchedNodeInfo != null) - { - // 3. Put into cache - bucketToNodeMap.put(bucketId, fetchedNodeInfo); - return fetchedNodeInfo; - } + return null; + } - return null; - } + /** + * Performs a simple random eviction. + * In ConcurrentHashMap, the iterator returns elements in an arbitrary order + * based on hash bin distribution, effectively acting as random selection. + */ + private void evictRandomEntry() + { + // Use the iterator to pick the "first" available element in the current traversal + Iterator iterator = bucketToNodeMap.keySet().iterator(); + if (iterator.hasNext()) + { + Integer randomKey = iterator.next(); + bucketToNodeMap.remove(randomKey); + } + } - private NodeProto.NodeInfo fetchNodeInfoFromNodeService(int bucketId) - { - return nodeService.getRetinaByBucket(bucketId); - } -} + private NodeProto.NodeInfo fetchNodeInfoFromNodeService(int bucketId) + { + return nodeService.getRetinaByBucket(bucketId); + } + } \ No newline at end of file diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/node/VnodeIdentifier.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/node/VnodeIdentifier.java new file mode 100644 index 0000000000..d50408dc54 --- /dev/null +++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/node/VnodeIdentifier.java @@ -0,0 +1,104 @@ +/* + * Copyright 2026 PixelsDB. + * + * This file is part of Pixels. + * + * Pixels is free software: you can redistribute it and/or modify + * it under the terms of the Affero GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Pixels is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Affero GNU General Public License for more details. + * + * You should have received a copy of the Affero GNU General Public + * License along with Pixels. If not, see + * . + */ +package io.pixelsdb.pixels.common.node; + +import io.pixelsdb.pixels.daemon.NodeProto; + +import java.util.Objects; + +/** + * Identifier for a virtual node, used as a unique key in Maps. + * This class is immutable to ensure consistency when used as a key. + */ +public final class VnodeIdentifier +{ + private final String address; + private final int virtualNodeId; + + /** + * Constructs a new identifier. + * * @param address The network address of the node. + * @param virtualNodeId The specific virtual ID on that host. + */ + public VnodeIdentifier(String address, int virtualNodeId) + { + this.address = address; + this.virtualNodeId = virtualNodeId; + } + + /** + * Factory method to create an identifier from a Protobuf NodeInfo message. + * * @param nodeInfo The Protobuf message object. + * @return A new instance of VnodeIdentifier. + */ + public static VnodeIdentifier fromNodeInfo(NodeProto.NodeInfo nodeInfo) + { + return new VnodeIdentifier(nodeInfo.getAddress(), nodeInfo.getVirtualNodeId()); + } + + public String getAddress() + { + return address; + } + + public int getVirtualNodeId() + { + return virtualNodeId; + } + + /** + * Compares this identifier with another object for equality. + * Required for correct behavior in HashMaps. + */ + @Override + public boolean equals(Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + VnodeIdentifier that = (VnodeIdentifier) o; + return virtualNodeId == that.virtualNodeId && + Objects.equals(address, that.address); + } + + /** + * Generates a hash code for this identifier. + * Required for correct behavior in HashMaps. + */ + @Override + public int hashCode() + { + return Objects.hash(address, virtualNodeId); + } + + @Override + public String toString() + { + return "VnodeIdentifier{" + + "address='" + address + '\'' + + ", virtualNodeId=" + virtualNodeId + + '}'; + } +} \ No newline at end of file diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/retina/RetinaService.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/retina/RetinaService.java index 71b1a3f1fd..7780f2e2db 100644 --- a/pixels-common/src/main/java/io/pixelsdb/pixels/common/retina/RetinaService.java +++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/retina/RetinaService.java @@ -115,7 +115,9 @@ private RetinaService(String host, int port, boolean enabled) assert (host != null); assert (port > 0 && port <= 65535); this.channel = ManagedChannelBuilder.forAddress(host, port). - usePlaintext().build(); + usePlaintext() + .maxInboundMessageSize(1024 * 1024 * 64) + .build(); this.stub = RetinaWorkerServiceGrpc.newBlockingStub(this.channel); this.asyncStub = RetinaWorkerServiceGrpc.newStub(this.channel); } else diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/IndexUtils.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/IndexUtils.java new file mode 100644 index 0000000000..f349f72888 --- /dev/null +++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/IndexUtils.java @@ -0,0 +1,68 @@ + /* + * Copyright 2026 PixelsDB. + * + * This file is part of Pixels. + * + * Pixels is free software: you can redistribute it and/or modify + * it under the terms of the Affero GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Pixels is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Affero GNU General Public License for more details. + * + * You should have received a copy of the Affero GNU General Public + * License along with Pixels. If not, see + * . + */ + + package io.pixelsdb.pixels.common.utils; + + import io.pixelsdb.pixels.common.exception.MetadataException; + import io.pixelsdb.pixels.common.metadata.MetadataService; + import io.pixelsdb.pixels.common.metadata.domain.Column; + import io.pixelsdb.pixels.common.metadata.domain.Schema; + import io.pixelsdb.pixels.common.metadata.domain.SinglePointIndex; + import io.pixelsdb.pixels.common.metadata.domain.Table; + + import java.util.LinkedList; + import java.util.List; + import java.util.stream.IntStream; + + public class IndexUtils +{ + private static final MetadataService metadataService = MetadataService.Instance(); + + public static List extractInfoFromIndex(long tableId, long indexId) throws MetadataException + { + Table table = metadataService.getTableById(tableId); + Schema schema = metadataService.getSchemaById(table.getSchemaId()); + return extractInfoFromIndex(schema.getName(), table.getName(), indexId); + } + + public static List extractInfoFromIndex(String dbName, String tableName, long indexId) throws MetadataException + { + List columns = metadataService.getColumns(dbName, tableName, false); + SinglePointIndex index = metadataService.getSinglePointIndex(indexId); + + int[] orderKeyColIds = new int[index.getKeyColumns().getKeyColumnIds().size()]; + List orderedKeyCols = new LinkedList<>(); + int keyColumnIdx = 0; + for (Integer keyColumnId : index.getKeyColumns().getKeyColumnIds()) { + int i = IntStream.range(0, columns.size()) + .filter(idx -> columns.get(idx).getId() == keyColumnId) + .findFirst() + .orElse(-1); + if(i == -1) + { + throw new MetadataException("Cant find key column id: " + keyColumnId + " in table " + + tableName + " schema id is " + dbName); + } + orderKeyColIds[keyColumnIdx++] = i; + orderedKeyCols.add(columns.get(i)); + } + return orderedKeyCols; + } +} diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/RetinaUtils.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/RetinaUtils.java index 2459ea99ce..6dfb52d383 100644 --- a/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/RetinaUtils.java +++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/RetinaUtils.java @@ -23,6 +23,7 @@ import com.google.common.hash.Hashing; import com.google.protobuf.ByteString; import io.pixelsdb.pixels.common.node.BucketCache; +import io.pixelsdb.pixels.common.node.VnodeIdentifier; import io.pixelsdb.pixels.common.retina.RetinaService; public class RetinaUtils @@ -83,6 +84,11 @@ public static String getRetinaHostNameFromBucketId(int bucketId) return BucketCache.getInstance().getRetinaNodeInfoByBucketId(bucketId).getAddress(); } + public static VnodeIdentifier getVnodeIdentifierFromBucketId(int bucketId) + { + return VnodeIdentifier.fromNodeInfo(BucketCache.getInstance().getRetinaNodeInfoByBucketId(bucketId)); + } + public static RetinaService getRetinaServiceFromBucketId(int bucketId) { String retinaHost = getRetinaHostNameFromBucketId(bucketId); diff --git a/pixels-common/src/main/resources/pixels.properties b/pixels-common/src/main/resources/pixels.properties index ff89dde243..8b17d13a7c 100644 --- a/pixels-common/src/main/resources/pixels.properties +++ b/pixels-common/src/main/resources/pixels.properties @@ -289,7 +289,8 @@ retina.buffer.flush.interval=30 retina.gc.interval=300 # retina buffer reader prefetch threads num retina.reader.prefetch.threads=8 - +# retina service init threads num +retina.service.init.threads=32 # offloading threshold for long query in seconds pixels.transaction.offload.threshold=1800 # snapshot storage directory @@ -357,6 +358,8 @@ index.rocksdb.stats.enabled=false index.rocksdb.stats.path=/tmp/rocksDBStats # Time interval (in seconds) between statistics dumps index.rocksdb.stats.interval=60 +# Time interval (in seconds) between rocksdb memory usage log +index.rocksdb.log.interval=10 # Whether to enable the latest version cache for SinglePointIndex index.cache.enabled=false # The maximum number of entries in the cache diff --git a/pixels-common/src/test/java/io/pixelsdb/pixels/common/index/TestLocalIndexService.java b/pixels-common/src/test/java/io/pixelsdb/pixels/common/index/TestLocalIndexService.java index fb25d90e23..a76b723904 100644 --- a/pixels-common/src/test/java/io/pixelsdb/pixels/common/index/TestLocalIndexService.java +++ b/pixels-common/src/test/java/io/pixelsdb/pixels/common/index/TestLocalIndexService.java @@ -20,6 +20,7 @@ package io.pixelsdb.pixels.common.index; import com.google.protobuf.ByteString; +import io.pixelsdb.pixels.common.index.service.LocalIndexService; import io.pixelsdb.pixels.index.IndexProto; import org.junit.jupiter.api.*; @@ -36,7 +37,7 @@ class TestLocalIndexService private static final long TABLE_ID = 1L; private static final long PRIMARY_INDEX_ID = 100L; private static final long SECONDARY_INDEX_ID = 200L; - + private static IndexOption indexOption; private static IndexProto.PrimaryIndexEntry primaryEntry; private static IndexProto.SecondaryIndexEntry secondaryEntry; @@ -46,8 +47,8 @@ static void setup() throws Exception indexService = LocalIndexService.Instance(); // open index - assertTrue(indexService.openIndex(TABLE_ID, PRIMARY_INDEX_ID, true)); - assertTrue(indexService.openIndex(TABLE_ID, SECONDARY_INDEX_ID, false)); + assertTrue(indexService.openIndex(TABLE_ID, PRIMARY_INDEX_ID, true, indexOption)); + assertTrue(indexService.openIndex(TABLE_ID, SECONDARY_INDEX_ID, false, indexOption)); // delicate RowId IndexProto.RowIdBatch batch = indexService.allocateRowIdBatch(TABLE_ID, 1); @@ -77,14 +78,16 @@ static void setup() throws Exception .setKey(ByteString.copyFromUtf8("key1")) .setTimestamp(12345678)) .build(); + + indexOption = IndexOption.builder().vNodeId(0).build(); } @Test @Order(1) void testPutPrimaryAndSecondaryIndex() throws Exception { - assertTrue(indexService.putPrimaryIndexEntry(primaryEntry)); - assertTrue(indexService.putSecondaryIndexEntry(secondaryEntry)); + assertTrue(indexService.putPrimaryIndexEntry(primaryEntry, indexOption)); + assertTrue(indexService.putSecondaryIndexEntry(secondaryEntry, indexOption)); } @Test @@ -92,12 +95,12 @@ void testPutPrimaryAndSecondaryIndex() throws Exception void testLookupIndex() throws Exception { // lookup primary - IndexProto.RowLocation primaryLocation = indexService.lookupUniqueIndex(primaryEntry.getIndexKey()); + IndexProto.RowLocation primaryLocation = indexService.lookupUniqueIndex(primaryEntry.getIndexKey(), indexOption); assertNotNull(primaryLocation); assertEquals(1, primaryLocation.getFileId()); // lookup secondary - List secondaryLocations = indexService.lookupNonUniqueIndex(secondaryEntry.getIndexKey()); + List secondaryLocations = indexService.lookupNonUniqueIndex(secondaryEntry.getIndexKey(), indexOption); assertNotNull(secondaryLocations); assertEquals(1, secondaryLocations.size()); } @@ -110,10 +113,10 @@ void testUpdateIndex() throws Exception IndexProto.PrimaryIndexEntry updatedPrimary = primaryEntry.toBuilder() .setRowId(newRowId) .build(); - IndexProto.RowLocation prevLocation = indexService.updatePrimaryIndexEntry(updatedPrimary); + IndexProto.RowLocation prevLocation = indexService.updatePrimaryIndexEntry(updatedPrimary, indexOption); assertNotNull(prevLocation); - List prevSecondaryRowIds = indexService.updateSecondaryIndexEntry(secondaryEntry); + List prevSecondaryRowIds = indexService.updateSecondaryIndexEntry(secondaryEntry, indexOption); assertNotNull(prevSecondaryRowIds); } @@ -122,11 +125,11 @@ void testUpdateIndex() throws Exception void testDeleteIndex() throws Exception { // delete primary - IndexProto.RowLocation deletedPrimaryLocation = indexService.deletePrimaryIndexEntry(primaryEntry.getIndexKey()); + IndexProto.RowLocation deletedPrimaryLocation = indexService.deletePrimaryIndexEntry(primaryEntry.getIndexKey(), indexOption); assertNotNull(deletedPrimaryLocation); // delete secondary - List deletedSecondaryRowIds = indexService.deleteSecondaryIndexEntry(secondaryEntry.getIndexKey()); + List deletedSecondaryRowIds = indexService.deleteSecondaryIndexEntry(secondaryEntry.getIndexKey(), indexOption); assertEquals(1, deletedSecondaryRowIds.size()); } @@ -134,16 +137,16 @@ void testDeleteIndex() throws Exception @Order(5) void testPurgeAndFlush() throws Exception { - assertTrue(indexService.putPrimaryIndexEntry(primaryEntry)); - assertTrue(indexService.putSecondaryIndexEntry(secondaryEntry)); + assertTrue(indexService.putPrimaryIndexEntry(primaryEntry, indexOption)); + assertTrue(indexService.putSecondaryIndexEntry(secondaryEntry, indexOption)); // purge primary boolean purged = indexService.purgeIndexEntries(TABLE_ID, PRIMARY_INDEX_ID, - Collections.singletonList(primaryEntry.getIndexKey()), true); + Collections.singletonList(primaryEntry.getIndexKey()), true, indexOption); assertTrue(purged); // flush primary - assertTrue(indexService.flushIndexEntriesOfFile(TABLE_ID, PRIMARY_INDEX_ID, 1L, true)); + assertTrue(indexService.flushIndexEntriesOfFile(TABLE_ID, PRIMARY_INDEX_ID, 1L, true, indexOption)); } @Test @@ -151,11 +154,11 @@ void testPurgeAndFlush() throws Exception void testCloseAndRemoveIndex() throws Exception { // close - assertTrue(indexService.closeIndex(TABLE_ID, PRIMARY_INDEX_ID, true)); - assertTrue(indexService.closeIndex(TABLE_ID, SECONDARY_INDEX_ID, false)); + assertTrue(indexService.closeIndex(TABLE_ID, PRIMARY_INDEX_ID, true, indexOption)); + assertTrue(indexService.closeIndex(TABLE_ID, SECONDARY_INDEX_ID, false, indexOption)); // remove - assertTrue(indexService.removeIndex(TABLE_ID, PRIMARY_INDEX_ID, true)); - assertTrue(indexService.removeIndex(TABLE_ID, SECONDARY_INDEX_ID, false)); + assertTrue(indexService.removeIndex(TABLE_ID, PRIMARY_INDEX_ID, true, indexOption)); + assertTrue(indexService.removeIndex(TABLE_ID, SECONDARY_INDEX_ID, false, indexOption)); } } diff --git a/pixels-core/src/main/java/io/pixelsdb/pixels/core/TypeDescription.java b/pixels-core/src/main/java/io/pixelsdb/pixels/core/TypeDescription.java index f0e81b3a21..2774e7f79e 100644 --- a/pixels-core/src/main/java/io/pixelsdb/pixels/core/TypeDescription.java +++ b/pixels-core/src/main/java/io/pixelsdb/pixels/core/TypeDescription.java @@ -20,6 +20,7 @@ package io.pixelsdb.pixels.core; import com.google.common.collect.ImmutableSet; +import io.pixelsdb.pixels.common.metadata.domain.Column; import io.pixelsdb.pixels.core.utils.Decimal; import io.pixelsdb.pixels.core.vector.*; import org.apache.logging.log4j.LogManager; @@ -36,6 +37,7 @@ import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; import static io.pixelsdb.pixels.core.utils.DatetimeUtils.*; @@ -454,6 +456,12 @@ public static TypeDescription createSchema(List types) return schema; } + public static TypeDescription createSchemaFromColumns(List columns) + { + List columnNames = columns.stream().map(Column::getName).collect(Collectors.toList()); + List columnTypes = columns.stream().map(Column::getType).collect(Collectors.toList()); + return createSchemaFromStrings(columnNames, columnTypes); + } /** * Based on the column type, create the corresponding schema. * Column types are represented as string types, e.g. "decimal(15, 2)", "varchar(10)". diff --git a/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/PixelsRecordReaderBufferImpl.java b/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/PixelsRecordReaderBufferImpl.java index 16cae39480..7cb79b61d5 100644 --- a/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/PixelsRecordReaderBufferImpl.java +++ b/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/PixelsRecordReaderBufferImpl.java @@ -59,12 +59,12 @@ public class PixelsRecordReaderBufferImpl implements PixelsRecordReader private final TypeDescription typeDescription; private final int colNum; private final int typeMode = TypeDescription.Mode.CREATE_INT_VECTOR_FOR_INT; - private final ExecutorService prefetchExecutor; // Thread pool for I/O and deserialization + private static ExecutorService prefetchExecutor; // Thread pool for I/O and deserialization private final BlockingQueue prefetchQueue; // Queue for completed batches private final AtomicInteger pendingTasks = new AtomicInteger(0); // Counter for submitted but unfinished tasks private final AtomicBoolean initialMemtableSubmitted = new AtomicBoolean(false); // Flag for active memtable - private final int maxPrefetchTasks; // Max concurrent tasks - private final int prefetchQueueCapacity; // Queue capacity + private static int maxPrefetchTasks; + private static final int prefetchQueueCapacity = DEFAULT_QUEUE_CAPACITY; // Queue capacity private final boolean shouldReadHiddenColumn; private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private final List visibilityBitmap; @@ -127,21 +127,29 @@ public PixelsRecordReaderBufferImpl(PixelsReaderOption option, this.includedColumnTypes = new ArrayList<>(); this.everRead = false; this.vNodeId = vNodeId; - this.maxPrefetchTasks = Integer.parseInt(configFactory.getProperty("retina.reader.prefetch.threads")); - this.prefetchQueueCapacity = DEFAULT_QUEUE_CAPACITY; - - this.prefetchExecutor = Executors.newFixedThreadPool(maxPrefetchTasks, r -> - { - Thread t = Executors.defaultThreadFactory().newThread(r); - t.setName("Pixels-Buffer-Reader-Prefetch"); - t.setDaemon(true); - return t; - }); this.prefetchQueue = new LinkedBlockingQueue<>(prefetchQueueCapacity); - + initInternalExecutor(); checkBeforeRead(); } + private static synchronized void initInternalExecutor() + { + if (prefetchExecutor == null) + { + ConfigFactory configFactory = ConfigFactory.Instance(); + String threadProp = configFactory.getProperty("retina.reader.prefetch.threads"); + maxPrefetchTasks = (threadProp != null) ? Integer.parseInt(threadProp) : DEFAULT_QUEUE_CAPACITY; + prefetchExecutor = Executors.newFixedThreadPool(maxPrefetchTasks, r -> + { + Thread t = new Thread(r); + t.setName("Pixels-Buffer-Reader-Prefetch-Shared"); + t.setDaemon(true); + return t; + }); + LOGGER.info("Initialized shared Pixels-Buffer-Reader-Prefetch pool with {} threads", maxPrefetchTasks); + } + } + private static boolean checkBit(RetinaProto.VisibilityBitmap bitmap, int k) { long bitmap_ = bitmap.getBitmap(k / 64); @@ -351,6 +359,7 @@ public VectorizedRowBatch readBatch() throws IOException { return createEmptyRowBatch(0); } + memoryUsage.addAndGet(-curRowBatch.getMemoryUsage()); LongColumnVector hiddenTimestampVector = (LongColumnVector) curRowBatch.cols[this.colNum - 1]; /** @@ -439,7 +448,12 @@ public long getMemoryUsage() @Override public void close() throws IOException { - prefetchExecutor.shutdownNow(); + List remaining = new ArrayList<>(); + prefetchQueue.drainTo(remaining); + for (VectorizedRowBatch b : remaining) + { + memoryUsage.addAndGet(-b.getMemoryUsage()); + } } private String getRetinaBufferStoragePathFromId(long entryId, int virtualId) diff --git a/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/index/IndexServiceImpl.java b/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/index/IndexServiceImpl.java index 3b6b32967f..ef2eb32b70 100644 --- a/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/index/IndexServiceImpl.java +++ b/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/index/IndexServiceImpl.java @@ -78,7 +78,8 @@ public void lookupUniqueIndex(IndexProto.LookupUniqueIndexRequest request, long tableId = key.getTableId(); long indexId = key.getIndexId(); MainIndex mainIndex = MainIndexFactory.Instance().getMainIndex(tableId); - SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId); + IndexOption indexOption = new IndexOption(request.getIndexOption()); + SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId, indexOption); long rowId = singlePointIndex.getUniqueRowId(key); if(rowId >= 0) { @@ -117,7 +118,8 @@ public void lookupNonUniqueIndex(IndexProto.LookupNonUniqueIndexRequest request, long tableId = key.getTableId(); long indexId = key.getIndexId(); MainIndex mainIndex = MainIndexFactory.Instance().getMainIndex(tableId); - SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId); + IndexOption indexOption = new IndexOption(request.getIndexOption()); + SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId, indexOption); List rowIds = singlePointIndex.getRowIds(key); List rowLocations = new ArrayList<>(); if(!rowIds.isEmpty()) @@ -164,7 +166,8 @@ public void putPrimaryIndexEntry(IndexProto.PutPrimaryIndexEntryRequest request, long tableId = key.getTableId(); long indexId = key.getIndexId(); MainIndex mainIndex = MainIndexFactory.Instance().getMainIndex(tableId); - SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId); + IndexOption indexOption = new IndexOption(request.getIndexOption()); + SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId, indexOption); boolean success = singlePointIndex.putEntry(entry.getIndexKey(), entry.getRowId()); if (success) { @@ -206,7 +209,8 @@ public void putPrimaryIndexEntries(IndexProto.PutPrimaryIndexEntriesRequest requ { long tableId = request.getTableId(); long indexId = request.getIndexId(); - SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId); + IndexOption indexOption = new IndexOption(request.getIndexOption()); + SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId, indexOption); boolean success = singlePointIndex.putPrimaryEntries(entries); if (success) { @@ -249,7 +253,8 @@ public void putSecondaryIndexEntry(IndexProto.PutSecondaryIndexEntryRequest requ IndexProto.IndexKey key = entry.getIndexKey(); long tableId = key.getTableId(); long indexId = key.getIndexId(); - SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId); + IndexOption indexOption = new IndexOption(request.getIndexOption()); + SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId, indexOption); boolean success = singlePointIndex.putEntry(entry.getIndexKey(), entry.getRowId()); if (success) { @@ -278,7 +283,8 @@ public void putSecondaryIndexEntries(IndexProto.PutSecondaryIndexEntriesRequest { long tableId = request.getTableId(); long indexId = request.getIndexId(); - SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId); + IndexOption indexOption = new IndexOption(request.getIndexOption()); + SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId, indexOption); boolean success = singlePointIndex.putSecondaryEntries(entries); if (success) { @@ -308,7 +314,8 @@ public void deletePrimaryIndexEntry(IndexProto.DeletePrimaryIndexEntryRequest re long tableId = key.getTableId(); long indexId = key.getIndexId(); MainIndex mainIndex = MainIndexFactory.Instance().getMainIndex(tableId); - SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId); + IndexOption indexOption = new IndexOption(request.getIndexOption()); + SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId, indexOption); long rowId = singlePointIndex.deleteUniqueEntry(key); if (rowId > 0) { @@ -350,7 +357,8 @@ public void deletePrimaryIndexEntries(IndexProto.DeletePrimaryIndexEntriesReques long tableId = request.getTableId(); long indexId = request.getIndexId(); MainIndex mainIndex = MainIndexFactory.Instance().getMainIndex(tableId); - SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId); + IndexOption indexOption = new IndexOption(request.getIndexOption()); + SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId, indexOption); List rowIds = singlePointIndex.deleteEntries(keys); if (rowIds != null && !rowIds.isEmpty()) { @@ -392,7 +400,8 @@ public void deleteSecondaryIndexEntry(IndexProto.DeleteSecondaryIndexEntryReques { long tableId = key.getTableId(); long indexId = key.getIndexId(); - SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId); + IndexOption indexOption = new IndexOption(request.getIndexOption()); + SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId, indexOption); List rowIds = singlePointIndex.deleteEntry(key); builder.setErrorCode(ErrorCode.SUCCESS).addAllRowIds(rowIds); } @@ -414,7 +423,8 @@ public void deleteSecondaryIndexEntries(IndexProto.DeleteSecondaryIndexEntriesRe { long tableId = request.getTableId(); long indexId = request.getIndexId(); - SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId); + IndexOption indexOption = new IndexOption(request.getIndexOption()); + SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId, indexOption); List rowIds = singlePointIndex.deleteEntries(keys); if (rowIds != null && !rowIds.isEmpty()) { @@ -445,7 +455,8 @@ public void updatePrimaryIndexEntry(IndexProto.UpdatePrimaryIndexEntryRequest re long tableId = key.getTableId(); long indexId = key.getIndexId(); MainIndex mainIndex = MainIndexFactory.Instance().getMainIndex(tableId); - SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId); + IndexOption indexOption = new IndexOption(request.getIndexOption()); + SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId, indexOption); long prevRowId = singlePointIndex.updatePrimaryEntry(entry.getIndexKey(), entry.getRowId()); if (prevRowId > 0) { @@ -491,7 +502,8 @@ public void updatePrimaryIndexEntries(IndexProto.UpdatePrimaryIndexEntriesReques long tableId = request.getTableId(); long indexId = request.getIndexId(); MainIndex mainIndex = MainIndexFactory.Instance().getMainIndex(tableId); - SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId); + IndexOption indexOption = new IndexOption(request.getIndexOption()); + SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId, indexOption); List prevRowIds = singlePointIndex.updatePrimaryEntries(entries); if (prevRowIds != null && !prevRowIds.isEmpty()) { @@ -543,7 +555,8 @@ public void updateSecondaryIndexEntry(IndexProto.UpdateSecondaryIndexEntryReques IndexProto.IndexKey key = entry.getIndexKey(); long tableId = key.getTableId(); long indexId = key.getIndexId(); - SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId); + IndexOption indexOption = new IndexOption(request.getIndexOption()); + SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId, indexOption); List rowIds = singlePointIndex.updateSecondaryEntry(entry.getIndexKey(), entry.getRowId()); builder.setErrorCode(ErrorCode.SUCCESS).addAllPrevRowIds(rowIds); } @@ -565,7 +578,8 @@ public void updateSecondaryIndexEntries(IndexProto.UpdateSecondaryIndexEntriesRe { long tableId = request.getTableId(); long indexId = request.getIndexId(); - SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId); + IndexOption indexOption = new IndexOption(request.getIndexOption()); + SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId, indexOption); List rowIds = singlePointIndex.updateSecondaryEntries(entries); if (rowIds != null && !rowIds.isEmpty()) { @@ -595,7 +609,8 @@ public void purgeIndexEntries(IndexProto.PurgeIndexEntriesRequest request, long tableId = request.getTableId(); long indexId = request.getIndexId(); boolean isPrimary = request.getIsPrimary(); - SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId); + IndexOption indexOption = new IndexOption(request.getIndexOption()); + SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId, indexOption); List rowIds = singlePointIndex.purgeEntries(keys); if (rowIds != null && !rowIds.isEmpty()) { @@ -684,7 +699,8 @@ public void openIndex(IndexProto.OpenIndexRequest request, { long tableId = request.getTableId(); long indexId = request.getIndexId(); - SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId); + IndexOption indexOption = new IndexOption(request.getIndexOption()); + SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId, indexOption); if (singlePointIndex != null) { if (request.getIsPrimary()) @@ -731,7 +747,9 @@ public void closeIndex(IndexProto.CloseIndexRequest request, { long tableId = request.getTableId(); long indexId = request.getIndexId(); - SinglePointIndexFactory.Instance().closeIndex(tableId, indexId, false); + IndexOption indexOption = IndexOption.builder() + .build(); + SinglePointIndexFactory.Instance().closeIndex(tableId, indexId, false, indexOption); if (request.getIsPrimary()) { MainIndexFactory.Instance().closeIndex(tableId, false); @@ -759,7 +777,8 @@ public void removeIndex(IndexProto.RemoveIndexRequest request, { long tableId = request.getTableId(); long indexId = request.getIndexId(); - SinglePointIndexFactory.Instance().closeIndex(tableId, indexId, true); + IndexOption indexOption = IndexOption.builder().build(); + SinglePointIndexFactory.Instance().closeIndex(tableId, indexId, true, indexOption); if (request.getIsPrimary()) { MainIndexFactory.Instance().closeIndex(tableId, true); diff --git a/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/metadata/MetadataServiceImpl.java b/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/metadata/MetadataServiceImpl.java index c0290f3b25..13324c781a 100644 --- a/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/metadata/MetadataServiceImpl.java +++ b/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/metadata/MetadataServiceImpl.java @@ -245,6 +245,41 @@ public void existSchema(MetadataProto.ExistSchemaRequest request, responseObserver.onCompleted(); } + @Override + public void getSchemaById(MetadataProto.GetSchemaByIdRequest request, StreamObserver responseObserver) + { + MetadataProto.ResponseHeader.Builder headerBuilder = MetadataProto.ResponseHeader.newBuilder() + .setToken(request.getHeader().getToken()); + + MetadataProto.ResponseHeader header; + MetadataProto.GetSchemaByIdResponse response; + MetadataProto.Schema schema = schemaDao.getById(request.getSchemaId()); + if (schema == null) + { + header = headerBuilder + .setErrorCode(METADATA_SCHEMA_NOT_FOUND) // Constant representing schema error + .setErrorMsg("Metadata server failed to get schema: " + request.getSchemaId()) + .build(); + + response = MetadataProto.GetSchemaByIdResponse.newBuilder() + .setHeader(header) + .build(); + } + else + { + header = headerBuilder + .setErrorCode(0) + .setErrorMsg("") + .build(); + response = MetadataProto.GetSchemaByIdResponse.newBuilder() + .setHeader(header) + .setSchema(schema) + .build(); + } + responseObserver.onNext(response); + responseObserver.onCompleted(); + } + @Override public void getSchemas(MetadataProto.GetSchemasRequest request, StreamObserver responseObserver) diff --git a/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/node/NodeServiceImpl.java b/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/node/NodeServiceImpl.java index 8ea8fd0029..047ebdb52c 100644 --- a/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/node/NodeServiceImpl.java +++ b/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/node/NodeServiceImpl.java @@ -197,8 +197,23 @@ private void addNodeInternal(NodeProto.NodeInfo.Builder node) { node.setVirtualNodeId(i); int hashPoint = hash(node.getAddress() + "#" + i) % bucketNum; - hashRing.put(hashPoint, node.build()); + NodeProto.NodeInfo oldNode = hashRing.get(hashPoint); + NodeProto.NodeInfo newNode = node.build(); + if(putNode(oldNode, newNode)) + { + hashRing.put(hashPoint, node.build()); + } + } + } + + private boolean putNode(NodeProto.NodeInfo oldNode, NodeProto.NodeInfo newNode) + { + if (oldNode == null) + { + return true; } + int cmp = oldNode.getAddress().compareTo(newNode.getAddress()); + return (cmp != 0) ? (cmp < 0) : (oldNode.getVirtualNodeId() < newNode.getVirtualNodeId()); } /** diff --git a/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/retina/RetinaServerImpl.java b/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/retina/RetinaServerImpl.java index 3a8d36c820..5102b4413d 100644 --- a/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/retina/RetinaServerImpl.java +++ b/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/retina/RetinaServerImpl.java @@ -20,15 +20,18 @@ package io.pixelsdb.pixels.daemon.retina; import com.google.common.base.Function; +import com.google.common.util.concurrent.Striped; import com.google.protobuf.ByteString; import io.grpc.stub.StreamObserver; import io.pixelsdb.pixels.common.exception.IndexException; import io.pixelsdb.pixels.common.exception.RetinaException; -import io.pixelsdb.pixels.common.index.IndexService; -import io.pixelsdb.pixels.common.index.IndexServiceProvider; +import io.pixelsdb.pixels.common.index.IndexOption; +import io.pixelsdb.pixels.common.index.service.IndexService; +import io.pixelsdb.pixels.common.index.service.IndexServiceProvider; import io.pixelsdb.pixels.common.metadata.MetadataService; import io.pixelsdb.pixels.common.metadata.domain.*; import io.pixelsdb.pixels.common.physical.Storage; +import io.pixelsdb.pixels.common.utils.ConfigFactory; import io.pixelsdb.pixels.index.IndexProto; import io.pixelsdb.pixels.retina.RetinaProto; import io.pixelsdb.pixels.retina.RetinaResourceManager; @@ -37,6 +40,12 @@ import org.apache.logging.log4j.Logger; import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -53,6 +62,7 @@ public class RetinaServerImpl extends RetinaWorkerServiceGrpc.RetinaWorkerServic private final MetadataService metadataService; private final IndexService indexService; private final RetinaResourceManager retinaResourceManager; + private final Striped updateLocks = Striped.lock(1024); /** * Initialize the visibility management for all the records. @@ -99,21 +109,79 @@ public RetinaServerImpl() .collect(Collectors.toList())); } } - for (String filePath : files) + + int threadNum = Integer.parseInt + (ConfigFactory.Instance().getProperty("retina.service.init.threads")); + ExecutorService executorService = Executors.newFixedThreadPool(threadNum); + AtomicBoolean success = new AtomicBoolean(true); + AtomicReference e = new AtomicReference<>(); + try + { + for (String filePath : files) + { + executorService.submit(() -> + { + try + { + this.retinaResourceManager.addVisibility(filePath); + } catch (Exception ex) + { + success.set(false); + e.set(ex); + } + }); + } + } finally + { + executorService.shutdown(); + } + + if(success.get()) + { + executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + } + + if(!success.get()) { - this.retinaResourceManager.addVisibility(filePath); + throw new RetinaException("Can't add visibility", e.get()); } this.retinaResourceManager.addWriteBuffer(schema.getName(), table.getName()); } } this.retinaResourceManager.finishRecovery(); + logger.info("Retina service is ready"); } catch (Exception e) { logger.error("Error while initializing RetinaServerImpl", e); } } + /** + * Check if the order or compact paths from pixels metadata is valid. + * + * @param paths the order or compact paths from pixels metadata. + */ + public static void validateOrderedOrCompactPaths(List paths) throws RetinaException + { + requireNonNull(paths, "paths is null"); + checkArgument(!paths.isEmpty(), "paths must contain at least one valid directory"); + try + { + Storage.Scheme firstScheme = Storage.Scheme.fromPath(paths.get(0).getUri()); + assert firstScheme != null; + for (int i = 1; i < paths.size(); ++i) + { + Storage.Scheme scheme = Storage.Scheme.fromPath(paths.get(i).getUri()); + checkArgument(firstScheme.equals(scheme), + "all the directories in the paths must have the same storage scheme"); + } + } catch (Throwable e) + { + throw new RetinaException("Failed to parse storage scheme from paths", e); + } + } + @Override public void updateRecord(RetinaProto.UpdateRecordRequest request, StreamObserver responseObserver) @@ -204,84 +272,16 @@ public void onCompleted() }; } - /** - * A memory-efficient, read-only view that represents the transposed version of a list of objects. - * This class implements the List interface but does not store the transposed data explicitly. - * Instead, it computes the transposed data on-the-fly when accessed. - */ - private static class TransposedIndexKeyView extends AbstractList> - { - private final List originalData; - private final Function> indexExtractor; - private final int columnCount; - - public TransposedIndexKeyView(List originalData, - Function> indexExtractor) - { - this.originalData = originalData; - this.indexExtractor = indexExtractor; - if (originalData == null || originalData.isEmpty()) - { - this.columnCount = 0; - } else - { - this.columnCount = indexExtractor.apply(originalData.get(0)).size(); - } - } - - @Override - public List get(int columnIndex) - { - if (columnIndex < 0 || columnIndex >= columnCount) - { - throw new IndexOutOfBoundsException("Column index out of bounds: " + columnIndex); - } - return new ColumnView(columnIndex); - } - - @Override - public int size() - { - return columnCount; - } - - private class ColumnView extends AbstractList - { - private final int columnIndex; - - public ColumnView(int columnIndex) - { - this.columnIndex = columnIndex; - } - - @Override - public IndexProto.IndexKey get(int rowIndex) - { - if (rowIndex < 0 || rowIndex >= originalData.size()) - { - throw new IndexOutOfBoundsException("Row index out of bounds: " + rowIndex); - } - return indexExtractor.apply(originalData.get(rowIndex)).get(columnIndex); - } - - @Override - public int size() - { - return originalData.size(); - } - } - } - - /** * Transpose the index keys from a row set to a column set. + * * @param dataList * @param indexExtractor - * @return * @param + * @return */ private List> transposeIndexKeys(List dataList, - Function> indexExtractor) + Function> indexExtractor) { if (dataList == null || dataList.isEmpty()) { @@ -302,6 +302,9 @@ private void processUpdateRequest(RetinaProto.UpdateRecordRequest request) throw { String schemaName = request.getSchemaName(); List tableUpdateDataList = request.getTableUpdateDataList(); + int virtualNodeId = request.getVirtualNodeId(); + IndexOption indexOption = new IndexOption(); + indexOption.setVNodeId(virtualNodeId); if (!tableUpdateDataList.isEmpty()) { for (RetinaProto.TableUpdateData tableUpdateData : tableUpdateDataList) @@ -325,7 +328,7 @@ private void processUpdateRequest(RetinaProto.UpdateRecordRequest request) throw boolean allRecordsValid = deleteDataList.stream().allMatch(deleteData -> deleteData.getIndexKeysCount() == indexNum && - deleteData.getIndexKeys(0).getIndexId() == primaryIndexId); + deleteData.getIndexKeys(0).getIndexId() == primaryIndexId); if (!allRecordsValid) { throw new RetinaException("Primary index id mismatch or inconsistent index key list size"); @@ -339,7 +342,7 @@ private void processUpdateRequest(RetinaProto.UpdateRecordRequest request) throw List primaryIndexKeys = indexKeysList.get(0); long tableId = primaryIndexKeys.get(0).getTableId(); List rowLocations = indexService.deletePrimaryIndexEntries - (tableId, primaryIndexId, primaryIndexKeys); + (tableId, primaryIndexId, primaryIndexKeys, indexOption); // 1d. Delete the records for (IndexProto.RowLocation rowLocation : rowLocations) @@ -352,7 +355,7 @@ private void processUpdateRequest(RetinaProto.UpdateRecordRequest request) throw { List indexKeys = indexKeysList.get(i); indexService.deleteSecondaryIndexEntries(indexKeys.get(0).getTableId(), - indexKeys.get(0).getIndexId(), indexKeys); + indexKeys.get(0).getIndexId(), indexKeys, indexOption); } } @@ -371,7 +374,7 @@ private void processUpdateRequest(RetinaProto.UpdateRecordRequest request) throw boolean allRecordValid = insertDataList.stream().allMatch(insertData -> insertData.getIndexKeysCount() == indexNum && - insertData.getIndexKeys(0).getIndexId() == primaryIndexId); + insertData.getIndexKeys(0).getIndexId() == primaryIndexId); if (!allRecordValid) { throw new RetinaException("Primary index id mismatch or inconsistent index key list size"); @@ -393,7 +396,7 @@ private void processUpdateRequest(RetinaProto.UpdateRecordRequest request) throw IndexProto.PrimaryIndexEntry.Builder builder = this.retinaResourceManager.insertRecord(schemaName, tableName, - colValuesByteArray, timestamp, request.getVirtualNodeId()); + colValuesByteArray, timestamp, virtualNodeId); builder.setIndexKey(insertData.getIndexKeys(0)); IndexProto.PrimaryIndexEntry entry = builder.build(); primaryIndexEntries.add(entry); @@ -402,7 +405,7 @@ private void processUpdateRequest(RetinaProto.UpdateRecordRequest request) throw // 2d. Put the primary index entries long tableId = primaryIndexEntries.get(0).getIndexKey().getTableId(); - indexService.putPrimaryIndexEntries(tableId, primaryIndexId, primaryIndexEntries); + indexService.putPrimaryIndexEntries(tableId, primaryIndexId, primaryIndexEntries, indexOption); // 2e. Put the secondary index entries for (int i = 1; i < indexNum; ++i) @@ -416,7 +419,7 @@ private void processUpdateRequest(RetinaProto.UpdateRecordRequest request) throw .build()) .collect(Collectors.toList()); indexService.putSecondaryIndexEntries(indexKeys.get(0).getTableId(), - indexKeys.get(0).getIndexId(), secondaryIndexEntries); + indexKeys.get(0).getIndexId(), secondaryIndexEntries, indexOption); } } @@ -435,7 +438,7 @@ private void processUpdateRequest(RetinaProto.UpdateRecordRequest request) throw boolean allRecordsValid = updateDataList.stream().allMatch(updateData -> updateData.getIndexKeysCount() == indexNum && - updateData.getIndexKeys(0).getIndexId() == primaryIndexId); + updateData.getIndexKeys(0).getIndexId() == primaryIndexId); if (!allRecordsValid) { throw new RetinaException("Primary index id mismatch or inconsistent index key list size"); @@ -457,7 +460,7 @@ private void processUpdateRequest(RetinaProto.UpdateRecordRequest request) throw IndexProto.PrimaryIndexEntry.Builder builder = this.retinaResourceManager.insertRecord(schemaName, tableName, - colValuesByteArray, timestamp, request.getVirtualNodeId()); + colValuesByteArray, timestamp, virtualNodeId); builder.setIndexKey(updateData.getIndexKeys(0)); IndexProto.PrimaryIndexEntry entry = builder.build(); @@ -467,8 +470,20 @@ private void processUpdateRequest(RetinaProto.UpdateRecordRequest request) throw // 3d. Update the primary index entries and get the previous row locations long tableId = primaryIndexEntries.get(0).getIndexKey().getTableId(); - List previousRowLocations = indexService.updatePrimaryIndexEntries - (tableId, primaryIndexId, primaryIndexEntries); + String lockKey = "vnode_" + virtualNodeId + "_idx_" + primaryIndexId; + Lock lock = updateLocks.get(lockKey); + + + List previousRowLocations = null; + lock.lock(); + try + { + previousRowLocations = indexService.updatePrimaryIndexEntries + (tableId, primaryIndexId, primaryIndexEntries, indexOption); + } finally + { + lock.unlock(); + } // 3e. Delete the previous records for (IndexProto.RowLocation location : previousRowLocations) @@ -489,8 +504,9 @@ private void processUpdateRequest(RetinaProto.UpdateRecordRequest request) throw .collect(Collectors.toList()); indexService.updateSecondaryIndexEntries(indexKeys.get(0).getTableId(), - indexKeys.get(0).getIndexId(), secondaryIndexEntries); + indexKeys.get(0).getIndexId(), secondaryIndexEntries, indexOption); } + } } } @@ -593,7 +609,7 @@ public void reclaimVisibility(RetinaProto.ReclaimVisibilityRequest request, @Override public void addWriteBuffer(RetinaProto.AddWriteBufferRequest request, - StreamObserver responseObserver) + StreamObserver responseObserver) { RetinaProto.ResponseHeader.Builder headerBuilder = RetinaProto.ResponseHeader.newBuilder() .setToken(request.getHeader().getToken()); @@ -617,7 +633,7 @@ public void addWriteBuffer(RetinaProto.AddWriteBufferRequest request, @Override public void getWriteBuffer(RetinaProto.GetWriteBufferRequest request, - StreamObserver responseObserver) + StreamObserver responseObserver) { RetinaProto.ResponseHeader.Builder headerBuilder = RetinaProto.ResponseHeader.newBuilder() .setToken(request.getHeader().getToken()); @@ -691,27 +707,70 @@ public void unregisterOffload(RetinaProto.UnregisterOffloadRequest request, } /** - * Check if the order or compact paths from pixels metadata is valid. - * - * @param paths the order or compact paths from pixels metadata. + * A memory-efficient, read-only view that represents the transposed version of a list of objects. + * This class implements the List interface but does not store the transposed data explicitly. + * Instead, it computes the transposed data on-the-fly when accessed. */ - public static void validateOrderedOrCompactPaths(List paths) throws RetinaException + private static class TransposedIndexKeyView extends AbstractList> { - requireNonNull(paths, "paths is null"); - checkArgument(!paths.isEmpty(), "paths must contain at least one valid directory"); - try + private final List originalData; + private final Function> indexExtractor; + private final int columnCount; + + public TransposedIndexKeyView(List originalData, + Function> indexExtractor) { - Storage.Scheme firstScheme = Storage.Scheme.fromPath(paths.get(0).getUri()); - assert firstScheme != null; - for (int i = 1; i < paths.size(); ++i) + this.originalData = originalData; + this.indexExtractor = indexExtractor; + if (originalData == null || originalData.isEmpty()) { - Storage.Scheme scheme = Storage.Scheme.fromPath(paths.get(i).getUri()); - checkArgument(firstScheme.equals(scheme), - "all the directories in the paths must have the same storage scheme"); + this.columnCount = 0; + } else + { + this.columnCount = indexExtractor.apply(originalData.get(0)).size(); } - } catch (Throwable e) + } + + @Override + public List get(int columnIndex) { - throw new RetinaException("Failed to parse storage scheme from paths", e); + if (columnIndex < 0 || columnIndex >= columnCount) + { + throw new IndexOutOfBoundsException("Column index out of bounds: " + columnIndex); + } + return new ColumnView(columnIndex); + } + + @Override + public int size() + { + return columnCount; + } + + private class ColumnView extends AbstractList + { + private final int columnIndex; + + public ColumnView(int columnIndex) + { + this.columnIndex = columnIndex; + } + + @Override + public IndexProto.IndexKey get(int rowIndex) + { + if (rowIndex < 0 || rowIndex >= originalData.size()) + { + throw new IndexOutOfBoundsException("Row index out of bounds: " + rowIndex); + } + return indexExtractor.apply(originalData.get(rowIndex)).get(columnIndex); + } + + @Override + public int size() + { + return originalData.size(); + } } } diff --git a/pixels-daemon/src/test/java/io/pixelsdb/pixels/daemon/index/TestIndexServicePerf.java b/pixels-daemon/src/test/java/io/pixelsdb/pixels/daemon/index/TestIndexServicePerf.java index 25f0e4c6e7..38720ebe08 100644 --- a/pixels-daemon/src/test/java/io/pixelsdb/pixels/daemon/index/TestIndexServicePerf.java +++ b/pixels-daemon/src/test/java/io/pixelsdb/pixels/daemon/index/TestIndexServicePerf.java @@ -21,8 +21,9 @@ import com.google.protobuf.ByteString; import io.pixelsdb.pixels.common.exception.IndexException; import io.pixelsdb.pixels.common.exception.MetadataException; -import io.pixelsdb.pixels.common.index.IndexService; -import io.pixelsdb.pixels.common.index.IndexServiceProvider; +import io.pixelsdb.pixels.common.index.IndexOption; +import io.pixelsdb.pixels.common.index.service.IndexService; +import io.pixelsdb.pixels.common.index.service.IndexServiceProvider; import io.pixelsdb.pixels.common.index.RowIdAllocator; import io.pixelsdb.pixels.common.metadata.MetadataService; import io.pixelsdb.pixels.common.metadata.domain.*; @@ -61,7 +62,7 @@ public class TestIndexServicePerf private static final List rowIdAllocators = new ArrayList<>(); private static Config config; private static AtomicBoolean running = new AtomicBoolean(true); - + private static IndexOption indexOption; private static class Config { @@ -90,6 +91,7 @@ static void setup() throws MetadataException, InterruptedException { dropSchema(); config = new Config(); + indexOption = IndexOption.builder().vNodeId(0).build(); List schemas = metadataService.getSchemas(); boolean exists = schemas.stream() .anyMatch(schema -> schema.getName().equals(testSchemaName)); @@ -245,7 +247,7 @@ private void fillSequentialData() throws Exception try { currentRowWrite.incrementAndGet(); - indexService.putPrimaryIndexEntry(primaryIndexEntry); + indexService.putPrimaryIndexEntry(primaryIndexEntry, indexOption); } catch (IndexException e) { throw new RuntimeException(e); @@ -452,7 +454,7 @@ private void consumeBucket(int bucketId) try { currentRowWrite.incrementAndGet(); - indexService.updatePrimaryIndexEntry(primaryIndexEntry); + indexService.updatePrimaryIndexEntry(primaryIndexEntry, indexOption); } catch (IndexException e) { throw new RuntimeException(e); diff --git a/pixels-example/src/main/java/io/pixelsdb/pixels/example/core/TestCachingSinglePointIndex.java b/pixels-example/src/main/java/io/pixelsdb/pixels/example/core/TestCachingSinglePointIndex.java index ce177e738c..d9f3c71ac6 100644 --- a/pixels-example/src/main/java/io/pixelsdb/pixels/example/core/TestCachingSinglePointIndex.java +++ b/pixels-example/src/main/java/io/pixelsdb/pixels/example/core/TestCachingSinglePointIndex.java @@ -22,6 +22,7 @@ import com.google.protobuf.ByteString; import io.pixelsdb.pixels.common.exception.SinglePointIndexException; import io.pixelsdb.pixels.common.index.CachingSinglePointIndex; +import io.pixelsdb.pixels.common.index.IndexOption; import io.pixelsdb.pixels.index.IndexProto; import io.pixelsdb.pixels.index.rocksdb.RocksDBIndex; @@ -70,7 +71,10 @@ public void tearDown() throws IOException public void testRocksDBIndex() throws Exception { System.out.println("\n-- Starting benchmark for [RocksDBIndex] --"); - this.index = new RocksDBIndex(0, 0, true); + IndexOption option = IndexOption.builder() + .vNodeId(0) + .build(); + this.index = new RocksDBIndex(0, 0, true, option); loadData(); updateData(); } diff --git a/pixels-index/pixels-index-mapdb/src/main/java/io/pixelsdb/pixels/index/mapdb/MapDBIndexProvider.java b/pixels-index/pixels-index-mapdb/src/main/java/io/pixelsdb/pixels/index/mapdb/MapDBIndexProvider.java index 88e91e8a9a..3259eb2f0d 100644 --- a/pixels-index/pixels-index-mapdb/src/main/java/io/pixelsdb/pixels/index/mapdb/MapDBIndexProvider.java +++ b/pixels-index/pixels-index-mapdb/src/main/java/io/pixelsdb/pixels/index/mapdb/MapDBIndexProvider.java @@ -20,6 +20,7 @@ package io.pixelsdb.pixels.index.mapdb; import io.pixelsdb.pixels.common.exception.SinglePointIndexException; +import io.pixelsdb.pixels.common.index.IndexOption; import io.pixelsdb.pixels.common.index.SinglePointIndex; import io.pixelsdb.pixels.common.index.SinglePointIndexProvider; import io.pixelsdb.pixels.common.utils.ConfigFactory; @@ -57,7 +58,7 @@ public class MapDBIndexProvider implements SinglePointIndexProvider @Override public SinglePointIndex createInstance(long tableId, long indexId, @Nonnull SinglePointIndex.Scheme scheme, - boolean unique) throws SinglePointIndexException + boolean unique, IndexOption indexOption) throws SinglePointIndexException { if (scheme == SinglePointIndex.Scheme.mapdb) { diff --git a/pixels-index/pixels-index-memory/src/main/java/io/pixelsdb/pixels/index/memory/MemoryIndexProvider.java b/pixels-index/pixels-index-memory/src/main/java/io/pixelsdb/pixels/index/memory/MemoryIndexProvider.java index d2d12359e2..f0b8dc9ec9 100644 --- a/pixels-index/pixels-index-memory/src/main/java/io/pixelsdb/pixels/index/memory/MemoryIndexProvider.java +++ b/pixels-index/pixels-index-memory/src/main/java/io/pixelsdb/pixels/index/memory/MemoryIndexProvider.java @@ -20,6 +20,7 @@ package io.pixelsdb.pixels.index.memory; import io.pixelsdb.pixels.common.exception.SinglePointIndexException; +import io.pixelsdb.pixels.common.index.IndexOption; import io.pixelsdb.pixels.common.index.SinglePointIndex; import io.pixelsdb.pixels.common.index.SinglePointIndexProvider; @@ -33,7 +34,7 @@ public class MemoryIndexProvider implements SinglePointIndexProvider { @Override public SinglePointIndex createInstance(long tableId, long indexId, @Nonnull SinglePointIndex.Scheme scheme, - boolean unique) throws SinglePointIndexException + boolean unique, IndexOption indexOption) throws SinglePointIndexException { if (scheme == SinglePointIndex.Scheme.memory) { diff --git a/pixels-index/pixels-index-rocksdb/pom.xml b/pixels-index/pixels-index-rocksdb/pom.xml index 5376f5fc40..2e019d349f 100644 --- a/pixels-index/pixels-index-rocksdb/pom.xml +++ b/pixels-index/pixels-index-rocksdb/pom.xml @@ -24,6 +24,10 @@ io.pixelsdb pixels-common + + io.pixelsdb + pixels-core + org.rocksdb @@ -35,6 +39,12 @@ commons-io commons-io + + + org.slf4j + log4j-over-slf4j + true + io.pixelsdb diff --git a/pixels-index/pixels-index-rocksdb/src/main/java/io/pixelsdb/pixels/index/rocksdb/RocksDBFactory.java b/pixels-index/pixels-index-rocksdb/src/main/java/io/pixelsdb/pixels/index/rocksdb/RocksDBFactory.java index ecaa729199..d70732acc5 100644 --- a/pixels-index/pixels-index-rocksdb/src/main/java/io/pixelsdb/pixels/index/rocksdb/RocksDBFactory.java +++ b/pixels-index/pixels-index-rocksdb/src/main/java/io/pixelsdb/pixels/index/rocksdb/RocksDBFactory.java @@ -16,17 +16,22 @@ */ package io.pixelsdb.pixels.index.rocksdb; +import io.pixelsdb.pixels.common.exception.MetadataException; +import io.pixelsdb.pixels.common.metadata.domain.Column; import io.pixelsdb.pixels.common.utils.ConfigFactory; +import io.pixelsdb.pixels.common.utils.IndexUtils; +import io.pixelsdb.pixels.core.TypeDescription; import org.rocksdb.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; /** * @package: io.pixelsdb.pixels.index.rocksdb @@ -36,6 +41,7 @@ */ public class RocksDBFactory { + private static final Logger logger = LoggerFactory.getLogger(RocksDBFactory.class); private static final String dbPath = ConfigFactory.Instance().getProperty("index.rocksdb.data.path"); private static final boolean multiCF = Boolean.parseBoolean(ConfigFactory.Instance().getProperty("index.rocksdb.multicf")); private static RocksDB instance; @@ -50,23 +56,40 @@ public class RocksDBFactory private static final AtomicInteger reference = new AtomicInteger(0); private static final Map cfHandles = new ConcurrentHashMap<>(); private static final String defaultColumnFamily = new String(RocksDB.DEFAULT_COLUMN_FAMILY, StandardCharsets.UTF_8); + private static final Map indexKeyLenCache = new ConcurrentHashMap<>(); + private static final Integer VARIABLE_LEN_SENTINEL = -2; + private static final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(runnable -> + { + Thread thread = new Thread(runnable, "RocksDB-Metrics-Logger"); + thread.setDaemon(true); + return thread; + }); private RocksDBFactory() { } - private static RocksDB createRocksDB() throws RocksDBException + static RocksDB createRocksDB(String rocksDBPath) throws RocksDBException { // 1. Get existing column families (returns empty list for new database) List existingColumnFamilies; try { - existingColumnFamilies = RocksDB.listColumnFamilies(new Options(), dbPath); + existingColumnFamilies = RocksDB.listColumnFamilies(new Options(), rocksDBPath); } catch (RocksDBException e) { // For new database, return list containing only default column family existingColumnFamilies = Collections.singletonList(RocksDB.DEFAULT_COLUMN_FAMILY); } // 2. Ensure default column family is included - if (!existingColumnFamilies.contains(RocksDB.DEFAULT_COLUMN_FAMILY)) + boolean existDefaultCF = false; + for (byte[] cf : existingColumnFamilies) + { + if (Arrays.equals(cf, RocksDB.DEFAULT_COLUMN_FAMILY)) + { + existDefaultCF = true; + break; + } + } + if (!existDefaultCF) { existingColumnFamilies = new ArrayList<>(existingColumnFamilies); existingColumnFamilies.add(RocksDB.DEFAULT_COLUMN_FAMILY); @@ -78,9 +101,25 @@ private static RocksDB createRocksDB() throws RocksDBException } // 3. Prepare column family descriptors - List descriptors = existingColumnFamilies.stream() - .map(RocksDBFactory::createCFDescriptor) - .collect(Collectors.toList()); + List descriptors = new ArrayList<>(); + for(byte[] existingColumnFamily: existingColumnFamilies) + { + long[] ids = parseTableAndIndexId(existingColumnFamily); + Integer keyLen = null; + if(ids != null) + { + long tableId = ids[0]; + long indexId = ids[1]; + try + { + keyLen = getIndexKeyLen(tableId, indexId); + } catch (MetadataException ignored) + { + + } + } + descriptors.add(createCFDescriptor(existingColumnFamily, keyLen)); + } // 4. Open DB List handles = new ArrayList<>(); @@ -100,16 +139,19 @@ private static RocksDB createRocksDB() throws RocksDBException if(enableRocksDBStats) { - String statsPath = ConfigFactory.Instance().getProperty("index.rocksdb.stats.path"); int statsInterval = Integer.parseInt(ConfigFactory.Instance().getProperty("index.rocksdb.stats.interval")); + String statsPath = ConfigFactory.Instance().getProperty("index.rocksdb.stats.path"); Statistics stats = new Statistics(); dbOptions.setStatistics(stats) .setStatsDumpPeriodSec(statsInterval) .setDbLogDir(statsPath); - } - - RocksDB db = RocksDB.open(dbOptions, dbPath, descriptors, handles); + } + RocksDB db = RocksDB.open(dbOptions, rocksDBPath, descriptors, handles); + if(enableRocksDBStats) + { + startRocksDBLogThread(db); + } // 5. Save handles for reuse for (int i = 0; i < descriptors.size(); i++) { @@ -119,7 +161,7 @@ private static RocksDB createRocksDB() throws RocksDBException return db; } - private static ColumnFamilyDescriptor createCFDescriptor(byte[] name) + private static ColumnFamilyDescriptor createCFDescriptor(byte[] name, Integer keyLen) { ConfigFactory config = ConfigFactory.Instance(); @@ -144,6 +186,10 @@ private static ColumnFamilyDescriptor createCFDescriptor(byte[] name) long targetFileSizeBase = Long.parseLong(config.getProperty("index.rocksdb.target.file.size.base")); int targetFileSizeMultiplier = Integer.parseInt(config.getProperty("index.rocksdb.target.file.size.multiplier")); int fixedLengthPrefix = Integer.parseInt(config.getProperty("index.rocksdb.prefix.length")); + if(keyLen != null) + { + fixedLengthPrefix = keyLen + Long.BYTES; // key buffer + index id + } CompactionStyle compactionStyle = CompactionStyle.valueOf(config.getProperty("index.rocksdb.compaction.style")); // Compression Options @@ -172,9 +218,9 @@ private static ColumnFamilyDescriptor createCFDescriptor(byte[] name) /** * Get or create a ColumnFamily for (tableId, indexId). */ - public static synchronized ColumnFamilyHandle getOrCreateColumnFamily(long tableId, long indexId) throws RocksDBException + public static synchronized ColumnFamilyHandle getOrCreateColumnFamily(long tableId, long indexId, int vNodeId) throws RocksDBException { - String cfName = getCFName(tableId, indexId); + String cfName = getCFName(tableId, indexId, vNodeId); // Return cached handle if exists if (cfHandles.containsKey(cfName)) @@ -182,18 +228,32 @@ public static synchronized ColumnFamilyHandle getOrCreateColumnFamily(long table return cfHandles.get(cfName); } + Integer keyLen = null; + try + { + keyLen = getIndexKeyLen(tableId, indexId); + } catch (MetadataException ignored) + { + + } + RocksDB db = getRocksDB(); - ColumnFamilyDescriptor newCF = createCFDescriptor(cfName.getBytes(StandardCharsets.UTF_8)); + ColumnFamilyDescriptor newCF = createCFDescriptor(cfName.getBytes(StandardCharsets.UTF_8), keyLen); ColumnFamilyHandle handle = db.createColumnFamily(newCF); cfHandles.put(cfName, handle); return handle; } - private static String getCFName(long tableId, long indexId) + protected static synchronized Map getAllCfHandles() + { + return cfHandles; + } + + private static String getCFName(long tableId, long indexId, int vNodeId) { if(multiCF) { - return "t" + tableId + "_i" + indexId; + return "t" + tableId + "_i" + indexId + "_v" + vNodeId; } else { @@ -201,11 +261,108 @@ private static String getCFName(long tableId, long indexId) } } + private static long[] parseTableAndIndexId(byte[] cfNameBytes) throws RocksDBException + { + if (cfNameBytes == null || Arrays.equals(cfNameBytes, RocksDB.DEFAULT_COLUMN_FAMILY)) + { + return null; + } + + String name = new String(cfNameBytes, StandardCharsets.UTF_8); + + try + { + // Expected format: "t{tableId}_i{indexId}_v{vNodeId}" + // Example: "t100_i200_v5" -> ["100", "200", "30"] + if (name.startsWith("t") && name.contains("_i") && name.contains("_v")) + { + // Remove the leading 't' + String content = name.substring(1); + + // Split using regex for multiple delimiters: _i and _v + String[] parts = content.split("_i|_v"); + + if (parts.length == 3) + { + long tableId = Long.parseLong(parts[0]); + long indexId = Long.parseLong(parts[1]); + long vNodeId = Long.parseLong(parts[2]); + + return new long[]{tableId, indexId, vNodeId}; + } + else + { + throw new RocksDBException("Failed to parse CF name (invalid segments): " + name); + } + } + } + catch (Exception e) + { + throw new RocksDBException("Failed to parse CF name: " + name); + } + + return null; + } + + private static Integer getIndexKeyLen(long tableId, long indexId) throws MetadataException + { + // Try to retrieve from cache using only indexId + Integer cachedLen = indexKeyLenCache.get(indexId); + if (cachedLen != null) + { + return cachedLen.equals(VARIABLE_LEN_SENTINEL) ? null : cachedLen; + } + + // Cache miss: Perform the metadata lookup + List keyColumns = IndexUtils.extractInfoFromIndex(tableId, indexId); + TypeDescription keySchema = TypeDescription.createSchemaFromColumns(keyColumns); + + int keyLen = 0; + Integer result = null; + + if (keySchema.getChildren() != null) + { + boolean isFixedLen = true; + for (TypeDescription typeDescription : keySchema.getChildren()) + { + int colLen = keyLengthOf(typeDescription.getCategory().getExternalJavaType()); + if (colLen == -1) + { + isFixedLen = false; + break; + } + keyLen += colLen; + } + + if (isFixedLen) + { + result = keyLen; + } + } + + // Update cache (store result or sentinel) + indexKeyLenCache.put(indexId, result == null ? VARIABLE_LEN_SENTINEL : result); + + return result; + } + + public static int keyLengthOf(Class clazz) { + if (clazz == boolean.class) return 1; + if (clazz == byte.class) return 1; + if (clazz == short.class) return 2; + if (clazz == int.class) return 4; + if (clazz == long.class) return 8; + if (clazz == float.class) return 4; + if (clazz == double.class) return 8; + + // Not Primitive + return -1; + } public static synchronized RocksDB getRocksDB() throws RocksDBException { if (instance == null || instance.isClosed()) { - instance = createRocksDB(); + instance = createRocksDB(dbPath); } reference.incrementAndGet(); return instance; @@ -229,4 +386,52 @@ public static synchronized String getDbPath() { return dbPath; } + + private static void startRocksDBLogThread(RocksDB db) + { + int logInterval = Integer.parseInt(ConfigFactory.Instance().getProperty("index.rocksdb.log.interval")); + List dbList = Collections.singletonList(db); + + scheduler.scheduleAtFixedRate(() -> + { + try + { + // 1. Get RocksDB Native Metrics + Map memoryUsage = MemoryUtil.getApproximateMemoryUsageByType(dbList, null); + long tableReaders = memoryUsage.getOrDefault(MemoryUsageType.kTableReadersTotal, 0L); + long memTable = memoryUsage.getOrDefault(MemoryUsageType.kMemTableTotal, 0L); + long blockCacheOnly = db.getLongProperty("rocksdb.block-cache-usage"); + long indexFilterOnly = Math.max(0, tableReaders - blockCacheOnly); + long totalNativeBytes = tableReaders + memTable; + + // 2. Get JVM Heap Metrics + Runtime runtime = Runtime.getRuntime(); + long heapMax = runtime.maxMemory(); + long heapCommitted = runtime.totalMemory(); + long heapUsed = heapCommitted - runtime.freeMemory(); + + // 3. Format string with both RocksDB and JVM data + // We use GiB for all units to keep the Shell script calculations simple + double GiB = 1024.0 * 1024.0 * 1024.0; + + String formattedMetrics = String.format( + "TotalNative≈%.4f GiB (MemTable=%.4f GiB, BlockCache=%.4f GiB, IndexFilter=%.4f GiB) " + + "JVMHeap (Used=%.4f GiB, Committed=%.4f GiB, Max=%.4f GiB)", + totalNativeBytes / GiB, + memTable / GiB, + blockCacheOnly / GiB, + indexFilterOnly / GiB, + heapUsed / GiB, + heapCommitted / GiB, + heapMax / GiB + ); + + logger.info("[RocksDB Metrics] {}", formattedMetrics); + } + catch (Exception e) + { + logger.error("Error occurred during RocksDB metrics collection", e); + } + }, 0, logInterval, TimeUnit.SECONDS); + } } diff --git a/pixels-index/pixels-index-rocksdb/src/main/java/io/pixelsdb/pixels/index/rocksdb/RocksDBIndex.java b/pixels-index/pixels-index-rocksdb/src/main/java/io/pixelsdb/pixels/index/rocksdb/RocksDBIndex.java index 787f5e7053..3d9ef58ae5 100644 --- a/pixels-index/pixels-index-rocksdb/src/main/java/io/pixelsdb/pixels/index/rocksdb/RocksDBIndex.java +++ b/pixels-index/pixels-index-rocksdb/src/main/java/io/pixelsdb/pixels/index/rocksdb/RocksDBIndex.java @@ -23,6 +23,7 @@ import com.google.protobuf.ByteString; import io.pixelsdb.pixels.common.exception.SinglePointIndexException; import io.pixelsdb.pixels.common.index.CachingSinglePointIndex; +import io.pixelsdb.pixels.common.index.IndexOption; import io.pixelsdb.pixels.index.IndexProto; import org.apache.commons.io.FileUtils; import org.rocksdb.*; @@ -58,7 +59,7 @@ public class RocksDBIndex extends CachingSinglePointIndex private final AtomicBoolean closed = new AtomicBoolean(false); private final AtomicBoolean removed = new AtomicBoolean(false); - public RocksDBIndex(long tableId, long indexId, boolean unique) throws RocksDBException + public RocksDBIndex(long tableId, long indexId, boolean unique, IndexOption indexOption) throws RocksDBException { super(); this.tableId = tableId; @@ -68,7 +69,7 @@ public RocksDBIndex(long tableId, long indexId, boolean unique) throws RocksDBEx this.rocksDB = RocksDBFactory.getRocksDB(); this.unique = unique; this.writeOptions = new WriteOptions(); - this.columnFamilyHandle = RocksDBFactory.getOrCreateColumnFamily(tableId, indexId); + this.columnFamilyHandle = RocksDBFactory.getOrCreateColumnFamily(tableId, indexId, indexOption.getVNodeId()); } @Override diff --git a/pixels-index/pixels-index-rocksdb/src/main/java/io/pixelsdb/pixels/index/rocksdb/RocksDBIndexProvider.java b/pixels-index/pixels-index-rocksdb/src/main/java/io/pixelsdb/pixels/index/rocksdb/RocksDBIndexProvider.java index 406dfa590c..683be75be7 100644 --- a/pixels-index/pixels-index-rocksdb/src/main/java/io/pixelsdb/pixels/index/rocksdb/RocksDBIndexProvider.java +++ b/pixels-index/pixels-index-rocksdb/src/main/java/io/pixelsdb/pixels/index/rocksdb/RocksDBIndexProvider.java @@ -20,6 +20,7 @@ package io.pixelsdb.pixels.index.rocksdb; import io.pixelsdb.pixels.common.exception.SinglePointIndexException; +import io.pixelsdb.pixels.common.index.IndexOption; import io.pixelsdb.pixels.common.index.SinglePointIndex; import io.pixelsdb.pixels.common.index.SinglePointIndexProvider; import org.rocksdb.RocksDBException; @@ -34,13 +35,13 @@ public class RocksDBIndexProvider implements SinglePointIndexProvider { @Override public SinglePointIndex createInstance(long tableId, long indexId, @Nonnull SinglePointIndex.Scheme scheme, - boolean unique) throws SinglePointIndexException + boolean unique, IndexOption indexOption) throws SinglePointIndexException { if (scheme == SinglePointIndex.Scheme.rocksdb) { try { - return new RocksDBIndex(tableId, indexId, unique); + return new RocksDBIndex(tableId, indexId, unique, indexOption); } catch (RocksDBException e) { diff --git a/pixels-index/pixels-index-rocksdb/src/test/java/io/pixelsdb/pixels/index/rocksdb/TestRetinaTrace.java b/pixels-index/pixels-index-rocksdb/src/test/java/io/pixelsdb/pixels/index/rocksdb/TestRetinaTrace.java index c3aaa090da..d996eaf4ed 100644 --- a/pixels-index/pixels-index-rocksdb/src/test/java/io/pixelsdb/pixels/index/rocksdb/TestRetinaTrace.java +++ b/pixels-index/pixels-index-rocksdb/src/test/java/io/pixelsdb/pixels/index/rocksdb/TestRetinaTrace.java @@ -21,8 +21,9 @@ import com.google.protobuf.ByteString; import io.pixelsdb.pixels.common.exception.IndexException; -import io.pixelsdb.pixels.common.index.IndexService; -import io.pixelsdb.pixels.common.index.IndexServiceProvider; +import io.pixelsdb.pixels.common.index.IndexOption; +import io.pixelsdb.pixels.common.index.service.IndexService; +import io.pixelsdb.pixels.common.index.service.IndexServiceProvider; import io.pixelsdb.pixels.index.IndexProto; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.BeforeAll; @@ -75,6 +76,7 @@ public class TestRetinaTrace private static final IndexService indexService = IndexServiceProvider.getService(IndexServiceProvider.ServiceMode.local); + private static IndexOption indexOption; /** * Load the initial data into the index */ @@ -82,6 +84,7 @@ public class TestRetinaTrace public static void prepare() { System.out.println("Preparing data from loadPath into index..."); + indexOption = IndexOption.builder().vNodeId(0).build(); long count = 0; try (BufferedReader reader = Files.newBufferedReader(Paths.get(loadPath))) { @@ -92,7 +95,7 @@ public static void prepare() String[] parts = line.split("\\t"); PutOperation putOperation = new PutOperation(parts); IndexProto.PrimaryIndexEntry entry = (IndexProto.PrimaryIndexEntry) putOperation.toProto(); - indexService.putPrimaryIndexEntry(entry); + indexService.putPrimaryIndexEntry(entry, indexOption); } } catch (IOException e) { @@ -223,10 +226,10 @@ public void run() { if (proto instanceof IndexProto.PrimaryIndexEntry) { - indexService.putPrimaryIndexEntry((IndexProto.PrimaryIndexEntry) proto); + indexService.putPrimaryIndexEntry((IndexProto.PrimaryIndexEntry) proto, indexOption); } else { - indexService.deletePrimaryIndexEntry((IndexProto.IndexKey) proto); + indexService.deletePrimaryIndexEntry((IndexProto.IndexKey) proto, indexOption); } } } catch (IndexException e) diff --git a/pixels-index/pixels-index-rocksdb/src/test/java/io/pixelsdb/pixels/index/rocksdb/TestRocksDB.java b/pixels-index/pixels-index-rocksdb/src/test/java/io/pixelsdb/pixels/index/rocksdb/TestRocksDB.java index 4ed938e0a5..30fdf447d6 100644 --- a/pixels-index/pixels-index-rocksdb/src/test/java/io/pixelsdb/pixels/index/rocksdb/TestRocksDB.java +++ b/pixels-index/pixels-index-rocksdb/src/test/java/io/pixelsdb/pixels/index/rocksdb/TestRocksDB.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; /** * @author hank @@ -343,4 +344,36 @@ public void testBasic() } } } + + @Test + public void testFullCompaction() throws RocksDBException + { + long openStart = System.currentTimeMillis(); + final String dbPath = "/home/ubuntu/disk6/collected_indexes/realtime-pixels-retina-2/rocksdb"; + System.out.println("Start Open RocksDB: " + dbPath); + try (RocksDB rocksDB = RocksDBFactory.createRocksDB(dbPath)) + { + long openEnd = System.currentTimeMillis(); + System.out.println("Open RocksDB Duration: " + (openEnd - openStart) + "ms" + "\tPath:" + dbPath); + long start = System.currentTimeMillis(); + System.out.println("Start Full Compaction"); + Map cfHandles = RocksDBFactory.getAllCfHandles(); + System.out.println("Column Family Count:" + cfHandles.size()); + // Iterate through each Column Family and trigger a manual compaction + for (Map.Entry entry : cfHandles.entrySet()) { + String cfName = entry.getKey(); + ColumnFamilyHandle handle = entry.getValue(); + System.out.println("Compacting Column Family: " + cfName); + /* * compactRange(handle) triggers a full compaction for the specific CF. + * It processes all levels from 0 to the maximum level. + * This is a synchronous/blocking call. + */ + rocksDB.compactRange(handle); + } + + long end = System.currentTimeMillis(); + System.out.println("Compaction Duration: " + (end - start) + "ms" + "\tPath:" + dbPath); + } + // Note: Ensure RocksDBFactory or a 'finally' block handles the closing of ColumnFamilyHandles + } } diff --git a/pixels-index/pixels-index-rocksdb/src/test/java/io/pixelsdb/pixels/index/rocksdb/TestRocksDBIndex.java b/pixels-index/pixels-index-rocksdb/src/test/java/io/pixelsdb/pixels/index/rocksdb/TestRocksDBIndex.java index a0aa0b8fdd..ca3a35ca4a 100644 --- a/pixels-index/pixels-index-rocksdb/src/test/java/io/pixelsdb/pixels/index/rocksdb/TestRocksDBIndex.java +++ b/pixels-index/pixels-index-rocksdb/src/test/java/io/pixelsdb/pixels/index/rocksdb/TestRocksDBIndex.java @@ -23,6 +23,7 @@ import com.google.protobuf.ByteString; import io.pixelsdb.pixels.common.exception.MainIndexException; import io.pixelsdb.pixels.common.exception.SinglePointIndexException; +import io.pixelsdb.pixels.common.index.IndexOption; import io.pixelsdb.pixels.common.index.SinglePointIndex; import io.pixelsdb.pixels.index.IndexProto; import org.junit.jupiter.api.AfterEach; @@ -51,8 +52,11 @@ public class TestRocksDBIndex @BeforeEach public void setUp() throws RocksDBException { - uniqueIndex = new RocksDBIndex(TABLE_ID, INDEX_ID, true); - nonUniqueIndex = new RocksDBIndex(TABLE_ID, INDEX_ID + 1, false); + IndexOption option = IndexOption.builder() + .vNodeId(0) + .build(); + uniqueIndex = new RocksDBIndex(TABLE_ID, INDEX_ID, true, option); + nonUniqueIndex = new RocksDBIndex(TABLE_ID, INDEX_ID + 1, false, option); rocksDB = RocksDBFactory.getRocksDB(); } diff --git a/pixels-index/pixels-index-rocksdb/src/test/java/io/pixelsdb/pixels/index/rocksdb/TestRocksDBIndexExtensive.java b/pixels-index/pixels-index-rocksdb/src/test/java/io/pixelsdb/pixels/index/rocksdb/TestRocksDBIndexExtensive.java index 0b642a9eaa..904ecc482b 100644 --- a/pixels-index/pixels-index-rocksdb/src/test/java/io/pixelsdb/pixels/index/rocksdb/TestRocksDBIndexExtensive.java +++ b/pixels-index/pixels-index-rocksdb/src/test/java/io/pixelsdb/pixels/index/rocksdb/TestRocksDBIndexExtensive.java @@ -22,6 +22,7 @@ import com.google.protobuf.ByteString; import io.pixelsdb.pixels.common.exception.MainIndexException; import io.pixelsdb.pixels.common.exception.SinglePointIndexException; +import io.pixelsdb.pixels.common.index.IndexOption; import io.pixelsdb.pixels.index.IndexProto; import org.junit.After; import org.junit.Before; @@ -52,8 +53,11 @@ public class TestRocksDBIndexExtensive @Before public void setUp() throws RocksDBException { - uniqueIndex = new RocksDBIndex(TABLE_ID, INDEX_ID, true); - nonUniqueIndex = new RocksDBIndex(TABLE_ID, INDEX_ID + 1, false); + IndexOption option = IndexOption.builder() + .vNodeId(0) + .build(); + uniqueIndex = new RocksDBIndex(TABLE_ID, INDEX_ID, true, option); + nonUniqueIndex = new RocksDBIndex(TABLE_ID, INDEX_ID + 1, false, option); } @After diff --git a/pixels-index/pixels-index-rockset/src/main/java/io/pixelsdb/pixels/index/rockset/RocksetIndexProvider.java b/pixels-index/pixels-index-rockset/src/main/java/io/pixelsdb/pixels/index/rockset/RocksetIndexProvider.java index ca9682d6b2..76584d7e2c 100644 --- a/pixels-index/pixels-index-rockset/src/main/java/io/pixelsdb/pixels/index/rockset/RocksetIndexProvider.java +++ b/pixels-index/pixels-index-rockset/src/main/java/io/pixelsdb/pixels/index/rockset/RocksetIndexProvider.java @@ -20,6 +20,7 @@ package io.pixelsdb.pixels.index.rockset; import io.pixelsdb.pixels.common.exception.SinglePointIndexException; +import io.pixelsdb.pixels.common.index.IndexOption; import io.pixelsdb.pixels.common.index.SinglePointIndex; import io.pixelsdb.pixels.common.index.SinglePointIndexProvider; import io.pixelsdb.pixels.common.utils.ConfigFactory; @@ -41,7 +42,7 @@ public class RocksetIndexProvider implements SinglePointIndexProvider @Override public SinglePointIndex createInstance(long tableId, long indexId, @Nonnull SinglePointIndex.Scheme scheme, - boolean unique) throws SinglePointIndexException + boolean unique, IndexOption indexOption) throws SinglePointIndexException { if (scheme == SinglePointIndex.Scheme.rockset) { diff --git a/pixels-index/pixels-index-rockset/src/test/java/io/pixelsdb/pixels/index/rockset/TestRocksetIndex.java b/pixels-index/pixels-index-rockset/src/test/java/io/pixelsdb/pixels/index/rockset/TestRocksetIndex.java index b07de105b4..94300efd8a 100644 --- a/pixels-index/pixels-index-rockset/src/test/java/io/pixelsdb/pixels/index/rockset/TestRocksetIndex.java +++ b/pixels-index/pixels-index-rockset/src/test/java/io/pixelsdb/pixels/index/rockset/TestRocksetIndex.java @@ -31,7 +31,7 @@ public void test() throws SinglePointIndexException { RocksetIndex rocksetIndex = (RocksetIndex) SinglePointIndexFactory.Instance(). getSinglePointIndex(new SinglePointIndexFactory.TableIndex( - 1L, 1L, SinglePointIndex.Scheme.rockset, true)); + 1L, 1L, SinglePointIndex.Scheme.rockset, true), null); long dbHandle = 0; diff --git a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/PixelsWriteBuffer.java b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/PixelsWriteBuffer.java index 963830ee7c..0b9b47c80f 100644 --- a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/PixelsWriteBuffer.java +++ b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/PixelsWriteBuffer.java @@ -22,7 +22,8 @@ import io.pixelsdb.pixels.common.exception.IndexException; import io.pixelsdb.pixels.common.exception.MetadataException; import io.pixelsdb.pixels.common.exception.RetinaException; -import io.pixelsdb.pixels.common.index.IndexServiceProvider; +import io.pixelsdb.pixels.common.index.IndexOption; +import io.pixelsdb.pixels.common.index.service.IndexServiceProvider; import io.pixelsdb.pixels.common.index.RowIdAllocator; import io.pixelsdb.pixels.common.metadata.MetadataService; import io.pixelsdb.pixels.common.metadata.domain.Path; @@ -124,6 +125,7 @@ public class PixelsWriteBuffer private String retinaHostName; private SinglePointIndex index; private final int virtualNodeId; + private final IndexOption indexOption; public PixelsWriteBuffer(long tableId, TypeDescription schema, Path targetOrderedDirPath, Path targetCompactDirPath, String retinaHostName, int virtualNode) throws RetinaException @@ -131,6 +133,9 @@ public PixelsWriteBuffer(long tableId, TypeDescription schema, Path targetOrdere this.tableId = tableId; this.schema = schema; this.virtualNodeId = virtualNode; + this.indexOption = IndexOption.builder() + .vNodeId(virtualNodeId) + .build(); ConfigFactory configFactory = ConfigFactory.Instance(); this.memTableSize = Integer.parseInt(configFactory.getProperty("retina.buffer.memTable.size")); @@ -412,7 +417,7 @@ private void startFlushObjectToFileScheduler(long intervalSeconds) if(index != null) { IndexServiceProvider.getService(IndexServiceProvider.ServiceMode.local) - .flushIndexEntriesOfFile(tableId, index.getId(), fileWriterManager.getFileId(), true); + .flushIndexEntriesOfFile(tableId, index.getId(), fileWriterManager.getFileId(), true, indexOption); } for (ObjectEntry objectEntry : toRemove) { diff --git a/proto/index.proto b/proto/index.proto index ed0bc95ef5..d7ed239c87 100644 --- a/proto/index.proto +++ b/proto/index.proto @@ -90,6 +90,10 @@ message SecondaryIndexEntry { // bool unique = 3; // whether this secondary index is unique } +message IndexOption { + uint32 virtualNodeId = 1; +} + message AllocateRowIdBatchRequest { uint64 tableId = 1; uint32 numRowIds = 2; // the number of row ids to allocate @@ -102,6 +106,7 @@ message AllocateRowIdBatchResponse { message LookupUniqueIndexRequest { IndexKey indexKey = 1; + IndexOption indexOption = 2; } message LookupUniqueIndexResponse { @@ -111,6 +116,7 @@ message LookupUniqueIndexResponse { message LookupNonUniqueIndexRequest { IndexKey indexKey = 1; + IndexOption indexOption = 2; } message LookupNonUniqueIndexResponse { @@ -120,6 +126,7 @@ message LookupNonUniqueIndexResponse { message PutPrimaryIndexEntryRequest { PrimaryIndexEntry indexEntry = 1; + IndexOption indexOption = 2; } message PutPrimaryIndexEntryResponse { @@ -130,6 +137,7 @@ message PutPrimaryIndexEntriesRequest { uint64 tableId = 1; uint64 indexId = 2; repeated PrimaryIndexEntry indexEntries = 3; // all entries must from the same index + IndexOption indexOption = 4; } message PutPrimaryIndexEntriesResponse { @@ -138,6 +146,7 @@ message PutPrimaryIndexEntriesResponse { message PutSecondaryIndexEntryRequest { SecondaryIndexEntry indexEntry = 1; + IndexOption indexOption = 2; } message PutSecondaryIndexEntryResponse { @@ -148,6 +157,7 @@ message PutSecondaryIndexEntriesRequest { uint64 tableId = 1; uint64 indexId = 2; repeated SecondaryIndexEntry indexEntries = 3; // all entries must from the same index + IndexOption indexOption = 4; } message PutSecondaryIndexEntriesResponse { @@ -156,6 +166,7 @@ message PutSecondaryIndexEntriesResponse { message DeletePrimaryIndexEntryRequest { IndexKey indexKey = 1; + IndexOption indexOption = 2; } message DeletePrimaryIndexEntryResponse { @@ -167,6 +178,7 @@ message DeletePrimaryIndexEntriesRequest { uint64 tableId = 1; uint64 indexId = 2; repeated IndexKey indexKeys = 3; // all entries must from the same index + IndexOption indexOption = 4; } message DeletePrimaryIndexEntriesResponse { @@ -176,6 +188,7 @@ message DeletePrimaryIndexEntriesResponse { message DeleteSecondaryIndexEntryRequest { IndexKey indexKey = 1; + IndexOption indexOption = 2; } message DeleteSecondaryIndexEntryResponse { @@ -187,6 +200,7 @@ message DeleteSecondaryIndexEntriesRequest { uint64 tableId = 1; uint64 indexId = 2; repeated IndexKey indexKeys = 3; // all entries must from the same index + IndexOption indexOption = 4; } message DeleteSecondaryIndexEntriesResponse { @@ -196,6 +210,7 @@ message DeleteSecondaryIndexEntriesResponse { message UpdatePrimaryIndexEntryRequest { PrimaryIndexEntry indexEntry = 1; + IndexOption indexOption = 2; } message UpdatePrimaryIndexEntryResponse { @@ -207,6 +222,7 @@ message UpdatePrimaryIndexEntriesRequest { uint64 tableId = 1; uint64 indexId = 2; repeated PrimaryIndexEntry indexEntries = 3; // all entries must from the same index + IndexOption indexOption = 4; } message UpdatePrimaryIndexEntriesResponse { @@ -216,6 +232,7 @@ message UpdatePrimaryIndexEntriesResponse { message UpdateSecondaryIndexEntryRequest { SecondaryIndexEntry indexEntry = 1; + IndexOption indexOption = 2; } message UpdateSecondaryIndexEntryResponse { @@ -227,6 +244,7 @@ message UpdateSecondaryIndexEntriesRequest { uint64 tableId = 1; uint64 indexId = 2; repeated SecondaryIndexEntry indexEntries = 3; // all entries must from the same index + IndexOption indexOption = 4; } message UpdateSecondaryIndexEntriesResponse { @@ -239,6 +257,7 @@ message PurgeIndexEntriesRequest { uint64 indexId = 2; repeated IndexKey indexKeys = 3; // all entries must from the same index bool isPrimary = 4; + IndexOption indexOption = 5; } message PurgeIndexEntriesResponse { @@ -260,6 +279,7 @@ message OpenIndexRequest { uint64 tableId = 1; uint64 indexId = 2; bool isPrimary = 3; + IndexOption indexOption = 4; } message OpenIndexResponse { @@ -270,6 +290,7 @@ message CloseIndexRequest { uint64 tableId = 1; uint64 indexId = 2; bool isPrimary = 3; + IndexOption indexOption = 4; } message CloseIndexResponse { @@ -280,6 +301,7 @@ message RemoveIndexRequest { uint64 tableId = 1; uint64 indexId = 2; bool isPrimary = 3; + IndexOption indexOption = 4; } message RemoveIndexResponse { diff --git a/proto/metadata.proto b/proto/metadata.proto index e3ae4f6897..61c75c0b52 100644 --- a/proto/metadata.proto +++ b/proto/metadata.proto @@ -32,6 +32,7 @@ package metadata.proto; service MetadataService { rpc CreateSchema (CreateSchemaRequest) returns (CreateSchemaResponse); rpc ExistSchema (ExistSchemaRequest) returns (ExistSchemaResponse); + rpc GetSchemaById (GetSchemaByIdRequest) returns (GetSchemaByIdResponse); rpc GetSchemas (GetSchemasRequest) returns (GetSchemasResponse); rpc DropSchema (DropSchemaRequest) returns (DropSchemaResponse); rpc CreateTable (CreateTableRequest) returns (CreateTableResponse); @@ -248,6 +249,16 @@ message ResponseHeader { } // schema +message GetSchemaByIdRequest { + RequestHeader header = 1; + uint64 schemaId = 2; +} + +message GetSchemaByIdResponse { + ResponseHeader header = 1; + Schema schema = 2; +} + message GetSchemasRequest { RequestHeader header = 1; }