Skip to content

Commit 28a622e

Browse files
committed
Inject extension into service
1 parent 423f224 commit 28a622e

File tree

7 files changed

+32
-11
lines changed

7 files changed

+32
-11
lines changed

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,8 @@ public TransportSearchAction(
215215
this.searchResponseMetrics = searchResponseMetrics;
216216
this.client = client;
217217
this.usageService = usageService;
218+
// just showing injection
219+
logger.info("Refresh remote connections: {}", searchService.forceRefreshRemoteConnections());
218220
}
219221

220222
private Map<String, OriginalIndices> buildPerIndexOriginalIndices(

server/src/main/java/org/elasticsearch/node/NodeConstruction.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.elasticsearch.action.bulk.IncrementalBulkService;
2424
import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService;
2525
import org.elasticsearch.action.ingest.ReservedPipelineAction;
26-
import org.elasticsearch.search.internal.CrossClusterSearchExtension;
2726
import org.elasticsearch.action.search.OnlinePrewarmingService;
2827
import org.elasticsearch.action.search.OnlinePrewarmingServiceProvider;
2928
import org.elasticsearch.action.search.SearchExecutionStatsCollector;
@@ -204,6 +203,7 @@
204203
import org.elasticsearch.search.SearchService;
205204
import org.elasticsearch.search.SearchUtils;
206205
import org.elasticsearch.search.aggregations.support.AggregationUsageService;
206+
import org.elasticsearch.search.internal.CrossClusterSearchExtension;
207207
import org.elasticsearch.shutdown.PluginShutdownService;
208208
import org.elasticsearch.snapshots.InternalSnapshotsInfoService;
209209
import org.elasticsearch.snapshots.RepositoryIntegrityHealthIndicatorService;
@@ -1021,7 +1021,7 @@ public Map<String, String> queryFields() {
10211021
CrossClusterSearchExtension.Default::new
10221022
);
10231023

1024-
logger.info("Cross-cluster search force reconnect: [{}]", crossClusterSearchExtension.forceReconnectBehaviorSupplier().get());
1024+
logger.info("Cross-cluster search force reconnect: [{}]", crossClusterSearchExtension.forceRefreshRemoteConnections().get());
10251025

10261026
ActionModule actionModule = new ActionModule(
10271027
settings,
@@ -1213,7 +1213,8 @@ public Map<String, String> queryFields() {
12131213
circuitBreakerService,
12141214
systemIndices.getExecutorSelector(),
12151215
telemetryProvider.getTracer(),
1216-
onlinePrewarmingService
1216+
onlinePrewarmingService,
1217+
crossClusterSearchExtension
12171218
);
12181219

12191220
final ShutdownPrepareService shutdownPrepareService = new ShutdownPrepareService(settings, httpServerTransport, terminationHandler);

server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.elasticsearch.script.ScriptService;
3838
import org.elasticsearch.search.SearchService;
3939
import org.elasticsearch.search.fetch.FetchPhase;
40+
import org.elasticsearch.search.internal.CrossClusterSearchExtension;
4041
import org.elasticsearch.tasks.TaskManager;
4142
import org.elasticsearch.telemetry.tracing.Tracer;
4243
import org.elasticsearch.threadpool.ThreadPool;
@@ -135,7 +136,8 @@ SearchService newSearchService(
135136
CircuitBreakerService circuitBreakerService,
136137
ExecutorSelector executorSelector,
137138
Tracer tracer,
138-
OnlinePrewarmingService onlinePrewarmingService
139+
OnlinePrewarmingService onlinePrewarmingService,
140+
CrossClusterSearchExtension crossClusterSearchExtension
139141
) {
140142
return new SearchService(
141143
clusterService,
@@ -147,7 +149,8 @@ SearchService newSearchService(
147149
circuitBreakerService,
148150
executorSelector,
149151
tracer,
150-
onlinePrewarmingService
152+
onlinePrewarmingService,
153+
crossClusterSearchExtension
151154
);
152155
}
153156

server/src/main/java/org/elasticsearch/search/SearchService.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@
104104
import org.elasticsearch.search.fetch.subphase.ScriptFieldsContext.ScriptField;
105105
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
106106
import org.elasticsearch.search.internal.AliasFilter;
107+
import org.elasticsearch.search.internal.CrossClusterSearchExtension;
107108
import org.elasticsearch.search.internal.InternalScrollSearchRequest;
108109
import org.elasticsearch.search.internal.LegacyReaderContext;
109110
import org.elasticsearch.search.internal.ReaderContext;
@@ -368,6 +369,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
368369

369370
private final Tracer tracer;
370371

372+
private final CrossClusterSearchExtension crossClusterSearchExtension;
373+
371374
public SearchService(
372375
ClusterService clusterService,
373376
IndicesService indicesService,
@@ -378,7 +381,8 @@ public SearchService(
378381
CircuitBreakerService circuitBreakerService,
379382
ExecutorSelector executorSelector,
380383
Tracer tracer,
381-
OnlinePrewarmingService onlinePrewarmingService
384+
OnlinePrewarmingService onlinePrewarmingService,
385+
CrossClusterSearchExtension crossClusterSearchExtension
382386
) {
383387
Settings settings = clusterService.getSettings();
384388
this.threadPool = threadPool;
@@ -444,6 +448,8 @@ public SearchService(
444448
clusterService.getClusterSettings()
445449
.addSettingsUpdateConsumer(MEMORY_ACCOUNTING_BUFFER_SIZE, newValue -> this.memoryAccountingBufferSize = newValue.getBytes());
446450
prewarmingMaxPoolFactorThreshold = PREWARMING_THRESHOLD_THREADPOOL_SIZE_FACTOR_POOL_SIZE.get(settings);
451+
452+
this.crossClusterSearchExtension = crossClusterSearchExtension;
447453
}
448454

449455
public CircuitBreaker getCircuitBreaker() {
@@ -458,6 +464,10 @@ private void setEnableSearchWorkerThreads(boolean enableSearchWorkerThreads) {
458464
}
459465
}
460466

467+
public boolean forceRefreshRemoteConnections() {
468+
return crossClusterSearchExtension.forceRefreshRemoteConnections().get();
469+
}
470+
461471
private void setEnableQueryPhaseParallelCollection(boolean enableQueryPhaseParallelCollection) {
462472
this.enableQueryPhaseParallelCollection = enableQueryPhaseParallelCollection;
463473
}

server/src/main/java/org/elasticsearch/search/internal/CrossClusterSearchExtension.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@
1313

1414
public interface CrossClusterSearchExtension {
1515

16-
Supplier<Boolean> forceReconnectBehaviorSupplier();
16+
Supplier<Boolean> forceRefreshRemoteConnections();
1717

1818
class Default implements CrossClusterSearchExtension {
1919
public Default() {}
2020

2121
@Override
22-
public Supplier<Boolean> forceReconnectBehaviorSupplier() {
22+
public Supplier<Boolean> forceRefreshRemoteConnections() {
2323
return () -> false;
2424
}
2525
}

server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@
186186
import org.elasticsearch.search.SearchService;
187187
import org.elasticsearch.search.builder.SearchSourceBuilder;
188188
import org.elasticsearch.search.fetch.FetchPhase;
189+
import org.elasticsearch.search.internal.CrossClusterSearchExtension;
189190
import org.elasticsearch.telemetry.TelemetryProvider;
190191
import org.elasticsearch.telemetry.tracing.Tracer;
191192
import org.elasticsearch.test.ClusterServiceUtils;
@@ -2557,7 +2558,8 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() {
25572558
new NoneCircuitBreakerService(),
25582559
EmptySystemIndices.INSTANCE.getExecutorSelector(),
25592560
Tracer.NOOP,
2560-
OnlinePrewarmingService.NOOP
2561+
OnlinePrewarmingService.NOOP,
2562+
new CrossClusterSearchExtension.Default()
25612563
);
25622564

25632565
final SnapshotFilesProvider snapshotFilesProvider = new SnapshotFilesProvider(repositoriesService);

test/framework/src/main/java/org/elasticsearch/node/MockNode.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.elasticsearch.search.MockSearchService;
4343
import org.elasticsearch.search.SearchService;
4444
import org.elasticsearch.search.fetch.FetchPhase;
45+
import org.elasticsearch.search.internal.CrossClusterSearchExtension;
4546
import org.elasticsearch.tasks.TaskManager;
4647
import org.elasticsearch.telemetry.tracing.Tracer;
4748
import org.elasticsearch.test.ESTestCase;
@@ -103,7 +104,8 @@ SearchService newSearchService(
103104
CircuitBreakerService circuitBreakerService,
104105
ExecutorSelector executorSelector,
105106
Tracer tracer,
106-
OnlinePrewarmingService onlinePrewarmingService
107+
OnlinePrewarmingService onlinePrewarmingService,
108+
CrossClusterSearchExtension crossClusterSearchExtension
107109
) {
108110
if (pluginsService.filterPlugins(MockSearchService.TestPlugin.class).findAny().isEmpty()) {
109111
return super.newSearchService(
@@ -117,7 +119,8 @@ SearchService newSearchService(
117119
circuitBreakerService,
118120
executorSelector,
119121
tracer,
120-
onlinePrewarmingService
122+
onlinePrewarmingService,
123+
crossClusterSearchExtension
121124
);
122125
}
123126

0 commit comments

Comments
 (0)