Skip to content

Fix bug of scan aggregate index returning empty non-end continuation #3397

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 31 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ private IndexPrefetchRangeKeyValueCursor(@Nonnull final FDBRecordContext context
@Nonnull final AsyncIterator<MappedKeyValue> iterator,
int prefixLength,
@Nonnull final CursorLimitManager limitManager,
int valuesLimit) {
int valuesLimit,
@Nonnull SerializationMode serializationMode) {

super(context, iterator, prefixLength, limitManager, valuesLimit);
super(context, iterator, prefixLength, limitManager, valuesLimit, serializationMode);
}

/**
Expand All @@ -69,7 +70,7 @@ public IndexPrefetchRangeKeyValueCursor build() {
AsyncIterator<MappedKeyValue> iterator = getTransaction()
.getMappedRange(getBegin(), getEnd(), mapper, getLimit(), isReverse(), getStreamingMode())
.iterator();
return new IndexPrefetchRangeKeyValueCursor(getContext(), iterator, getPrefixLength(), getLimitManager(), getValuesLimit());
return new IndexPrefetchRangeKeyValueCursor(getContext(), iterator, getPrefixLength(), getLimitManager(), getValuesLimit(), serializationMode);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ private KeyValueCursor(@Nonnull final FDBRecordContext context,
@Nonnull final AsyncIterator<KeyValue> iterator,
int prefixLength,
@Nonnull final CursorLimitManager limitManager,
int valuesLimit) {
super(context, iterator, prefixLength, limitManager, valuesLimit);
int valuesLimit,
@Nonnull SerializationMode serializationMode) {
super(context, iterator, prefixLength, limitManager, valuesLimit, serializationMode);
}

/**
Expand Down Expand Up @@ -77,7 +78,7 @@ public KeyValueCursor build() {
final AsyncIterator<KeyValue> iterator = getTransaction()
.getRange(getBegin(), getEnd(), getLimit(), isReverse(), getStreamingMode())
.iterator();
return new KeyValueCursor(getContext(), iterator, getPrefixLength(), getLimitManager(), getValuesLimit());
return new KeyValueCursor(getContext(), iterator, getPrefixLength(), getLimitManager(), getValuesLimit(), serializationMode);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.apple.foundationdb.record.KeyRange;
import com.apple.foundationdb.record.RecordCoreException;
import com.apple.foundationdb.record.RecordCursorContinuation;
import com.apple.foundationdb.record.RecordCursorProto;
import com.apple.foundationdb.record.RecordCursorResult;
import com.apple.foundationdb.record.ScanProperties;
import com.apple.foundationdb.record.TupleRange;
Expand All @@ -42,11 +43,13 @@
import com.apple.foundationdb.subspace.Subspace;
import com.apple.foundationdb.tuple.Tuple;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.ZeroCopyByteString;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

Expand All @@ -61,22 +64,26 @@ public abstract class KeyValueCursorBase<K extends KeyValue> extends AsyncIterat
private final int prefixLength;
@Nonnull
private final CursorLimitManager limitManager;
private int valuesLimit;
private final int valuesLimit;
// the pointer may be mutated, but the actual array must never be mutated or continuations will break
@Nullable
private byte[] lastKey;
@Nonnull
private final SerializationMode serializationMode;

protected KeyValueCursorBase(@Nonnull final FDBRecordContext context,
@Nonnull final AsyncIterator<K> iterator,
int prefixLength,
@Nonnull final CursorLimitManager limitManager,
int valuesLimit) {
int valuesLimit,
@Nonnull final SerializationMode serializationMode) {
super(context.getExecutor(), iterator);

this.context = context;
this.prefixLength = prefixLength;
this.limitManager = limitManager;
this.valuesLimit = valuesLimit;
this.serializationMode = serializationMode;

context.instrument(FDBStoreTimer.DetailEvents.GET_SCAN_RANGE_RAW_FIRST_CHUNK, iterator.onHasNext());
}
Expand Down Expand Up @@ -131,21 +138,23 @@ public RecordCursorResult<K> getNext() {

@Nonnull
private RecordCursorContinuation continuationHelper() {
return new Continuation(lastKey, prefixLength);
return new Continuation(lastKey, prefixLength, serializationMode);
}

private static class Continuation implements RecordCursorContinuation {
public static class Continuation implements RecordCursorContinuation {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be public just so that the RecordQueryIndexPlan can update its continuation appropriately?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes RecordIndexPlan calls this method: KeyValueCursorBase.Continuation.fromRawBytes(continuation, serializationMode);

@Nullable
private final byte[] lastKey;
private final int prefixLength;
private final SerializationMode serializationMode;

public Continuation(@Nullable final byte[] lastKey, final int prefixLength) {
public Continuation(@Nullable final byte[] lastKey, final int prefixLength, final SerializationMode serializationMode) {
// Note that doing this without a full copy is dangerous if the array is ever mutated.
// Currently, this never happens and the only thing that changes is which array lastKey points to.
// However, if logic in KeyValueCursor or KeyValue changes, this could break continuations.
// To resolve it, we could resort to doing a full copy here, although that's somewhat expensive.
this.lastKey = lastKey;
this.prefixLength = prefixLength;
this.serializationMode = serializationMode;
}

@Override
Expand All @@ -156,21 +165,83 @@ public boolean isEnd() {
@Nonnull
@Override
public ByteString toByteString() {
if (lastKey == null) {
return ByteString.EMPTY;
if (serializationMode == SerializationMode.TO_OLD) {
// lastKey = null when source iterator hit limit that we passed down.
if (lastKey == null) {
return ByteString.EMPTY;
}
ByteString base = ZeroCopyByteString.wrap(lastKey);
// when prefixLength == lastKey.length, toByteString() also returns ByteString.EMPTY
return base.substring(prefixLength, lastKey.length);
} else {
return toProto().toByteString();
}
ByteString base = ZeroCopyByteString.wrap(lastKey);
return base.substring(prefixLength, lastKey.length);
}

@Nullable
@Override
public byte[] toBytes() {
if (lastKey == null) {
return null;
}
ByteString byteString = toByteString();
return byteString.isEmpty() ? new byte[0] : byteString.toByteArray();
}

@Nullable
public byte[] getInnerContinuationInBytes() {
if (lastKey == null) {
return null;
}
return Arrays.copyOfRange(lastKey, prefixLength, lastKey.length);
}

@Nonnull
public ByteString getInnerContinuationInByteString() {
if (lastKey == null) {
return ByteString.EMPTY;
}
ByteString base = ZeroCopyByteString.wrap(lastKey);
return base.substring(prefixLength, lastKey.length);
}

public static byte[] fromRawBytes(@Nullable byte[] rawBytes, SerializationMode serializationMode) {
if (rawBytes == null) {
return null;
}
if (serializationMode == SerializationMode.TO_OLD) {
return rawBytes;
}
try {
RecordCursorProto.KeyValueCursorContinuation continuationProto = RecordCursorProto.KeyValueCursorContinuation.parseFrom(rawBytes);
if (continuationProto.hasPrefixLength()) {
return continuationProto.getContinuation().toByteArray();
} else {
// parseFrom can parse an old serialization result as the new proto, wrong deserialization
return rawBytes;
}
} catch (InvalidProtocolBufferException ipbe) {
return rawBytes;
}
}

@Nonnull
private RecordCursorProto.KeyValueCursorContinuation toProto() {
RecordCursorProto.KeyValueCursorContinuation.Builder builder = RecordCursorProto.KeyValueCursorContinuation.newBuilder();
if (lastKey == null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The null handling here, in general, seems kind of suspect to me. Looking things over, I actually think that lastKey should never be null. It seems to be marked as @Nullable mainly because the lastKey variable in the outer class is @Nullable, but I don't think it should ever still be null when it creates a continuation. That's because:

  1. The lastKey starts as null
  2. If there is no data at all, then lastKey remains null but we return RecordCursorResult.exhausted() which does not create one of these continuations
  3. If there is data, then lastKey will be set to a non-null value. All the scenarios where we create one of these continuations should come in this case

So, I don't think we'll ever hit this code path. I think it also points to the possibility that this isn't quite the right format for the continuation, though that's something that I said in a comment on record_cursor.proto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this! I removed the is_end from the proto, and removed the null handling here.

// proto.hasContinuation() = false when lastKey = null
return builder.setPrefixLength(prefixLength).build();
} else {
ByteString base = ZeroCopyByteString.wrap(Objects.requireNonNull(lastKey));
// proto.hasContinuation() = ByteString.EMPTY when prefixLength = lastKey.length
return builder.setContinuation(base.substring(prefixLength, lastKey.length)).setPrefixLength(prefixLength).build();
}
}
}

public enum SerializationMode {
TO_OLD,
TO_NEW
Comment on lines +243 to +244
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using "old" and "new" here may be fine (it's certainly what we did with #3254). The danger is that we leave it in for too long and then we're confused what's "new" about it. I think the goal is for this to be pretty temporary, and for us to move pretty quickly to a mode where we only have the new way, and if that's the case, then we're probably okay with this nomenclature

}

/**
Expand Down Expand Up @@ -208,9 +279,11 @@ public abstract static class Builder<T extends Builder<T>> {
private StreamingMode streamingMode;
private KeySelector begin;
private KeySelector end;
protected SerializationMode serializationMode;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like there's some confusion about the lifecycle of this value. It's a member variable of the builder, and there's a setter, but it also gets cribbed from the execute properties during prepare. I'm not sure it needs to be a member variable of the builder, at least as it is currently. (It's possible that it would make sense for it to stay here if we end up removing it from the ExecuteProperties)


protected Builder(@Nonnull Subspace subspace) {
this.subspace = subspace;
this.serializationMode = SerializationMode.TO_OLD;
}

/**
Expand Down Expand Up @@ -247,10 +320,12 @@ protected void prepare() {
prefixLength = calculatePrefixLength();

reverse = scanProperties.isReverse();

if (continuation != null) {
final byte[] continuationBytes = new byte[prefixLength + continuation.length];
byte[] realContinuation = KeyValueCursorBase.Continuation.fromRawBytes(continuation, serializationMode);
final byte[] continuationBytes = new byte[prefixLength + realContinuation.length];
System.arraycopy(lowBytes, 0, continuationBytes, 0, prefixLength);
System.arraycopy(continuation, 0, continuationBytes, prefixLength, continuation.length);
System.arraycopy(realContinuation, 0, continuationBytes, prefixLength, realContinuation.length);
if (reverse) {
highBytes = continuationBytes;
highEndpoint = EndpointType.CONTINUATION;
Expand Down Expand Up @@ -334,6 +409,11 @@ public T setHigh(@Nonnull byte[] highBytes, @Nonnull EndpointType highEndpoint)
return self();
}

public T setSerializationMode(@Nonnull final SerializationMode serializationMode) {
this.serializationMode = serializationMode;
return self();
}

/**
* Calculate the key prefix length for the returned values. This will be used to derive the primary key used in
* the calculated continuation.
Expand Down
Loading