Conversation
# Conflicts: # modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStep.java # modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStepTests.java
There was a problem hiding this comment.
Pull request overview
This PR adds a clone step to the Data Lifecycle Management (DLM) system. The clone step creates a copy of an index with zero replicas to prepare it for force merging, optimizing resource usage during the force merge operation.
Changes:
- Implements
CloneStepthat clones indices with 0 replicas or marks indices that already have 0 replicas for force merging - Adds
MarkIndexToBeForceMergedActionto update cluster state metadata marking which index should be force merged - Adds comprehensive test coverage for the clone step functionality
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| CloneStep.java | Implements the core clone step logic including index cloning, metadata marking, and cleanup operations |
| MarkIndexToBeForceMergedAction.java | Defines the action and request/response structures for marking indices to be force merged |
| TransportMarkIndexToBeForceMergedAction.java | Implements the transport layer for the mark index action |
| CloneStepTests.java | Provides comprehensive unit tests for the clone step functionality |
| DataStreamsPlugin.java | Registers the new mark index action handler |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
Pinging @elastic/es-storage-engine (Team:StorageEngine) |
|
@elasticsearchmachine test this |
...reams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStep.java
Outdated
Show resolved
Hide resolved
| String cloneIndexName = generateCloneIndexName(indexName); | ||
| if (projectMetadata.indices().containsKey(cloneIndexName)) { | ||
| logger.info("DLM cleaning up clone index [{}] for index [{}] as it already exists.", cloneIndexName, indexName); | ||
| deleteCloneIndexIfExists(stepContext); |
There was a problem hiding this comment.
Could this cause an infinite loop?
If this step runs again while the clone is in progress (Which may happen as we keep re-running execute(..) until stepComplete(...) is true, relying on the action duplicators to prevent us actually making changes twice) won't it delete the currently cloning index then try and clone it again next cycle?
There was a problem hiding this comment.
Good catch, the deduplicator alone wouldn't fix this though if i understand correctly? Struggling to think of a solution here that allows us to clean up without a potential conflict
I also just realized i forgot to add the deduplicators on the other requests... let me fix that now!
There was a problem hiding this comment.
Leaving a note here that we discussed this offline and will be following up on this problem to discuss workarounds
There was a problem hiding this comment.
I added the timeout we discussed, figured the best thing to do if the timeout hasn't been reached is just return and let the next DLM run check again rather than have the step's thread waiting around and checking periodically or something
| public void onResponse(CreateIndexResponse createIndexResponse) { | ||
| logger.debug("DLM successfully cloned index [{}] to index [{}]", sourceIndexName, targetIndexName); | ||
| // on success, write the cloned index name to the custom metadata of the index metadata of original index | ||
| markIndexToBeForceMerged(sourceIndexName, targetIndexName, stepContext, listener); |
There was a problem hiding this comment.
I believe all actions need to be executed via the deduplicator. I'm also not sure what effect calling an action inside a listener vs callback would have on thread usage... Perhaps @dakrone can weigh in on this pattern?
There was a problem hiding this comment.
My mistake, i totally forgot to wrap the other actions in the deduplicator... will fix that
There was a problem hiding this comment.
I'm also not sure what effect calling an action inside a listener vs callback would have on thread usage
True, I can move this to a callback pattern
There was a problem hiding this comment.
We do have a SubscribableListener which can be used for this, where you can chain actions with the .andThen(…) method. I've seen it used multiple places, though I don't claim to be an expert in its use.
There was a problem hiding this comment.
I ended up adapting the way some other places in the codebase do this, e.g. TransportReindexAction, if I understand correctly the execute's should be async? The SubscribableListener didn't seem as common to me so I was a little apprehensive to use it. Maybe I need some more pointers on this though if there's problems with my approach, I'm finding elasticsearch's listeners to be a little confusing to work with to say the least...
dakrone
left a comment
There was a problem hiding this comment.
I haven't looked at the test cases yet, but I left some comments about the implementation so far, thanks for working on this Sean!
...reams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStep.java
Outdated
Show resolved
Hide resolved
...reams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStep.java
Outdated
Show resolved
Hide resolved
| if (indexRoutingTable == null) { | ||
| return false; | ||
| } | ||
| return indexRoutingTable.allPrimaryShardsActive(); |
There was a problem hiding this comment.
Do we have index metadata that links the two indices? Should we add a check for that here if we do?
There was a problem hiding this comment.
The first conditional in the chain (getIndexToBeForceMerged) should check for that, it checks whether there has been an index marked for force merge for the source index.
...reams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStep.java
Outdated
Show resolved
Hide resolved
...reams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStep.java
Outdated
Show resolved
Hide resolved
...reams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStep.java
Outdated
Show resolved
Hide resolved
...reams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStep.java
Outdated
Show resolved
Hide resolved
...reams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStep.java
Outdated
Show resolved
Hide resolved
...g/elasticsearch/datastreams/lifecycle/transitions/steps/MarkIndexForDLMForceMergeAction.java
Outdated
Show resolved
Hide resolved
...rg/elasticsearch/datastreams/lifecycle/transitions/steps/MarkIndexToBeForceMergedAction.java
Outdated
Show resolved
Hide resolved
lukewhiting
left a comment
There was a problem hiding this comment.
Solid progress :-) Just a few bugs left to iron out.
...reams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStep.java
Outdated
Show resolved
Hide resolved
| String cloneIndexName = getCloneIndexName(indexName); | ||
| IndexMetadata cloneIndexMetadata = stepContext.projectState().metadata().index(cloneIndexName); | ||
| long cloneCreationTime = cloneIndexMetadata.getCreationDate(); | ||
| long currentTime = System.currentTimeMillis(); |
There was a problem hiding this comment.
Generally I would avoid using System.currentTimeMillis and use a Clock instead as it will make testing these timeouts much easier.
There was a problem hiding this comment.
We should probably add a Clock to the step context then, so it's available to every step.
There was a problem hiding this comment.
We should probably add a Clock to the step context then, so it's available to every step.
Clock.systemUTC().millis() is a static method, would we still want to add a clock to the context?
...reams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStep.java
Outdated
Show resolved
Hide resolved
...reams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStep.java
Outdated
Show resolved
Hide resolved
...reams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStep.java
Outdated
Show resolved
Hide resolved
.../src/test/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStepTests.java
Outdated
Show resolved
Hide resolved
dakrone
left a comment
There was a problem hiding this comment.
I'm only partway through, but I wanted to leave some overall feedback to unblock you from continuing to work on this while I continue the rest of the review.
...search/datastreams/lifecycle/transitions/steps/TransportMarkIndexForDLMForceMergeAction.java
Outdated
Show resolved
Hide resolved
| * Request to mark an index to be force merged. | ||
| */ | ||
| public static class Request extends MasterNodeRequest<Request> { | ||
| private final ProjectId projectId; |
There was a problem hiding this comment.
When I see ProjectId here, it fires a mental alarm that we're passing through too much state. I mentioned elsewhere that the service should be the one that handles cluster state updates, but here we're passing through too much info. I think moving the logic into the service would be better.
There was a problem hiding this comment.
Is there an easier way to do this without moving transport logic to the service class as you mentioned?Since the transport action will need that projectId to do the block check.
If we move the logic from the transport action to the service it'd be easy to avoid this kind of awkwardness, but if we don't rely on the transport action, my impression was that it would be difficult to do the throttling/deduplication. If that's the case, do you think it's warranted to just do this in a hacky way?
...g/elasticsearch/datastreams/lifecycle/transitions/steps/MarkIndexForDLMForceMergeAction.java
Outdated
Show resolved
Hide resolved
...g/elasticsearch/datastreams/lifecycle/transitions/steps/MarkIndexForDLMForceMergeAction.java
Outdated
Show resolved
Hide resolved
...reams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStep.java
Outdated
Show resolved
Hide resolved
...reams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStep.java
Outdated
Show resolved
Hide resolved
...reams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStep.java
Outdated
Show resolved
Hide resolved
...reams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStep.java
Outdated
Show resolved
Hide resolved
...reams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStep.java
Show resolved
Hide resolved
...reams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStep.java
Outdated
Show resolved
Hide resolved
dakrone
left a comment
There was a problem hiding this comment.
I left more comments, and I'm happy to discuss if it helps!
...reams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStep.java
Outdated
Show resolved
Hide resolved
...reams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStep.java
Outdated
Show resolved
Hide resolved
| stepContext.executeDeduplicatedRequest( | ||
| MarkIndexForDLMForceMergeAction.TYPE.name(), | ||
| markIndexForForceMergeRequest, | ||
| Strings.format( | ||
| "DLM service encountered an error when trying to mark index [%s] to be force merged for source index [%s]", | ||
| indexToBeForceMerged, | ||
| sourceIndex | ||
| ), | ||
| (req, unused) -> markIndexToBeForceMergedCallback(markIndexForForceMergeRequest, stepContext, listener) | ||
| ); |
There was a problem hiding this comment.
You know, now that I think about this more, I'm not sure we actually need to make the cluster state update a transport action. We're really only making it a separate transport action so that we can use the deduplicator (and thus avoid issuing multiple cluster state updates). Really what we want is something that does something like:
stepContext.executeOnce(
<some-object-that-we-use-as-a-key>,
"error message because failure",
(unused, unused2) -> taskQueue.submitTask("mark-" + indexName + "-ready-for-merge", new MarkIndexAsForceMergeReady(indexName)));I wonder if we could/should add something to the stepContext that could allow us to do these without having to add the transport bits. Is it going to happen enough as a pattern (where we really want to execute something that is not a transport action, like a CS update) that we'd want to do it?
What do you think @seanzatzdev @lukewhiting ?
There was a problem hiding this comment.
Although, we'd also have to thread some kind of task queue through the context so that we could submit things to it from steps.
There was a problem hiding this comment.
I think we will have to do a fair bit of cluster state updates in the latter steps so adding transport actions for everything could get tedious/messy. Curious to hear Luke's thoughts
There was a problem hiding this comment.
Maybe I've just been looking at this too long, but do we actually need this marker in the cluster state? Since the index for force merge will either be a deterministically named clone or the original (if it has 0 replicas already) is that something we can just write a function to check for in later steps and avoid this extra write to cluster state?
There was a problem hiding this comment.
I wonder if we could/should add something to the stepContext that could allow us to do these without having to add the transport bits. Is it going to happen enough as a pattern (where we really want to execute something that is not a transport action, like a CS update) that we'd want to do it?
Without the transport action serving as a wrapper (and making use of the transport action deduplicator), is there an easy way to do the throttling? My impression from our previous meetings was that it'd be difficult to implement the throttling without the transport action deduplicator.
...reams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStep.java
Outdated
Show resolved
Hide resolved
...reams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStep.java
Outdated
Show resolved
Hide resolved
...g/elasticsearch/datastreams/lifecycle/transitions/steps/MarkIndexForDLMForceMergeAction.java
Show resolved
Hide resolved
...g/elasticsearch/datastreams/lifecycle/transitions/steps/MarkIndexForDLMForceMergeAction.java
Outdated
Show resolved
Hide resolved
...g/elasticsearch/datastreams/lifecycle/transitions/steps/MarkIndexForDLMForceMergeAction.java
Outdated
Show resolved
Hide resolved
...g/elasticsearch/datastreams/lifecycle/transitions/steps/MarkIndexForDLMForceMergeAction.java
Outdated
Show resolved
Hide resolved
...search/datastreams/lifecycle/transitions/steps/TransportMarkIndexForDLMForceMergeAction.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...g/elasticsearch/datastreams/lifecycle/transitions/steps/MarkIndexForDLMForceMergeAction.java
Show resolved
Hide resolved
...treams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java
Show resolved
Hide resolved
...g/elasticsearch/datastreams/lifecycle/transitions/steps/MarkIndexForDLMForceMergeAction.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...treams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java
Outdated
Show resolved
Hide resolved
...reams/src/main/java/org/elasticsearch/datastreams/lifecycle/transitions/steps/CloneStep.java
Show resolved
Hide resolved
...g/elasticsearch/datastreams/lifecycle/transitions/steps/MarkIndexForDLMForceMergeAction.java
Show resolved
Hide resolved
.../test/java/org/elasticsearch/datastreams/lifecycle/MarkIndexForDLMForceMergeActionTests.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Adds clone step to DLM
closes https://github.com/orgs/elastic/projects/2139/views/1?pane=issue&itemId=141184511&issue=elastic%7Celasticsearch-team%7C2111