Skip to content

Commit a2971ad

Browse files
committed
Add restricted param check to single doc endpoint
1 parent 6c04589 commit a2971ad

File tree

4 files changed

+83
-18
lines changed

4 files changed

+83
-18
lines changed

server/src/main/java/org/elasticsearch/action/ActionModule.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,7 @@ public class ActionModule extends AbstractModule {
459459
private final RequestValidators<IndicesAliasesRequest> indicesAliasesRequestRequestValidators;
460460
private final ReservedClusterStateService reservedClusterStateService;
461461
private final RestExtension restExtension;
462+
private final ClusterService clusterService;
462463

463464
public ActionModule(
464465
Settings settings,
@@ -534,6 +535,7 @@ public ActionModule(
534535
reservedProjectStateHandlers
535536
);
536537
this.restExtension = restExtension;
538+
this.clusterService = clusterService;
537539
}
538540

539541
private static <T> T getRestServerComponent(
@@ -927,9 +929,9 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster, Predicate<
927929
registerHandler.accept(new RestResolveClusterAction());
928930
registerHandler.accept(new RestResolveIndexAction());
929931

930-
registerHandler.accept(new RestIndexAction());
931-
registerHandler.accept(new CreateHandler());
932-
registerHandler.accept(new AutoIdHandler());
932+
registerHandler.accept(new RestIndexAction(clusterService, projectIdResolver));
933+
registerHandler.accept(new CreateHandler(clusterService, projectIdResolver));
934+
registerHandler.accept(new AutoIdHandler(clusterService, projectIdResolver));
933935
registerHandler.accept(new RestGetAction());
934936
registerHandler.accept(new RestGetSourceAction());
935937
registerHandler.accept(new RestMultiGetAction(settings));

server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.elasticsearch.indices.SystemIndices;
4343
import org.elasticsearch.ingest.IngestService;
4444
import org.elasticsearch.node.NodeClosedException;
45+
import org.elasticsearch.rest.action.document.RestBulkAction;
4546
import org.elasticsearch.tasks.Task;
4647
import org.elasticsearch.threadpool.ThreadPool;
4748
import org.elasticsearch.transport.TransportService;
@@ -412,14 +413,43 @@ private void applyPipelinesAndDoInternalExecute(
412413
while (bulkRequestModifier.hasNext()) {
413414
req = bulkRequestModifier.next();
414415
i++;
416+
doStreamsChecks(bulkRequest, projectMetadata, req, bulkRequestModifier, i);
417+
}
418+
419+
var wrappedListener = bulkRequestModifier.wrapActionListenerIfNeeded(listener);
415420

416-
for (StreamType streamType : StreamType.getEnabledStreamTypesForProject(projectMetadata)) {
417-
if (req instanceof IndexRequest ir && streamType.matchesStreamPrefix(req.index()) && ir.isPipelineResolved() == false) {
418-
IllegalArgumentException e = new IllegalArgumentException(
421+
if (applyPipelines(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener) == false) {
422+
doInternalExecute(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener, relativeStartTimeNanos);
423+
}
424+
}
425+
426+
private void doStreamsChecks(
427+
BulkRequest bulkRequest,
428+
ProjectMetadata projectMetadata,
429+
DocWriteRequest<?> req,
430+
BulkRequestModifier bulkRequestModifier,
431+
int i
432+
) {
433+
for (StreamType streamType : StreamType.getEnabledStreamTypesForProject(projectMetadata)) {
434+
if (req instanceof IndexRequest ir && ir.isPipelineResolved() == false) {
435+
IllegalArgumentException e = null;
436+
if (streamType.matchesStreamPrefix(req.index())) {
437+
e = new IllegalArgumentException(
419438
"Direct writes to child streams are prohibited. Index directly into the ["
420439
+ streamType.getStreamName()
421440
+ "] stream instead"
422441
);
442+
}
443+
444+
if (e == null && bulkRequest.streamsRestrictedParamsUsed() && req.index().equals(streamType.getStreamName())) {
445+
e = new IllegalArgumentException(
446+
"When writing to a stream, only the following parameters are allowed: ["
447+
+ String.join(",", RestBulkAction.STREAMS_ALLOWED_PARAMS)
448+
+ "]"
449+
);
450+
}
451+
452+
if (e != null) {
423453
Boolean failureStoreEnabled = resolveFailureStore(req.index(), projectMetadata, threadPool.absoluteTimeInMillis());
424454

425455
if (featureService.clusterHasFeature(clusterService.state(), DataStream.DATA_STREAM_FAILURE_STORE_FEATURE)) {
@@ -438,12 +468,6 @@ private void applyPipelinesAndDoInternalExecute(
438468
}
439469
}
440470
}
441-
442-
var wrappedListener = bulkRequestModifier.wrapActionListenerIfNeeded(listener);
443-
444-
if (applyPipelines(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener) == false) {
445-
doInternalExecute(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener, relativeStartTimeNanos);
446-
}
447471
}
448472

449473
/**

server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,12 @@
1515
import org.elasticsearch.action.index.IndexRequest;
1616
import org.elasticsearch.action.support.ActiveShardCount;
1717
import org.elasticsearch.client.internal.node.NodeClient;
18+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
19+
import org.elasticsearch.cluster.project.ProjectIdResolver;
20+
import org.elasticsearch.cluster.service.ClusterService;
1821
import org.elasticsearch.common.bytes.ReleasableBytesReference;
22+
import org.elasticsearch.common.streams.StreamType;
23+
import org.elasticsearch.common.util.set.Sets;
1924
import org.elasticsearch.index.VersionType;
2025
import org.elasticsearch.rest.BaseRestHandler;
2126
import org.elasticsearch.rest.RestRequest;
@@ -40,6 +45,13 @@ public class RestIndexAction extends BaseRestHandler {
4045
+ "index requests is deprecated, use the typeless endpoints instead (/{index}/_doc/{id}, /{index}/_doc, "
4146
+ "or /{index}/_create/{id}).";
4247
private final Set<String> capabilities = Set.of(FAILURE_STORE_STATUS_CAPABILITY);
48+
private final ClusterService clusterService;
49+
private final ProjectIdResolver projectIdResolver;
50+
51+
public RestIndexAction(ClusterService clusterService, ProjectIdResolver projectIdResolver) {
52+
this.clusterService = clusterService;
53+
this.projectIdResolver = projectIdResolver;
54+
}
4355

4456
@Override
4557
public List<Route> routes() {
@@ -54,6 +66,10 @@ public String getName() {
5466
@ServerlessScope(Scope.PUBLIC)
5567
public static final class CreateHandler extends RestIndexAction {
5668

69+
public CreateHandler(ClusterService clusterService, ProjectIdResolver projectIdResolver) {
70+
super(clusterService, projectIdResolver);
71+
}
72+
5773
@Override
5874
public String getName() {
5975
return "document_create_action";
@@ -81,7 +97,9 @@ static void validateOpType(String opType) {
8197
@ServerlessScope(Scope.PUBLIC)
8298
public static final class AutoIdHandler extends RestIndexAction {
8399

84-
public AutoIdHandler() {}
100+
public AutoIdHandler(ClusterService clusterService, ProjectIdResolver projectIdResolver) {
101+
super(clusterService, projectIdResolver);
102+
}
85103

86104
@Override
87105
public String getName() {
@@ -105,7 +123,28 @@ public RestChannelConsumer prepareRequest(RestRequest request, final NodeClient
105123
@Override
106124
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
107125
ReleasableBytesReference source = request.requiredContent();
108-
IndexRequest indexRequest = new IndexRequest(request.param("index"));
126+
127+
String index = request.param("index");
128+
ProjectMetadata projectMetadata = null;
129+
130+
for (StreamType streamType : StreamType.values()) {
131+
if (index.equals(streamType.getStreamName())) {
132+
if (projectMetadata == null) {
133+
projectMetadata = clusterService.state().projectState(projectIdResolver.getProjectId()).metadata();
134+
}
135+
136+
if (streamType.streamTypeIsEnabled(projectMetadata)
137+
&& Sets.difference(request.params().keySet(), RestBulkAction.STREAMS_ALLOWED_PARAMS).isEmpty() == false) {
138+
throw new IllegalArgumentException(
139+
"When writing to a stream, only the following parameters are allowed: ["
140+
+ String.join(",", RestBulkAction.STREAMS_ALLOWED_PARAMS)
141+
+ "]"
142+
);
143+
}
144+
}
145+
}
146+
147+
IndexRequest indexRequest = new IndexRequest(index);
109148
indexRequest.id(request.param("id"));
110149
indexRequest.routing(request.param("routing"));
111150
indexRequest.setPipeline(request.param("pipeline"));

server/src/test/java/org/elasticsearch/rest/action/document/RestIndexActionTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,13 @@ public final class RestIndexActionTests extends RestActionTestCase {
3737

3838
@Before
3939
public void setUpAction() {
40-
controller().registerHandler(new RestIndexAction());
41-
controller().registerHandler(new CreateHandler());
42-
controller().registerHandler(new AutoIdHandler());
40+
controller().registerHandler(new RestIndexAction(null, null));
41+
controller().registerHandler(new CreateHandler(null, null));
42+
controller().registerHandler(new AutoIdHandler(null, null));
4343
}
4444

4545
public void testCreateOpTypeValidation() {
46-
RestIndexAction.CreateHandler create = new CreateHandler();
46+
RestIndexAction.CreateHandler create = new CreateHandler(null, null);
4747

4848
String opType = randomFrom("CREATE", null);
4949
CreateHandler.validateOpType(opType);

0 commit comments

Comments
 (0)