Skip to content

Commit 530add2

Browse files
authored
[Issue #1283] implement upsert mode for Retina indexing to handle missing keys (#1285)
1 parent 8f004fa commit 530add2

File tree

9 files changed

+232
-55
lines changed

9 files changed

+232
-55
lines changed

pixels-common/src/main/java/io/pixelsdb/pixels/common/index/service/LocalIndexService.java

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.pixelsdb.pixels.common.exception.RowIdException;
2525
import io.pixelsdb.pixels.common.exception.SinglePointIndexException;
2626
import io.pixelsdb.pixels.common.index.*;
27+
import io.pixelsdb.pixels.common.utils.ConfigFactory;
2728
import io.pixelsdb.pixels.index.IndexProto;
2829

2930
import java.util.ArrayList;
@@ -32,13 +33,16 @@
3233
public class LocalIndexService implements IndexService
3334
{
3435
private static final LocalIndexService defaultInstance = new LocalIndexService();
35-
36+
private static boolean upsertMode;
3637
public static LocalIndexService Instance()
3738
{
3839
return defaultInstance;
3940
}
4041

41-
private LocalIndexService() {}
42+
private LocalIndexService()
43+
{
44+
upsertMode = Boolean.parseBoolean(ConfigFactory.Instance().getProperty("retina.upsert-mode.enabled"));
45+
}
4246

4347
@Override
4448
public IndexProto.RowIdBatch allocateRowIdBatch(long tableId, int numRowIds) throws IndexException
@@ -251,6 +255,10 @@ public IndexProto.RowLocation deletePrimaryIndexEntry(IndexProto.IndexKey key, I
251255
long prevRowId = singlePointIndex.deleteUniqueEntry(key);
252256
if (prevRowId < 0)
253257
{
258+
if (upsertMode)
259+
{
260+
return null;
261+
}
254262
throw new IndexException("Primary index entry not found for tableId=" + tableId + ", indexId=" + indexId);
255263
}
256264
IndexProto.RowLocation location = mainIndex.getLocation(prevRowId);
@@ -278,6 +286,10 @@ public List<IndexProto.RowLocation> deletePrimaryIndexEntries(
278286
List<Long> prevRowIds = singlePointIndex.deleteEntries(keys);
279287
if (prevRowIds == null || prevRowIds.isEmpty())
280288
{
289+
if (upsertMode)
290+
{
291+
return new ArrayList<>();
292+
}
281293
throw new IndexException("Primary index entries not found for tableId="
282294
+ tableId + ", indexId=" + indexId);
283295
}
@@ -348,7 +360,7 @@ public IndexProto.RowLocation updatePrimaryIndexEntry(IndexProto.PrimaryIndexEnt
348360
SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId, indexOption);
349361
// update the entry in the single point index and get the previous row ID
350362
long prevRowId = singlePointIndex.updatePrimaryEntry(key, indexEntry.getRowId());
351-
IndexProto.RowLocation prevLocation;
363+
IndexProto.RowLocation prevLocation = null;
352364
if (prevRowId >= 0)
353365
{
354366
// retrieve the previous RowLocation from the main index
@@ -360,7 +372,10 @@ public IndexProto.RowLocation updatePrimaryIndexEntry(IndexProto.PrimaryIndexEnt
360372
}
361373
else
362374
{
363-
throw new IndexException("Failed to get previous row id for tableId=" + tableId + ", indexId=" + indexId);
375+
if (!upsertMode)
376+
{
377+
throw new IndexException("Failed to get previous row id for tableId=" + tableId + ", indexId=" + indexId);
378+
}
364379
}
365380
boolean mainSuccess = mainIndex.putEntry(indexEntry.getRowId(), indexEntry.getRowLocation());
366381
if (!mainSuccess)
@@ -393,13 +408,21 @@ public IndexProto.RowLocation updatePrimaryIndexEntry(IndexProto.PrimaryIndexEnt
393408
List<Long> prevRowIds = singlePointIndex.updatePrimaryEntries(indexEntries);
394409
if (prevRowIds == null || prevRowIds.isEmpty())
395410
{
396-
throw new IndexException("Failed to get previous row ids for tableId=" + tableId + ", indexId=" + indexId);
411+
if (!upsertMode)
412+
{
413+
throw new IndexException("Failed to get previous row ids for tableId=" + tableId + ", indexId=" + indexId);
414+
}
415+
prevRowIds = new ArrayList<>();
397416
}
398417
List<IndexProto.RowLocation> prevRowLocations = mainIndex.getLocations(prevRowIds);
399418
if (prevRowLocations == null || prevRowLocations.isEmpty())
400419
{
401-
throw new IndexException("Failed to get previous row locations for tableId=" +
402-
tableId + ", indexId=" + indexId);
420+
if (!(upsertMode && prevRowIds.isEmpty()))
421+
{
422+
throw new IndexException("Failed to get previous row locations for tableId=" +
423+
tableId + ", indexId=" + indexId);
424+
}
425+
403426
}
404427
for (Boolean mainSuccess : mainIndex.putEntries(indexEntries))
405428
{

pixels-common/src/main/resources/pixels.properties

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,8 @@ retina.reader.prefetch.threads=8
293293
retina.service.init.threads=32
294294
# interval seconds, -1 means disabled,
295295
retina.metrics.log.interval=-1
296+
# Enable upsert mode to treat missing keys in UPDATE as INSERT and ignore missing keys in DELETE
297+
retina.upsert-mode.enabled=false
296298
# offloading threshold for long query in seconds
297299
pixels.transaction.offload.threshold=1800
298300
# snapshot storage directory

pixels-core/src/main/java/io/pixelsdb/pixels/core/utils/DatetimeUtils.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,11 @@ public static int sqlDateToDay (Date date)
119119
*/
120120
public static int stringDateToDay(String date)
121121
{
122+
if (date == null) return 0;
123+
if (date.contains(" ") || date.contains("T")) {
124+
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("[yyyy-MM-dd HH:mm:ss][yyyy-MM-dd'T'HH:mm:ss]");
125+
return (int) LocalDateTime.parse(date, formatter).toLocalDate().toEpochDay();
126+
}
122127
return (int) LocalDate.parse(date).toEpochDay();
123128
}
124129

pixels-core/src/main/java/io/pixelsdb/pixels/core/vector/ByteColumnVector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public void add(byte value)
7575
@Override
7676
public void add(byte[] value)
7777
{
78-
if(checkBytesNull(value))
78+
if (checkBytesNull(value))
7979
{
8080
return;
8181
}

pixels-core/src/main/java/io/pixelsdb/pixels/core/vector/ColumnVector.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -515,14 +515,14 @@ protected void deserializeBase(ColumnVectorBaseFlat base)
515515
*/
516516
protected boolean checkBytesNull(byte[] value)
517517
{
518-
if (value == null)
518+
if (value == null || value.length == 0)
519519
{
520520
if(writeIndex >= getLength())
521521
{
522522
ensureSize(writeIndex * 2, true);
523-
isNull[writeIndex++] = true;
524-
return true;
525523
}
524+
isNull[writeIndex++] = true;
525+
return true;
526526
}
527527
return false;
528528
}
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Copyright 2025 PixelsDB.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package io.pixelsdb.pixels.daemon.index;
19+
20+
import com.google.protobuf.ByteString;
21+
import io.pixelsdb.pixels.common.index.IndexOption;
22+
import io.pixelsdb.pixels.common.index.service.LocalIndexService;
23+
import io.pixelsdb.pixels.common.utils.ConfigFactory;
24+
import io.pixelsdb.pixels.index.IndexProto;
25+
import org.junit.jupiter.api.*;
26+
27+
import java.util.Collections;
28+
import java.util.List;
29+
30+
import static org.junit.jupiter.api.Assertions.*;
31+
import static org.junit.jupiter.api.Assertions.assertNotNull;
32+
import static org.junit.jupiter.api.Assertions.assertNull;
33+
34+
public class TestIndexUpsert
35+
{
36+
private static LocalIndexService indexService;
37+
private static final long TABLE_ID = 3622L;
38+
private static final long PRIMARY_INDEX_ID = 3560L;
39+
private static IndexOption indexOption;
40+
private static IndexProto.PrimaryIndexEntry missingEntry;
41+
42+
@BeforeAll
43+
static void setup() throws Exception
44+
{
45+
ConfigFactory.Instance().addProperty("retina.upsert-mode.enabled", "true");
46+
indexService = LocalIndexService.Instance();
47+
indexOption = IndexOption.builder().vNodeId(0).build();
48+
// 1. Enable Upsert Mode (Ensure your config utility supports this)
49+
// PixelsConfig.set("retina.index.upsert-mode.enabled", "true");
50+
51+
indexService.openIndex(TABLE_ID, PRIMARY_INDEX_ID, true, indexOption);
52+
53+
// Define an entry that is NOT in the database yet
54+
missingEntry = IndexProto.PrimaryIndexEntry.newBuilder()
55+
.setRowId(9999L)
56+
.setIndexKey(IndexProto.IndexKey.newBuilder()
57+
.setTableId(TABLE_ID)
58+
.setIndexId(PRIMARY_INDEX_ID)
59+
.setKey(ByteString.copyFromUtf8("ghost_key"))
60+
.setTimestamp(System.currentTimeMillis()))
61+
.setRowLocation(IndexProto.RowLocation.newBuilder()
62+
.setFileId(10)
63+
.setRgId(1)
64+
.setRgRowOffset(500))
65+
.build();
66+
}
67+
68+
@Test
69+
@Order(1)
70+
@DisplayName("Test UPDATE on missing key (Should Insert)")
71+
void testUpdateMissingKey() throws Exception
72+
{
73+
// In standard mode, this would throw IndexException.
74+
// In Upsert mode, it returns null and inserts the data.
75+
IndexProto.RowLocation prevLocation = indexService.updatePrimaryIndexEntry(missingEntry, indexOption);
76+
77+
// Assertions
78+
assertNull(prevLocation, "Upsert should return null when no previous entry exists");
79+
80+
// Verify the data was actually inserted
81+
IndexProto.RowLocation currentLoc = indexService.lookupUniqueIndex(missingEntry.getIndexKey(), indexOption);
82+
assertNotNull(currentLoc, "Entry should have been inserted by the update call");
83+
assertEquals(10, currentLoc.getFileId());
84+
}
85+
86+
@Test
87+
@Order(2)
88+
@DisplayName("Test DELETE on missing key (Should Ignore)")
89+
void testDeleteMissingKey() throws Exception
90+
{
91+
// Create a key that definitely doesn't exist
92+
IndexProto.IndexKey nonExistentKey = missingEntry.getIndexKey().toBuilder()
93+
.setKey(ByteString.copyFromUtf8("never_existed"))
94+
.build();
95+
96+
// In Upsert mode, this should NOT throw exception
97+
IndexProto.RowLocation deletedLocation = indexService.deletePrimaryIndexEntry(nonExistentKey, indexOption);
98+
99+
assertNull(deletedLocation, "Delete on missing key should return null in upsert mode");
100+
}
101+
102+
@Test
103+
@Order(3)
104+
@DisplayName("Test Batch UPDATE with missing keys")
105+
void testBatchUpdateUpsert() throws Exception
106+
{
107+
List<IndexProto.PrimaryIndexEntry> entries = Collections.singletonList(
108+
missingEntry.toBuilder()
109+
.setRowId(8888L)
110+
.setIndexKey(missingEntry.getIndexKey().toBuilder().setKey(ByteString.copyFromUtf8("batch_ghost")))
111+
.build()
112+
);
113+
114+
// Should execute without throwing exception
115+
List<IndexProto.RowLocation> prevLocations = indexService.updatePrimaryIndexEntries(TABLE_ID, PRIMARY_INDEX_ID, entries, indexOption);
116+
117+
// Even if some or all were missing, the returned list can be empty or only contain found locations
118+
assertNotNull(prevLocations);
119+
}
120+
121+
@AfterAll
122+
static void tearDown() throws Exception
123+
{
124+
indexService.removeIndex(TABLE_ID, PRIMARY_INDEX_ID, true, indexOption);
125+
// PixelsConfig.set("retina.index.upsert-mode.enabled", "false");
126+
}
127+
}

pixels-index/pixels-index-rocksdb/src/main/java/io/pixelsdb/pixels/index/rocksdb/RocksDBFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ private static ColumnFamilyDescriptor createCFDescriptor(byte[] name, Integer ke
188188
int fixedLengthPrefix = Integer.parseInt(config.getProperty("index.rocksdb.prefix.length"));
189189
if(keyLen != null)
190190
{
191-
fixedLengthPrefix = keyLen + Long.BYTES; // key buffer + index id
191+
fixedLengthPrefix = Long.BYTES + keyLen; // index id + key buffer
192192
}
193193
CompactionStyle compactionStyle = CompactionStyle.valueOf(config.getProperty("index.rocksdb.compaction.style"));
194194

pixels-index/pixels-index-rocksdb/src/main/java/io/pixelsdb/pixels/index/rocksdb/RocksDBIndex.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,9 @@ public long getUniqueRowIdInternal(IndexProto.IndexKey key) throws SinglePointIn
9898
throw new SinglePointIndexException("getUniqueRowId should only be called on unique index");
9999
}
100100
ReadOptions readOptions = RocksDBThreadResources.getReadOptions();
101-
readOptions.setPrefixSameAsStart(true);
101+
readOptions.setPrefixSameAsStart(true)
102+
.setTotalOrderSeek(false)
103+
.setVerifyChecksums(false);
102104
ByteBuffer keyBuffer = toKeyBuffer(key);
103105
long rowId = -1L;
104106
try (RocksIterator iterator = rocksDB.newIterator(columnFamilyHandle, readOptions))

0 commit comments

Comments
 (0)