Skip to content

Commit 10e23b3

Browse files
authored
Explicitly pass project ID in simulate pipeline request (#124033)
Instead of implicitly relying on the `IngestService` to resolve the project ID, the simulate pipeline request should do this and pass the project ID explicitly.
1 parent 496c38e commit 10e23b3

File tree

3 files changed

+33
-5
lines changed

3 files changed

+33
-5
lines changed

server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import org.elasticsearch.action.ActionRequest;
1313
import org.elasticsearch.action.ActionRequestValidationException;
14+
import org.elasticsearch.cluster.metadata.ProjectId;
1415
import org.elasticsearch.common.bytes.BytesReference;
1516
import org.elasticsearch.common.bytes.ReleasableBytesReference;
1617
import org.elasticsearch.common.io.stream.StreamInput;
@@ -130,6 +131,7 @@ record Parsed(Pipeline pipeline, List<IngestDocument> documents, boolean verbose
130131
static final String SIMULATED_PIPELINE_ID = "_simulate_pipeline";
131132

132133
static Parsed parseWithPipelineId(
134+
ProjectId projectId,
133135
String pipelineId,
134136
Map<String, Object> config,
135137
boolean verbose,
@@ -139,7 +141,7 @@ static Parsed parseWithPipelineId(
139141
if (pipelineId == null) {
140142
throw new IllegalArgumentException("param [pipeline] is null");
141143
}
142-
Pipeline pipeline = ingestService.getPipeline(pipelineId);
144+
Pipeline pipeline = ingestService.getPipeline(projectId, pipelineId);
143145
if (pipeline == null) {
144146
throw new IllegalArgumentException("pipeline [" + pipelineId + "] does not exist");
145147
}

server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.action.support.HandledTransportAction;
1818
import org.elasticsearch.cluster.node.DiscoveryNode;
1919
import org.elasticsearch.cluster.node.DiscoveryNodes;
20+
import org.elasticsearch.cluster.project.ProjectResolver;
2021
import org.elasticsearch.common.Randomness;
2122
import org.elasticsearch.common.settings.Setting;
2223
import org.elasticsearch.common.util.concurrent.EsExecutors;
@@ -49,6 +50,7 @@ public class SimulatePipelineTransportAction extends HandledTransportAction<Simu
4950
private final IngestService ingestService;
5051
private final SimulateExecutionService executionService;
5152
private final TransportService transportService;
53+
private final ProjectResolver projectResolver;
5254
private volatile TimeValue ingestNodeTransportActionTimeout;
5355
// ThreadLocal because our unit testing framework does not like sharing Randoms across threads
5456
private final ThreadLocal<Random> random = ThreadLocal.withInitial(Randomness::get);
@@ -58,7 +60,8 @@ public SimulatePipelineTransportAction(
5860
ThreadPool threadPool,
5961
TransportService transportService,
6062
ActionFilters actionFilters,
61-
IngestService ingestService
63+
IngestService ingestService,
64+
ProjectResolver projectResolver
6265
) {
6366
super(
6467
SimulatePipelineAction.NAME,
@@ -70,6 +73,7 @@ public SimulatePipelineTransportAction(
7073
this.ingestService = ingestService;
7174
this.executionService = new SimulateExecutionService(threadPool);
7275
this.transportService = transportService;
76+
this.projectResolver = projectResolver;
7377
this.ingestNodeTransportActionTimeout = INGEST_NODE_TRANSPORT_ACTION_TIMEOUT.get(ingestService.getClusterService().getSettings());
7478
ingestService.getClusterService()
7579
.getClusterSettings()
@@ -96,9 +100,11 @@ protected void doExecute(Task task, SimulatePipelineRequest request, ActionListe
96100
}
97101
try {
98102
if (discoveryNodes.getLocalNode().isIngestNode()) {
103+
final var projectId = projectResolver.getProjectId();
99104
final SimulatePipelineRequest.Parsed simulateRequest;
100105
if (request.getId() != null) {
101106
simulateRequest = SimulatePipelineRequest.parseWithPipelineId(
107+
projectId,
102108
request.getId(),
103109
source,
104110
request.isVerbose(),

server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@
4444
import static org.hamcrest.Matchers.containsString;
4545
import static org.hamcrest.Matchers.equalTo;
4646
import static org.hamcrest.Matchers.nullValue;
47+
import static org.mockito.ArgumentMatchers.any;
48+
import static org.mockito.ArgumentMatchers.eq;
4749
import static org.mockito.Mockito.mock;
4850
import static org.mockito.Mockito.when;
4951

@@ -61,7 +63,7 @@ public void init() throws IOException {
6163
(factories, tag, description, config) -> processor
6264
);
6365
ingestService = mock(IngestService.class);
64-
when(ingestService.getPipeline(SIMULATED_PIPELINE_ID)).thenReturn(pipeline);
66+
when(ingestService.getPipeline(any(), eq(SIMULATED_PIPELINE_ID))).thenReturn(pipeline);
6567
when(ingestService.getProcessorFactories()).thenReturn(registry);
6668
}
6769

@@ -89,7 +91,9 @@ public void testParseUsingPipelineStore() throws Exception {
8991
expectedDocs.add(expectedDoc);
9092
}
9193

94+
var projectId = randomProjectIdOrDefault();
9295
SimulatePipelineRequest.Parsed actualRequest = SimulatePipelineRequest.parseWithPipelineId(
96+
projectId,
9397
SIMULATED_PIPELINE_ID,
9498
requestContent,
9599
false,
@@ -213,24 +217,40 @@ public void testParseWithProvidedPipeline() throws Exception {
213217
}
214218

215219
public void testNullPipelineId() {
220+
var projectId = randomProjectIdOrDefault();
216221
Map<String, Object> requestContent = new HashMap<>();
217222
List<Map<String, Object>> docs = new ArrayList<>();
218223
requestContent.put(Fields.DOCS, docs);
219224
Exception e = expectThrows(
220225
IllegalArgumentException.class,
221-
() -> SimulatePipelineRequest.parseWithPipelineId(null, requestContent, false, ingestService, RestApiVersion.current())
226+
() -> SimulatePipelineRequest.parseWithPipelineId(
227+
projectId,
228+
null,
229+
requestContent,
230+
false,
231+
ingestService,
232+
RestApiVersion.current()
233+
)
222234
);
223235
assertThat(e.getMessage(), equalTo("param [pipeline] is null"));
224236
}
225237

226238
public void testNonExistentPipelineId() {
239+
var projectId = randomProjectIdOrDefault();
227240
String pipelineId = randomAlphaOfLengthBetween(1, 10);
228241
Map<String, Object> requestContent = new HashMap<>();
229242
List<Map<String, Object>> docs = new ArrayList<>();
230243
requestContent.put(Fields.DOCS, docs);
231244
Exception e = expectThrows(
232245
IllegalArgumentException.class,
233-
() -> SimulatePipelineRequest.parseWithPipelineId(pipelineId, requestContent, false, ingestService, RestApiVersion.current())
246+
() -> SimulatePipelineRequest.parseWithPipelineId(
247+
projectId,
248+
pipelineId,
249+
requestContent,
250+
false,
251+
ingestService,
252+
RestApiVersion.current()
253+
)
234254
);
235255
assertThat(e.getMessage(), equalTo("pipeline [" + pipelineId + "] does not exist"));
236256
}

0 commit comments

Comments
 (0)