Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/132064.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 132064
summary: Only Allow Enabling Streams If No Conflicting Indices Exist
area: Data streams
type: enhancement
issues: []
2 changes: 1 addition & 1 deletion modules/streams/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ esplugin {

restResources {
restApi {
include '_common', 'streams', "bulk", "index", "ingest", "indices", "delete_by_query", "search"
include '_common', 'streams', 'bulk', 'index', 'ingest', 'indices', 'delete_by_query', 'search'
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
Expand All @@ -22,12 +23,15 @@
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.metadata.StreamsMetadata;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.streams.StreamType;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -75,19 +79,45 @@ protected void masterOperation(
LogsStreamsActivationToggleAction.Request request,
ClusterState state,
ActionListener<AcknowledgedResponse> listener
) throws Exception {
) {
ProjectId projectId = projectResolver.getProjectId();
StreamsMetadata streamsState = state.metadata().getProject(projectId).custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY);
ProjectMetadata projectMetadata = state.metadata().getProject(projectId);
StreamsMetadata streamsState = projectMetadata.custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY);
boolean currentlyEnabled = streamsState.isLogsEnabled();
boolean shouldEnable = request.shouldEnable();
if (shouldEnable != currentlyEnabled) {
StreamsMetadataUpdateTask updateTask = new StreamsMetadataUpdateTask(request, listener, projectId, shouldEnable);
String taskName = String.format(Locale.ROOT, "enable-streams-logs-[%s]", shouldEnable ? "enable" : "disable");
taskQueue.submitTask(taskName, updateTask, updateTask.timeout());
} else {

if (shouldEnable == currentlyEnabled) {
logger.debug("Logs streams are already in the requested state: {}", shouldEnable);
listener.onResponse(AcknowledgedResponse.TRUE);
return;
}

if (shouldEnable && logsIndexExists(projectMetadata)) {
listener.onFailure(
new ElasticsearchStatusException(
"Cannot enable logs streams: indices named 'logs' or starting with 'logs.' already exist.",
RestStatus.CONFLICT
)
);
return;
}

StreamsMetadataUpdateTask updateTask = new StreamsMetadataUpdateTask(request, listener, projectId, shouldEnable);
String taskName = String.format(Locale.ROOT, "enable-streams-logs-[%s]", shouldEnable ? "enable" : "disable");
taskQueue.submitTask(taskName, updateTask, updateTask.timeout());
}

private boolean logsIndexExists(ProjectMetadata projectMetadata) {
String logsStreamName = StreamType.LOGS.getStreamName();
String logsStreamPrefix = logsStreamName + ".";

for (String name : projectMetadata.getConcreteAllIndices()) {
if (name.equals(logsStreamName) || name.startsWith(logsStreamPrefix)) {
return true;
}
}

return false;
}

@Override
Expand All @@ -111,7 +141,7 @@ static class StreamsMetadataUpdateTask extends AckedClusterStateUpdateTask {
}

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
public ClusterState execute(ClusterState currentState) {
return currentState.copyAndUpdateProject(
projectId,
builder -> builder.putCustom(StreamsMetadata.TYPE, new StreamsMetadata(enabled))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
---
teardown:
- do:
streams.logs_disable: { }

---
"Basic toggle of logs state enable to disable and back":
- do:
Expand Down Expand Up @@ -41,3 +46,31 @@
- do:
streams.status: { }
- is_true: logs.enabled

---
"Check streams can't be enabled with existing logs indices":
- do:
indices.create:
index: logs

- do:
catch: conflict
streams.logs_enable: { }
- match: { error.reason: "Cannot enable logs streams: indices named 'logs' or starting with 'logs.' already exist." }

- do:
indices.delete:
index: logs

- do:
indices.create:
index: logs.test

- do:
catch: conflict
streams.logs_enable: { }
- match: { error.reason: "Cannot enable logs streams: indices named 'logs' or starting with 'logs.' already exist." }

- do:
indices.delete:
index: logs.test
1 change: 1 addition & 0 deletions server/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@
exports org.elasticsearch.common.regex;
exports org.elasticsearch.common.scheduler;
exports org.elasticsearch.common.settings;
exports org.elasticsearch.common.streams;
exports org.elasticsearch.common.text;
exports org.elasticsearch.common.time;
exports org.elasticsearch.common.transport;
Expand Down