Skip to content

Commit 7e6984e

Browse files
committed
Support CH
1 parent c0f4351 commit 7e6984e

File tree

5 files changed

+74
-52
lines changed

5 files changed

+74
-52
lines changed

pixels-core/src/main/java/io/pixelsdb/pixels/core/vector/ByteColumnVector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public void add(byte value)
7575
@Override
7676
public void add(byte[] value)
7777
{
78-
if(checkBytesNull(value))
78+
if (checkBytesNull(value))
7979
{
8080
return;
8181
}

pixels-core/src/main/java/io/pixelsdb/pixels/core/vector/ColumnVector.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -515,14 +515,14 @@ protected void deserializeBase(ColumnVectorBaseFlat base)
515515
*/
516516
protected boolean checkBytesNull(byte[] value)
517517
{
518-
if (value == null)
518+
if (value == null || value.length == 0)
519519
{
520520
if(writeIndex >= getLength())
521521
{
522522
ensureSize(writeIndex * 2, true);
523-
isNull[writeIndex++] = true;
524-
return true;
525523
}
524+
isNull[writeIndex++] = true;
525+
return true;
526526
}
527527
return false;
528528
}

pixels-index/pixels-index-rocksdb/src/main/java/io/pixelsdb/pixels/index/rocksdb/RocksDBFactory.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -167,8 +167,8 @@ private static ColumnFamilyDescriptor createCFDescriptor(byte[] name, Integer ke
167167

168168
long blockSize = Long.parseLong(config.getProperty("index.rocksdb.block.size"));
169169
BlockBasedTableConfig tableConfig = new BlockBasedTableConfig()
170-
.setFilterPolicy(new BloomFilter(10, false))
171-
.setWholeKeyFiltering(false)
170+
.setFilterPolicy(new BloomFilter(20, false))
171+
.setWholeKeyFiltering(true)
172172
.setBlockSize(blockSize)
173173
.setCacheIndexAndFilterBlocks(true)
174174
.setPinL0FilterAndIndexBlocksInCache(true)
@@ -188,7 +188,7 @@ private static ColumnFamilyDescriptor createCFDescriptor(byte[] name, Integer ke
188188
int fixedLengthPrefix = Integer.parseInt(config.getProperty("index.rocksdb.prefix.length"));
189189
if(keyLen != null)
190190
{
191-
fixedLengthPrefix = keyLen + Long.BYTES; // key buffer + index id
191+
fixedLengthPrefix = Long.BYTES + keyLen; // index id + key buffer
192192
}
193193
CompactionStyle compactionStyle = CompactionStyle.valueOf(config.getProperty("index.rocksdb.compaction.style"));
194194

@@ -200,7 +200,8 @@ private static ColumnFamilyDescriptor createCFDescriptor(byte[] name, Integer ke
200200
.setWriteBufferSize(writeBufferSize)
201201
.setMaxWriteBufferNumber(maxWriteBufferNumber)
202202
.setMinWriteBufferNumberToMerge(minWriteBufferNumberToMerge)
203-
.setMemtablePrefixBloomSizeRatio(0.1)
203+
.setMemtablePrefixBloomSizeRatio(0.2)
204+
.setOptimizeFiltersForHits(false)
204205
.setTableFormatConfig(tableConfig)
205206
.setLevel0FileNumCompactionTrigger(level0FileNumCompactionTrigger)
206207
.setMaxBytesForLevelBase(maxBytesForLevelBase)

pixels-index/pixels-index-rocksdb/src/main/java/io/pixelsdb/pixels/index/rocksdb/RocksDBIndex.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,9 @@ public long getUniqueRowIdInternal(IndexProto.IndexKey key) throws SinglePointIn
9898
throw new SinglePointIndexException("getUniqueRowId should only be called on unique index");
9999
}
100100
ReadOptions readOptions = RocksDBThreadResources.getReadOptions();
101-
readOptions.setPrefixSameAsStart(true);
101+
readOptions.setPrefixSameAsStart(true)
102+
.setTotalOrderSeek(false)
103+
.setVerifyChecksums(false);
102104
ByteBuffer keyBuffer = toKeyBuffer(key);
103105
long rowId = -1L;
104106
try (RocksIterator iterator = rocksDB.newIterator(columnFamilyHandle, readOptions))
@@ -131,7 +133,8 @@ public List<Long> getRowIds(IndexProto.IndexKey key) throws SinglePointIndexExce
131133
return ImmutableList.of(getUniqueRowId(key));
132134
}
133135
Set<Long> rowIds = new HashSet<>();
134-
ReadOptions readOptions = RocksDBThreadResources.getReadOptions();
136+
// ReadOptions readOptions = RocksDBThreadResources.getReadOptions();
137+
ReadOptions readOptions = new ReadOptions();
135138
readOptions.setPrefixSameAsStart(true);
136139
ByteBuffer keyBuffer = toKeyBuffer(key);
137140
// use RocksDB iterator for prefix search

pixels-index/pixels-index-rocksdb/src/test/java/io/pixelsdb/pixels/index/rocksdb/TestRocksDB.java

Lines changed: 60 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525

2626
import java.util.*;
2727
import java.util.concurrent.CompletableFuture;
28-
import java.util.concurrent.ExecutionException;
2928
import java.util.concurrent.ExecutorService;
3029
import java.util.concurrent.Executors;
3130
import java.util.stream.Collectors;
@@ -350,50 +349,46 @@ public void testBasic()
350349
@Test
351350
public void testFullCompaction() throws RocksDBException
352351
{
353-
// Define the list of RocksDB paths
352+
// List of RocksDB storage paths
354353
List<String> dbPaths = new ArrayList<>();
355354
dbPaths.add("/home/ubuntu/disk6/collected_indexes/realtime-pixels-retina/" + "/rocksdb");
356-
for (int i = 2; i <= 8; i++)
357-
{
358-
dbPaths.add(
359-
"/home/ubuntu/disk6/collected_indexes/realtime-pixels-retina-" + i + "/rocksdb"
360-
);
361-
}
355+
// for (int i = 2; i <= 8; i++)
356+
// {
357+
// dbPaths.add(
358+
// "/home/ubuntu/disk6/collected_indexes/realtime-pixels-retina-" + i + "/rocksdb"
359+
// );
360+
// }
362361

363362
long totalStart = System.currentTimeMillis();
364363
System.out.println("Starting parallel compaction for " + dbPaths.size() + " databases.");
365-
// Define parallelism level (e.g., number of disks or cores)
366-
int parallelism = 4;
367-
ExecutorService executor = Executors.newFixedThreadPool(parallelism);
364+
365+
// Parallelism level for multiple databases (usually matches the number of disks)
366+
int dbParallelism = 4;
367+
ExecutorService dbExecutor = Executors.newFixedThreadPool(dbParallelism);
368+
368369
try
369370
{
371+
// Map each database path to an asynchronous compaction task
370372
List<CompletableFuture<Void>> futures = dbPaths.stream()
371373
.map(dbPath -> CompletableFuture.runAsync(() ->
372374
{
373375
try
374376
{
375377
executeSingleDbCompaction(dbPath);
376-
} catch (RocksDBException e)
378+
}
379+
catch (RocksDBException e)
377380
{
378-
throw new RuntimeException(e);
381+
throw new RuntimeException("Compaction failed for " + dbPath, e);
379382
}
380-
}, executor))
383+
}, dbExecutor))
381384
.collect(Collectors.toList());
382385

383-
// Block until all compaction tasks are complete
384-
for(CompletableFuture<Void> future: futures)
385-
{
386-
future.get();
387-
}
388-
} catch (ExecutionException e)
389-
{
390-
throw new RuntimeException(e);
391-
} catch (InterruptedException e)
392-
{
393-
throw new RuntimeException(e);
394-
} finally
386+
// Wait for all DB-level tasks to finish
387+
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
388+
}
389+
finally
395390
{
396-
executor.shutdown();
391+
dbExecutor.shutdown();
397392
}
398393

399394
long totalEnd = System.currentTimeMillis();
@@ -402,36 +397,59 @@ public void testFullCompaction() throws RocksDBException
402397

403398
private void executeSingleDbCompaction(String dbPath) throws RocksDBException
404399
{
405-
long openStart = System.currentTimeMillis();
406-
System.out.println("Thread [" + Thread.currentThread().getName() + "] Start Open: " + dbPath);
400+
long startTime = System.currentTimeMillis();
401+
System.out.println("Thread [" + Thread.currentThread().getName() + "] Opening DB: " + dbPath);
402+
407403
RocksDB rocksDB;
408404
Map<String, ColumnFamilyHandle> cfHandles;
409405

410-
long start = System.currentTimeMillis();
406+
// Synchronized block to safely initialize the RocksDB instance and retrieve handles
411407
synchronized (this)
412408
{
413409
rocksDB = RocksDBFactory.createRocksDB(dbPath);
414-
long openEnd = System.currentTimeMillis();
415-
System.out.println("Open Duration: " + (openEnd - openStart) + "ms\tPath: " + dbPath);
416410
cfHandles = new HashMap<>(RocksDBFactory.getAllCfHandles());
411+
// Clear handles in the factory to prevent memory leaks or reuse conflicts
417412
RocksDBFactory.clearCFHandles();
418413
}
419414

420-
System.out.println("Path: " + dbPath + " | Column Family Count: " + cfHandles.size());
421-
// Iterate through each Column Family for manual compaction
422-
for (Map.Entry<String, ColumnFamilyHandle> entry : cfHandles.entrySet())
415+
System.out.println("Path: " + dbPath + " | CF Count: " + cfHandles.size());
416+
417+
// Use parallel stream to compact multiple Column Families simultaneously
418+
// This allows RocksDB background threads to work on different CFs at once
419+
cfHandles.entrySet().parallelStream().forEach(entry ->
423420
{
424421
String cfName = entry.getKey();
425422
ColumnFamilyHandle handle = entry.getValue();
426423

427-
System.out.println("Compacting CF [" + cfName + "] in " + dbPath);
424+
try
425+
{
426+
System.out.println("Compacting CF [" + cfName + "] in " + dbPath);
428427

429-
// This is a blocking call per Column Family
430-
rocksDB.compactRange(handle);
431-
handle.close();
432-
}
433-
long end = System.currentTimeMillis();
434-
System.out.println("SUCCESS: Compaction Duration: " + (end - start) + "ms\tPath: " + dbPath);
428+
// Configure CompactRangeOptions to force a rewrite of the bottommost level
429+
// This is mandatory to apply the new 20-bit Bloom Filter configuration
430+
CompactRangeOptions options = new CompactRangeOptions()
431+
.setBottommostLevelCompaction(CompactRangeOptions.BottommostLevelCompaction.kForce)
432+
.setMaxSubcompactions(4) // Parallel threads within a single CF compaction job
433+
.setAllowWriteStall(true);
434+
435+
// Synchronous call per CF, but parallelized by the parallelStream
436+
rocksDB.compactRange(handle, null, null, options);
437+
438+
System.out.println("Finished CF [" + cfName + "] in " + dbPath);
439+
}
440+
catch (RocksDBException e)
441+
{
442+
System.err.println("Error during compaction for CF " + cfName + ": " + e.getMessage());
443+
}
444+
finally
445+
{
446+
// Close handles to release native memory
447+
handle.close();
448+
}
449+
});
450+
451+
long endTime = System.currentTimeMillis();
452+
System.out.println("SUCCESS: Path: " + dbPath + " | Duration: " + (endTime - startTime) + "ms");
435453
rocksDB.close();
436454
}
437455
}

0 commit comments

Comments
 (0)