|
25 | 25 | import com.apple.foundationdb.ReadTransaction;
|
26 | 26 | import com.apple.foundationdb.StreamingMode;
|
27 | 27 | import com.apple.foundationdb.Transaction;
|
| 28 | +import com.apple.foundationdb.async.AsyncIterable; |
28 | 29 | import com.apple.foundationdb.async.AsyncUtil;
|
29 | 30 | import com.apple.foundationdb.subspace.Subspace;
|
| 31 | +import com.apple.foundationdb.tuple.ByteArrayUtil; |
30 | 32 | import com.apple.foundationdb.tuple.Tuple;
|
31 | 33 | import com.christianheina.langx.half4j.Half;
|
32 | 34 | import com.google.common.collect.ImmutableList;
|
33 | 35 |
|
34 | 36 | import javax.annotation.Nonnull;
|
| 37 | +import javax.annotation.Nullable; |
| 38 | +import java.util.List; |
35 | 39 | import java.util.concurrent.CompletableFuture;
|
36 | 40 |
|
37 | 41 | /**
|
@@ -66,27 +70,35 @@ protected CompletableFuture<Node<NodeReferenceWithVector>> fetchNodeInternal(@No
|
66 | 70 |
|
67 | 71 | return AsyncUtil.collect(readTransaction.getRange(Range.startsWith(rangeKey),
|
68 | 72 | ReadTransaction.ROW_LIMIT_UNLIMITED, false, StreamingMode.WANT_ALL), readTransaction.getExecutor())
|
69 |
| - .thenApply(keyValues -> { |
70 |
| - final OnReadListener onReadListener = getOnReadListener(); |
71 |
| - |
72 |
| - final ImmutableList.Builder<NodeReferenceWithVector> nodeReferencesWithVectorBuilder = ImmutableList.builder(); |
73 |
| - for (final KeyValue keyValue : keyValues) { |
74 |
| - final byte[] key = keyValue.getKey(); |
75 |
| - final byte[] value = keyValue.getValue(); |
76 |
| - onReadListener.onKeyValueRead(key, value); |
77 |
| - final Tuple neighborKeyTuple = getDataSubspace().unpack(key); |
78 |
| - final Tuple neighborValueTuple = Tuple.fromBytes(value); |
79 |
| - |
80 |
| - final Tuple neighborPrimaryKey = neighborKeyTuple.getNestedTuple(2); // neighbor primary key |
81 |
| - final Vector<Half> neighborVector = StorageAdapter.vectorFromTuple(neighborValueTuple); // the entire value is the vector |
82 |
| - nodeReferencesWithVectorBuilder.add(new NodeReferenceWithVector(neighborPrimaryKey, neighborVector)); |
83 |
| - } |
84 |
| - |
85 |
| - final Node<NodeReferenceWithVector> node = |
86 |
| - getNodeFactory().create(primaryKey, null, nodeReferencesWithVectorBuilder.build()); |
87 |
| - onReadListener.onNodeRead(node); |
88 |
| - return node; |
89 |
| - }); |
| 73 | + .thenApply(keyValues -> nodeFromRaw(primaryKey, keyValues)); |
| 74 | + } |
| 75 | + |
| 76 | + @Nonnull |
| 77 | + private Node<NodeReferenceWithVector> nodeFromRaw(final @Nonnull Tuple primaryKey, final List<KeyValue> keyValues) { |
| 78 | + final OnReadListener onReadListener = getOnReadListener(); |
| 79 | + |
| 80 | + final ImmutableList.Builder<NodeReferenceWithVector> nodeReferencesWithVectorBuilder = ImmutableList.builder(); |
| 81 | + for (final KeyValue keyValue : keyValues) { |
| 82 | + nodeReferencesWithVectorBuilder.add(neighborFromRaw(keyValue.getKey(), keyValue.getValue())); |
| 83 | + } |
| 84 | + |
| 85 | + final Node<NodeReferenceWithVector> node = |
| 86 | + getNodeFactory().create(primaryKey, null, nodeReferencesWithVectorBuilder.build()); |
| 87 | + onReadListener.onNodeRead(node); |
| 88 | + return node; |
| 89 | + } |
| 90 | + |
| 91 | + @Nonnull |
| 92 | + private NodeReferenceWithVector neighborFromRaw(final @Nonnull byte[] key, final byte[] value) { |
| 93 | + final OnReadListener onReadListener = getOnReadListener(); |
| 94 | + |
| 95 | + onReadListener.onKeyValueRead(key, value); |
| 96 | + final Tuple neighborKeyTuple = getDataSubspace().unpack(key); |
| 97 | + final Tuple neighborValueTuple = Tuple.fromBytes(value); |
| 98 | + |
| 99 | + final Tuple neighborPrimaryKey = neighborKeyTuple.getNestedTuple(2); // neighbor primary key |
| 100 | + final Vector<Half> neighborVector = StorageAdapter.vectorFromTuple(neighborValueTuple); // the entire value is the vector |
| 101 | + return new NodeReferenceWithVector(neighborPrimaryKey, neighborVector); |
90 | 102 | }
|
91 | 103 |
|
92 | 104 | @Override
|
@@ -122,4 +134,43 @@ private byte[] getNeighborKey(final int layer,
|
122 | 134 | @Nonnull final Tuple neighborPrimaryKey) {
|
123 | 135 | return getDataSubspace().pack(Tuple.from(layer, node.getPrimaryKey(), neighborPrimaryKey));
|
124 | 136 | }
|
| 137 | + |
| 138 | + @Override |
| 139 | + public Iterable<Node<NodeReferenceWithVector>> scanLayer(@Nonnull final ReadTransaction readTransaction, int layer, |
| 140 | + @Nullable final Tuple lastPrimaryKey, int maxNumRead) { |
| 141 | + final byte[] layerPrefix = getDataSubspace().pack(Tuple.from(layer)); |
| 142 | + final Range range = |
| 143 | + lastPrimaryKey == null |
| 144 | + ? Range.startsWith(layerPrefix) |
| 145 | + : new Range(ByteArrayUtil.strinc(getDataSubspace().pack(Tuple.from(layer, lastPrimaryKey))), |
| 146 | + ByteArrayUtil.strinc(layerPrefix)); |
| 147 | + final AsyncIterable<KeyValue> itemsIterable = |
| 148 | + readTransaction.getRange(range, |
| 149 | + maxNumRead, false, StreamingMode.ITERATOR); |
| 150 | + int numRead = 0; |
| 151 | + Tuple nodePrimaryKey = null; |
| 152 | + ImmutableList.Builder<Node<NodeReferenceWithVector>> nodeBuilder = ImmutableList.builder(); |
| 153 | + ImmutableList.Builder<NodeReferenceWithVector> neighborsBuilder = ImmutableList.builder(); |
| 154 | + for (final KeyValue item: itemsIterable) { |
| 155 | + final NodeReferenceWithVector neighbor = |
| 156 | + neighborFromRaw(item.getKey(), item.getValue()); |
| 157 | + final Tuple primaryKeyFromNodeReference = neighbor.getPrimaryKey(); |
| 158 | + if (nodePrimaryKey == null) { |
| 159 | + nodePrimaryKey = primaryKeyFromNodeReference; |
| 160 | + } else { |
| 161 | + if (!nodePrimaryKey.equals(primaryKeyFromNodeReference)) { |
| 162 | + nodeBuilder.add(getNodeFactory().create(nodePrimaryKey, null, neighborsBuilder.build())); |
| 163 | + } |
| 164 | + } |
| 165 | + neighborsBuilder.add(neighbor); |
| 166 | + numRead ++; |
| 167 | + } |
| 168 | + |
| 169 | + // there may be a rest |
| 170 | + if (numRead > 0 && numRead < maxNumRead) { |
| 171 | + nodeBuilder.add(getNodeFactory().create(nodePrimaryKey, null, neighborsBuilder.build())); |
| 172 | + } |
| 173 | + |
| 174 | + return nodeBuilder.build(); |
| 175 | + } |
125 | 176 | }
|
0 commit comments