Skip to content

Commit 4042137

Browse files
lukewhitingCopilotmasseyke
authored
ES-11331 streams params restriction (#132967)
* Add support for passing down if restricted params are used # Conflicts: # server/src/main/java/org/elasticsearch/TransportVersions.java # Conflicts: # server/src/main/java/org/elasticsearch/TransportVersions.java * Add restricted param check to single doc endpoint * Fixup YAML test * Add YAML tests for new function * Switch to more descriptive catch check * Additional tests and fixes for those tests * Update docs/changelog/132967.yaml * Apply suggestions from code review Co-authored-by: Copilot <[email protected]> * Move check lower down the stack and preserve param names for better error message Also whitelists ID and pretty as allowed params * Fix NPE due to missing params set during serialization * Add additional allowed params following product feedback --------- Co-authored-by: Copilot <[email protected]> Co-authored-by: Keith Massey <[email protected]>
1 parent d55a91b commit 4042137

File tree

12 files changed

+349
-25
lines changed

12 files changed

+349
-25
lines changed

docs/changelog/132967.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 132967
2+
summary: ES-11331 streams params restriction
3+
area: Data streams
4+
type: enhancement
5+
issues: []

modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,13 +142,16 @@ teardown:
142142
index.default_pipeline: "reroute-to-logs-foo-success"
143143
- do:
144144
bulk:
145-
refresh: true
146145
body: |
147146
{ "index": { "_index": "logs" } }
148147
{ "foo": "bar", "baz": "qux" }
149148
- match: { errors: false }
150149
- match: { items.0.index.status: 201 }
151150

151+
- do:
152+
indices.refresh:
153+
index: logs.foo
154+
152155
- do:
153156
delete_by_query:
154157
index: logs.foo
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
---
2+
teardown:
3+
- do:
4+
streams.logs_disable: { }
5+
6+
---
7+
"Check User Can't Use Restricted Params On Logs Endpoint":
8+
- do:
9+
streams.logs_enable: { }
10+
- is_true: acknowledged
11+
12+
- do:
13+
streams.status: { }
14+
- is_true: logs.enabled
15+
16+
- do:
17+
bulk:
18+
list_executed_pipelines: true
19+
body: |
20+
{ "index": { "_index": "logs"} }
21+
{ "foo": "bar" }
22+
{ "index": { "_index": "not-logs" } }
23+
{ "foo": "bar" }
24+
- match: { errors: true }
25+
- match: { items.0.index.status: 400 }
26+
- match: { items.0.index.error.type: "illegal_argument_exception" }
27+
- match: { items.0.index.error.reason: '/When\ writing\ to\ a\ stream\,\ only\ the\ following\ parameters\ are\ allowed\:\ \[.+\]\ however\ the\ following\ were\ used\:\ \[.+\]/' }
28+
- match: { items.1.index.status: 201 }
29+
30+
---
31+
"Check User Can't Use Restricted Params On Logs Endpoint - Single Doc":
32+
- do:
33+
streams.logs_enable: { }
34+
- is_true: acknowledged
35+
36+
- do:
37+
streams.status: { }
38+
- is_true: logs.enabled
39+
40+
- do:
41+
index:
42+
require_alias: true
43+
index: logs
44+
body: { "foo": "bar" }
45+
catch: '/When writing to a stream, only the following parameters are allowed: \[.+\] however the following were used: \[.+\]/'
46+
47+
---
48+
"Allowed Params Only - Bulk Should Succeed":
49+
- do:
50+
streams.logs_enable: { }
51+
- is_true: acknowledged
52+
53+
- do:
54+
bulk:
55+
error_trace: true
56+
timeout: "1m"
57+
body: |
58+
{ "index": { "_index": "logs"} }
59+
{ "foo": "bar" }
60+
- match: { errors: false }
61+
- match: { items.0.index.status: 201 }
62+
63+
---
64+
"Allowed Params Only - Single Doc Should Succeed":
65+
- do:
66+
streams.logs_enable: { }
67+
- is_true: acknowledged
68+
69+
- do:
70+
index:
71+
index: logs
72+
error_trace: true
73+
timeout: "1m"
74+
body: |
75+
{ "foo": "bar" }
76+
- match: { _index: "logs" }
77+
- match: { result: "created" }
78+
79+
---
80+
"Allowed Params Only - Single Doc with ID Should Succeed":
81+
- do:
82+
streams.logs_enable: { }
83+
- is_true: acknowledged
84+
85+
- do:
86+
index:
87+
index: logs
88+
id: 123456
89+
body: |
90+
{ "foo": "bar" }
91+
- match: { _index: "logs" }
92+
- match: { result: "created" }
93+
94+
---
95+
"No Params - Bulk Should Succeed":
96+
- do:
97+
streams.logs_enable: { }
98+
- is_true: acknowledged
99+
100+
- do:
101+
bulk:
102+
body: |
103+
{ "index": { "_index": "logs"} }
104+
{ "foo": "bar" }
105+
- match: { errors: false }
106+
- match: { items.0.index.status: 201 }
107+
108+
---
109+
"No Params - Single Doc Should Succeed":
110+
- do:
111+
streams.logs_enable: { }
112+
- is_true: acknowledged
113+
-
114+
do:
115+
index:
116+
index: logs
117+
body: { "foo": "bar" }
118+
- match: { _index: "logs" }
119+
- match: { result: "created" }
120+
121+
---
122+
"Mixed Allowed and Disallowed Params - Bulk Should Fail":
123+
- do:
124+
streams.logs_enable: { }
125+
- is_true: acknowledged
126+
127+
- do:
128+
bulk:
129+
error_trace: true
130+
timeout: "1m"
131+
routing: "custom-routing"
132+
refresh: true
133+
body: |
134+
{ "index": { "_index": "logs"} }
135+
{ "foo": "bar" }
136+
- match: { errors: true }
137+
- match: { items.0.index.status: 400 }
138+
- match: { items.0.index.error.type: "illegal_argument_exception" }
139+
- match: { items.0.index.error.reason: '/When\ writing\ to\ a\ stream\,\ only\ the\ following\ parameters\ are\ allowed\:\ \[.+\]\ however\ the\ following\ were\ used\:\ \[.+\]/' }
140+
141+
---
142+
"Mixed Allowed and Disallowed Params - Single Doc Should Fail":
143+
- do:
144+
streams.logs_enable: { }
145+
- is_true: acknowledged
146+
147+
- do:
148+
index:
149+
index: logs
150+
error_trace: true
151+
timeout: "1m"
152+
routing: "custom-routing"
153+
refresh: true
154+
body: { "foo": "bar" }
155+
catch: '/When writing to a stream, only the following parameters are allowed: \[.+\] however the following were used: \[.+\]/'
156+
157+
---
158+
"Multiple Disallowed Params - Bulk Should Fail":
159+
- do:
160+
streams.logs_enable: { }
161+
- is_true: acknowledged
162+
163+
- do:
164+
bulk:
165+
routing: "custom-routing"
166+
wait_for_active_shards: "2"
167+
refresh: true
168+
body: |
169+
{ "index": { "_index": "logs"} }
170+
{ "foo": "bar" }
171+
- match: { errors: true }
172+
- match: { items.0.index.status: 400 }
173+
- match: { items.0.index.error.type: "illegal_argument_exception" }
174+
- match: { items.0.index.error.reason: '/When\ writing\ to\ a\ stream\,\ only\ the\ following\ parameters\ are\ allowed\:\ \[.+\]\ however\ the\ following\ were\ used\:\ \[.+\]/' }
175+
176+
---
177+
"Multiple Disallowed Params - Single Doc Should Fail":
178+
- do:
179+
streams.logs_enable: { }
180+
- is_true: acknowledged
181+
182+
- do:
183+
index:
184+
index: logs
185+
routing: "custom-routing"
186+
pipeline: "my-pipeline"
187+
wait_for_active_shards: "2"
188+
refresh: true
189+
body: { "foo": "bar" }
190+
catch: '/When writing to a stream, only the following parameters are allowed: \[.+\] however the following were used: \[.+\]/'

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,7 @@ static TransportVersion def(int id) {
354354
public static final TransportVersion SHARD_WRITE_LOAD_IN_CLUSTER_INFO = def(9_126_0_00);
355355
public static final TransportVersion ESQL_SAMPLE_OPERATOR_STATUS = def(9_127_0_00);
356356
public static final TransportVersion PROJECT_RESERVED_STATE_MOVE_TO_REGISTRY = def(9_147_0_00);
357+
public static final TransportVersion STREAMS_ENDPOINT_PARAM_RESTRICTIONS = def(9_148_00_00);
357358

358359
/*
359360
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/ActionModule.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,7 @@ public class ActionModule extends AbstractModule {
459459
private final RequestValidators<IndicesAliasesRequest> indicesAliasesRequestRequestValidators;
460460
private final ReservedClusterStateService reservedClusterStateService;
461461
private final RestExtension restExtension;
462+
private final ClusterService clusterService;
462463

463464
public ActionModule(
464465
Settings settings,
@@ -534,6 +535,7 @@ public ActionModule(
534535
reservedProjectStateHandlers
535536
);
536537
this.restExtension = restExtension;
538+
this.clusterService = clusterService;
537539
}
538540

539541
private static <T> T getRestServerComponent(
@@ -927,9 +929,9 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster, Predicate<
927929
registerHandler.accept(new RestResolveClusterAction());
928930
registerHandler.accept(new RestResolveIndexAction());
929931

930-
registerHandler.accept(new RestIndexAction());
931-
registerHandler.accept(new CreateHandler());
932-
registerHandler.accept(new AutoIdHandler());
932+
registerHandler.accept(new RestIndexAction(clusterService, projectIdResolver));
933+
registerHandler.accept(new CreateHandler(clusterService, projectIdResolver));
934+
registerHandler.accept(new AutoIdHandler(clusterService, projectIdResolver));
933935
registerHandler.accept(new RestGetAction());
934936
registerHandler.accept(new RestGetSourceAction());
935937
registerHandler.accept(new RestMultiGetAction(settings));

server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.util.Objects;
4949
import java.util.Set;
5050

51+
import static java.util.Collections.emptySet;
5152
import static org.elasticsearch.action.ValidateActions.addValidationError;
5253

5354
/**
@@ -86,6 +87,7 @@ public class BulkRequest extends LegacyActionRequest
8687
private Boolean globalRequireAlias;
8788
private Boolean globalRequireDatsStream;
8889
private boolean includeSourceOnError = true;
90+
private Set<String> paramsUsed = emptySet();
8991

9092
private long sizeInBytes = 0;
9193

@@ -108,6 +110,9 @@ public BulkRequest(StreamInput in) throws IOException {
108110
if (in.getTransportVersion().onOrAfter(TransportVersions.INGEST_REQUEST_INCLUDE_SOURCE_ON_ERROR)) {
109111
includeSourceOnError = in.readBoolean();
110112
} // else default value is true
113+
if (in.getTransportVersion().onOrAfter(TransportVersions.STREAMS_ENDPOINT_PARAM_RESTRICTIONS)) {
114+
paramsUsed = in.readCollectionAsImmutableSet(StreamInput::readString);
115+
}
111116
}
112117

113118
public BulkRequest(@Nullable String globalIndex) {
@@ -474,6 +479,9 @@ public void writeTo(StreamOutput out) throws IOException {
474479
if (out.getTransportVersion().onOrAfter(TransportVersions.INGEST_REQUEST_INCLUDE_SOURCE_ON_ERROR)) {
475480
out.writeBoolean(includeSourceOnError);
476481
}
482+
if (out.getTransportVersion().onOrAfter(TransportVersions.STREAMS_ENDPOINT_PARAM_RESTRICTIONS)) {
483+
out.writeCollection(paramsUsed, StreamOutput::writeString);
484+
}
477485
}
478486

479487
@Override
@@ -516,6 +524,14 @@ public boolean isSimulated() {
516524
return false; // Always false, but may be overridden by a subclass
517525
}
518526

527+
public Set<String> requestParamsUsed() {
528+
return paramsUsed;
529+
}
530+
531+
public void requestParamsUsed(Set<String> paramsUsed) {
532+
this.paramsUsed = paramsUsed;
533+
}
534+
519535
/*
520536
* Returns any component template substitutions that are to be used as part of this bulk request. We would likely only have
521537
* substitutions in the event of a simulated request.
@@ -554,6 +570,7 @@ public BulkRequest shallowClone() {
554570
bulkRequest.routing(routing());
555571
bulkRequest.requireAlias(requireAlias());
556572
bulkRequest.requireDataStream(requireDataStream());
573+
bulkRequest.requestParamsUsed(requestParamsUsed());
557574
return bulkRequest;
558575
}
559576
}

server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,11 @@
2929
import java.util.Collections;
3030
import java.util.List;
3131
import java.util.Optional;
32+
import java.util.Set;
3233
import java.util.concurrent.atomic.AtomicBoolean;
3334
import java.util.function.Supplier;
3435

36+
import static java.util.Collections.emptySet;
3537
import static org.elasticsearch.common.settings.Setting.boolSetting;
3638

3739
public class IncrementalBulkService {
@@ -62,12 +64,17 @@ public IncrementalBulkService(Client client, IndexingPressure indexingPressure,
6264

6365
public Handler newBulkRequest() {
6466
ensureEnabled();
65-
return newBulkRequest(null, null, null);
67+
return newBulkRequest(null, null, null, emptySet());
6668
}
6769

68-
public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) {
70+
public Handler newBulkRequest(
71+
@Nullable String waitForActiveShards,
72+
@Nullable TimeValue timeout,
73+
@Nullable String refresh,
74+
Set<String> paramsUsed
75+
) {
6976
ensureEnabled();
70-
return new Handler(client, indexingPressure, waitForActiveShards, timeout, refresh, chunkWaitTimeMillisHistogram);
77+
return new Handler(client, indexingPressure, waitForActiveShards, timeout, refresh, chunkWaitTimeMillisHistogram, paramsUsed);
7178
}
7279

7380
private void ensureEnabled() {
@@ -105,6 +112,7 @@ public static class Handler implements Releasable {
105112
private final Client client;
106113
private final ActiveShardCount waitForActiveShards;
107114
private final TimeValue timeout;
115+
private final Set<String> paramsUsed;
108116
private final String refresh;
109117

110118
private final ArrayList<Releasable> releasables = new ArrayList<>(4);
@@ -125,12 +133,14 @@ protected Handler(
125133
@Nullable String waitForActiveShards,
126134
@Nullable TimeValue timeout,
127135
@Nullable String refresh,
128-
LongHistogram chunkWaitTimeMillisHistogram
136+
LongHistogram chunkWaitTimeMillisHistogram,
137+
Set<String> paramsUsed
129138
) {
130139
this.client = client;
131140
this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null;
132141
this.timeout = timeout;
133142
this.refresh = refresh;
143+
this.paramsUsed = paramsUsed;
134144
this.incrementalOperation = indexingPressure.startIncrementalCoordinating(0, 0, false);
135145
this.chunkWaitTimeMillisHistogram = chunkWaitTimeMillisHistogram;
136146
createNewBulkRequest(EMPTY_STATE);
@@ -310,6 +320,7 @@ private void createNewBulkRequest(BulkRequest.IncrementalState incrementalState)
310320
if (refresh != null) {
311321
bulkRequest.setRefreshPolicy(refresh);
312322
}
323+
bulkRequest.requestParamsUsed(paramsUsed);
313324
}
314325
}
315326
}

0 commit comments

Comments
 (0)