You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
For Cassandra backend, by default, Janusgraph batches CQL mutation statements for graph addition/deletion within each transaction. The batching behavior is controlled by storage.cql.batch-statement-size. However, I found that if a transaction requires, say 200 mutation statements, Janusgraph does not generate 10 batch CQL statements (with 20 mutations each). Instead, there are many more batch statements with lower batch-size, some even containing just one statement.
private void mutateManyUnlogged(final Map<String, Map<StaticBuffer, KCVMutation>> mutations, final StoreTransaction txh) throws BackendException {
final MaskedTimestamp commitTime = new MaskedTimestamp(txh);
final Future<Seq<ResultSet>> result = Future.sequence(this.executorService, Iterator.ofAll(mutations.entrySet()).flatMap(tableNameAndMutations -> {
final String tableName = tableNameAndMutations.getKey();
final Map<StaticBuffer, KCVMutation> tableMutations = tableNameAndMutations.getValue();
final CQLKeyColumnValueStore columnValueStore = Option.of(this.openStores.get(tableName))
.getOrElseThrow(() -> new IllegalStateException("Store cannot be found: " + tableName));
return Iterator.ofAll(tableMutations.entrySet()).flatMap(keyAndMutations -> {
final StaticBuffer key = keyAndMutations.getKey();
final KCVMutation keyMutations = keyAndMutations.getValue();
final Iterator<Statement> deletions = Iterator.of(commitTime.getDeletionTime(this.times))
.flatMap(deleteTime -> Iterator.ofAll(keyMutations.getDeletions()).map(deletion -> columnValueStore.deleteColumn(key, deletion, deleteTime)));
final Iterator<Statement> additions = Iterator.of(commitTime.getAdditionTime(this.times))
.flatMap(addTime -> Iterator.ofAll(keyMutations.getAdditions()).map(addition -> columnValueStore.insertColumn(key, addition, addTime)));
return Iterator.concat(deletions, additions)
.grouped(this.batchSize)
.map(group -> Future.fromJavaFuture(this.executorService,
this.session.executeAsync(
new BatchStatement(Type.UNLOGGED)
.addAll(group)
.setConsistencyLevel(getTransaction(txh).getWriteConsistencyLevel()))));
});
}));
result.await();
if (result.isFailure()) {
throw EXCEPTION_MAPPER.apply(result.getCause().get());
}
sleepAfterWrite(txh, commitTime);
}
In contrast, when atomic (logged) batching is turned on, batching is performed across keys, which confirms that batching across keys is not necessarily wrong and the unlogged batching implementation is potentially incorrect. Here's the logged (atomic) batching logic:
private void mutateManyLogged(final Map<String, Map<StaticBuffer, KCVMutation>> mutations, final StoreTransaction txh) throws BackendException {
final MaskedTimestamp commitTime = new MaskedTimestamp(txh);
final BatchStatement batchStatement = new BatchStatement(Type.LOGGED);
batchStatement.setConsistencyLevel(getTransaction(txh).getWriteConsistencyLevel());
batchStatement.addAll(Iterator.ofAll(mutations.entrySet()).flatMap(tableNameAndMutations -> {
final String tableName = tableNameAndMutations.getKey();
final Map<StaticBuffer, KCVMutation> tableMutations = tableNameAndMutations.getValue();
final CQLKeyColumnValueStore columnValueStore = Option.of(this.openStores.get(tableName))
.getOrElseThrow(() -> new IllegalStateException("Store cannot be found: " + tableName));
return Iterator.ofAll(tableMutations.entrySet()).flatMap(keyAndMutations -> {
final StaticBuffer key = keyAndMutations.getKey();
final KCVMutation keyMutations = keyAndMutations.getValue();
final Iterator<Statement> deletions = Iterator.of(commitTime.getDeletionTime(this.times))
.flatMap(deleteTime -> Iterator.ofAll(keyMutations.getDeletions()).map(deletion -> columnValueStore.deleteColumn(key, deletion, deleteTime)));
final Iterator<Statement> additions = Iterator.of(commitTime.getAdditionTime(this.times))
.flatMap(addTime -> Iterator.ofAll(keyMutations.getAdditions()).map(addition -> columnValueStore.insertColumn(key, addition, addTime)));
return Iterator.concat(deletions, additions);
});
}));
final Future<ResultSet> result = Future.fromJavaFuture(this.executorService, this.session.executeAsync(batchStatement));
result.await();
if (result.isFailure()) {
throw EXCEPTION_MAPPER.apply(result.getCause().get());
}
sleepAfterWrite(txh, commitTime);
}
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
Uh oh!
There was an error while loading. Please reload this page.
-
For Cassandra backend, by default, Janusgraph batches CQL mutation statements for graph addition/deletion within each transaction. The batching behavior is controlled by
storage.cql.batch-statement-size
. However, I found that if a transaction requires, say 200 mutation statements, Janusgraph does not generate 10 batch CQL statements (with 20 mutations each). Instead, there are many more batch statements with lower batch-size, some even containing just one statement.I think the reason is that the following batching code only batches requests for the same keys. Batching does not happen on mutation statements across keys: https://github.com/JanusGraph/janusgraph/blob/v0.5/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLStoreManager.java#L496-L529
In contrast, when atomic (logged) batching is turned on, batching is performed across keys, which confirms that batching across keys is not necessarily wrong and the unlogged batching implementation is potentially incorrect. Here's the logged (atomic) batching logic:
Beta Was this translation helpful? Give feedback.
All reactions