-
Notifications
You must be signed in to change notification settings - Fork 111
Online Indexer: replace the synchronized runner with a heartbeat #3530
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
89d0263
to
f318f65
Compare
Each indexing session, for each index, will create a key-value heartbeat of the format: [prefix, xid] -> [indexing-type, genesis time, heartbeat time] Indexing session that are expected to be exclusive will throw an exception if another, active, session exists. Motivation: 1. Represent the heartbeat in every index during multi target indexing (currently - only the master index has a sync lock) 2. Keep a heartbeat during mutual indexing, which can allow better automatic decision making 3. Decide about exclusiveness according to the indexing method (currently - user input) Resolve FoundationDB#3529
a680857
to
83830c6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't get a chance to look at all of the code, we can discuss some of the things offline.
@@ -180,7 +178,6 @@ <M extends Message> void singleRebuild( | |||
if (!safeBuild) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does safeBuild
here mean in this context? Do we need to continue to prevent concurrency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The things that had left to non-safeBuild
, as it seems, are clearing and marking the index as write-only and setting constrains that, IIUIC, should not be happening after the previous clearing.
After so many changes in the code, maybe it is time to re-design (and possibly simplify) the singleRebuild
. Should that be a separate PR?
@@ -238,12 +235,6 @@ <M extends Message> void singleRebuild( | |||
}); | |||
} | |||
} | |||
if (safeBuild) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be replaced with checking that no heartbeats exist?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
I've added it unconditionally after any build.
private final long leaseLengthMillis; | ||
|
||
public static final long UNLIMITED_TIME = 0; | ||
|
||
OnlineIndexOperationConfig(int maxLimit, int initialLimit, int maxRetries, int recordsPerSecond, long progressLogIntervalMillis, int increaseLimitAfter, | ||
int maxWriteLimitBytes, long timeLimitMilliseconds, long transactionTimeLimitMilliseconds, | ||
boolean useSynchronizedSession, long leaseLengthMillis) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to leave the parameter around, and have it inform the heartbeat usage, such that what this actually means is "Do not allow two builds of this index at the same time"? Perhaps, only as a temporary change to reduce breaking changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not know the reasons for originally allowing the non-synchronized mode. I assume that it was either done to allow concurrent indexing of single ranges - which is irrelevant now.
Mutual indexing had been already ignoring this parameter for practical reasons, and concurrent "no sync lock" indexing methods that are optimized for exclusiveness will now throw a "sync exception" (deterministically on the newest indexer) instead of a vague "conflicts exception" (on a random indexer). FMHO, this is not something worth avoiding.
@@ -188,10 +186,6 @@ public static Builder newBuilder() { | |||
return new Builder(); | |||
} | |||
|
|||
public boolean shouldUseSynchronizedSession() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this stick around as a deprecated method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
(I assumed that this function is not really a part of the API. But it is probably harmless to keep it as a "just in case")
public Builder setUseSynchronizedSession(boolean useSynchronizedSession) { | ||
this.useSynchronizedSession = useSynchronizedSession; | ||
// no-op | ||
return this; | ||
} | ||
|
||
/** | ||
* Set the lease length in milliseconds if the synchronized session is used. By default this is {@link #DEFAULT_LEASE_LENGTH_MILLIS}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is no longer clear
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
if (!otherIndexerId.equals(this.indexerId)) { | ||
final IndexBuildProto.IndexBuildHeartbeat otherHeartbeat = IndexBuildProto.IndexBuildHeartbeat.parseFrom(kv.getValue()); | ||
final long age = now - otherHeartbeat.getHeartbeatTimeMilliseconds(); | ||
if (age > 0 && age < leaseLength) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this clear the heartbeat if it is expired?
Are you avoiding that to avoid conflicts by a bunch of workers discovering that one has expired?
Does the indexing process guarantee that they are cleared when it fails?
If the instance crashes does the heartbeat stick around until you complete an index build (successfully, or with an exception)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At this point I believe that every process clears its own heartbeat. Instance crashes should be too rare to cause significant DB space leak (or so I hope). If needed, a cleanup can be triggered by the user with the Indexer's new clearIndexingHeartbeats
API.
Saying that, we can clear all heartbeats in FDBRecordStore.clearReadableIndexBuildData
, which is triggered when the index becomes readable. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
Clearing all heartbeats with an index becomes readable.
} | ||
|
||
// Verify that the previous clear does not affect other index | ||
try (OnlineIndexer indexer = newIndexerBuilder(indexes.get(1)).build()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There should be low-level tests of updating leases, and ensuring that the heartbeat stays non-expired.
Also, that it can steal an expired lease. And that the original heartbeat will fail it lost its heartbeat. I think it would only fail if the stealing process is still running so perhaps covering it trying to continue while the stealer is still active, and if that has completed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
.thenCompose(store -> IndexingHeartbeat.getIndexingHeartbeats(store, common.getPrimaryIndex(), maxCount))); | ||
} | ||
|
||
public CompletableFuture<Integer> clearIndexingHeartbeats(long minAgenMilliseconds, int maxIteration) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this use ThrottledRetryingIterator
, removing the need for maxIteration
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IDK. The maxIteration
is not expected to be used unless something went wrong. Very wrong. Using the ThrottledRetryingIterator
might be an overkill.
class OnlineIndexingHeartbeatTest extends OnlineIndexerTest { | ||
|
||
@Test | ||
void testHeartbeatLowLevel() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to look at SynchronizedSessionTest
and make sure you are covering the same features, probably at a low-level and at a high level.
At first glance, beyond what I've already suggested, I see that it could benefit from testing that two different indexes can be built simultaneously regardless of indexer type.
@@ -1237,7 +1267,7 @@ public static class Builder { | |||
private DesiredAction ifReadable = DesiredAction.CONTINUE; | |||
private boolean doAllowUniquePendingState = false; | |||
private Set<TakeoverTypes> allowedTakeoverSet = null; | |||
private long checkIndexingStampFrequency = 60_000; | |||
private long checkIndexingStampFrequency = 10_000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why check the type stamp more often?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To align the default stamp & heartbeat checking with the default lease time.
Each indexing session, for each index, will create a key-value heartbeat of the format:
[prefix, xid] -> [indexing-type, genesis time, heartbeat time]
Indexing session that are expected to be exclusive will throw an exception if another, active, session exists.
Motivation:
1. Represent the heartbeat in every index during multi target indexing (currently - only the master index has a sync lock)
2. Keep a heartbeat during mutual indexing, which can allow better automatic decision making
3. Decide about exclusiveness according to the indexing method (currently - user input)
Note that with this change, the equivalent of a sync lock will be determined by the indexing type and cannot be set by the users. The index configuration function
setUseSynchronizedSession
will have no effect on the indexing process.Resolve #3529