Skip to content

Commit e8b5d2d

Browse files
committed
Reindex subset of vertices
Signed-off-by: ntisseyre <[email protected]>
1 parent 3b8843f commit e8b5d2d

File tree

20 files changed

+521
-14
lines changed

20 files changed

+521
-14
lines changed

janusgraph-backend-testutils/src/main/java/org/janusgraph/graphdb/JanusGraphBaseTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -747,7 +747,7 @@ public static void evaluateQuery(JanusGraphQuery query, ElementCategory resultTy
747747
}
748748

749749
protected ScanMetrics executeScanJob(VertexScanJob job) throws Exception {
750-
return executeScanJob(VertexJobConverter.convert(graph,job));
750+
return executeScanJob(VertexJobConverter.convert(graph, job, null));
751751
}
752752

753753
protected ScanMetrics executeScanJob(ScanJob job) throws Exception {

janusgraph-backend-testutils/src/main/java/org/janusgraph/graphdb/JanusGraphIndexTest.java

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1451,6 +1451,88 @@ public void testCompositeVsMixedIndexing() {
14511451
assertTrue(tx.traversal().V().has("intId2", 234).hasNext());
14521452
}
14531453

1454+
@Test
1455+
public void testSubsetReindex() throws Exception {
1456+
1457+
clopen(option(FORCE_INDEX_USAGE), true);
1458+
1459+
mgmt.makeVertexLabel("cat").make();
1460+
mgmt.makeVertexLabel("dog").make();
1461+
1462+
makeKey("id", Integer.class);
1463+
makeKey("name", String.class);
1464+
final PropertyKey typeKey = makeKey("type", String.class);
1465+
1466+
String typeIndex = "searchByType";
1467+
mgmt.buildIndex(typeIndex, Vertex.class)
1468+
.addKey(typeKey)
1469+
.buildCompositeIndex();
1470+
mgmt.commit();
1471+
1472+
//Cats
1473+
int catsCount = 3;
1474+
for (int i = 0; i < catsCount; i++) {
1475+
Vertex v = tx.addVertex("cat");
1476+
v.property("id", i);
1477+
v.property("name", "cat_" + i);
1478+
v.property("type", "cat");
1479+
}
1480+
1481+
//Dogs
1482+
for (int i = 0; i < 5; i++) {
1483+
Vertex v = tx.addVertex("dog");
1484+
v.property("id", i);
1485+
v.property("name", "dog_" + i);
1486+
v.property("type", "dog");
1487+
}
1488+
1489+
tx.commit();
1490+
1491+
//Select a subset of vertices to index
1492+
clopen(option(FORCE_INDEX_USAGE), true);
1493+
List<Vertex> cats = tx.traversal().V().has("type", "cat").toList();
1494+
assertEquals(catsCount, cats.size());
1495+
1496+
List<Vertex> dogs = tx.traversal().V().has("type", "dog").toList();
1497+
assertEquals(5, dogs.size());
1498+
tx.rollback();
1499+
1500+
//Create new Index
1501+
graph.getOpenTransactions().forEach(JanusGraphTransaction::rollback);
1502+
mgmt = graph.openManagement();
1503+
mgmt.getOpenInstances().stream().filter(i -> !i.contains("current")).forEach(i -> mgmt.forceCloseInstance(i));
1504+
mgmt.commit();
1505+
1506+
String catsNameIndex = "searchByName_CatsOnly";
1507+
mgmt = graph.openManagement();
1508+
mgmt.buildIndex(catsNameIndex, Vertex.class)
1509+
.addKey(mgmt.getPropertyKey("name"))
1510+
.indexOnly(mgmt.getVertexLabel("cat"))
1511+
.buildCompositeIndex();
1512+
mgmt.commit();
1513+
1514+
//Make Index as REGISTERED
1515+
mgmt = graph.openManagement();
1516+
mgmt.updateIndex(mgmt.getGraphIndex(catsNameIndex), SchemaAction.REGISTER_INDEX).get();
1517+
mgmt.commit();
1518+
ManagementSystem.awaitGraphIndexStatus(graph, catsNameIndex).status(SchemaStatus.REGISTERED).call();
1519+
1520+
//Reindex a given subset
1521+
mgmt = graph.openManagement();
1522+
mgmt.updateIndex(mgmt.getGraphIndex(catsNameIndex), SchemaAction.REINDEX,
1523+
cats.stream().map(Element::id).collect(Collectors.toList())).get();
1524+
mgmt.commit();
1525+
ManagementSystem.awaitGraphIndexStatus(graph, catsNameIndex).status(SchemaStatus.ENABLED).call();
1526+
1527+
clopen(option(FORCE_INDEX_USAGE), true);
1528+
1529+
for (int i = 0; i < catsCount; i++) {
1530+
List<Vertex> catsByName = tx.traversal().V().hasLabel("cat").has("name", "cat_" + i).toList();
1531+
assertEquals(1, catsByName.size());
1532+
}
1533+
tx.rollback();
1534+
}
1535+
14541536
@Test
14551537
public void testIndexInlineProperties() throws NoSuchMethodException {
14561538

janusgraph-core/src/main/java/org/janusgraph/core/schema/JanusGraphManagement.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.janusgraph.diskstorage.keycolumnvalue.scan.ScanJobFuture;
2626

2727
import java.time.Duration;
28+
import java.util.List;
2829
import java.util.Set;
2930

3031
/**
@@ -341,6 +342,17 @@ interface IndexBuilder {
341342
*/
342343
ScanJobFuture updateIndex(Index index, SchemaAction updateAction, int numOfThreads);
343344

345+
/**
346+
* Updates the provided index according to the given {@link SchemaAction} for
347+
* the given subset of vertices.
348+
*
349+
* @param index
350+
* @param updateAction
351+
* @param vertexOnly Set of vertexIds that only should be considered for index update
352+
* @return a future that completes when the index action is done
353+
*/
354+
ScanJobFuture updateIndex(Index index, SchemaAction updateAction, List<Object> vertexOnly);
355+
344356
/**
345357
* If an index update job was triggered through {@link #updateIndex(Index, SchemaAction)} with schema actions
346358
* {@link org.janusgraph.core.schema.SchemaAction#REINDEX} or {@link org.janusgraph.core.schema.SchemaAction#DISCARD_INDEX}

janusgraph-core/src/main/java/org/janusgraph/diskstorage/common/DistributedStoreManager.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.AUTH_PASSWORD;
2929
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.AUTH_USERNAME;
3030
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.CONNECTION_TIMEOUT;
31+
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.KEYS_SIZE;
3132
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.PAGE_SIZE;
3233
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.STORAGE_HOSTS;
3334
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.STORAGE_PORT;
@@ -69,6 +70,7 @@ public enum Deployment {
6970
protected final int port;
7071
protected final Duration connectionTimeoutMS;
7172
protected final int pageSize;
73+
protected final int keysSize;
7274

7375
protected final String username;
7476
protected final String password;
@@ -83,6 +85,7 @@ public DistributedStoreManager(Configuration storageConfig, int portDefault) {
8385
else this.port = portDefault;
8486
this.connectionTimeoutMS = storageConfig.get(CONNECTION_TIMEOUT);
8587
this.pageSize = storageConfig.get(PAGE_SIZE);
88+
this.keysSize = storageConfig.get(KEYS_SIZE);
8689
this.times = storageConfig.get(TIMESTAMP_PROVIDER);
8790

8891
if (storageConfig.has(AUTH_USERNAME)) {
@@ -121,6 +124,15 @@ public int getPageSize() {
121124
return pageSize;
122125
}
123126

127+
/**
128+
* Returns the default configured keys size for this storage backend. The keys size is used to determine
129+
* how many keys/partitions to request from storage within single request.
130+
* @return
131+
*/
132+
public int getKeysSize() {
133+
return keysSize;
134+
}
135+
124136
/*
125137
* TODO this should go away once we have a JanusGraphConfig that encapsulates TimestampProvider
126138
*/

janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/KeyColumnValueStore.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
package org.janusgraph.diskstorage.keycolumnvalue;
1616

17+
import org.apache.commons.lang.NotImplementedException;
1718
import org.janusgraph.diskstorage.BackendException;
1819
import org.janusgraph.diskstorage.Entry;
1920
import org.janusgraph.diskstorage.EntryList;
@@ -181,6 +182,10 @@ default Map<SliceQuery, Map<StaticBuffer, EntryList>> getMultiSlices(MultiKeysQu
181182
*/
182183
void acquireLock(StaticBuffer key, StaticBuffer column, StaticBuffer expectedValue, StoreTransaction txh) throws BackendException;
183184

185+
default KeyIterator getKeys(final List<StaticBuffer> keys, final SliceQuery query, final StoreTransaction txh) throws BackendException {
186+
throw new NotImplementedException();
187+
}
188+
184189
/**
185190
* Returns a {@link KeyIterator} over all keys that fall within the key-range specified by the given query and have one or more columns matching the column-range.
186191
* Calling {@link KeyIterator#getEntries()} returns the list of all entries that match the column-range specified by the given query.

janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/scan/MultiThreadsRowsCollector.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ class MultiThreadsRowsCollector extends RowsCollector {
6161
private final StoreTransaction storeTx;
6262
private final List<SliceQuery> queries;
6363
private final Predicate<StaticBuffer> keyFilter;
64+
private final List<StaticBuffer> keysToScan;
6465
private final Configuration graphConfiguration;
6566
private final DataPuller[] pullThreads;
6667
private final BlockingQueue<SliceResult>[] dataQueues;
@@ -72,6 +73,7 @@ class MultiThreadsRowsCollector extends RowsCollector {
7273
StoreTransaction storeTx,
7374
List<SliceQuery> queries,
7475
Predicate<StaticBuffer> keyFilter,
76+
List<StaticBuffer> keysToScan,
7577
BlockingQueue<Row> rowQueue,
7678
Configuration graphConfiguration) throws BackendException {
7779

@@ -80,6 +82,7 @@ class MultiThreadsRowsCollector extends RowsCollector {
8082
this.storeTx = storeTx;
8183
this.queries = queries;
8284
this.keyFilter = keyFilter;
85+
this.keysToScan = keysToScan;
8386
this.graphConfiguration = graphConfiguration;
8487

8588
this.dataQueues = new BlockingQueue[queries.size()];
@@ -189,8 +192,14 @@ private void addDataPuller(SliceQuery sq, StoreTransaction stx, int pos) throws
189192
this.graphConfiguration.get(GraphDatabaseConfiguration.PAGE_SIZE));
190193
dataQueues[pos] = queue;
191194

192-
DataPuller dp = new DataPuller(sq, queue,
193-
KCVSUtil.getKeys(store,sq,storeFeatures, MAX_KEY_LENGTH,stx), keyFilter);
195+
KeyIterator keyIterator;
196+
if (keysToScan != null) {
197+
keyIterator = store.getKeys(keysToScan, sq, stx);
198+
} else {
199+
keyIterator = KCVSUtil.getKeys(store, sq, storeFeatures, MAX_KEY_LENGTH, stx);
200+
}
201+
202+
DataPuller dp = new DataPuller(sq, queue, keyIterator, keyFilter);
194203
pullThreads[pos] = dp;
195204
dp.setName("data-puller-" + pos); // setting the name for thread dumps!
196205
dp.start();

janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/scan/ScanJob.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,14 @@ default void workerIterationEnd(ScanMetrics metrics) {}
113113
*/
114114
List<SliceQuery> getQueries();
115115

116+
/**
117+
* Get keys to scan by the job. If stream is empty, all keys will be scanned.
118+
* @return
119+
*/
120+
default List<StaticBuffer> getKeysToScan() {
121+
return null;
122+
}
123+
116124
/**
117125
* A predicate that determines whether
118126
* {@link #process(org.janusgraph.diskstorage.StaticBuffer, java.util.Map, ScanMetrics)}

janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/scan/StandardScannerExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ private RowsCollector buildScanner(BlockingQueue<Row> processorQueue, List<Slice
167167
job.getKeyFilter(), processorQueue);
168168
} else {
169169
return new MultiThreadsRowsCollector(store, storeFeatures, storeTx, queries,
170-
job.getKeyFilter(), processorQueue, graphConfiguration);
170+
job.getKeyFilter(), job.getKeysToScan(), processorQueue, graphConfiguration);
171171
}
172172
}
173173

janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/MetricInstrumentedStore.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,18 @@ public void acquireLock(final StaticBuffer key,
165165
});
166166
}
167167

168+
@Override
169+
public KeyIterator getKeys(final List<StaticBuffer> keys, final SliceQuery query, final StoreTransaction txh) throws BackendException {
170+
return runWithMetrics(txh, metricsStoreName, M_GET_KEYS, () -> {
171+
final KeyIterator ki = backend.getKeys(keys, query, txh);
172+
if (txh.getConfiguration().hasGroupName()) {
173+
return MetricInstrumentedIterator.of(ki, txh.getConfiguration().getGroupName(), metricsStoreName, M_GET_KEYS, M_ITERATOR);
174+
} else {
175+
return ki;
176+
}
177+
});
178+
}
179+
168180
@Override
169181
public KeyIterator getKeys(final KeyRangeQuery query, final StoreTransaction txh) throws BackendException {
170182
return runWithMetrics(txh, metricsStoreName, M_GET_KEYS, () -> {

janusgraph-core/src/main/java/org/janusgraph/graphdb/configuration/GraphDatabaseConfiguration.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -830,6 +830,10 @@ public boolean apply(@Nullable String s) {
830830
"up to this many elements.",
831831
ConfigOption.Type.MASKABLE, 100);
832832

833+
public static final ConfigOption<Integer> KEYS_SIZE = new ConfigOption<>(STORAGE_NS,"keys-size",
834+
"The maximum amount of keys/partitions to retrieve from distributed storage system by JanusGraph in a single request.",
835+
ConfigOption.Type.MASKABLE, 100);
836+
833837
public static final ConfigOption<Boolean> DROP_ON_CLEAR = new ConfigOption<>(STORAGE_NS, "drop-on-clear",
834838
"Whether to drop the graph database (true) or delete rows (false) when clearing storage. " +
835839
"Note that some backends always drop the graph database when clearing storage. Also note that indices are " +

0 commit comments

Comments
 (0)