Skip to content

Commit 53a5332

Browse files
authored
Merge pull request #4726 from ntisseyre/reindex_subset
Reindex subset of vertices
2 parents 7f6ffcf + e5859c6 commit 53a5332

File tree

22 files changed

+541
-14
lines changed

22 files changed

+541
-14
lines changed

docs/configs/janusgraph-cfg.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,7 @@ Configuration options for the storage backend. Some options are applicable only
422422
| storage.directory | Storage directory for those storage backends that require local storage. | String | (no default value) | LOCAL |
423423
| storage.drop-on-clear | Whether to drop the graph database (true) or delete rows (false) when clearing storage. Note that some backends always drop the graph database when clearing storage. Also note that indices are always dropped when clearing storage. | Boolean | true | MASKABLE |
424424
| storage.hostname | The hostname or comma-separated list of hostnames of storage backend servers. This is only applicable to some storage backends, such as cassandra and hbase. | String[] | 127.0.0.1 | LOCAL |
425+
| storage.keys-size | The maximum amount of keys/partitions to retrieve from distributed storage system by JanusGraph in a single request. | Integer | 100 | MASKABLE |
425426
| storage.num-mutations-parallel-threshold | This parameter determines the minimum number of mutations a transaction must have before parallel processing is applied during aggregation. Leveraging parallel processing can enhance the commit times for transactions involving a large number of mutations. However, it is advisable not to set the threshold too low (e.g., 0 or 1) due to the overhead associated with parallelism synchronization. This overhead is more efficiently offset in the context of larger transactions. | Integer | 100 | MASKABLE |
426427
| storage.page-size | JanusGraph break requests that may return many results from distributed storage backends into a series of requests for small chunks/pages of results, where each chunk contains up to this many elements. | Integer | 100 | MASKABLE |
427428
| storage.parallel-backend-ops | Whether JanusGraph should attempt to parallelize storage operations | Boolean | true | MASKABLE |

janusgraph-backend-testutils/src/main/java/org/janusgraph/graphdb/JanusGraphBaseTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -747,7 +747,7 @@ public static void evaluateQuery(JanusGraphQuery query, ElementCategory resultTy
747747
}
748748

749749
protected ScanMetrics executeScanJob(VertexScanJob job) throws Exception {
750-
return executeScanJob(VertexJobConverter.convert(graph,job));
750+
return executeScanJob(VertexJobConverter.convert(graph, job, null));
751751
}
752752

753753
protected ScanMetrics executeScanJob(ScanJob job) throws Exception {

janusgraph-backend-testutils/src/main/java/org/janusgraph/graphdb/JanusGraphIndexTest.java

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@
102102
import org.janusgraph.graphdb.types.vertices.JanusGraphSchemaVertex;
103103
import org.janusgraph.graphdb.vertices.CacheVertex;
104104
import org.janusgraph.testutil.TestGraphConfigs;
105+
import org.javatuples.Pair;
105106
import org.junit.jupiter.api.Tag;
106107
import org.junit.jupiter.api.Test;
107108
import org.junit.jupiter.api.TestInfo;
@@ -1451,6 +1452,94 @@ public void testCompositeVsMixedIndexing() {
14511452
assertTrue(tx.traversal().V().has("intId2", 234).hasNext());
14521453
}
14531454

1455+
@Test
1456+
public void testSubsetReindex() throws Exception {
1457+
1458+
clopen(option(FORCE_INDEX_USAGE), true);
1459+
1460+
mgmt.makeVertexLabel("cat").make();
1461+
mgmt.makeVertexLabel("dog").make();
1462+
1463+
makeKey("id", Integer.class);
1464+
makeKey("name", String.class);
1465+
final PropertyKey typeKey = makeKey("type", String.class);
1466+
1467+
String typeIndex = "searchByType";
1468+
mgmt.buildIndex(typeIndex, Vertex.class)
1469+
.addKey(typeKey)
1470+
.buildCompositeIndex();
1471+
mgmt.commit();
1472+
1473+
//Cats
1474+
int catsCount = 3;
1475+
for (int i = 0; i < catsCount; i++) {
1476+
Vertex v = tx.addVertex("cat");
1477+
v.property("id", i);
1478+
v.property("name", "cat_" + i);
1479+
v.property("type", "cat");
1480+
}
1481+
1482+
//Dogs
1483+
for (int i = 0; i < 5; i++) {
1484+
Vertex v = tx.addVertex("dog");
1485+
v.property("id", i);
1486+
v.property("name", "dog_" + i);
1487+
v.property("type", "dog");
1488+
}
1489+
1490+
tx.commit();
1491+
1492+
//Select a subset of vertices to index
1493+
clopen(option(FORCE_INDEX_USAGE), true);
1494+
List<Vertex> cats = tx.traversal().V().has("type", "cat").toList();
1495+
assertEquals(catsCount, cats.size());
1496+
String excludedCat = cats.get(cats.size() - 1).value("name");
1497+
List<Pair<Object, String>> catsSubset = cats.subList(0, cats.size() - 1).stream()
1498+
.map(kitty -> new Pair<Object, String>(kitty.id(), kitty.value("name")))
1499+
.collect(Collectors.toList());
1500+
1501+
List<Vertex> dogs = tx.traversal().V().has("type", "dog").toList();
1502+
assertEquals(5, dogs.size());
1503+
tx.rollback();
1504+
1505+
//Create new Index
1506+
graph.getOpenTransactions().forEach(JanusGraphTransaction::rollback);
1507+
mgmt = graph.openManagement();
1508+
mgmt.getOpenInstances().stream().filter(i -> !i.contains("current")).forEach(i -> mgmt.forceCloseInstance(i));
1509+
mgmt.commit();
1510+
1511+
String catsNameIndex = "searchByName_CatsOnly";
1512+
mgmt = graph.openManagement();
1513+
mgmt.buildIndex(catsNameIndex, Vertex.class)
1514+
.addKey(mgmt.getPropertyKey("name"))
1515+
.indexOnly(mgmt.getVertexLabel("cat"))
1516+
.buildCompositeIndex();
1517+
mgmt.commit();
1518+
1519+
//Make Index as REGISTERED
1520+
mgmt = graph.openManagement();
1521+
mgmt.updateIndex(mgmt.getGraphIndex(catsNameIndex), SchemaAction.REGISTER_INDEX).get();
1522+
mgmt.commit();
1523+
ManagementSystem.awaitGraphIndexStatus(graph, catsNameIndex).status(SchemaStatus.REGISTERED).call();
1524+
1525+
//Reindex a given subset
1526+
List<Object> reIndexOnlyIds = catsSubset.stream().map(Pair::getValue0).collect(Collectors.toList());
1527+
mgmt = graph.openManagement();
1528+
mgmt.updateIndex(mgmt.getGraphIndex(catsNameIndex), SchemaAction.REINDEX, reIndexOnlyIds).get();
1529+
mgmt.commit();
1530+
ManagementSystem.awaitGraphIndexStatus(graph, catsNameIndex).status(SchemaStatus.ENABLED).call();
1531+
1532+
clopen(option(FORCE_INDEX_USAGE), true);
1533+
catsSubset.forEach(kitty -> {
1534+
List<Vertex> catsByName = tx.traversal().V().hasLabel("cat").has("name", kitty.getValue1()).toList();
1535+
assertEquals(1, catsByName.size());
1536+
});
1537+
1538+
List<Vertex> catsByName = tx.traversal().V().hasLabel("cat").has("name", excludedCat).toList();
1539+
assertEquals(0, catsByName.size());
1540+
tx.rollback();
1541+
}
1542+
14541543
@Test
14551544
public void testIndexInlineProperties() throws NoSuchMethodException {
14561545

janusgraph-core/src/main/java/org/janusgraph/core/schema/JanusGraphManagement.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.janusgraph.diskstorage.keycolumnvalue.scan.ScanJobFuture;
2626

2727
import java.time.Duration;
28+
import java.util.List;
2829
import java.util.Set;
2930

3031
/**
@@ -341,6 +342,17 @@ interface IndexBuilder {
341342
*/
342343
ScanJobFuture updateIndex(Index index, SchemaAction updateAction, int numOfThreads);
343344

345+
/**
346+
* Updates the provided index according to the given {@link SchemaAction} for
347+
* the given subset of vertices.
348+
*
349+
* @param index
350+
* @param updateAction
351+
* @param vertexOnly Set of vertexIds that only should be considered for index update
352+
* @return a future that completes when the index action is done
353+
*/
354+
ScanJobFuture updateIndex(Index index, SchemaAction updateAction, List<Object> vertexOnly);
355+
344356
/**
345357
* If an index update job was triggered through {@link #updateIndex(Index, SchemaAction)} with schema actions
346358
* {@link org.janusgraph.core.schema.SchemaAction#REINDEX} or {@link org.janusgraph.core.schema.SchemaAction#DISCARD_INDEX}

janusgraph-core/src/main/java/org/janusgraph/diskstorage/common/DistributedStoreManager.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.AUTH_PASSWORD;
2929
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.AUTH_USERNAME;
3030
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.CONNECTION_TIMEOUT;
31+
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.KEYS_SIZE;
3132
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.PAGE_SIZE;
3233
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.STORAGE_HOSTS;
3334
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.STORAGE_PORT;
@@ -69,6 +70,7 @@ public enum Deployment {
6970
protected final int port;
7071
protected final Duration connectionTimeoutMS;
7172
protected final int pageSize;
73+
protected final int keysSize;
7274

7375
protected final String username;
7476
protected final String password;
@@ -83,6 +85,7 @@ public DistributedStoreManager(Configuration storageConfig, int portDefault) {
8385
else this.port = portDefault;
8486
this.connectionTimeoutMS = storageConfig.get(CONNECTION_TIMEOUT);
8587
this.pageSize = storageConfig.get(PAGE_SIZE);
88+
this.keysSize = storageConfig.get(KEYS_SIZE);
8689
this.times = storageConfig.get(TIMESTAMP_PROVIDER);
8790

8891
if (storageConfig.has(AUTH_USERNAME)) {
@@ -121,6 +124,15 @@ public int getPageSize() {
121124
return pageSize;
122125
}
123126

127+
/**
128+
* Returns the default configured keys size for this storage backend. The keys size is used to determine
129+
* how many keys/partitions to request from storage within single request.
130+
* @return
131+
*/
132+
public int getKeysSize() {
133+
return keysSize;
134+
}
135+
124136
/*
125137
* TODO this should go away once we have a JanusGraphConfig that encapsulates TimestampProvider
126138
*/

janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/KeyColumnValueStore.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
package org.janusgraph.diskstorage.keycolumnvalue;
1616

17+
import org.apache.commons.lang.NotImplementedException;
1718
import org.janusgraph.diskstorage.BackendException;
1819
import org.janusgraph.diskstorage.Entry;
1920
import org.janusgraph.diskstorage.EntryList;
@@ -181,6 +182,10 @@ default Map<SliceQuery, Map<StaticBuffer, EntryList>> getMultiSlices(MultiKeysQu
181182
*/
182183
void acquireLock(StaticBuffer key, StaticBuffer column, StaticBuffer expectedValue, StoreTransaction txh) throws BackendException;
183184

185+
default KeyIterator getKeys(final List<StaticBuffer> keys, final SliceQuery query, final StoreTransaction txh) throws BackendException {
186+
throw new NotImplementedException();
187+
}
188+
184189
/**
185190
* Returns a {@link KeyIterator} over all keys that fall within the key-range specified by the given query and have one or more columns matching the column-range.
186191
* Calling {@link KeyIterator#getEntries()} returns the list of all entries that match the column-range specified by the given query.

janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/scan/MultiThreadsRowsCollector.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ class MultiThreadsRowsCollector extends RowsCollector {
6161
private final StoreTransaction storeTx;
6262
private final List<SliceQuery> queries;
6363
private final Predicate<StaticBuffer> keyFilter;
64+
private final List<StaticBuffer> keysToScan;
6465
private final Configuration graphConfiguration;
6566
private final DataPuller[] pullThreads;
6667
private final BlockingQueue<SliceResult>[] dataQueues;
@@ -72,6 +73,7 @@ class MultiThreadsRowsCollector extends RowsCollector {
7273
StoreTransaction storeTx,
7374
List<SliceQuery> queries,
7475
Predicate<StaticBuffer> keyFilter,
76+
List<StaticBuffer> keysToScan,
7577
BlockingQueue<Row> rowQueue,
7678
Configuration graphConfiguration) throws BackendException {
7779

@@ -80,6 +82,7 @@ class MultiThreadsRowsCollector extends RowsCollector {
8082
this.storeTx = storeTx;
8183
this.queries = queries;
8284
this.keyFilter = keyFilter;
85+
this.keysToScan = keysToScan;
8386
this.graphConfiguration = graphConfiguration;
8487

8588
this.dataQueues = new BlockingQueue[queries.size()];
@@ -189,8 +192,14 @@ private void addDataPuller(SliceQuery sq, StoreTransaction stx, int pos) throws
189192
this.graphConfiguration.get(GraphDatabaseConfiguration.PAGE_SIZE));
190193
dataQueues[pos] = queue;
191194

192-
DataPuller dp = new DataPuller(sq, queue,
193-
KCVSUtil.getKeys(store,sq,storeFeatures, MAX_KEY_LENGTH,stx), keyFilter);
195+
KeyIterator keyIterator;
196+
if (keysToScan != null) {
197+
keyIterator = store.getKeys(keysToScan, sq, stx);
198+
} else {
199+
keyIterator = KCVSUtil.getKeys(store, sq, storeFeatures, MAX_KEY_LENGTH, stx);
200+
}
201+
202+
DataPuller dp = new DataPuller(sq, queue, keyIterator, keyFilter);
194203
pullThreads[pos] = dp;
195204
dp.setName("data-puller-" + pos); // setting the name for thread dumps!
196205
dp.start();

janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/scan/ScanJob.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,14 @@ default void workerIterationEnd(ScanMetrics metrics) {}
113113
*/
114114
List<SliceQuery> getQueries();
115115

116+
/**
117+
* Get keys to scan by the job. If stream is empty, all keys will be scanned.
118+
* @return
119+
*/
120+
default List<StaticBuffer> getKeysToScan() {
121+
return null;
122+
}
123+
116124
/**
117125
* A predicate that determines whether
118126
* {@link #process(org.janusgraph.diskstorage.StaticBuffer, java.util.Map, ScanMetrics)}

janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/scan/StandardScannerExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ private RowsCollector buildScanner(BlockingQueue<Row> processorQueue, List<Slice
167167
job.getKeyFilter(), processorQueue);
168168
} else {
169169
return new MultiThreadsRowsCollector(store, storeFeatures, storeTx, queries,
170-
job.getKeyFilter(), processorQueue, graphConfiguration);
170+
job.getKeyFilter(), job.getKeysToScan(), processorQueue, graphConfiguration);
171171
}
172172
}
173173

janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/MetricInstrumentedStore.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,18 @@ public void acquireLock(final StaticBuffer key,
165165
});
166166
}
167167

168+
@Override
169+
public KeyIterator getKeys(final List<StaticBuffer> keys, final SliceQuery query, final StoreTransaction txh) throws BackendException {
170+
return runWithMetrics(txh, metricsStoreName, M_GET_KEYS, () -> {
171+
final KeyIterator ki = backend.getKeys(keys, query, txh);
172+
if (txh.getConfiguration().hasGroupName()) {
173+
return MetricInstrumentedIterator.of(ki, txh.getConfiguration().getGroupName(), metricsStoreName, M_GET_KEYS, M_ITERATOR);
174+
} else {
175+
return ki;
176+
}
177+
});
178+
}
179+
168180
@Override
169181
public KeyIterator getKeys(final KeyRangeQuery query, final StoreTransaction txh) throws BackendException {
170182
return runWithMetrics(txh, metricsStoreName, M_GET_KEYS, () -> {

0 commit comments

Comments
 (0)