Skip to content

Commit 6f4c73f

Browse files
committed
fix for bad bug that didn't wait for a future
1 parent 1170856 commit 6f4c73f

File tree

3 files changed

+52
-62
lines changed

3 files changed

+52
-62
lines changed

fdb-extensions/src/main/java/com/apple/foundationdb/async/MoreAsyncUtil.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,10 @@
4646
import java.util.concurrent.ThreadFactory;
4747
import java.util.concurrent.TimeUnit;
4848
import java.util.concurrent.atomic.AtomicInteger;
49+
import java.util.concurrent.atomic.AtomicReference;
4950
import java.util.function.BiConsumer;
5051
import java.util.function.BiFunction;
5152
import java.util.function.Function;
52-
import java.util.function.IntFunction;
5353
import java.util.function.IntPredicate;
5454
import java.util.function.IntUnaryOperator;
5555
import java.util.function.Predicate;
@@ -1058,22 +1058,25 @@ public static CompletableFuture<Void> swallowException(@Nonnull CompletableFutur
10581058
return result;
10591059
}
10601060

1061-
public static CompletableFuture<Void> forLoop(final int startI, @Nonnull final IntPredicate conditionPredicate,
1062-
@Nonnull final IntUnaryOperator stepFunction,
1063-
@Nonnull final IntFunction<CompletableFuture<Void>> body,
1064-
@Nonnull final Executor executor) {
1061+
public static <U> CompletableFuture<U> forLoop(final int startI, @Nullable final U startU,
1062+
@Nonnull final IntPredicate conditionPredicate,
1063+
@Nonnull final IntUnaryOperator stepFunction,
1064+
@Nonnull final BiFunction<Integer, U, CompletableFuture<U>> body,
1065+
@Nonnull final Executor executor) {
10651066
final AtomicInteger loopVariableAtomic = new AtomicInteger(startI);
1067+
final AtomicReference<U> lastResultAtomic = new AtomicReference<>(startU);
10661068
return AsyncUtil.whileTrue(() -> {
10671069
final int loopVariable = loopVariableAtomic.get();
10681070
if (!conditionPredicate.test(loopVariable)) {
10691071
return AsyncUtil.READY_FALSE;
10701072
}
1071-
return body.apply(loopVariable)
1072-
.thenApply(ignored -> {
1073+
return body.apply(loopVariable, lastResultAtomic.get())
1074+
.thenApply(result -> {
10731075
loopVariableAtomic.set(stepFunction.applyAsInt(loopVariable));
1076+
lastResultAtomic.set(result);
10741077
return true;
10751078
});
1076-
}, executor);
1079+
}, executor).thenApply(ignored -> lastResultAtomic.get());
10771080
}
10781081

10791082
@SuppressWarnings("unchecked")

fdb-extensions/src/main/java/com/apple/foundationdb/async/hnsw/HNSW.java

Lines changed: 40 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -426,23 +426,14 @@ public CompletableFuture<? extends List<? extends NodeReferenceAndNode<? extends
426426
return CompletableFuture.completedFuture(entryState);
427427
}
428428

429-
final AtomicReference<NodeReferenceWithDistance> nodeReferenceAtomic =
430-
new AtomicReference<>(entryState);
431-
432-
return MoreAsyncUtil.forLoop(entryLayer,
433-
layer -> layer > 0,
434-
layer -> layer - 1,
435-
layer -> {
436-
final var storageAdapter = getStorageAdapterForLayer(layer);
437-
final var greedyIn = nodeReferenceAtomic.get();
438-
return greedySearchLayer(storageAdapter, readTransaction, greedyIn, layer,
439-
queryVector)
440-
.thenApply(greedyState -> {
441-
nodeReferenceAtomic.set(greedyState);
442-
return null;
443-
});
444-
}, executor)
445-
.thenApply(ignored -> nodeReferenceAtomic.get());
429+
return MoreAsyncUtil.forLoop(entryLayer, entryState,
430+
layer -> layer > 0,
431+
layer -> layer - 1,
432+
(layer, previousNodeReference) -> {
433+
final var storageAdapter = getStorageAdapterForLayer(layer);
434+
return greedySearchLayer(storageAdapter, readTransaction, previousNodeReference,
435+
layer, queryVector);
436+
}, executor);
446437
}).thenCompose(nodeReference -> {
447438
if (nodeReference == null) {
448439
return CompletableFuture.completedFuture(null);
@@ -747,46 +738,42 @@ public CompletableFuture<Void> insert(@Nonnull final Transaction transaction, @N
747738
debug(l -> l.debug("entry node with key {} at layer {}", entryNodeReference.getPrimaryKey(),
748739
lMax));
749740

750-
final AtomicReference<NodeReferenceWithDistance> nodeReferenceAtomic =
751-
new AtomicReference<>(new NodeReferenceWithDistance(entryNodeReference.getPrimaryKey(),
741+
final NodeReferenceWithDistance initialNodeReference =
742+
new NodeReferenceWithDistance(entryNodeReference.getPrimaryKey(),
752743
entryNodeReference.getVector(),
753-
Vector.comparativeDistance(metric, entryNodeReference.getVector(), newVector)));
754-
MoreAsyncUtil.forLoop(lMax,
755-
layer -> layer > insertionLayer,
756-
layer -> layer - 1,
757-
layer -> {
758-
final StorageAdapter<? extends NodeReference> storageAdapter = getStorageAdapterForLayer(layer);
759-
return greedySearchLayer(storageAdapter, transaction,
760-
nodeReferenceAtomic.get(), layer, newVector)
761-
.thenApply(nodeReference -> {
762-
nodeReferenceAtomic.set(nodeReference);
763-
return null;
764-
});
765-
}, executor);
766-
767-
debug(l -> {
768-
final NodeReference nodeReference = nodeReferenceAtomic.get();
769-
l.debug("nearest entry point at lMax={} is at key={}", lMax, nodeReference.getPrimaryKey());
770-
});
771-
772-
final AtomicReference<List<NodeReferenceWithDistance>> nearestNeighborsAtomic =
773-
new AtomicReference<>(ImmutableList.of(nodeReferenceAtomic.get()));
774-
775-
return MoreAsyncUtil.forLoop(Math.min(lMax, insertionLayer),
776-
layer -> layer >= 0,
777-
layer -> layer - 1,
778-
layer -> {
779-
final StorageAdapter<? extends NodeReference> storageAdapter = getStorageAdapterForLayer(layer);
780-
return insertIntoLayer(storageAdapter, transaction,
781-
nearestNeighborsAtomic.get(), layer, newPrimaryKey, newVector)
782-
.thenCompose(nearestNeighbors -> {
783-
nearestNeighborsAtomic.set(nearestNeighbors);
784-
return AsyncUtil.DONE;
785-
});
786-
}, executor);
744+
Vector.comparativeDistance(metric, entryNodeReference.getVector(), newVector));
745+
return MoreAsyncUtil.forLoop(lMax, initialNodeReference,
746+
layer -> layer > insertionLayer,
747+
layer -> layer - 1,
748+
(layer, previousNodeReference) -> {
749+
final StorageAdapter<? extends NodeReference> storageAdapter = getStorageAdapterForLayer(layer);
750+
return greedySearchLayer(storageAdapter, transaction,
751+
previousNodeReference, layer, newVector);
752+
}, executor)
753+
.thenCompose(nodeReference ->
754+
insertIntoLayers(transaction, newPrimaryKey, newVector, nodeReference,
755+
lMax, insertionLayer));
787756
}).thenCompose(ignored -> AsyncUtil.DONE);
788757
}
789758

759+
@Nonnull
760+
private CompletableFuture<Void> insertIntoLayers(final @Nonnull Transaction transaction,
761+
final @Nonnull Tuple newPrimaryKey,
762+
final @Nonnull Vector<Half> newVector,
763+
final NodeReferenceWithDistance nodeReference, final int lMax, final int insertionLayer) {
764+
debug(l -> {
765+
l.debug("nearest entry point at lMax={} is at key={}", lMax, nodeReference.getPrimaryKey());
766+
});
767+
return MoreAsyncUtil.<List<NodeReferenceWithDistance>>forLoop(Math.min(lMax, insertionLayer), ImmutableList.of(nodeReference),
768+
layer -> layer >= 0,
769+
layer -> layer - 1,
770+
(layer, previousNodeReferences) -> {
771+
final StorageAdapter<? extends NodeReference> storageAdapter = getStorageAdapterForLayer(layer);
772+
return insertIntoLayer(storageAdapter, transaction,
773+
previousNodeReferences, layer, newPrimaryKey, newVector);
774+
}, executor).thenCompose(ignored -> AsyncUtil.DONE);
775+
}
776+
790777
@Nonnull
791778
private <N extends NodeReference> CompletableFuture<List<NodeReferenceWithDistance>> insertIntoLayer(@Nonnull final StorageAdapter<N> storageAdapter,
792779
@Nonnull final Transaction transaction,

fdb-extensions/src/test/java/com/apple/foundationdb/async/hnsw/HNSWModificationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ public void testSIFTInsert10k() throws Exception {
237237
Comparator.comparing(NodeReferenceWithDistance::getDistance));
238238

239239
try (BufferedReader br = new BufferedReader(new FileReader(tsvFile))) {
240-
for (int i = 0; i < 10000;) {
240+
for (int i = 0; i < 1000;) {
241241
i += basicInsertBatch(100, nextNodeIdAtomic, onReadListener,
242242
tr -> {
243243
final String line;

0 commit comments

Comments
 (0)