Skip to content

Commit 8f884cf

Browse files
Adding undelete record to persistent index (#1366)
Added behavioral changes for PersistentIndex for Undeletes Added test for bad undelete, test infra for undelete in CuratedLogIndexState Added undelete/lifeVersion awareness to findEntriesSince method Co-authored-by: David Harju <david.a.harju@gmail.com>
1 parent 3fea6a1 commit 8f884cf

File tree

9 files changed

+958
-171
lines changed

9 files changed

+958
-171
lines changed

ambry-api/src/main/java/com.github.ambry/store/StoreErrorCodes.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,5 +35,9 @@ public enum StoreErrorCodes {
3535
Already_Updated,
3636
Update_Not_Allowed,
3737
File_Not_Found,
38-
Channel_Closed
38+
Channel_Closed,
39+
Life_Version_Conflict,
40+
ID_Not_Deleted,
41+
ID_Undeleted,
42+
ID_Deleted_Permanently
3943
}

ambry-store/src/main/java/com.github.ambry.store/IndexSegment.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1010,10 +1010,9 @@ boolean getEntriesSince(StoreKey key, FindEntriesCondition findEntriesCondition,
10101010
getIndexEntriesSince(key, findEntriesCondition, indexEntries, currentTotalSizeOfEntriesInBytes, true);
10111011
for (IndexEntry indexEntry : indexEntries) {
10121012
IndexValue value = indexEntry.getValue();
1013-
MessageInfo info =
1014-
new MessageInfo(indexEntry.getKey(), value.getSize(), value.isFlagSet(IndexValue.Flags.Delete_Index),
1015-
value.isFlagSet(IndexValue.Flags.Ttl_Update_Index), value.getExpiresAtMs(), value.getAccountId(),
1016-
value.getContainerId(), value.getOperationTimeInMs());
1013+
MessageInfo info = new MessageInfo(indexEntry.getKey(), value.getSize(), value.isDelete(), value.isTTLUpdate(),
1014+
value.isUndelete(), value.getExpiresAtMs(), null, value.getAccountId(), value.getContainerId(),
1015+
value.getOperationTimeInMs(), value.getLifeVersion());
10171016
entries.add(info);
10181017
}
10191018
return areNewEntriesAdded;
@@ -1100,10 +1099,10 @@ boolean getIndexEntriesSince(StoreKey key, FindEntriesCondition findEntriesCondi
11001099
*/
11011100
private void eliminateDuplicates(List<IndexEntry> entries) {
11021101
Set<StoreKey> setToFindDuplicate = new HashSet<>();
1103-
// first choose PUTs over update entries (omitting DELETEs)
1104-
entries.removeIf(
1105-
entry -> !entry.getValue().isFlagSet(IndexValue.Flags.Delete_Index) && !setToFindDuplicate.add(entry.getKey()));
1106-
// then choose DELETEs over all other entries
1102+
// first choose PUTs over update entries (omitting DELETEs and UNDELETEs)
1103+
entries.removeIf(entry -> !entry.getValue().isDelete() && !entry.getValue().isUndelete() && !setToFindDuplicate.add(
1104+
entry.getKey()));
1105+
// then choose DELETEs/UNDELETEs over all other entries
11071106
setToFindDuplicate.clear();
11081107
ListIterator<IndexEntry> iterator = entries.listIterator(entries.size());
11091108
while (iterator.hasPrevious()) {

ambry-store/src/main/java/com.github.ambry.store/IndexValue.java

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ enum Flags {
5555

5656
final static byte FLAGS_DEFAULT_VALUE = (byte) 0;
5757
final static long UNKNOWN_ORIGINAL_MESSAGE_OFFSET = -1;
58+
// The life version when the operation is trigger by the requests from frontend.
59+
final static short LIFE_VERSION_FROM_FRONTEND = -1;
5860

5961
private final static int BLOB_SIZE_IN_BYTES = 8;
6062
private final static int OFFSET_SIZE_IN_BYTES = 8;
@@ -186,6 +188,15 @@ enum Flags {
186188
this(size, offset, flags, expiresAtMs, offset.getOffset(), operationTimeInMs, accountId, containerId, lifeVersion);
187189
}
188190

191+
/**
192+
* Constructor to copy all data from a given {@link IndexValue}.
193+
* @param other the given {@link IndexValue}.
194+
*/
195+
IndexValue(IndexValue other) {
196+
this(other.getSize(), other.getOffset(), other.getFlags(), other.getExpiresAtMs(), other.getOriginalMessageOffset(),
197+
other.getOperationTimeInMs(), other.getAccountId(), other.getContainerId(), other.getLifeVersion());
198+
}
199+
189200
/**
190201
* Constructs IndexValue based on the args passed
191202
* @param size the size of the blob that this index value refers to
@@ -243,6 +254,38 @@ boolean isFlagSet(Flags flag) {
243254
return ((getFlags() & (1 << flag.ordinal())) != 0);
244255
}
245256

257+
/**
258+
* Helper function for isFlagSet(Flags.Ttl_Update_Index).
259+
* @return true when the Ttl_Update_Index is set.
260+
*/
261+
boolean isTTLUpdate() {
262+
return isFlagSet(Flags.Ttl_Update_Index);
263+
}
264+
265+
/**
266+
* Helper function for isFlagSet(Flags.Delete_Index).
267+
* @return true when the Delete_Index is set.
268+
*/
269+
boolean isDelete() {
270+
return isFlagSet(Flags.Delete_Index);
271+
}
272+
273+
/**
274+
* Helper function for isFlagSet(Flags.Undelete_Index).
275+
* @return true when the Undelete_Index is set.
276+
*/
277+
boolean isUndelete() {
278+
return isFlagSet(Flags.Undelete_Index);
279+
}
280+
281+
/**
282+
* Helper function to decide if this value is a put value or not.
283+
* @return true when it's not a put record.
284+
*/
285+
boolean isPut() {
286+
return flags == FLAGS_DEFAULT_VALUE;
287+
}
288+
246289
/**
247290
* @return the expiration time of the index value in ms
248291
*/
@@ -279,6 +322,15 @@ short getContainerId() {
279322
return containerId;
280323
}
281324

325+
/**
326+
* True when the life version is not from frontend requests.
327+
* @param lifeVersion the given life version.
328+
* @return true when it's not from frontend requests.
329+
*/
330+
static boolean hasLifeVersion(short lifeVersion) {
331+
return lifeVersion > LIFE_VERSION_FROM_FRONTEND;
332+
}
333+
282334
/**
283335
* @return the lifeVersion of the {@link IndexValue}
284336
*/
@@ -385,12 +437,11 @@ ByteBuffer getBytes() {
385437

386438
@Override
387439
public String toString() {
388-
return "Offset: " + offset + ", Size: " + getSize() + ", Deleted: " + isFlagSet(Flags.Delete_Index)
389-
+ ", TTL Updated: " + isFlagSet(Flags.Ttl_Update_Index) + ", Undelete: " + isFlagSet(
390-
Flags.Undelete_Index) + ", ExpiresAtMs: " + getExpiresAtMs() + ", Original Message Offset: "
440+
return "Offset: " + offset + ", Size: " + getSize() + ", Deleted: " + isDelete() + ", TTL Updated: " + isTTLUpdate()
441+
+ ", Undelete: " + isUndelete() + ", ExpiresAtMs: " + getExpiresAtMs() + ", Original Message Offset: "
391442
+ getOriginalMessageOffset() + (formatVersion != PersistentIndex.VERSION_0 ? (", OperationTimeAtSecs "
392-
+ getOperationTimeInMs() + ", AccountId " + getAccountId() + ", ContainerId " + getContainerId())
393-
: "") + (formatVersion > PersistentIndex.VERSION_2 ? ", Life Version:" + lifeVersion : "");
443+
+ getOperationTimeInMs() + ", AccountId " + getAccountId() + ", ContainerId " + getContainerId()) : "") + (
444+
formatVersion > PersistentIndex.VERSION_2 ? ", Life Version:" + lifeVersion : "");
394445
}
395446

396447
/**

0 commit comments

Comments
 (0)