Skip to content

Commit dae05d5

Browse files
committed
insert almost works
1 parent 871d92c commit dae05d5

12 files changed

+462
-97
lines changed

fdb-extensions/src/main/java/com/apple/foundationdb/async/MoreAsyncUtil.java

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.apple.foundationdb.annotation.API;
2424
import com.apple.foundationdb.util.LoggableException;
2525
import com.google.common.base.Suppliers;
26+
import com.google.common.collect.ImmutableList;
27+
import com.google.common.collect.Lists;
2628
import com.google.common.util.concurrent.ThreadFactoryBuilder;
2729

2830
import javax.annotation.Nonnull;
@@ -33,6 +35,7 @@
3335
import java.util.Iterator;
3436
import java.util.List;
3537
import java.util.NoSuchElementException;
38+
import java.util.Objects;
3639
import java.util.Queue;
3740
import java.util.concurrent.CompletableFuture;
3841
import java.util.concurrent.CompletionException;
@@ -1055,7 +1058,7 @@ public static CompletableFuture<Void> swallowException(@Nonnull CompletableFutur
10551058
return result;
10561059
}
10571060

1058-
public static CompletableFuture<Void> forLoop(int startI, @Nonnull final IntPredicate conditionPredicate,
1061+
public static CompletableFuture<Void> forLoop(final int startI, @Nonnull final IntPredicate conditionPredicate,
10591062
@Nonnull final IntUnaryOperator stepFunction,
10601063
@Nonnull final IntFunction<CompletableFuture<Void>> body,
10611064
@Nonnull final Executor executor) {
@@ -1073,6 +1076,51 @@ public static CompletableFuture<Void> forLoop(int startI, @Nonnull final IntPred
10731076
}, executor);
10741077
}
10751078

1079+
@SuppressWarnings("unchecked")
1080+
public static <T, U> CompletableFuture<List<U>> forEach(@Nonnull final Iterable<T> items,
1081+
@Nonnull final Function<T, CompletableFuture<U>> body,
1082+
final int parallelism,
1083+
@Nonnull final Executor executor) {
1084+
// this deque is only modified by once upon creation
1085+
final ArrayDeque<T> toBeProcessed = new ArrayDeque<>();
1086+
for (final T item : items) {
1087+
toBeProcessed.addLast(item);
1088+
}
1089+
1090+
final List<CompletableFuture<Void>> working = Lists.newArrayList();
1091+
final AtomicInteger indexAtomic = new AtomicInteger(0);
1092+
final Object[] resultArray = new Object[toBeProcessed.size()];
1093+
1094+
return AsyncUtil.whileTrue(() -> {
1095+
working.removeIf(CompletableFuture::isDone);
1096+
1097+
while (working.size() <= parallelism) {
1098+
final T currentItem = toBeProcessed.pollFirst();
1099+
if (currentItem == null) {
1100+
break;
1101+
}
1102+
1103+
final int index = indexAtomic.getAndIncrement();
1104+
working.add(body.apply(currentItem)
1105+
.thenAccept(resultNode -> {
1106+
Objects.requireNonNull(resultNode);
1107+
resultArray[index] = resultNode;
1108+
}));
1109+
}
1110+
1111+
if (working.isEmpty()) {
1112+
return AsyncUtil.READY_FALSE;
1113+
}
1114+
return AsyncUtil.whenAny(working).thenApply(ignored -> true);
1115+
}, executor).thenApply(ignored -> {
1116+
final ImmutableList.Builder<U> resultBuilder = ImmutableList.builder();
1117+
for (final Object o : resultArray) {
1118+
resultBuilder.add((U)o);
1119+
}
1120+
return resultBuilder.build();
1121+
});
1122+
}
1123+
10761124
/**
10771125
* A {@code Boolean} function that is always true.
10781126
* @param <T> the type of the (ignored) argument to the function

fdb-extensions/src/main/java/com/apple/foundationdb/async/hnsw/AbstractNode.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,17 @@
2828

2929
/**
3030
* TODO.
31-
* @param <R> node type class.
31+
* @param <N> node type class.
3232
*/
33-
abstract class AbstractNode<R extends NodeReference> implements Node<R> {
33+
abstract class AbstractNode<N extends NodeReference> implements Node<N> {
3434
@Nonnull
3535
private final Tuple primaryKey;
3636

3737
@Nonnull
38-
private final List<R> neighbors;
38+
private final List<N> neighbors;
3939

4040
protected AbstractNode(@Nonnull final Tuple primaryKey,
41-
@Nonnull final List<R> neighbors) {
41+
@Nonnull final List<N> neighbors) {
4242
this.primaryKey = primaryKey;
4343
this.neighbors = ImmutableList.copyOf(neighbors);
4444
}
@@ -51,13 +51,13 @@ public Tuple getPrimaryKey() {
5151

5252
@Nonnull
5353
@Override
54-
public List<R> getNeighbors() {
54+
public List<N> getNeighbors() {
5555
return neighbors;
5656
}
5757

5858
@Nonnull
5959
@Override
60-
public R getNeighbor(final int index) {
60+
public N getNeighbor(final int index) {
6161
return neighbors.get(index);
6262
}
6363
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* InliningNode.java
3+
*
4+
* This source file is part of the FoundationDB open source project
5+
*
6+
* Copyright 2015-2023 Apple Inc. and the FoundationDB project authors
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this file except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package com.apple.foundationdb.async.hnsw;
22+
23+
import com.apple.foundationdb.Transaction;
24+
25+
import javax.annotation.Nonnull;
26+
import javax.annotation.Nullable;
27+
import java.util.List;
28+
29+
/**
30+
* TODO.
31+
*/
32+
class BaseNeighborsChangeSet<N extends NodeReference> implements NeighborsChangeSet<N> {
33+
@Nonnull
34+
private final NodeReferenceAndNode<N> baseNode;
35+
36+
public BaseNeighborsChangeSet(@Nonnull final NodeReferenceAndNode<N> baseNode) {
37+
this.baseNode = baseNode;
38+
}
39+
40+
@Nullable
41+
public BaseNeighborsChangeSet<N> getParent() {
42+
return null;
43+
}
44+
45+
@Nonnull
46+
public List<N> merge() {
47+
return baseNode.getNode().getNeighbors();
48+
}
49+
50+
@Override
51+
public void writeDelta(@Nonnull final Transaction transaction) {
52+
}
53+
}

fdb-extensions/src/main/java/com/apple/foundationdb/async/hnsw/CompactNode.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222

2323
import com.apple.foundationdb.tuple.Tuple;
2424
import com.christianheina.langx.half4j.Half;
25-
import com.google.common.base.Verify;
2625
import com.google.common.collect.Lists;
2726

2827
import javax.annotation.Nonnull;
@@ -39,8 +38,8 @@ public class CompactNode extends AbstractNode<NodeReference> {
3938
@SuppressWarnings("unchecked")
4039
@Nonnull
4140
@Override
42-
public Node<NodeReference> create(@Nonnull final NodeKind nodeKind, @Nonnull final Tuple primaryKey, @Nullable final Vector<Half> vector, @Nonnull final List<? extends NodeReference> neighbors) {
43-
Verify.verify(nodeKind == NodeKind.COMPACT);
41+
public Node<NodeReference> create(@Nonnull final Tuple primaryKey, @Nullable final Vector<Half> vector,
42+
@Nonnull final List<? extends NodeReference> neighbors) {
4443
return new CompactNode(primaryKey, Objects.requireNonNull(vector), (List<NodeReference>)neighbors);
4544
}
4645

@@ -60,6 +59,12 @@ public CompactNode(@Nonnull final Tuple primaryKey, @Nonnull final Vector<Half>
6059
this.vector = vector;
6160
}
6261

62+
@Nonnull
63+
@Override
64+
public NodeReference getSelfReference(@Nullable final Vector<Half> vector) {
65+
return new NodeReference(getPrimaryKey());
66+
}
67+
6368
@Nonnull
6469
@Override
6570
public NodeKind getKind() {
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* InliningNode.java
3+
*
4+
* This source file is part of the FoundationDB open source project
5+
*
6+
* Copyright 2015-2023 Apple Inc. and the FoundationDB project authors
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this file except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package com.apple.foundationdb.async.hnsw;
22+
23+
import com.apple.foundationdb.Transaction;
24+
import com.apple.foundationdb.tuple.Tuple;
25+
import com.google.common.collect.ImmutableSet;
26+
import com.google.common.collect.Iterables;
27+
28+
import javax.annotation.Nonnull;
29+
import java.util.Collection;
30+
import java.util.Set;
31+
32+
/**
33+
* TODO.
34+
*/
35+
class DeleteNeighborsChangeSet<N extends NodeReference> implements NeighborsChangeSet<N> {
36+
@Nonnull
37+
private final NeighborsChangeSet<N> parent;
38+
39+
@Nonnull
40+
private final Set<Tuple> deletedNeighborsTuples;
41+
42+
public DeleteNeighborsChangeSet(@Nonnull final NeighborsChangeSet<N> parent,
43+
@Nonnull final Collection<Tuple> deletedNeighborsTuples) {
44+
this.parent = parent;
45+
this.deletedNeighborsTuples = ImmutableSet.copyOf(deletedNeighborsTuples);
46+
}
47+
48+
@Nonnull
49+
public NeighborsChangeSet<N> getParent() {
50+
return parent;
51+
}
52+
53+
@Nonnull
54+
public Iterable<N> merge() {
55+
return Iterables.filter(getParent().merge(),
56+
current -> !deletedNeighborsTuples.contains(current.getPrimaryKey()));
57+
}
58+
59+
@Override
60+
public void writeDelta(@Nonnull final Transaction transaction) {
61+
throw new UnsupportedOperationException("not implemented yet");
62+
}
63+
}

0 commit comments

Comments
 (0)