Skip to content

Commit 689e95d

Browse files
committed
Reroute code complete
1 parent 7380179 commit 689e95d

File tree

7 files changed

+166
-6
lines changed

7 files changed

+166
-6
lines changed

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
6565
entry(RegisteredDomainProcessor.TYPE, new RegisteredDomainProcessor.Factory()),
6666
entry(RemoveProcessor.TYPE, new RemoveProcessor.Factory(parameters.scriptService)),
6767
entry(RenameProcessor.TYPE, new RenameProcessor.Factory(parameters.scriptService)),
68-
entry(RerouteProcessor.TYPE, new RerouteProcessor.Factory()),
68+
entry(RerouteProcessor.TYPE, new RerouteProcessor.Factory(parameters.ingestService)),
6969
entry(ScriptProcessor.TYPE, new ScriptProcessor.Factory(parameters.scriptService)),
7070
entry(SetProcessor.TYPE, new SetProcessor.Factory(parameters.scriptService)),
7171
entry(SortProcessor.TYPE, new SortProcessor.Factory()),

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RerouteProcessor.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,14 @@
1010
package org.elasticsearch.ingest.common;
1111

1212
import org.elasticsearch.cluster.metadata.ProjectId;
13+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
14+
import org.elasticsearch.cluster.service.ClusterService;
15+
import org.elasticsearch.common.streams.StreamsPermissionsUtils;
1316
import org.elasticsearch.core.Nullable;
1417
import org.elasticsearch.ingest.AbstractProcessor;
1518
import org.elasticsearch.ingest.ConfigurationUtils;
1619
import org.elasticsearch.ingest.IngestDocument;
20+
import org.elasticsearch.ingest.IngestService;
1721
import org.elasticsearch.ingest.Processor;
1822

1923
import java.util.List;
@@ -45,14 +49,20 @@ public final class RerouteProcessor extends AbstractProcessor {
4549
private final List<DataStreamValueSource> dataset;
4650
private final List<DataStreamValueSource> namespace;
4751
private final String destination;
52+
private final ClusterService clusterService;
53+
private final ProjectId projectId;
54+
private final StreamsPermissionsUtils streamsPermissionsUtils;
4855

4956
RerouteProcessor(
5057
String tag,
5158
String description,
5259
List<DataStreamValueSource> type,
5360
List<DataStreamValueSource> dataset,
5461
List<DataStreamValueSource> namespace,
55-
String destination
62+
String destination,
63+
ClusterService clusterService,
64+
ProjectId projectId,
65+
StreamsPermissionsUtils streamsPermissionsUtils
5666
) {
5767
super(tag, description);
5868
if (type.isEmpty()) {
@@ -71,11 +81,16 @@ public final class RerouteProcessor extends AbstractProcessor {
7181
this.namespace = namespace;
7282
}
7383
this.destination = destination;
84+
this.clusterService = clusterService;
85+
this.projectId = projectId;
86+
this.streamsPermissionsUtils = streamsPermissionsUtils;
7487
}
7588

7689
@Override
7790
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
7891
if (destination != null) {
92+
ProjectMetadata projectMetadata = clusterService.state().projectState(projectId).metadata();
93+
streamsPermissionsUtils.throwIfRetrouteToSubstreamNotAllowed(projectMetadata, ingestDocument.getIndexHistory(), destination);
7994
ingestDocument.reroute(destination);
8095
return ingestDocument;
8196
}
@@ -171,6 +186,12 @@ String getDestination() {
171186

172187
public static final class Factory implements Processor.Factory {
173188

189+
private final IngestService ingestService;
190+
191+
public Factory(IngestService ingestService) {
192+
this.ingestService = ingestService;
193+
}
194+
174195
@Override
175196
public RerouteProcessor create(
176197
Map<String, Processor.Factory> processorFactories,
@@ -212,7 +233,17 @@ public RerouteProcessor create(
212233
throw newConfigurationException(TYPE, tag, "destination", "can only be set if type, dataset, and namespace are not set");
213234
}
214235

215-
return new RerouteProcessor(tag, description, type, dataset, namespace, destination);
236+
return new RerouteProcessor(
237+
tag,
238+
description,
239+
type,
240+
dataset,
241+
namespace,
242+
destination,
243+
ingestService.getClusterService(),
244+
projectId,
245+
StreamsPermissionsUtils.getInstance()
246+
);
216247
}
217248
}
218249

modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorFactoryTests.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
package org.elasticsearch.ingest.common;
1111

1212
import org.elasticsearch.ElasticsearchParseException;
13+
import org.elasticsearch.cluster.metadata.ProjectId;
14+
import org.elasticsearch.ingest.IngestService;
1315
import org.elasticsearch.ingest.common.RerouteProcessor.DataStreamValueSource;
1416
import org.elasticsearch.test.ESTestCase;
1517

@@ -18,6 +20,8 @@
1820
import java.util.Map;
1921

2022
import static org.hamcrest.Matchers.equalTo;
23+
import static org.mockito.Answers.RETURNS_SMART_NULLS;
24+
import static org.mockito.Mockito.mock;
2125

2226
public class RerouteProcessorFactoryTests extends ESTestCase {
2327

@@ -74,6 +78,7 @@ private static RerouteProcessor create(String dataset, String namespace) throws
7478
}
7579

7680
private static RerouteProcessor create(Map<String, Object> config) throws Exception {
77-
return new RerouteProcessor.Factory().create(null, null, null, new HashMap<>(config), null);
81+
IngestService ingestService = mock(IngestService.class, RETURNS_SMART_NULLS);
82+
return new RerouteProcessor.Factory(ingestService).create(null, null, null, new HashMap<>(config), ProjectId.DEFAULT);
7883
}
7984
}

modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorTests.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,45 @@
99

1010
package org.elasticsearch.ingest.common;
1111

12+
import org.elasticsearch.cluster.ClusterState;
13+
import org.elasticsearch.cluster.metadata.ProjectId;
14+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
15+
import org.elasticsearch.cluster.service.ClusterService;
16+
import org.elasticsearch.common.streams.StreamsPermissionsUtils;
1217
import org.elasticsearch.ingest.CompoundProcessor;
1318
import org.elasticsearch.ingest.IngestDocument;
1419
import org.elasticsearch.ingest.Processor;
1520
import org.elasticsearch.ingest.RandomDocumentPicks;
1621
import org.elasticsearch.ingest.TestProcessor;
1722
import org.elasticsearch.ingest.WrappingProcessor;
1823
import org.elasticsearch.test.ESTestCase;
24+
import org.junit.Before;
1925

2026
import java.util.List;
2127
import java.util.Map;
2228

2329
import static org.hamcrest.Matchers.equalTo;
30+
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
31+
import static org.mockito.Mockito.any;
32+
import static org.mockito.Mockito.anyString;
33+
import static org.mockito.Mockito.doNothing;
34+
import static org.mockito.Mockito.mock;
35+
import static org.mockito.Mockito.when;
2436

2537
public class RerouteProcessorTests extends ESTestCase {
2638

39+
private final StreamsPermissionsUtils streamsPermissionsUtilsMock = mock(StreamsPermissionsUtils.class);
40+
private final ClusterService clusterServiceMock = mock(ClusterService.class, RETURNS_DEEP_STUBS);
41+
42+
@Before
43+
public void setUpStreamsPermissionsUtils() {
44+
ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE)
45+
.putProjectMetadata(ProjectMetadata.builder(ProjectId.DEFAULT).build())
46+
.build();
47+
when(clusterServiceMock.state()).thenReturn(clusterState);
48+
doNothing().when(streamsPermissionsUtilsMock).throwIfRetrouteToSubstreamNotAllowed(any(), any(), anyString());
49+
}
50+
2751
public void testDefaults() throws Exception {
2852
IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
2953

@@ -291,12 +315,25 @@ private RerouteProcessor createRerouteProcessor(List<String> type, List<String>
291315
type.stream().map(RerouteProcessor.DataStreamValueSource::type).toList(),
292316
dataset.stream().map(RerouteProcessor.DataStreamValueSource::dataset).toList(),
293317
namespace.stream().map(RerouteProcessor.DataStreamValueSource::namespace).toList(),
294-
null
318+
null,
319+
clusterServiceMock,
320+
ProjectId.DEFAULT,
321+
streamsPermissionsUtilsMock
295322
);
296323
}
297324

298325
private RerouteProcessor createRerouteProcessor(String destination) {
299-
return new RerouteProcessor(null, null, List.of(), List.of(), List.of(), destination);
326+
return new RerouteProcessor(
327+
null,
328+
null,
329+
List.of(),
330+
List.of(),
331+
List.of(),
332+
destination,
333+
clusterServiceMock,
334+
ProjectId.DEFAULT,
335+
streamsPermissionsUtilsMock
336+
);
300337
}
301338

302339
private void assertDataSetFields(IngestDocument ingestDocument, String type, String dataset, String namespace) {

server/src/main/java/module-info.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@
213213
exports org.elasticsearch.common.regex;
214214
exports org.elasticsearch.common.scheduler;
215215
exports org.elasticsearch.common.settings;
216+
exports org.elasticsearch.common.streams;
216217
exports org.elasticsearch.common.text;
217218
exports org.elasticsearch.common.time;
218219
exports org.elasticsearch.common.transport;
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.common.streams;
11+
12+
public enum StreamTypes {
13+
14+
LOGS("logs");
15+
16+
private final String streamName;
17+
18+
StreamTypes(String streamName) {
19+
this.streamName = streamName;
20+
}
21+
22+
public String getStreamName() {
23+
return streamName;
24+
}
25+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.common.streams;
11+
12+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
13+
import org.elasticsearch.cluster.metadata.StreamsMetadata;
14+
15+
import java.util.Set;
16+
17+
public class StreamsPermissionsUtils {
18+
19+
private static volatile StreamsPermissionsUtils INSTANCE = null;
20+
21+
// Visible for testing only
22+
StreamsPermissionsUtils() {}
23+
24+
public static StreamsPermissionsUtils getInstance() {
25+
if (INSTANCE == null) {
26+
synchronized (StreamsPermissionsUtils.class) {
27+
if (INSTANCE == null) {
28+
INSTANCE = new StreamsPermissionsUtils();
29+
}
30+
}
31+
}
32+
return INSTANCE;
33+
}
34+
35+
public void throwIfRetrouteToSubstreamNotAllowed(ProjectMetadata projectMetadata, Set<String> indexHistory, String destination)
36+
throws IllegalArgumentException {
37+
for (StreamTypes streamType : StreamTypes.values()) {
38+
String streamName = streamType.getStreamName();
39+
if (streamTypeIsEnabled(streamType, projectMetadata)
40+
&& destination.startsWith(streamName + ".")
41+
&& indexHistory.contains(streamName) == false) {
42+
throw new IllegalArgumentException(
43+
"Cannot reroute to substream ["
44+
+ destination
45+
+ "] as only the stream itself can reroute to substreams. "
46+
+ "Please reroute to the stream ["
47+
+ streamName
48+
+ "] instead."
49+
);
50+
}
51+
}
52+
}
53+
54+
public boolean streamTypeIsEnabled(StreamTypes streamType, ProjectMetadata projectMetadata) {
55+
StreamsMetadata metadata = projectMetadata.custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY);
56+
return switch (streamType) {
57+
case LOGS -> metadata.isLogsEnabled();
58+
};
59+
}
60+
61+
}

0 commit comments

Comments
 (0)