Skip to content

Commit 97d02b6

Browse files
committed
Apply Scott's requested changes
1 parent 0980757 commit 97d02b6

File tree

9 files changed

+711
-51
lines changed

9 files changed

+711
-51
lines changed

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/FDBRecordStore.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4966,6 +4966,7 @@ public void removeFormerIndex(FormerIndex formerIndex) {
49664966

49674967
private void clearReadableIndexBuildData(Index index) {
49684968
IndexingRangeSet.forIndexBuild(this, index).clear();
4969+
IndexingHeartbeat.clearAllHeartbeats(this, index);
49694970
}
49704971

49714972
@SuppressWarnings("PMD.CloseResource")

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingBase.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,11 @@ public void enforceStampOverwrite() {
361361
private CompletableFuture<Void> setIndexingTypeOrThrow(FDBRecordStore store, boolean continuedBuild) {
362362
// continuedBuild is set if this session isn't a continuation of a previous indexing
363363
IndexBuildProto.IndexBuildIndexingStamp indexingTypeStamp = getIndexingTypeStamp(store);
364-
heartbeat = new IndexingHeartbeat(common.getIndexerId(), indexingTypeStamp.getMethod(), common.config.getLeaseLengthMillis());
364+
final IndexBuildProto.IndexBuildIndexingStamp.Method method = indexingTypeStamp.getMethod();
365+
boolean allowMutual =
366+
method == IndexBuildProto.IndexBuildIndexingStamp.Method.MUTUAL_BY_RECORDS ||
367+
method == IndexBuildProto.IndexBuildIndexingStamp.Method.SCRUB_REPAIR;
368+
heartbeat = new IndexingHeartbeat(common.getIndexerId(), indexingTypeStamp.getMethod().toString(), common.config.getLeaseLengthMillis(), allowMutual);
365369

366370
return forEachTargetIndex(index -> setIndexingTypeOrThrow(store, continuedBuild, index, indexingTypeStamp)
367371
.thenCompose(ignore -> updateHeartbeat(true, store, index)));

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingHeartbeat.java

Lines changed: 25 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -37,23 +37,25 @@
3737
import java.util.concurrent.atomic.AtomicInteger;
3838

3939
public class IndexingHeartbeat {
40-
// [prefix, xid] -> [indexing-type, genesis time, heartbeat time]
40+
// [prefix, indexerId] -> [indexing-type, genesis time, heartbeat time]
4141
final UUID indexerId;
42-
final IndexBuildProto.IndexBuildIndexingStamp.Method indexingMethod;
42+
final String info;
4343
final long genesisTimeMilliseconds;
4444
final long leaseLength;
45+
final boolean allowMutual;
4546

46-
public IndexingHeartbeat(final UUID indexerId, IndexBuildProto.IndexBuildIndexingStamp.Method indexingMethod, long leaseLength) {
47+
public IndexingHeartbeat(final UUID indexerId, String info, long leaseLength, boolean allowMutual) {
4748
this.indexerId = indexerId;
48-
this.indexingMethod = indexingMethod;
49+
this.info = info;
4950
this.leaseLength = leaseLength;
51+
this.allowMutual = allowMutual;
5052
this.genesisTimeMilliseconds = nowMilliseconds();
5153
}
5254

5355
public void updateHeartbeat(@Nonnull FDBRecordStore store, @Nonnull Index index) {
5456
byte[] key = IndexingSubspaces.indexheartbeatSubspace(store, index, indexerId).pack();
5557
byte[] value = IndexBuildProto.IndexBuildHeartbeat.newBuilder()
56-
.setMethod(indexingMethod)
58+
.setInfo(info)
5759
.setGenesisTimeMilliseconds(genesisTimeMilliseconds)
5860
.setHeartbeatTimeMilliseconds(nowMilliseconds())
5961
.build().toByteArray();
@@ -62,18 +64,14 @@ public void updateHeartbeat(@Nonnull FDBRecordStore store, @Nonnull Index index)
6264

6365
public CompletableFuture<Void> checkAndUpdateHeartbeat(@Nonnull FDBRecordStore store, @Nonnull Index index) {
6466
// complete exceptionally if non-mutual, other exists
65-
switch (indexingMethod) {
66-
case SCRUB_REPAIR:
67-
case MUTUAL_BY_RECORDS:
68-
updateHeartbeat(store, index);
69-
return AsyncUtil.DONE;
67+
if (allowMutual) {
68+
updateHeartbeat(store, index);
69+
return AsyncUtil.DONE;
70+
}
7071

71-
case BY_RECORDS:
72-
case MULTI_TARGET_BY_RECORDS:
73-
case BY_INDEX:
74-
final AsyncIterator<KeyValue> iterator = heartbeatsIterator(store, index);
75-
final long now = nowMilliseconds();
76-
return AsyncUtil.whileTrue(() -> iterator.onHasNext()
72+
final AsyncIterator<KeyValue> iterator = heartbeatsIterator(store, index);
73+
final long now = nowMilliseconds();
74+
return AsyncUtil.whileTrue(() -> iterator.onHasNext()
7775
.thenApply(hasNext -> {
7876
if (!hasNext) {
7977
return false;
@@ -85,6 +83,7 @@ public CompletableFuture<Void> checkAndUpdateHeartbeat(@Nonnull FDBRecordStore s
8583
final IndexBuildProto.IndexBuildHeartbeat otherHeartbeat = IndexBuildProto.IndexBuildHeartbeat.parseFrom(kv.getValue());
8684
final long age = now - otherHeartbeat.getHeartbeatTimeMilliseconds();
8785
if (age > 0 && age < leaseLength) {
86+
// For practical reasons, this exception is backward compatible to the Synchronized Lock one
8887
throw new SynchronizedSessionLockedException("Failed to initialize the session because of an existing session in progress")
8988
.addLogInfo(LogMessageKeys.INDEXER_ID, indexerId)
9089
.addLogInfo(LogMessageKeys.EXISTING_INDEXER_ID, otherIndexerId)
@@ -97,21 +96,20 @@ public CompletableFuture<Void> checkAndUpdateHeartbeat(@Nonnull FDBRecordStore s
9796
}
9897
return true;
9998
}))
100-
.thenApply(ignore -> {
101-
updateHeartbeat(store, index);
102-
return null;
103-
});
104-
105-
default:
106-
throw new IndexingBase.ValidationException("invalid indexing method",
107-
LogMessageKeys.INDEXING_METHOD, indexingMethod);
108-
}
99+
.thenApply(ignore -> {
100+
updateHeartbeat(store, index);
101+
return null;
102+
});
109103
}
110104

111105
public void clearHeartbeat(@Nonnull FDBRecordStore store, @Nonnull Index index) {
112106
store.ensureContextActive().clear(IndexingSubspaces.indexheartbeatSubspace(store, index, indexerId).pack());
113107
}
114108

109+
public static void clearAllHeartbeats(@Nonnull FDBRecordStore store, @Nonnull Index index) {
110+
store.ensureContextActive().clear(IndexingSubspaces.indexheartbeatSubspace(store, index).range());
111+
}
112+
115113
public static CompletableFuture<Map<UUID, IndexBuildProto.IndexBuildHeartbeat>> getIndexingHeartbeats(FDBRecordStore store, Index index, int maxCount) {
116114
final Map<UUID, IndexBuildProto.IndexBuildHeartbeat> ret = new HashMap<>();
117115
final AsyncIterator<KeyValue> iterator = heartbeatsIterator(store, index);
@@ -130,9 +128,9 @@ public static CompletableFuture<Map<UUID, IndexBuildProto.IndexBuildHeartbeat>>
130128
final IndexBuildProto.IndexBuildHeartbeat otherHeartbeat = IndexBuildProto.IndexBuildHeartbeat.parseFrom(kv.getValue());
131129
ret.put(otherIndexerId, otherHeartbeat);
132130
} catch (InvalidProtocolBufferException e) {
133-
// put a NONE heartbeat to indicate an invalid item
131+
// Let the caller know about this invalid heartbeat.
134132
ret.put(otherIndexerId, IndexBuildProto.IndexBuildHeartbeat.newBuilder()
135-
.setMethod(IndexBuildProto.IndexBuildIndexingStamp.Method.NONE)
133+
.setInfo("<< Invalid Heartbeat >>")
136134
.build());
137135
}
138136
return true;

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexOperationConfig.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,19 @@ public static Builder newBuilder() {
186186
return new Builder();
187187
}
188188

189+
190+
/**
191+
* Not used anymore.
192+
* @return always false;
193+
* @deprecated see {@link Builder#setUseSynchronizedSession(boolean)}
194+
*/
195+
@API(API.Status.DEPRECATED)
196+
@SuppressWarnings("PMD.AvoidUsingHardCodedIP") // version is not IP
197+
@Deprecated(since = "4.4.3.0", forRemoval = true)
198+
public boolean shouldUseSynchronizedSession() {
199+
return false;
200+
}
201+
189202
public long getLeaseLengthMillis() {
190203
return leaseLengthMillis;
191204
}
@@ -496,8 +509,9 @@ public Builder setUseSynchronizedSession(boolean useSynchronizedSession) {
496509
}
497510

498511
/**
499-
* Set the lease length in milliseconds if the synchronized session is used. By default this is {@link #DEFAULT_LEASE_LENGTH_MILLIS}.
500-
* @see com.apple.foundationdb.synchronizedsession.SynchronizedSession
512+
* If the indexing session is not expected to be mutual, abort indexing if another session is active. This function
513+
* defines the maximum age of another session's heartbeat to be considered an "active session".
514+
* The default value is {@link #DEFAULT_LEASE_LENGTH_MILLIS}.
501515
* @param leaseLengthMillis length between last access and lease's end time in milliseconds
502516
* @return this builder
503517
*/

fdb-record-layer-core/src/main/proto/index_build.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ message IndexBuildIndexingStamp {
4747
}
4848

4949
message IndexBuildHeartbeat {
50-
required IndexBuildIndexingStamp.Method method = 1;
50+
required string info = 1;
5151
required int64 genesisTimeMilliseconds = 2;
5252
required int64 heartbeatTimeMilliseconds = 3;
5353
}

0 commit comments

Comments
 (0)