Skip to content

Commit 4b76a5e

Browse files
committed
wip
1 parent 9308dfb commit 4b76a5e

File tree

4 files changed

+114
-3
lines changed

4 files changed

+114
-3
lines changed

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexOperationConfig.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,19 @@ public static Builder newBuilder() {
186186
return new Builder();
187187
}
188188

189+
190+
/**
191+
* Not used anymore.
192+
* @return always false;
193+
* @deprecated see {@link Builder#setUseSynchronizedSession(boolean)}
194+
*/
195+
@API(API.Status.DEPRECATED)
196+
@SuppressWarnings("PMD.AvoidUsingHardCodedIP") // version is not IP
197+
@Deprecated(since = "4.4.3.0", forRemoval = true)
198+
public boolean shouldUseSynchronizedSession() {
199+
return false;
200+
}
201+
189202
public long getLeaseLengthMillis() {
190203
return leaseLengthMillis;
191204
}
@@ -496,8 +509,9 @@ public Builder setUseSynchronizedSession(boolean useSynchronizedSession) {
496509
}
497510

498511
/**
499-
* Set the lease length in milliseconds if the synchronized session is used. By default this is {@link #DEFAULT_LEASE_LENGTH_MILLIS}.
500-
* @see com.apple.foundationdb.synchronizedsession.SynchronizedSession
512+
* If the indexing session is not expected to be mutual, abort indexing if another session is active. This function
513+
* defines the maximum age of another session's heartbeat to be considered an "active session".
514+
* The default value is {@link #DEFAULT_LEASE_LENGTH_MILLIS}.
501515
* @param leaseLengthMillis length between last access and lease's end time in milliseconds
502516
* @return this builder
503517
*/

fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexerIndexFromIndexTest.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.apple.foundationdb.record.metadata.expressions.EmptyKeyExpression;
3030
import com.apple.foundationdb.record.metadata.expressions.GroupingKeyExpression;
3131
import com.apple.foundationdb.record.metadata.expressions.KeyExpression;
32+
import com.apple.foundationdb.synchronizedsession.SynchronizedSessionLockedException;
3233
import com.apple.test.BooleanSource;
3334
import com.google.common.collect.Comparators;
3435
import org.junit.jupiter.api.Test;
@@ -40,6 +41,9 @@
4041
import javax.annotation.Nullable;
4142
import java.util.List;
4243
import java.util.Map;
44+
import java.util.concurrent.Semaphore;
45+
import java.util.concurrent.TimeUnit;
46+
import java.util.concurrent.atomic.AtomicBoolean;
4347
import java.util.concurrent.atomic.AtomicLong;
4448
import java.util.stream.Collectors;
4549
import java.util.stream.IntStream;
@@ -1156,4 +1160,69 @@ void testIndexFromIndexBlock() {
11561160
assertReadable(tgtIndex);
11571161
scrubAndValidate(List.of(tgtIndex));
11581162
}
1163+
1164+
@Test
1165+
void testForbidConcurrentIndexFromIndexSessions() throws InterruptedException {
1166+
// Do not let a conversion of few indexes of an active multi-target session
1167+
final int numRecords = 59;
1168+
populateData(numRecords);
1169+
1170+
Index sourceIndex = new Index("src_index", field("num_value_2"), EmptyKeyExpression.EMPTY, IndexTypes.VALUE, IndexOptions.UNIQUE_OPTIONS);
1171+
openSimpleMetaData(metaDataBuilder -> metaDataBuilder.addIndex("MySimpleRecord", sourceIndex));
1172+
buildIndexClean(sourceIndex);
1173+
1174+
// Partly build index
1175+
Index tgtIndex = new Index("tgt_index", field("num_value_3_indexed"), IndexTypes.VALUE);
1176+
FDBRecordStoreTestBase.RecordMetaDataHook hook = myHook(sourceIndex, tgtIndex);
1177+
openSimpleMetaData(hook);
1178+
1179+
Semaphore pauseMutualBuildSemaphore = new Semaphore(1);
1180+
Semaphore startBuildingSemaphore = new Semaphore(1);
1181+
pauseMutualBuildSemaphore.acquire();
1182+
startBuildingSemaphore.acquire();
1183+
AtomicBoolean passed = new AtomicBoolean(false);
1184+
Thread t1 = new Thread(() -> {
1185+
// build index and pause halfway, allowing an active session test
1186+
try (OnlineIndexer indexBuilder = newIndexerBuilder(tgtIndex)
1187+
.setLeaseLengthMillis(TimeUnit.SECONDS.toMillis(20))
1188+
.setLimit(4)
1189+
.setIndexingPolicy(OnlineIndexer.IndexingPolicy.newBuilder()
1190+
.setSourceIndex("src_index")
1191+
.forbidRecordScan())
1192+
.setConfigLoader(old -> {
1193+
if (passed.get()) {
1194+
try {
1195+
startBuildingSemaphore.release();
1196+
pauseMutualBuildSemaphore.acquire(); // pause to try building indexes
1197+
} catch (InterruptedException e) {
1198+
throw new RuntimeException(e);
1199+
} finally {
1200+
pauseMutualBuildSemaphore.release();
1201+
}
1202+
} else {
1203+
passed.set(true);
1204+
}
1205+
return old;
1206+
})
1207+
.build()) {
1208+
indexBuilder.buildIndex();
1209+
}
1210+
});
1211+
t1.start();
1212+
startBuildingSemaphore.acquire();
1213+
startBuildingSemaphore.release();
1214+
// Try one index at a time
1215+
try (OnlineIndexer indexBuilder = newIndexerBuilder(tgtIndex)
1216+
.setIndexingPolicy(OnlineIndexer.IndexingPolicy.newBuilder()
1217+
.setSourceIndex("src_index")
1218+
.forbidRecordScan())
1219+
.build()) {
1220+
assertThrows(SynchronizedSessionLockedException.class, indexBuilder::buildIndex);
1221+
}
1222+
// let the other thread finish indexing
1223+
pauseMutualBuildSemaphore.release();
1224+
t1.join();
1225+
// happy indexes assertion
1226+
assertReadable(tgtIndex);
1227+
}
11591228
}

fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexerSimpleTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -887,4 +887,32 @@ public void runWithWeakReadSemantics() {
887887
fdb.setTrackLastSeenVersionOnRead(dbTracksReadVersionOnCommit);
888888
}
889889
}
890+
891+
@Test
892+
@SuppressWarnings("removal")
893+
void testDeprecatedSetUseSynchronizedSession() {
894+
List<TestRecords1Proto.MySimpleRecord> records = LongStream.range(0, 20).mapToObj(val ->
895+
TestRecords1Proto.MySimpleRecord.newBuilder().setRecNo(val).setNumValue2((int)val + 1).build()
896+
).collect(Collectors.toList());
897+
Index index = new Index("simple$value_2", field("num_value_2").ungrouped(), IndexTypes.SUM);
898+
FDBRecordStoreTestBase.RecordMetaDataHook hook = metaDataBuilder -> metaDataBuilder.addIndex("MySimpleRecord", index);
899+
900+
openSimpleMetaData();
901+
try (FDBRecordContext context = openContext()) {
902+
records.forEach(recordStore::saveRecord);
903+
context.commit();
904+
}
905+
906+
openSimpleMetaData(hook);
907+
disableAll(List.of(index));
908+
909+
openSimpleMetaData(hook);
910+
try (OnlineIndexer indexBuilder = newIndexerBuilder(index)
911+
.setUseSynchronizedSession(true)
912+
.build()) {
913+
indexBuilder.buildIndex();
914+
}
915+
916+
assertReadable(index);
917+
}
890918
}

fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexingHeartbeatTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ void testHeartbeatLowLevel() {
7676
Map<UUID, IndexBuildProto.IndexBuildHeartbeat> queried = indexer.getIndexingHeartbeats(0);
7777
Assertions.assertThat(queried).hasSize(count);
7878
Assertions.assertThat(queried.keySet())
79-
.containsExactlyInAnyOrderElementsOf(Arrays.stream(heartbeats).map(ht -> ht.indexerId).collect(Collectors.toList()));
79+
.containsExactlyInAnyOrderElementsOf(Arrays.stream(heartbeats).map(heartbeat -> heartbeat.indexerId).collect(Collectors.toList()));
8080

8181
// Query, partial
8282
queried = indexer.getIndexingHeartbeats(5);

0 commit comments

Comments
 (0)