Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/132064.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 132064
summary: Only Allow Enabling Streams If No Conflicting Indices Exist
area: Data streams
type: enhancement
issues: []
2 changes: 1 addition & 1 deletion modules/streams/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ esplugin {

restResources {
restApi {
include '_common', 'streams', "bulk", "index", "ingest", "indices", "delete_by_query", "search"
include '_common', 'streams', 'bulk', 'index', 'ingest', 'indices', 'delete_by_query', 'search'
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
Expand All @@ -22,12 +23,15 @@
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.metadata.StreamsMetadata;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.streams.StreamType;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -75,19 +79,45 @@ protected void masterOperation(
LogsStreamsActivationToggleAction.Request request,
ClusterState state,
ActionListener<AcknowledgedResponse> listener
) throws Exception {
) {
ProjectId projectId = projectResolver.getProjectId();
StreamsMetadata streamsState = state.metadata().getProject(projectId).custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY);
ProjectMetadata projectMetadata = state.metadata().getProject(projectId);
StreamsMetadata streamsState = projectMetadata.custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY);
boolean currentlyEnabled = streamsState.isLogsEnabled();
boolean shouldEnable = request.shouldEnable();
if (shouldEnable != currentlyEnabled) {
StreamsMetadataUpdateTask updateTask = new StreamsMetadataUpdateTask(request, listener, projectId, shouldEnable);
String taskName = String.format(Locale.ROOT, "enable-streams-logs-[%s]", shouldEnable ? "enable" : "disable");
taskQueue.submitTask(taskName, updateTask, updateTask.timeout());
} else {

if (shouldEnable == currentlyEnabled) {
logger.debug("Logs streams are already in the requested state: {}", shouldEnable);
listener.onResponse(AcknowledgedResponse.TRUE);
return;
}

if (shouldEnable && logsIndexExists(projectMetadata)) {
listener.onFailure(
new ElasticsearchStatusException(
"Cannot enable logs streams: indices named 'logs' or starting with 'logs.' already exist.",
RestStatus.CONFLICT
)
);
return;
}

StreamsMetadataUpdateTask updateTask = new StreamsMetadataUpdateTask(request, listener, projectId, shouldEnable);
String taskName = String.format(Locale.ROOT, "enable-streams-logs-[%s]", shouldEnable ? "enable" : "disable");
taskQueue.submitTask(taskName, updateTask, updateTask.timeout());
}

private boolean logsIndexExists(ProjectMetadata projectMetadata) {
String logsStreamName = StreamType.LOGS.getStreamName();
String logsStreamPrefix = logsStreamName + ".";

for (String name : projectMetadata.getConcreteAllIndices()) {
if (name.equals(logsStreamName) || name.startsWith(logsStreamPrefix)) {
return true;
}
}

return false;
}

@Override
Expand All @@ -111,7 +141,7 @@ static class StreamsMetadataUpdateTask extends AckedClusterStateUpdateTask {
}

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
public ClusterState execute(ClusterState currentState) {
return currentState.copyAndUpdateProject(
projectId,
builder -> builder.putCustom(StreamsMetadata.TYPE, new StreamsMetadata(enabled))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
---
teardown:
- do:
streams.logs_disable: { }

---
"Basic toggle of logs state enable to disable and back":
- do:
Expand All @@ -24,6 +29,10 @@
streams.status: { }
- is_true: logs.enabled

- do:
streams.logs_disable: { }
- is_true: acknowledged

---
"Check for repeated toggle to same state":
- do:
Expand All @@ -41,3 +50,35 @@
- do:
streams.status: { }
- is_true: logs.enabled

- do:
streams.logs_disable: { }
- is_true: acknowledged

---
"Check streams can't be enabled with existing logs indices":
- do:
indices.create:
index: logs

- do:
catch: conflict
streams.logs_enable: { }
- match: { error.reason: "Cannot enable logs streams: indices named 'logs' or starting with 'logs.' already exist." }

- do:
indices.delete:
index: logs

- do:
indices.create:
index: logs.test

- do:
catch: conflict
streams.logs_enable: { }
- match: { error.reason: "Cannot enable logs streams: indices named 'logs' or starting with 'logs.' already exist." }

- do:
indices.delete:
index: logs.test
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
---
teardown:
- do:
streams.logs_disable: { }

---
"Check User Can't Write To Substream Directly":
- do:
Expand Down Expand Up @@ -71,6 +76,10 @@
- match: { items.0.index.error.type: "illegal_argument_exception" }
- match: { items.0.index.error.reason: "Pipeline [reroute-to-logs-foo] can't change the target index (from [bad-index] to [logs] child stream [logs.foo]) History: [bad-index]" }

- do:
indices.delete:
index: bad-index

---
"Check Bulk Index With Script Processor To Substream Is Rejected":
- do:
Expand Down Expand Up @@ -104,6 +113,10 @@
- match: { items.0.index.error.type: "illegal_argument_exception" }
- match: { items.0.index.error.reason: "Pipeline [script-to-logs-foo] can't change the target index (from [bad-index-script] to [logs] child stream [logs.foo]) History: [bad-index-script]" }

- do:
indices.delete:
index: bad-index-script

---
"Check Delete By Query Directly On Substream After Reroute Succeeds":
- do:
Expand Down Expand Up @@ -154,3 +167,7 @@
query:
match_all: {}
- match: { hits.total.value: 0 }

- do:
indices.delete:
index: logs
1 change: 1 addition & 0 deletions server/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@
exports org.elasticsearch.common.regex;
exports org.elasticsearch.common.scheduler;
exports org.elasticsearch.common.settings;
exports org.elasticsearch.common.streams;
exports org.elasticsearch.common.text;
exports org.elasticsearch.common.time;
exports org.elasticsearch.common.transport;
Expand Down