Skip to content

Commit c0f4351

Browse files
committed
Add Upsert Mode
1 parent 3b5a7ca commit c0f4351

File tree

4 files changed

+164
-7
lines changed

4 files changed

+164
-7
lines changed

pixels-common/src/main/java/io/pixelsdb/pixels/common/index/service/LocalIndexService.java

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.pixelsdb.pixels.common.exception.RowIdException;
2525
import io.pixelsdb.pixels.common.exception.SinglePointIndexException;
2626
import io.pixelsdb.pixels.common.index.*;
27+
import io.pixelsdb.pixels.common.utils.ConfigFactory;
2728
import io.pixelsdb.pixels.index.IndexProto;
2829

2930
import java.util.ArrayList;
@@ -32,13 +33,16 @@
3233
public class LocalIndexService implements IndexService
3334
{
3435
private static final LocalIndexService defaultInstance = new LocalIndexService();
35-
36+
private static boolean upsertMode;
3637
public static LocalIndexService Instance()
3738
{
3839
return defaultInstance;
3940
}
4041

41-
private LocalIndexService() {}
42+
private LocalIndexService()
43+
{
44+
upsertMode = Boolean.parseBoolean(ConfigFactory.Instance().getProperty("retina.upsert-mode.enabled"));
45+
}
4246

4347
@Override
4448
public IndexProto.RowIdBatch allocateRowIdBatch(long tableId, int numRowIds) throws IndexException
@@ -251,6 +255,10 @@ public IndexProto.RowLocation deletePrimaryIndexEntry(IndexProto.IndexKey key, I
251255
long prevRowId = singlePointIndex.deleteUniqueEntry(key);
252256
if (prevRowId < 0)
253257
{
258+
if (upsertMode)
259+
{
260+
return null;
261+
}
254262
throw new IndexException("Primary index entry not found for tableId=" + tableId + ", indexId=" + indexId);
255263
}
256264
IndexProto.RowLocation location = mainIndex.getLocation(prevRowId);
@@ -278,6 +286,10 @@ public List<IndexProto.RowLocation> deletePrimaryIndexEntries(
278286
List<Long> prevRowIds = singlePointIndex.deleteEntries(keys);
279287
if (prevRowIds == null || prevRowIds.isEmpty())
280288
{
289+
if (upsertMode)
290+
{
291+
return new ArrayList<>();
292+
}
281293
throw new IndexException("Primary index entries not found for tableId="
282294
+ tableId + ", indexId=" + indexId);
283295
}
@@ -348,7 +360,7 @@ public IndexProto.RowLocation updatePrimaryIndexEntry(IndexProto.PrimaryIndexEnt
348360
SinglePointIndex singlePointIndex = SinglePointIndexFactory.Instance().getSinglePointIndex(tableId, indexId, indexOption);
349361
// update the entry in the single point index and get the previous row ID
350362
long prevRowId = singlePointIndex.updatePrimaryEntry(key, indexEntry.getRowId());
351-
IndexProto.RowLocation prevLocation;
363+
IndexProto.RowLocation prevLocation = null;
352364
if (prevRowId >= 0)
353365
{
354366
// retrieve the previous RowLocation from the main index
@@ -360,7 +372,10 @@ public IndexProto.RowLocation updatePrimaryIndexEntry(IndexProto.PrimaryIndexEnt
360372
}
361373
else
362374
{
363-
throw new IndexException("Failed to get previous row id for tableId=" + tableId + ", indexId=" + indexId);
375+
if (!upsertMode)
376+
{
377+
throw new IndexException("Failed to get previous row id for tableId=" + tableId + ", indexId=" + indexId);
378+
}
364379
}
365380
boolean mainSuccess = mainIndex.putEntry(indexEntry.getRowId(), indexEntry.getRowLocation());
366381
if (!mainSuccess)
@@ -393,13 +408,21 @@ public IndexProto.RowLocation updatePrimaryIndexEntry(IndexProto.PrimaryIndexEnt
393408
List<Long> prevRowIds = singlePointIndex.updatePrimaryEntries(indexEntries);
394409
if (prevRowIds == null || prevRowIds.isEmpty())
395410
{
396-
throw new IndexException("Failed to get previous row ids for tableId=" + tableId + ", indexId=" + indexId);
411+
if (!upsertMode)
412+
{
413+
throw new IndexException("Failed to get previous row ids for tableId=" + tableId + ", indexId=" + indexId);
414+
}
415+
prevRowIds = new ArrayList<>();
397416
}
398417
List<IndexProto.RowLocation> prevRowLocations = mainIndex.getLocations(prevRowIds);
399418
if (prevRowLocations == null || prevRowLocations.isEmpty())
400419
{
401-
throw new IndexException("Failed to get previous row locations for tableId=" +
402-
tableId + ", indexId=" + indexId);
420+
if (!(upsertMode && prevRowIds.isEmpty()))
421+
{
422+
throw new IndexException("Failed to get previous row locations for tableId=" +
423+
tableId + ", indexId=" + indexId);
424+
}
425+
403426
}
404427
for (Boolean mainSuccess : mainIndex.putEntries(indexEntries))
405428
{

pixels-common/src/main/resources/pixels.properties

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,8 @@ retina.reader.prefetch.threads=8
293293
retina.service.init.threads=32
294294
# interval seconds, -1 means disabled,
295295
retina.metrics.log.interval=-1
296+
# Enable upsert mode to treat missing keys in UPDATE as INSERT and ignore missing keys in DELETE
297+
retina.upsert-mode.enabled=false
296298
# offloading threshold for long query in seconds
297299
pixels.transaction.offload.threshold=1800
298300
# snapshot storage directory

pixels-core/src/main/java/io/pixelsdb/pixels/core/utils/DatetimeUtils.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,11 @@ public static int sqlDateToDay (Date date)
119119
*/
120120
public static int stringDateToDay(String date)
121121
{
122+
if (date == null) return 0;
123+
if (date.contains(" ") || date.contains("T")) {
124+
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("[yyyy-MM-dd HH:mm:ss][yyyy-MM-dd'T'HH:mm:ss]");
125+
return (int) LocalDateTime.parse(date, formatter).toLocalDate().toEpochDay();
126+
}
122127
return (int) LocalDate.parse(date).toEpochDay();
123128
}
124129

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Copyright 2025 PixelsDB.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package io.pixelsdb.pixels.daemon.index;
19+
20+
import com.google.protobuf.ByteString;
21+
import io.pixelsdb.pixels.common.index.IndexOption;
22+
import io.pixelsdb.pixels.common.index.service.LocalIndexService;
23+
import io.pixelsdb.pixels.common.utils.ConfigFactory;
24+
import io.pixelsdb.pixels.index.IndexProto;
25+
import org.junit.jupiter.api.*;
26+
27+
import java.util.Collections;
28+
import java.util.List;
29+
30+
import static org.junit.jupiter.api.Assertions.*;
31+
import static org.junit.jupiter.api.Assertions.assertNotNull;
32+
import static org.junit.jupiter.api.Assertions.assertNull;
33+
34+
public class TestIndexUpsert
35+
{
36+
private static LocalIndexService indexService;
37+
private static final long TABLE_ID = 3622L;
38+
private static final long PRIMARY_INDEX_ID = 3560L;
39+
private static IndexOption indexOption;
40+
private static IndexProto.PrimaryIndexEntry missingEntry;
41+
42+
@BeforeAll
43+
static void setup() throws Exception
44+
{
45+
ConfigFactory.Instance().addProperty("retina.upsert-mode.enabled", "true");
46+
indexService = LocalIndexService.Instance();
47+
indexOption = IndexOption.builder().vNodeId(0).build();
48+
// 1. Enable Upsert Mode (Ensure your config utility supports this)
49+
// PixelsConfig.set("retina.index.upsert-mode.enabled", "true");
50+
51+
indexService.openIndex(TABLE_ID, PRIMARY_INDEX_ID, true, indexOption);
52+
53+
// Define an entry that is NOT in the database yet
54+
missingEntry = IndexProto.PrimaryIndexEntry.newBuilder()
55+
.setRowId(9999L)
56+
.setIndexKey(IndexProto.IndexKey.newBuilder()
57+
.setTableId(TABLE_ID)
58+
.setIndexId(PRIMARY_INDEX_ID)
59+
.setKey(ByteString.copyFromUtf8("ghost_key"))
60+
.setTimestamp(System.currentTimeMillis()))
61+
.setRowLocation(IndexProto.RowLocation.newBuilder()
62+
.setFileId(10)
63+
.setRgId(1)
64+
.setRgRowOffset(500))
65+
.build();
66+
}
67+
68+
@Test
69+
@Order(1)
70+
@DisplayName("Test UPDATE on missing key (Should Insert)")
71+
void testUpdateMissingKey() throws Exception
72+
{
73+
// In standard mode, this would throw IndexException.
74+
// In Upsert mode, it returns null and inserts the data.
75+
IndexProto.RowLocation prevLocation = indexService.updatePrimaryIndexEntry(missingEntry, indexOption);
76+
77+
// Assertions
78+
assertNull(prevLocation, "Upsert should return null when no previous entry exists");
79+
80+
// Verify the data was actually inserted
81+
IndexProto.RowLocation currentLoc = indexService.lookupUniqueIndex(missingEntry.getIndexKey(), indexOption);
82+
assertNotNull(currentLoc, "Entry should have been inserted by the update call");
83+
assertEquals(10, currentLoc.getFileId());
84+
}
85+
86+
@Test
87+
@Order(2)
88+
@DisplayName("Test DELETE on missing key (Should Ignore)")
89+
void testDeleteMissingKey() throws Exception
90+
{
91+
// Create a key that definitely doesn't exist
92+
IndexProto.IndexKey nonExistentKey = missingEntry.getIndexKey().toBuilder()
93+
.setKey(ByteString.copyFromUtf8("never_existed"))
94+
.build();
95+
96+
// In Upsert mode, this should NOT throw exception
97+
IndexProto.RowLocation deletedLocation = indexService.deletePrimaryIndexEntry(nonExistentKey, indexOption);
98+
99+
assertNull(deletedLocation, "Delete on missing key should return null in upsert mode");
100+
}
101+
102+
@Test
103+
@Order(3)
104+
@DisplayName("Test Batch UPDATE with missing keys")
105+
void testBatchUpdateUpsert() throws Exception
106+
{
107+
List<IndexProto.PrimaryIndexEntry> entries = Collections.singletonList(
108+
missingEntry.toBuilder()
109+
.setRowId(8888L)
110+
.setIndexKey(missingEntry.getIndexKey().toBuilder().setKey(ByteString.copyFromUtf8("batch_ghost")))
111+
.build()
112+
);
113+
114+
// Should execute without throwing exception
115+
List<IndexProto.RowLocation> prevLocations = indexService.updatePrimaryIndexEntries(TABLE_ID, PRIMARY_INDEX_ID, entries, indexOption);
116+
117+
// Even if some or all were missing, the returned list can be empty or only contain found locations
118+
assertNotNull(prevLocations);
119+
}
120+
121+
@AfterAll
122+
static void tearDown() throws Exception
123+
{
124+
indexService.removeIndex(TABLE_ID, PRIMARY_INDEX_ID, true, indexOption);
125+
// PixelsConfig.set("retina.index.upsert-mode.enabled", "false");
126+
}
127+
}

0 commit comments

Comments
 (0)