Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,11 @@ public ClusterState executeTask(

final var rolloverIndexName = rolloverResult.rolloverIndexName();
final var sourceIndexName = rolloverResult.sourceIndexName();
logger.info(
"rolling over data stream [{}] to index [{}] because it was marked for lazy rollover",
dataStream.getName(),
rolloverIndexName
);

final var waitForActiveShardsTimeout = rolloverRequest.masterNodeTimeout().millis() < 0
? null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,44 +212,17 @@ protected void masterOperation(
final String trialRolloverIndexName = trialRolloverNames.rolloverName();
MetadataCreateIndexService.validateIndexName(trialRolloverIndexName, metadata, clusterState.routingTable());

boolean isDataStream = metadata.dataStreams().containsKey(resolvedRolloverTarget.resource());
if (rolloverRequest.isLazy()) {
if (isDataStream == false || rolloverRequest.getConditions().hasConditions()) {
String message;
if (isDataStream) {
message = "Lazy rollover can be used only without any conditions."
+ " Please remove the conditions from the request body or the query parameter 'lazy'.";
} else if (rolloverRequest.getConditions().hasConditions() == false) {
message = "Lazy rollover can be applied only on a data stream." + " Please remove the query parameter 'lazy'.";
} else {
message = "Lazy rollover can be applied only on a data stream with no conditions."
+ " Please remove the query parameter 'lazy'.";
}
listener.onFailure(new IllegalArgumentException(message));
return;
}
if (rolloverRequest.isDryRun() == false) {
metadataDataStreamsService.setRolloverOnWrite(
resolvedRolloverTarget.resource(),
true,
targetFailureStore,
rolloverRequest.ackTimeout(),
rolloverRequest.masterNodeTimeout(),
listener.map(
response -> new RolloverResponse(
trialSourceIndexName,
trialRolloverIndexName,
Map.of(),
false,
false,
response.isAcknowledged(),
false,
response.isAcknowledged()
)
)
);
return;
}
markForLazyRollover(
rolloverRequest,
listener,
metadata,
resolvedRolloverTarget,
targetFailureStore,
trialSourceIndexName,
trialRolloverIndexName
);
return;
}

final IndexAbstraction rolloverTargetAbstraction = clusterState.metadata()
Expand Down Expand Up @@ -345,7 +318,7 @@ protected void masterOperation(
false,
false,
false,
rolloverRequest.isLazy()
false
);

// If this is a dry run, return with the results without invoking a cluster state update
Expand Down Expand Up @@ -373,6 +346,57 @@ protected void masterOperation(
);
}

private void markForLazyRollover(
RolloverRequest rolloverRequest,
ActionListener<RolloverResponse> listener,
Metadata metadata,
ResolvedExpression resolvedRolloverTarget,
boolean targetFailureStore,
String trialSourceIndexName,
String trialRolloverIndexName
) {
boolean isDataStream = metadata.dataStreams().containsKey(resolvedRolloverTarget.resource());
if (isDataStream == false || rolloverRequest.getConditions().hasConditions()) {
String message;
if (isDataStream) {
message = "Lazy rollover can be used only without any conditions."
+ " Please remove the conditions from the request body or the query parameter 'lazy'.";
} else if (rolloverRequest.getConditions().hasConditions() == false) {
message = "Lazy rollover can be applied only on a data stream. Please remove the query parameter 'lazy'.";
} else {
message = "Lazy rollover can be applied only on a data stream with no conditions."
+ " Please remove the query parameter 'lazy'.";
}
listener.onFailure(new IllegalArgumentException(message));
return;
}
if (rolloverRequest.isDryRun()) {
listener.onResponse(
new RolloverResponse(trialSourceIndexName, trialRolloverIndexName, Map.of(), true, false, false, false, true)
);
return;
}
metadataDataStreamsService.setRolloverOnWrite(
resolvedRolloverTarget.resource(),
true,
targetFailureStore,
rolloverRequest.ackTimeout(),
rolloverRequest.masterNodeTimeout(),
listener.map(
response -> new RolloverResponse(
trialSourceIndexName,
trialRolloverIndexName,
Map.of(),
false,
false,
response.isAcknowledged(),
false,
true
)
)
);
}

private void initializeFailureStore(
RolloverRequest rolloverRequest,
ActionListener<RolloverResponse> listener,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,19 +466,39 @@ public void testLazyRollover() throws Exception {
mockMetadataDataStreamService,
dataStreamAutoShardingService
);
final PlainActionFuture<RolloverResponse> future = new PlainActionFuture<>();
RolloverRequest rolloverRequest = new RolloverRequest("logs-ds", null);
rolloverRequest.lazy(true);
transportRolloverAction.masterOperation(mock(CancellableTask.class), rolloverRequest, stateBefore, future);
RolloverResponse rolloverResponse = future.actionGet();
assertThat(rolloverResponse.getOldIndex(), equalTo(".ds-logs-ds-000001"));
assertThat(rolloverResponse.getNewIndex(), Matchers.startsWith(".ds-logs-ds-"));
assertThat(rolloverResponse.getNewIndex(), Matchers.endsWith("-000002"));
assertThat(rolloverResponse.isLazy(), equalTo(true));
assertThat(rolloverResponse.isDryRun(), equalTo(false));
assertThat(rolloverResponse.isRolledOver(), equalTo(false));
assertThat(rolloverResponse.getConditionStatus().size(), equalTo(0));
assertThat(rolloverResponse.isAcknowledged(), is(true));
{
// Regular lazy rollover
final PlainActionFuture<RolloverResponse> future = new PlainActionFuture<>();
RolloverRequest rolloverRequest = new RolloverRequest("logs-ds", null);
rolloverRequest.lazy(true);
transportRolloverAction.masterOperation(mock(CancellableTask.class), rolloverRequest, stateBefore, future);
RolloverResponse rolloverResponse = future.actionGet();
assertThat(rolloverResponse.getOldIndex(), equalTo(".ds-logs-ds-000001"));
assertThat(rolloverResponse.getNewIndex(), Matchers.startsWith(".ds-logs-ds-"));
assertThat(rolloverResponse.getNewIndex(), Matchers.endsWith("-000002"));
assertThat(rolloverResponse.isLazy(), equalTo(true));
assertThat(rolloverResponse.isDryRun(), equalTo(false));
assertThat(rolloverResponse.isRolledOver(), equalTo(false));
assertThat(rolloverResponse.getConditionStatus().size(), equalTo(0));
assertThat(rolloverResponse.isAcknowledged(), is(true));
}
{
// Dry-run lazy rollover
final PlainActionFuture<RolloverResponse> future = new PlainActionFuture<>();
RolloverRequest rolloverRequest = new RolloverRequest("logs-ds", null);
rolloverRequest.lazy(true);
rolloverRequest.dryRun(true);
transportRolloverAction.masterOperation(mock(CancellableTask.class), rolloverRequest, stateBefore, future);
RolloverResponse rolloverResponse = future.actionGet();
assertThat(rolloverResponse.getOldIndex(), equalTo(".ds-logs-ds-000001"));
assertThat(rolloverResponse.getNewIndex(), Matchers.startsWith(".ds-logs-ds-"));
assertThat(rolloverResponse.getNewIndex(), Matchers.endsWith("-000002"));
assertThat(rolloverResponse.isLazy(), equalTo(true));
assertThat(rolloverResponse.isDryRun(), equalTo(true));
assertThat(rolloverResponse.isRolledOver(), equalTo(false));
assertThat(rolloverResponse.getConditionStatus().size(), equalTo(0));
assertThat(rolloverResponse.isAcknowledged(), is(false));
}
}

public void testLazyRolloverFails() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
Expand Down Expand Up @@ -840,9 +841,8 @@ public void onFailure(Exception e) {

void onRolloversBulkResponse(Collection<RolloverResponse> rolloverResponses) {
for (RolloverResponse rolloverResponse : rolloverResponses) {
if (rolloverResponse.isRolledOver() == false) {
logger.warn("rollover of the [{}] index [{}] failed", getOrigin(), rolloverResponse.getOldIndex());
}
assert rolloverResponse.isLazy() && rolloverResponse.isRolledOver() == false
: Strings.format("Expected rollover of the [%s] index [%s] to be lazy", getOrigin(), rolloverResponse.getOldIndex());
}
}

Expand Down