Skip to content

Commit 66547ca

Browse files
committed
started on the writing path
1 parent eb7e54d commit 66547ca

15 files changed

+511
-755
lines changed

fdb-extensions/src/main/java/com/apple/foundationdb/async/MoreAsyncUtil.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,13 @@
4242
import java.util.concurrent.ScheduledThreadPoolExecutor;
4343
import java.util.concurrent.ThreadFactory;
4444
import java.util.concurrent.TimeUnit;
45+
import java.util.concurrent.atomic.AtomicInteger;
4546
import java.util.function.BiConsumer;
4647
import java.util.function.BiFunction;
4748
import java.util.function.Function;
49+
import java.util.function.IntFunction;
50+
import java.util.function.IntPredicate;
51+
import java.util.function.IntUnaryOperator;
4852
import java.util.function.Predicate;
4953
import java.util.function.Supplier;
5054

@@ -1051,6 +1055,24 @@ public static CompletableFuture<Void> swallowException(@Nonnull CompletableFutur
10511055
return result;
10521056
}
10531057

1058+
public static CompletableFuture<Void> forLoop(int startI, @Nonnull final IntPredicate conditionPredicate,
1059+
@Nonnull final IntUnaryOperator stepFunction,
1060+
@Nonnull final IntFunction<CompletableFuture<Void>> body,
1061+
@Nonnull final Executor executor) {
1062+
final AtomicInteger loopVariableAtomic = new AtomicInteger(startI);
1063+
return AsyncUtil.whileTrue(() -> {
1064+
final int loopVariable = loopVariableAtomic.get();
1065+
if (!conditionPredicate.test(loopVariable)) {
1066+
return AsyncUtil.READY_FALSE;
1067+
}
1068+
return body.apply(loopVariable)
1069+
.thenApply(ignored -> {
1070+
loopVariableAtomic.set(stepFunction.applyAsInt(loopVariable));
1071+
return true;
1072+
});
1073+
}, executor);
1074+
}
1075+
10541076
/**
10551077
* A {@code Boolean} function that is always true.
10561078
* @param <T> the type of the (ignored) argument to the function

fdb-extensions/src/main/java/com/apple/foundationdb/async/hnsw/AbstractStorageAdapter.java

Lines changed: 3 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,12 @@
2121
package com.apple.foundationdb.async.hnsw;
2222

2323
import com.apple.foundationdb.ReadTransaction;
24-
import com.apple.foundationdb.Transaction;
2524
import com.apple.foundationdb.subspace.Subspace;
2625
import com.apple.foundationdb.tuple.Tuple;
2726

2827
import javax.annotation.Nonnull;
2928
import javax.annotation.Nullable;
30-
import java.math.BigInteger;
31-
import java.util.List;
32-
import java.util.Objects;
3329
import java.util.concurrent.CompletableFuture;
34-
import java.util.function.Function;
3530

3631
/**
3732
* Implementations and attributes common to all concrete implementations of {@link StorageAdapter}.
@@ -53,8 +48,6 @@ abstract class AbstractStorageAdapter implements StorageAdapter {
5348
private final Subspace dataSubspace;
5449

5550
protected AbstractStorageAdapter(@Nonnull final HNSW.Config config, @Nonnull final Subspace subspace,
56-
@Nonnull final Subspace nodeSlotIndexSubspace,
57-
@Nonnull final Function<HNSW.Point, BigInteger> hilbertValueFunction,
5851
@Nonnull final OnWriteListener onWriteListener,
5952
@Nonnull final OnReadListener onReadListener) {
6053
this.config = config;
@@ -108,79 +101,16 @@ public OnReadListener getOnReadListener() {
108101
return onReadListener;
109102
}
110103

111-
@Override
112-
public void writeNodes(@Nonnull final Transaction transaction, @Nonnull final List<? extends Node> nodes) {
113-
for (final Node node : nodes) {
114-
writeNode(transaction, node);
115-
}
116-
}
117-
118-
protected void writeNode(@Nonnull final Transaction transaction, @Nonnull final Node node) {
119-
final Node.ChangeSet changeSet = node.getChangeSet();
120-
if (changeSet == null) {
121-
return;
122-
}
123-
124-
changeSet.apply(transaction);
125-
getOnWriteListener().onNodeWritten(node);
126-
}
127-
128-
@Nonnull
129-
public byte[] packWithSubspace(final byte[] key) {
130-
return getSubspace().pack(key);
131-
}
132-
133-
@Nonnull
134-
public byte[] packWithSubspace(final Tuple tuple) {
135-
return getSubspace().pack(tuple);
136-
}
137-
138-
@Nonnull
139-
@Override
140-
public CompletableFuture<Node> scanNodeIndexAndFetchNode(@Nonnull final ReadTransaction transaction,
141-
final int level,
142-
@Nonnull final BigInteger hilbertValue,
143-
@Nonnull final Tuple key,
144-
final boolean isInsertUpdate) {
145-
Objects.requireNonNull(nodeSlotIndexAdapter);
146-
return nodeSlotIndexAdapter.scanIndexForNodeId(transaction, level, hilbertValue, key, isInsertUpdate)
147-
.thenCompose(nodeId -> nodeId == null
148-
? CompletableFuture.completedFuture(null)
149-
: fetchNode(transaction, nodeId));
150-
}
151-
152-
@Override
153-
public void insertIntoNodeIndexIfNecessary(@Nonnull final Transaction transaction, final int level,
154-
@Nonnull final NodeSlot nodeSlot) {
155-
if (!getConfig().isUseNodeSlotIndex() || !(nodeSlot instanceof ChildSlot)) {
156-
return;
157-
}
158-
159-
Objects.requireNonNull(nodeSlotIndexAdapter);
160-
nodeSlotIndexAdapter.writeChildSlot(transaction, level, (ChildSlot)nodeSlot);
161-
}
162-
163-
@Override
164-
public void deleteFromNodeIndexIfNecessary(@Nonnull final Transaction transaction, final int level,
165-
@Nonnull final NodeSlot nodeSlot) {
166-
if (!getConfig().isUseNodeSlotIndex() || !(nodeSlot instanceof ChildSlot)) {
167-
return;
168-
}
169-
170-
Objects.requireNonNull(nodeSlotIndexAdapter);
171-
nodeSlotIndexAdapter.clearChildSlot(transaction, level, (ChildSlot)nodeSlot);
172-
}
173-
174104
@Nonnull
175105
@Override
176-
public <N extends NodeReference> CompletableFuture<Node<N>> fetchNode(@Nonnull final Node.NodeCreator<N> creator,
106+
public <N extends NodeReference> CompletableFuture<Node<N>> fetchNode(@Nonnull final NodeFactory<N> nodeFactory,
177107
@Nonnull final ReadTransaction readTransaction,
178108
int layer, @Nonnull Tuple primaryKey) {
179-
return fetchNodeInternal(creator, readTransaction, layer, primaryKey).thenApply(this::checkNode);
109+
return fetchNodeInternal(nodeFactory, readTransaction, layer, primaryKey).thenApply(this::checkNode);
180110
}
181111

182112
@Nonnull
183-
protected abstract <N extends NodeReference> CompletableFuture<Node<N>> fetchNodeInternal(@Nonnull Node.NodeCreator<N> creator,
113+
protected abstract <N extends NodeReference> CompletableFuture<Node<N>> fetchNodeInternal(@Nonnull NodeFactory<N> nodeFactory,
184114
@Nonnull ReadTransaction readTransaction,
185115
int layer, @Nonnull Tuple primaryKey);
186116

@@ -197,16 +127,4 @@ protected abstract <N extends NodeReference> CompletableFuture<Node<N>> fetchNod
197127
private <N extends NodeReference> Node<N> checkNode(@Nullable final Node<N> node) {
198128
return node;
199129
}
200-
201-
@Nonnull
202-
abstract <S extends NodeSlot, N extends AbstractNode<S, N>> AbstractChangeSet<S, N>
203-
newInsertChangeSet(@Nonnull N node, int level, @Nonnull List<S> insertedSlots);
204-
205-
@Nonnull
206-
abstract <S extends NodeSlot, N extends AbstractNode<S, N>> AbstractChangeSet<S, N>
207-
newUpdateChangeSet(@Nonnull N node, int level, @Nonnull S originalSlot, @Nonnull S updatedSlot);
208-
209-
@Nonnull
210-
abstract <S extends NodeSlot, N extends AbstractNode<S, N>> AbstractChangeSet<S, N>
211-
newDeleteChangeSet(@Nonnull N node, int level, @Nonnull List<S> deletedSlots);
212130
}

0 commit comments

Comments
 (0)