1111
1212import  org .apache .logging .log4j .LogManager ;
1313import  org .apache .logging .log4j .Logger ;
14+ import  org .elasticsearch .ElasticsearchStatusException ;
1415import  org .elasticsearch .action .ActionListener ;
1516import  org .elasticsearch .action .support .ActionFilters ;
1617import  org .elasticsearch .action .support .master .AcknowledgedRequest ;
2223import  org .elasticsearch .cluster .block .ClusterBlockException ;
2324import  org .elasticsearch .cluster .block .ClusterBlockLevel ;
2425import  org .elasticsearch .cluster .metadata .ProjectId ;
26+ import  org .elasticsearch .cluster .metadata .ProjectMetadata ;
2527import  org .elasticsearch .cluster .metadata .StreamsMetadata ;
2628import  org .elasticsearch .cluster .project .ProjectResolver ;
2729import  org .elasticsearch .cluster .service .ClusterService ;
2830import  org .elasticsearch .cluster .service .MasterServiceTaskQueue ;
2931import  org .elasticsearch .common .Priority ;
32+ import  org .elasticsearch .common .streams .StreamType ;
3033import  org .elasticsearch .injection .guice .Inject ;
34+ import  org .elasticsearch .rest .RestStatus ;
3135import  org .elasticsearch .tasks .Task ;
3236import  org .elasticsearch .threadpool .ThreadPool ;
3337import  org .elasticsearch .transport .TransportService ;
@@ -75,19 +79,45 @@ protected void masterOperation(
7579        LogsStreamsActivationToggleAction .Request  request ,
7680        ClusterState  state ,
7781        ActionListener <AcknowledgedResponse > listener 
78-     ) throws   Exception   {
82+     ) {
7983        ProjectId  projectId  = projectResolver .getProjectId ();
80-         StreamsMetadata  streamsState  = state .metadata ().getProject (projectId ).custom (StreamsMetadata .TYPE , StreamsMetadata .EMPTY );
84+         ProjectMetadata  projectMetadata  = state .metadata ().getProject (projectId );
85+         StreamsMetadata  streamsState  = projectMetadata .custom (StreamsMetadata .TYPE , StreamsMetadata .EMPTY );
8186        boolean  currentlyEnabled  = streamsState .isLogsEnabled ();
8287        boolean  shouldEnable  = request .shouldEnable ();
83-         if  (shouldEnable  != currentlyEnabled ) {
84-             StreamsMetadataUpdateTask  updateTask  = new  StreamsMetadataUpdateTask (request , listener , projectId , shouldEnable );
85-             String  taskName  = String .format (Locale .ROOT , "enable-streams-logs-[%s]" , shouldEnable  ? "enable"  : "disable" );
86-             taskQueue .submitTask (taskName , updateTask , updateTask .timeout ());
87-         } else  {
88+ 
89+         if  (shouldEnable  == currentlyEnabled ) {
8890            logger .debug ("Logs streams are already in the requested state: {}" , shouldEnable );
8991            listener .onResponse (AcknowledgedResponse .TRUE );
92+             return ;
93+         }
94+ 
95+         if  (shouldEnable  && logsIndexExists (projectMetadata )) {
96+             listener .onFailure (
97+                 new  ElasticsearchStatusException (
98+                     "Cannot enable logs streams: indices named 'logs' or starting with 'logs.' already exist." ,
99+                     RestStatus .CONFLICT 
100+                 )
101+             );
102+             return ;
90103        }
104+ 
105+         StreamsMetadataUpdateTask  updateTask  = new  StreamsMetadataUpdateTask (request , listener , projectId , shouldEnable );
106+         String  taskName  = String .format (Locale .ROOT , "enable-streams-logs-[%s]" , shouldEnable  ? "enable"  : "disable" );
107+         taskQueue .submitTask (taskName , updateTask , updateTask .timeout ());
108+     }
109+ 
110+     private  boolean  logsIndexExists (ProjectMetadata  projectMetadata ) {
111+         String  logsStreamName  = StreamType .LOGS .getStreamName ();
112+         String  logsStreamPrefix  = logsStreamName  + "." ;
113+ 
114+         for  (String  name  : projectMetadata .getConcreteAllIndices ()) {
115+             if  (name .equals (logsStreamName ) || name .startsWith (logsStreamPrefix )) {
116+                 return  true ;
117+             }
118+         }
119+ 
120+         return  false ;
91121    }
92122
93123    @ Override 
@@ -111,7 +141,7 @@ static class StreamsMetadataUpdateTask extends AckedClusterStateUpdateTask {
111141        }
112142
113143        @ Override 
114-         public  ClusterState  execute (ClusterState  currentState ) throws   Exception   {
144+         public  ClusterState  execute (ClusterState  currentState ) {
115145            return  currentState .copyAndUpdateProject (
116146                projectId ,
117147                builder  -> builder .putCustom (StreamsMetadata .TYPE , new  StreamsMetadata (enabled ))
0 commit comments