Skip to content

Commit 8999b27

Browse files
authored
Add an example plugin for system generated search processors. (#19274)
* Support system generated search pipeline. Signed-off-by: Bo Zhang <[email protected]> * Add an example plugin for system generated search processors. Signed-off-by: Bo Zhang <[email protected]> --------- Signed-off-by: Bo Zhang <[email protected]>
1 parent 2f2a3a7 commit 8999b27

17 files changed

+1413
-0
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*
8+
* Modifications Copyright OpenSearch Contributors. See
9+
* GitHub history for details.
10+
*/
11+
apply plugin: 'opensearch.opensearchplugin'
12+
apply plugin: 'opensearch.yaml-rest-test'
13+
14+
opensearchplugin {
15+
name = 'example-system-search-processor'
16+
description = 'An example plugin implementing some system generated search processor.'
17+
classname = 'org.opensearch.example.systemsearchprocessor.ExampleSystemSearchProcessorPlugin'
18+
licenseFile = rootProject.file('licenses/APACHE-LICENSE-2.0.txt')
19+
noticeFile = rootProject.file('NOTICE.txt')
20+
}
21+
22+
dependencies {
23+
compileOnly project(':modules:lang-painless')
24+
}
25+
26+
testClusters {
27+
yamlRestTest {
28+
// add built-in modules to the cluster
29+
module ":modules:search-pipeline-common"
30+
module ":modules:lang-painless"
31+
}
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.example.systemsearchprocessor;
10+
11+
import org.opensearch.action.search.SearchPhaseContext;
12+
import org.opensearch.action.search.SearchPhaseName;
13+
import org.opensearch.action.search.SearchPhaseResults;
14+
import org.opensearch.search.SearchPhaseResult;
15+
import org.opensearch.search.pipeline.Processor;
16+
import org.opensearch.search.pipeline.ProcessorGenerationContext;
17+
import org.opensearch.search.pipeline.SearchPhaseResultsProcessor;
18+
import org.opensearch.search.pipeline.SystemGeneratedProcessor;
19+
20+
import java.util.Map;
21+
22+
/**
23+
* An example system generated search phase results processor that will be executed after the user defined processor
24+
*/
25+
public class ExampleSearchPhaseResultsProcessor implements SystemGeneratedProcessor, SearchPhaseResultsProcessor {
26+
private static final String TYPE = "example-search-phase-results-processor";
27+
private static final String DESCRIPTION = "This is a system generated search phase results processor which will be"
28+
+ "executed after the user defined search request. It will set the max score as 10.";
29+
private final String tag;
30+
private final boolean ignoreFailure;
31+
32+
/**
33+
* ExampleSearchPhaseResultsProcessor constructor
34+
* @param tag processor tag
35+
* @param ignoreFailure should processor ignore the failure
36+
*/
37+
public ExampleSearchPhaseResultsProcessor(String tag, boolean ignoreFailure) {
38+
this.tag = tag;
39+
this.ignoreFailure = ignoreFailure;
40+
}
41+
42+
@Override
43+
public <Result extends SearchPhaseResult> void process(
44+
SearchPhaseResults<Result> searchPhaseResult,
45+
SearchPhaseContext searchPhaseContext
46+
) {
47+
searchPhaseResult.getAtomicArray().asList().forEach(searchResult -> { searchResult.queryResult().topDocs().maxScore = 10; });
48+
}
49+
50+
@Override
51+
public SearchPhaseName getBeforePhase() {
52+
return SearchPhaseName.QUERY;
53+
}
54+
55+
@Override
56+
public SearchPhaseName getAfterPhase() {
57+
return SearchPhaseName.FETCH;
58+
}
59+
60+
@Override
61+
public String getType() {
62+
return TYPE;
63+
}
64+
65+
@Override
66+
public String getTag() {
67+
return this.tag;
68+
}
69+
70+
@Override
71+
public String getDescription() {
72+
return DESCRIPTION;
73+
}
74+
75+
@Override
76+
public boolean isIgnoreFailure() {
77+
return this.ignoreFailure;
78+
}
79+
80+
static class Factory implements SystemGeneratedProcessor.SystemGeneratedFactory<SearchPhaseResultsProcessor> {
81+
public static final String TYPE = "example-search-phase-results-processor-factory";
82+
83+
@Override
84+
public boolean shouldGenerate(ProcessorGenerationContext context) {
85+
return true;
86+
}
87+
88+
@Override
89+
public SearchPhaseResultsProcessor create(
90+
Map<String, Processor.Factory<SearchPhaseResultsProcessor>> processorFactories,
91+
String tag,
92+
String description,
93+
boolean ignoreFailure,
94+
Map<String, Object> config,
95+
PipelineContext pipelineContext
96+
) throws Exception {
97+
return new ExampleSearchPhaseResultsProcessor(tag, ignoreFailure);
98+
}
99+
}
100+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.example.systemsearchprocessor;
10+
11+
import org.opensearch.action.search.SearchRequest;
12+
import org.opensearch.search.pipeline.Processor;
13+
import org.opensearch.search.pipeline.ProcessorGenerationContext;
14+
import org.opensearch.search.pipeline.SearchRequestProcessor;
15+
import org.opensearch.search.pipeline.SystemGeneratedProcessor;
16+
17+
import java.util.Map;
18+
19+
/**
20+
* An example system generated search request processor that will be executed before the user defined processor
21+
*/
22+
public class ExampleSearchRequestPostProcessor implements SearchRequestProcessor, SystemGeneratedProcessor {
23+
/**
24+
* type of the processor
25+
*/
26+
public static final String TYPE = "example-search-request-post-processor";
27+
/**
28+
* description of the processor
29+
*/
30+
public static final String DESCRIPTION = "This is a system generated search request processor which will be"
31+
+ "executed after the user defined search request. It will increase the query size by 2.";
32+
private final String tag;
33+
private final boolean ignoreFailure;
34+
35+
/**
36+
* ExampleSearchRequestPostProcessor constructor
37+
* @param tag tag of the processor
38+
* @param ignoreFailure should processor ignore the failure
39+
*/
40+
public ExampleSearchRequestPostProcessor(String tag, boolean ignoreFailure) {
41+
this.tag = tag;
42+
this.ignoreFailure = ignoreFailure;
43+
}
44+
45+
@Override
46+
public SearchRequest processRequest(SearchRequest request) {
47+
if (request == null || request.source() == null) {
48+
return request;
49+
}
50+
int size = request.source().size();
51+
request.source().size(size + 2);
52+
return request;
53+
}
54+
55+
@Override
56+
public String getType() {
57+
return TYPE;
58+
}
59+
60+
@Override
61+
public String getTag() {
62+
return this.tag;
63+
}
64+
65+
@Override
66+
public String getDescription() {
67+
return DESCRIPTION;
68+
}
69+
70+
@Override
71+
public boolean isIgnoreFailure() {
72+
return this.ignoreFailure;
73+
}
74+
75+
@Override
76+
public ExecutionStage getExecutionStage() {
77+
// This processor will be executed after the user defined search request processor
78+
return ExecutionStage.POST_USER_DEFINED;
79+
}
80+
81+
static class Factory implements SystemGeneratedFactory<SearchRequestProcessor> {
82+
public static final String TYPE = "example-search-request-post-processor-factory";
83+
84+
// We auto generate the processor if the original query size is less than 5.
85+
@Override
86+
public boolean shouldGenerate(ProcessorGenerationContext context) {
87+
SearchRequest searchRequest = context.searchRequest();
88+
if (searchRequest == null || searchRequest.source() == null) {
89+
return false;
90+
}
91+
int size = searchRequest.source().size();
92+
return size < 5;
93+
}
94+
95+
@Override
96+
public SearchRequestProcessor create(
97+
Map<String, Processor.Factory<SearchRequestProcessor>> processorFactories,
98+
String tag,
99+
String description,
100+
boolean ignoreFailure,
101+
Map<String, Object> config,
102+
PipelineContext pipelineContext
103+
) throws Exception {
104+
return new ExampleSearchRequestPostProcessor(tag, ignoreFailure);
105+
}
106+
}
107+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.example.systemsearchprocessor;
10+
11+
import org.opensearch.action.search.SearchRequest;
12+
import org.opensearch.search.pipeline.PipelineProcessingContext;
13+
import org.opensearch.search.pipeline.Processor;
14+
import org.opensearch.search.pipeline.ProcessorGenerationContext;
15+
import org.opensearch.search.pipeline.SearchRequestProcessor;
16+
import org.opensearch.search.pipeline.SystemGeneratedProcessor;
17+
18+
import java.util.Locale;
19+
import java.util.Map;
20+
21+
/**
22+
* An example system generated search request processor that will be executed before the user defined processor
23+
*/
24+
public class ExampleSearchRequestPreProcessor implements SearchRequestProcessor, SystemGeneratedProcessor {
25+
/**
26+
* type of the processor
27+
*/
28+
public static final String TYPE = "example-search-request-pre-processor";
29+
/**
30+
* description of the processor
31+
*/
32+
public static final String DESCRIPTION = "This is a system generated search request processor which will be"
33+
+ "executed before the user defined search request. It will increase the query size by 1.";
34+
/**
35+
* original query size attribute key
36+
*/
37+
public static final String ORIGINAL_QUERY_SIZE_KEY = "example-original-query-size";
38+
private final String tag;
39+
private final boolean ignoreFailure;
40+
41+
/**
42+
* ExampleSearchRequestPreProcessor constructore
43+
* @param tag tag of the processor
44+
* @param ignoreFailure should processor ignore the failure
45+
*/
46+
public ExampleSearchRequestPreProcessor(String tag, boolean ignoreFailure) {
47+
this.tag = tag;
48+
this.ignoreFailure = ignoreFailure;
49+
}
50+
51+
@Override
52+
public SearchRequest processRequest(SearchRequest request) {
53+
throw new UnsupportedOperationException(
54+
String.format(Locale.ROOT, " [%s] should process the search request with PipelineProcessingContext.", TYPE)
55+
);
56+
}
57+
58+
@Override
59+
public SearchRequest processRequest(SearchRequest request, PipelineProcessingContext requestContext) throws Exception {
60+
if (request == null || request.source() == null) {
61+
return request;
62+
}
63+
int size = request.source().size();
64+
// store the original query size so that later the response processor can use it
65+
requestContext.setAttribute(ORIGINAL_QUERY_SIZE_KEY, size);
66+
request.source().size(size + 1);
67+
return request;
68+
}
69+
70+
@Override
71+
public String getType() {
72+
return TYPE;
73+
}
74+
75+
@Override
76+
public String getTag() {
77+
return this.tag;
78+
}
79+
80+
@Override
81+
public String getDescription() {
82+
return DESCRIPTION;
83+
}
84+
85+
@Override
86+
public boolean isIgnoreFailure() {
87+
return this.ignoreFailure;
88+
}
89+
90+
@Override
91+
public SystemGeneratedProcessor.ExecutionStage getExecutionStage() {
92+
// This processor will be executed before the user defined search request processor
93+
return ExecutionStage.PRE_USER_DEFINED;
94+
}
95+
96+
static class Factory implements SystemGeneratedProcessor.SystemGeneratedFactory<SearchRequestProcessor> {
97+
public static final String TYPE = "example-search-request-pre-processor-factory";
98+
99+
// We auto generate the processor if the original query size is less than 5.
100+
@Override
101+
public boolean shouldGenerate(ProcessorGenerationContext context) {
102+
SearchRequest searchRequest = context.searchRequest();
103+
if (searchRequest == null || searchRequest.source() == null) {
104+
return false;
105+
}
106+
int size = searchRequest.source().size();
107+
return size < 5;
108+
}
109+
110+
@Override
111+
public SearchRequestProcessor create(
112+
Map<String, Processor.Factory<SearchRequestProcessor>> processorFactories,
113+
String tag,
114+
String description,
115+
boolean ignoreFailure,
116+
Map<String, Object> config,
117+
PipelineContext pipelineContext
118+
) throws Exception {
119+
return new ExampleSearchRequestPreProcessor(tag, ignoreFailure);
120+
}
121+
}
122+
}

0 commit comments

Comments
 (0)