diff --git a/build-tools/src/main/java/org/elasticsearch/gradle/plugin/GenerateBundleManifestTask.java b/build-tools/src/main/java/org/elasticsearch/gradle/plugin/GenerateBundleManifestTask.java index 8513ac30e5134..201731e116f86 100644 --- a/build-tools/src/main/java/org/elasticsearch/gradle/plugin/GenerateBundleManifestTask.java +++ b/build-tools/src/main/java/org/elasticsearch/gradle/plugin/GenerateBundleManifestTask.java @@ -51,11 +51,12 @@ public GenerateBundleManifestTask(WorkerExecutor workerExecutor, ExecOperations @TaskAction public void scanPluginClasses() { File outputFile = projectLayout.getBuildDirectory().file(NAMED_COMPONENTS_PATH).get().getAsFile(); + File moduleDirectory = projectLayout.getProjectDirectory().getAsFile(); ExecResult execResult = LoggedExec.javaexec(execOperations, spec -> { spec.classpath(pluginScannerClasspath.plus(getClasspath()).getAsPath()); spec.getMainClass().set("org.elasticsearch.plugin.scanner.ManifestBuilder"); - spec.args(outputFile); + spec.args(outputFile, moduleDirectory); spec.setErrorOutput(System.err); spec.setStandardOutput(System.out); }); diff --git a/libs/plugin-api/src/main/java/org/elasticsearch/plugin/Extension.java b/libs/plugin-api/src/main/java/org/elasticsearch/plugin/Extension.java new file mode 100644 index 0000000000000..fe8288ab1191c --- /dev/null +++ b/libs/plugin-api/src/main/java/org/elasticsearch/plugin/Extension.java @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.plugin; + +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.TYPE; + +@Retention(RetentionPolicy.RUNTIME) +@Target(value = { TYPE, FIELD }) +public @interface Extension { +} diff --git a/libs/plugin-scanner/src/main/java/org/elasticsearch/plugin/scanner/ManifestBuilder.java b/libs/plugin-scanner/src/main/java/org/elasticsearch/plugin/scanner/ManifestBuilder.java index be9a3646c5286..2ff541ce91b0b 100644 --- a/libs/plugin-scanner/src/main/java/org/elasticsearch/plugin/scanner/ManifestBuilder.java +++ b/libs/plugin-scanner/src/main/java/org/elasticsearch/plugin/scanner/ManifestBuilder.java @@ -11,6 +11,7 @@ import org.elasticsearch.plugin.Component; import org.elasticsearch.plugin.Extensible; +import org.elasticsearch.plugin.Extension; import org.elasticsearch.plugin.MultipleRegistryEntries; import org.elasticsearch.plugin.NamedComponent; import org.elasticsearch.plugin.RegistryCtor; @@ -21,19 +22,24 @@ import org.objectweb.asm.AnnotationVisitor; import org.objectweb.asm.ClassReader; import org.objectweb.asm.ClassVisitor; +import org.objectweb.asm.FieldVisitor; import org.objectweb.asm.MethodVisitor; import org.objectweb.asm.Opcodes; import org.objectweb.asm.Type; import java.io.IOException; import java.io.OutputStream; +import java.lang.reflect.Modifier; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.BiFunction; +import java.util.stream.Stream; public class ManifestBuilder { @@ -41,14 +47,18 @@ public class ManifestBuilder { public static void main(String[] args) throws IOException { List classReaders = ClassReaders.ofClassPath(); - List components = findComponents(classReaders); + List componentsClasses = findComponents(classReaders); Map> registries = findRegistries(classReaders); Map> namedComponents = findNamedComponents(classReaders); + Map> extensionsFields = findExtensionsFields(ClassReaders.ofPaths(Stream.of(Path.of(args[1])))); + Path outputFile = Path.of(args[0]); - ManifestBuilder.writeToFile(components, registries, namedComponents, outputFile); + ManifestBuilder.writeToFile(componentsClasses, extensionsFields, registries, namedComponents, outputFile); } - public static void writeToFile(List components, Map> registries, Map> namedComponents, Path outputFile) throws IOException { + public static void writeToFile(List componentsClasses, Map> extensionsFields, + Map> registries, + Map> namedComponents, Path outputFile) throws IOException { Files.createDirectories(outputFile.getParent()); try (OutputStream outputStream = Files.newOutputStream(outputFile)) { @@ -57,7 +67,17 @@ public static void writeToFile(List components, Map> entry : extensionsFields.entrySet()) { + builder.startArray(entry.getKey()); + for (var value : entry.getValue()) { + builder.value(value); + } + builder.endArray(); + } + builder.endObject(); builder.startObject("registries"); for (var entry : registries.entrySet()) { @@ -242,6 +262,63 @@ public AnnotationVisitor visitAnnotation(String name, String descriptor) { return registries; } + private static Map> findExtensionsFields(List classReaders) { + Set extensibleClasses = new HashSet<>(); + Map> extensionFields = new HashMap<>(); + + // TODO: merge with component scanner? + for (ClassReader classReader : classReaders) { + classReader.accept(new ClassVisitor(Opcodes.ASM9) { + private String currentClassName; + + @Override + public void visit(int version, int access, String name, String signature, String superName, String[] interfaces) { + currentClassName = pathToClassName(name); + } + + @Override + public AnnotationVisitor visitAnnotation(String descriptor, boolean visible) { + if (descriptor.equals(Type.getDescriptor(Extensible.class))) { + extensibleClasses.add(currentClassName); + } + return super.visitAnnotation(descriptor, visible); + } + + @Override + public FieldVisitor visitField(int access, String fieldName, String descriptor, String signature, Object value) { + if (Modifier.isStatic(access) == false || Modifier.isFinal(access) == false) { + return super.visitField(access, fieldName, descriptor, signature, value); + } + return new FieldVisitor(Opcodes.ASM9) { + @Override + public AnnotationVisitor visitAnnotation(String annotationDescriptor, boolean visible) { + if (annotationDescriptor.equals(Type.getDescriptor(Extension.class))) { + Type type = Type.getType(descriptor); + extensionFields.compute(type.getClassName(), (k, set) -> { + if (set == null) { + set = new HashSet<>(); + } + set.add(currentClassName + "#" + fieldName); + return set; + }); + } + return super.visitAnnotation(annotationDescriptor, visible); + } + }; + } + }, ClassReader.SKIP_CODE); + } + + if (extensibleClasses.containsAll(extensionFields.keySet()) == false) { + System.out.println(extensibleClasses); + System.out.println(extensionFields); + extensionFields.keySet().removeAll(extensibleClasses); + // TODO: we don't scan dependencies now, so we can't check that class actually extends the right thing +// throw new RuntimeException("Some extension fields are not defined as extensible classes: " + extensionFields.values()); + } + return extensionFields; + } + private static String pathToClassName(String classWithSlashes) { return classWithSlashes.replace('/', '.'); } diff --git a/modules/repository-azure/src/main/java/module-info.java b/modules/repository-azure/src/main/java/module-info.java index 731f1e0a9986a..e1bc36b7390bb 100644 --- a/modules/repository-azure/src/main/java/module-info.java +++ b/modules/repository-azure/src/main/java/module-info.java @@ -29,4 +29,5 @@ requires reactor.netty.core; requires reactor.netty.http; requires com.azure.storage.blob.batch; + requires org.elasticsearch.plugin; } diff --git a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java index 3b945c8118804..4e332f9ad540f 100644 --- a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java +++ b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java @@ -19,13 +19,13 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.env.Environment; import org.elasticsearch.indices.recovery.RecoverySettings; +import org.elasticsearch.plugin.Extension; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.ReloadablePlugin; import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.Repository; -import org.elasticsearch.threadpool.ExecutorBuilder; -import org.elasticsearch.threadpool.ScalingExecutorBuilder; +import org.elasticsearch.threadpool.ScalingExecutorBuilderSpec; import org.elasticsearch.xcontent.NamedXContentRegistry; import java.security.AccessController; @@ -115,19 +115,13 @@ public List> getSettings() { ); } - @Override - public List> getExecutorBuilders(Settings settingsToUse) { - return List.of(executorBuilder(), nettyEventLoopExecutorBuilder(settingsToUse)); - } + @Extension + public static final ScalingExecutorBuilderSpec REPOSITORY_THREAD_POOL = new ScalingExecutorBuilderSpec(REPOSITORY_THREAD_POOL_NAME, + 0, 5, TimeValue.timeValueSeconds(30L), false); - public static ExecutorBuilder executorBuilder() { - return new ScalingExecutorBuilder(REPOSITORY_THREAD_POOL_NAME, 0, 5, TimeValue.timeValueSeconds(30L), false); - } - - public static ExecutorBuilder nettyEventLoopExecutorBuilder(Settings settings) { - int eventLoopThreads = AzureClientProvider.eventLoopThreadsFromSettings(settings); - return new ScalingExecutorBuilder(NETTY_EVENT_LOOP_THREAD_POOL_NAME, 0, eventLoopThreads, TimeValue.timeValueSeconds(30L), false); - } + @Extension + public static final ScalingExecutorBuilderSpec NETTY_EVENT_LOOP_THREAD_POOL = new ScalingExecutorBuilderSpec(NETTY_EVENT_LOOP_THREAD_POOL_NAME, + 0, AzureClientProvider::eventLoopThreadsFromSettings, TimeValue.timeValueSeconds(30L), false); @Override public void reload(Settings settingsToLoad) { diff --git a/server/src/main/java/org/elasticsearch/injection/Injector.java b/server/src/main/java/org/elasticsearch/injection/Injector.java index f0be131bfd023..74781b79e2c96 100644 --- a/server/src/main/java/org/elasticsearch/injection/Injector.java +++ b/server/src/main/java/org/elasticsearch/injection/Injector.java @@ -12,6 +12,7 @@ import org.elasticsearch.injection.api.Inject; import org.elasticsearch.injection.spec.AmbiguousSpec; import org.elasticsearch.injection.spec.ExistingInstanceSpec; +import org.elasticsearch.injection.spec.ExistingMultipleInstancesSpec; import org.elasticsearch.injection.spec.InjectionSpec; import org.elasticsearch.injection.spec.MethodHandleSpec; import org.elasticsearch.injection.spec.ParameterModifier; @@ -26,6 +27,7 @@ import java.lang.reflect.Constructor; import java.lang.reflect.Modifier; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; import java.util.LinkedHashMap; @@ -120,6 +122,22 @@ public Injector addInstances(Collection objects) { return this; } + public Injector addExtensionInstances(Class type, Collection objects) { + UnambiguousSpec spec = seedSpecs.computeIfAbsent(type, i -> new ExistingMultipleInstancesSpec(type, new ArrayList<>())); + if (spec instanceof ExistingMultipleInstancesSpec == false) { + throw new IllegalStateException("There's already an object for " + type); + } + ((ExistingMultipleInstancesSpec) spec).instances().addAll(objects); + return this; + } + + public Injector addExtensionsInstances(Map, ? extends Collection> objects) { + for (Map.Entry, ? extends Collection> entry : objects.entrySet()) { + addExtensionInstances(entry.getKey(), entry.getValue()); + } + return this; + } + /** * Indicates that object is to be injected for parameters of type type. * The given object is treated as though it had been instantiated by the injector. @@ -156,6 +174,8 @@ private PlanInterpreter doInjection() { specMap.values().forEach((spec) -> { if (spec instanceof ExistingInstanceSpec e) { existingInstances.put(e.requestedType(), e.instance()); + } else if (spec instanceof ExistingMultipleInstancesSpec s) { + existingInstances.put(s.requestedType(), s.instances()); } }); PlanInterpreter interpreter = new PlanInterpreter(existingInstances, new ProxyPool()); @@ -200,7 +220,7 @@ private static Map, InjectionSpec> specClosure(Map, Unambiguou } InjectionSpec spec = seedMap.get(c); - if (spec instanceof ExistingInstanceSpec) { + if (spec instanceof ExistingInstanceSpec || spec instanceof ExistingMultipleInstancesSpec) { // simple! result.put(c, spec); continue; diff --git a/server/src/main/java/org/elasticsearch/injection/PlanInterpreter.java b/server/src/main/java/org/elasticsearch/injection/PlanInterpreter.java index 5a3000f77c2f3..71cff3c2dd55e 100644 --- a/server/src/main/java/org/elasticsearch/injection/PlanInterpreter.java +++ b/server/src/main/java/org/elasticsearch/injection/PlanInterpreter.java @@ -11,6 +11,7 @@ import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.injection.spec.MethodHandleSpec; +import org.elasticsearch.injection.spec.ParameterModifier; import org.elasticsearch.injection.spec.ParameterSpec; import org.elasticsearch.injection.step.CreateCollectionProxyStep; import org.elasticsearch.injection.step.CreateInstanceProxyStep; @@ -70,7 +71,15 @@ final class PlanInterpreter { PlanInterpreter(Map, Object> existingInstances, ProxyPool proxyPool) { this.proxyPool = proxyPool; - existingInstances.forEach(this::addInstance); + for (Map.Entry, Object> entry : existingInstances.entrySet()) { + Class type = entry.getKey(); + Object value = entry.getValue(); + if (value instanceof Collection values) { + addInstances(type, values); + } else { + addInstance(type, value); + } + } } /** @@ -181,7 +190,11 @@ private Object instantiate(MethodHandleSpec spec) { } private Object parameterValue(ParameterSpec parameterSpec) { - return theInstanceOf(parameterSpec.formalType()); + if (parameterSpec.modifiers().contains(ParameterModifier.COLLECTION)) { + return instancesOf(parameterSpec.injectableType()); + } else { + return theInstanceOf(parameterSpec.formalType()); + } } } diff --git a/server/src/main/java/org/elasticsearch/injection/Planner.java b/server/src/main/java/org/elasticsearch/injection/Planner.java index 839a6a0c312b2..3dbc99d76e042 100644 --- a/server/src/main/java/org/elasticsearch/injection/Planner.java +++ b/server/src/main/java/org/elasticsearch/injection/Planner.java @@ -11,6 +11,7 @@ import org.elasticsearch.injection.spec.AmbiguousSpec; import org.elasticsearch.injection.spec.ExistingInstanceSpec; +import org.elasticsearch.injection.spec.ExistingMultipleInstancesSpec; import org.elasticsearch.injection.spec.InjectionSpec; import org.elasticsearch.injection.spec.MethodHandleSpec; import org.elasticsearch.injection.spec.ParameterSpec; @@ -117,6 +118,10 @@ private void planForSpec(InjectionSpec spec, int depth) { // Nothing to do. The injector will already have the required object. logger.trace("{}- Plan {}", indent(depth), e); } + case ExistingMultipleInstancesSpec e -> { + // Nothing to do. The injector will already have the required objects. + logger.trace("{}- Plan {}", indent(depth), e); + } case MethodHandleSpec m -> { for (var p : m.parameters()) { planParameter(p, depth); diff --git a/server/src/main/java/org/elasticsearch/injection/spec/ExistingMultipleInstancesSpec.java b/server/src/main/java/org/elasticsearch/injection/spec/ExistingMultipleInstancesSpec.java new file mode 100644 index 0000000000000..979c34c1d7919 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/injection/spec/ExistingMultipleInstancesSpec.java @@ -0,0 +1,27 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.injection.spec; + +import java.util.Collection; + +/** + * Indicates that a type should be injected by passing a particular collection {@link #instances}. + */ +public record ExistingMultipleInstancesSpec(Class requestedType, Collection instances) implements UnambiguousSpec { + @Override + public String toString() { + // Don't call instance.toString; who knows what that will return + return "ExistingMultipleInstanceSpec[" + "requestedType=" + requestedType + ']'; + } + + public void addInstances(Collection instances) { + this.instances.addAll(instances); + } +} diff --git a/server/src/main/java/org/elasticsearch/injection/spec/UnambiguousSpec.java b/server/src/main/java/org/elasticsearch/injection/spec/UnambiguousSpec.java index 0d09c533174b9..03c29c96ffc3b 100644 --- a/server/src/main/java/org/elasticsearch/injection/spec/UnambiguousSpec.java +++ b/server/src/main/java/org/elasticsearch/injection/spec/UnambiguousSpec.java @@ -12,4 +12,5 @@ /** * Indicates that there is just one way to inject {@link #requestedType}. */ -public sealed interface UnambiguousSpec extends InjectionSpec permits ExistingInstanceSpec, MethodHandleSpec, SubtypeSpec {} +public sealed interface UnambiguousSpec extends InjectionSpec + permits ExistingInstanceSpec, ExistingMultipleInstancesSpec, MethodHandleSpec, SubtypeSpec {} diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 175a29f12e8b4..14599d852820d 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -159,6 +159,7 @@ import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.AnalysisPlugin; +import org.elasticsearch.plugins.BundleInfo; import org.elasticsearch.plugins.BundleManifest; import org.elasticsearch.plugins.CircuitBreakerPlugin; import org.elasticsearch.plugins.ClusterCoordinationPlugin; @@ -218,7 +219,10 @@ import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.threadpool.DefaultBuiltInExecutorBuilders; import org.elasticsearch.threadpool.ExecutorBuilder; +import org.elasticsearch.threadpool.FixedExecutorBuilderSpec; +import org.elasticsearch.threadpool.ScalingExecutorBuilderSpec; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPoolSpec; import org.elasticsearch.threadpool.internal.BuiltInExecutorBuilders; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; @@ -236,9 +240,11 @@ import java.io.UncheckedIOException; import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodType; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashSet; @@ -285,8 +291,17 @@ static NodeConstruction prepareConstruction( Settings settings = constructor.createEnvironment(initialEnvironment, serviceProvider, pluginsLoader); constructor.loadLoggingDataProviders(); + + List pluginsInjectors = constructor.pluginsService.flatMapBundle(bundle -> { + Map, List> extensionsInstancesFromPlugin = loadExtensionsInstancesFromPlugin(bundle); + org.elasticsearch.injection.Injector pluginInjector = org.elasticsearch.injection.Injector.create(); + + pluginInjector.addExtensionsInstances(extensionsInstancesFromPlugin); + return Collections.singleton(pluginInjector); + }).toList(); + TelemetryProvider telemetryProvider = constructor.createTelemetryProvider(settings); - ThreadPool threadPool = constructor.createThreadPool(settings, telemetryProvider.getMeterRegistry()); + ThreadPool threadPool = constructor.createThreadPool(settings, telemetryProvider.getMeterRegistry(), pluginsInjectors); final SettingsModule settingsModule; try (var ignored = threadPool.getThreadContext().newStoredContext()) { @@ -417,6 +432,52 @@ private static Optional getSinglePlugin(Stream plugins, Class plugi return Optional.of(plugin); } + private static Map, List> loadExtensionsInstancesFromPlugin(BundleInfo bundle) { + Plugin plugin = bundle.instance(); + ClassLoader loader = plugin.getClass().getClassLoader(); + + Map> extensionsFields = bundle.manifest().extensionsFields(); + + Map, List> extensionsInstances = new HashMap<>(); + for (Map.Entry> entry : extensionsFields.entrySet()) { + String extensibleClassName = entry.getKey(); + Class extensibleClass; + try { + extensibleClass = loader.loadClass(extensibleClassName); + } catch (ClassNotFoundException e) { + throw new AssertionError(e); + } + List instances = entry.getValue().stream() + .map(extensionFieldName -> getExtensionInstanceFromField(extensionFieldName, loader)) + .toList(); + extensionsInstances.put(extensibleClass, instances); + } + + // TODO: we need to have all Extensible classes in the map, + // otherwise, Injector won't know anything about them and will fail attempting to find a suitable constructor + extensionsInstances.putIfAbsent(FixedExecutorBuilderSpec.class, Collections.emptyList()); + extensionsInstances.putIfAbsent(ScalingExecutorBuilderSpec.class, Collections.emptyList()); + + return extensionsInstances; + } + + private static Object getExtensionInstanceFromField(String extensionFieldName, ClassLoader loader) { + String[] parts = extensionFieldName.split("#"); + if (parts.length != 2) { + throw new IllegalArgumentException("Incorrect field name [" + extensionFieldName + "]"); + } + String className = parts[0]; + String fieldName = parts[1]; + try { + Class cls = loader.loadClass(className); + Field declaredField = cls.getDeclaredField(fieldName); + return declaredField.get(null); + } catch (ClassNotFoundException | NoSuchFieldException | IllegalAccessException e) { + // should not be possible, all classes were discovered within the bundle + throw new AssertionError(e); + } + } + private Settings createEnvironment(Environment initialEnvironment, NodeServiceProvider serviceProvider, PluginsLoader pluginsLoader) { // Pass the node settings to the DeprecationLogger class so that it can have the deprecation.skip_deprecated_settings setting: Settings envSettings = initialEnvironment.settings(); @@ -489,11 +550,19 @@ private TelemetryProvider createTelemetryProvider(Settings settings) { return getSinglePlugin(TelemetryPlugin.class).map(p -> p.getTelemetryProvider(settings)).orElse(TelemetryProvider.NOOP); } - private ThreadPool createThreadPool(Settings settings, MeterRegistry meterRegistry) throws IOException { + private ThreadPool createThreadPool(Settings settings, MeterRegistry meterRegistry, + List pluginsInjectors) throws IOException { + Collection> builderSpecs = pluginsInjectors.stream() + .map(injector -> injector.inject(List.of(RecordBuilderSpecs.class))) + .map(m -> (RecordBuilderSpecs) m.get(RecordBuilderSpecs.class)) + .flatMap(RecordBuilderSpecs::specsStream) + .toList(); + ThreadPool threadPool = new ThreadPool( settings, meterRegistry, pluginsService.loadSingletonServiceProvider(BuiltInExecutorBuilders.class, DefaultBuiltInExecutorBuilders::new), + builderSpecs, pluginsService.flatMap(p -> p.getExecutorBuilders(settings)).toArray(ExecutorBuilder[]::new) ); resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS)); @@ -1895,4 +1964,10 @@ private Module loadPersistentTasksService( private Set builtinIndexSettingProviders() { return Set.of(new IndexMode.IndexModeSettingsProvider()); } + + public record RecordBuilderSpecs(Collection fixedSpec, Collection scalingSpec) { + public Stream> specsStream() { + return Stream.concat(fixedSpec.stream(), scalingSpec.stream()); + } + } } diff --git a/server/src/main/java/org/elasticsearch/plugins/BundleManifest.java b/server/src/main/java/org/elasticsearch/plugins/BundleManifest.java index e9c1488d9cd65..6a842313f7d42 100644 --- a/server/src/main/java/org/elasticsearch/plugins/BundleManifest.java +++ b/server/src/main/java/org/elasticsearch/plugins/BundleManifest.java @@ -24,10 +24,11 @@ import static java.util.Map.entry; import static org.elasticsearch.xcontent.XContentParserConfiguration.EMPTY; -public record BundleManifest(List componentClasses, Map> registries, Map> namedComponents) { +public record BundleManifest(List componentClasses, Map> registries, + Map> namedComponents, Map> extensionsFields) { private static final String FILENAME = "bundle-manifest.json"; - public static final BundleManifest EMPTY = new BundleManifest(List.of(), Map.of(), Map.of()); + public static final BundleManifest EMPTY = new BundleManifest(List.of(), Map.of(), Map.of(), Map.of()); public record RegistryEntryInfo(String implementationClass, String categoryClass, String name, String factoryMethod) {} @@ -51,6 +52,8 @@ public static BundleManifest load(InputStream stream) { Map manifestMap = parser.map(); @SuppressWarnings("unchecked") List components = (List) manifestMap.get("components"); + @SuppressWarnings("unchecked") + Map> extensionsFields = (Map>) manifestMap.get("extensions_fields"); @SuppressWarnings("unchecked") Map>> untypedRegistries = (Map>>) manifestMap.get( @@ -83,7 +86,7 @@ public static BundleManifest load(InputStream stream) { System.out.println("Named components: " + namedComponents); } - return new BundleManifest(components, registries, namedComponents); + return new BundleManifest(components, registries, namedComponents, extensionsFields); } catch (IOException e) { throw new UncheckedIOException(e); } diff --git a/server/src/main/java/org/elasticsearch/threadpool/FixedExecutorBuilderSpec.java b/server/src/main/java/org/elasticsearch/threadpool/FixedExecutorBuilderSpec.java new file mode 100644 index 0000000000000..cda2c365f71dc --- /dev/null +++ b/server/src/main/java/org/elasticsearch/threadpool/FixedExecutorBuilderSpec.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.threadpool; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.plugin.Extensible; + +import java.util.function.ToIntFunction; + +@Extensible +public record FixedExecutorBuilderSpec( + String name, + ToIntFunction sizeFunction, + int queueSize, + String prefix, + EsExecutors.TaskTrackingConfig taskTrackingConfig +) implements ThreadPoolSpec { + public FixedExecutorBuilderSpec(String name, int size, int queueSize, String prefix) { + this(name, ignore -> size, queueSize, prefix); + } + + public FixedExecutorBuilderSpec(String name, ToIntFunction sizeFunction, int queueSize, String prefix) { + this(name, sizeFunction, queueSize, prefix, EsExecutors.TaskTrackingConfig.DO_NOT_TRACK); + } + + @Override + public FixedExecutorBuilder toBuilder(Settings settings) { + int size = sizeFunction().applyAsInt(settings); + return new FixedExecutorBuilder(settings, name, size, queueSize, prefix, taskTrackingConfig); + } +} diff --git a/server/src/main/java/org/elasticsearch/threadpool/ScalingExecutorBuilderSpec.java b/server/src/main/java/org/elasticsearch/threadpool/ScalingExecutorBuilderSpec.java new file mode 100644 index 0000000000000..fa20dde0faad1 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/threadpool/ScalingExecutorBuilderSpec.java @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.threadpool; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.plugin.Extensible; + +import java.util.function.ToIntFunction; + +@Extensible +public record ScalingExecutorBuilderSpec( + String name, + int core, + ToIntFunction maxFunction, + TimeValue keepAlive, + boolean rejectAfterShutdown, + String prefix, + EsExecutors.TaskTrackingConfig trackingConfig +) implements ThreadPoolSpec { + public ScalingExecutorBuilderSpec(String name, int core, int max, TimeValue keepAlive, boolean rejectAfterShutdown) { + this(name, core, ignore -> max, keepAlive, rejectAfterShutdown); + } + + public ScalingExecutorBuilderSpec(String name, int core, int max, TimeValue keepAlive, boolean rejectAfterShutdown, String prefix) { + this(name, core, ignore -> max, keepAlive, rejectAfterShutdown, prefix, EsExecutors.TaskTrackingConfig.DO_NOT_TRACK); + } + + public ScalingExecutorBuilderSpec(String name, int core, ToIntFunction maxFunction, TimeValue keepAlive, boolean rejectAfterShutdown) { + this(name, core, maxFunction, keepAlive, rejectAfterShutdown, "thread_pool." + name); + } + + public ScalingExecutorBuilderSpec(String name, int core, ToIntFunction maxFunction, TimeValue keepAlive, boolean rejectAfterShutdown, String prefix) { + this(name, core, maxFunction, keepAlive, rejectAfterShutdown, prefix, EsExecutors.TaskTrackingConfig.DO_NOT_TRACK); + } + + @Override + public ScalingExecutorBuilder toBuilder(Settings settings) { + int max = maxFunction.applyAsInt(settings); + return new ScalingExecutorBuilder(name, core, max, keepAlive, rejectAfterShutdown, prefix, trackingConfig); + } +} diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 85ee02b6db856..66a3ba827b56f 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -274,11 +274,21 @@ public Collection builders() { * @param builtInExecutorBuilders used to construct builders for the built-in thread pools * @param customBuilders a list of additional thread pool builders that were defined elsewhere (like a Plugin). */ + public ThreadPool( + final Settings settings, + MeterRegistry meterRegistry, + BuiltInExecutorBuilders builtInExecutorBuilders, + final ExecutorBuilder... customBuilders + ) { + this(settings, meterRegistry, builtInExecutorBuilders, Collections.emptyList(), customBuilders); + } + @SuppressWarnings({ "rawtypes", "unchecked" }) public ThreadPool( final Settings settings, MeterRegistry meterRegistry, BuiltInExecutorBuilders builtInExecutorBuilders, + Collection> executorBuilderSpecs, final ExecutorBuilder... customBuilders ) { assert Node.NODE_NAME_SETTING.exists(settings); @@ -294,6 +304,11 @@ public ThreadPool( } builders.put(builder.name(), builder); } + for (ThreadPoolSpec spec : executorBuilderSpecs) { + ExecutorBuilder builder = spec.toBuilder(settings); + builders.put(builder.name(), builder); + } + this.builders = Collections.unmodifiableMap(builders); threadContext = new ThreadContext(settings); @@ -489,6 +504,10 @@ public ExecutorService executor(String name) { return holder.executor(); } + public ExecutorService executor(FixedExecutorBuilderSpec spec) { + return executor(spec.name()); + } + /** * Schedules a one-shot command to run after a given delay. The command is run in the context of the calling thread. * diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPoolSpec.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPoolSpec.java new file mode 100644 index 0000000000000..2cfac66ed65d4 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPoolSpec.java @@ -0,0 +1,16 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.threadpool; + +import org.elasticsearch.common.settings.Settings; + +public interface ThreadPoolSpec { + ExecutorBuilder toBuilder(Settings settings); +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 0164afef75adb..67d52735e084b 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -9,8 +9,6 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.RequestValidators; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; @@ -27,7 +25,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.common.settings.SettingsModule; -import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.env.Environment; import org.elasticsearch.features.NodeFeature; import org.elasticsearch.index.IndexModule; @@ -36,6 +33,7 @@ import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.persistent.PersistentTaskParams; import org.elasticsearch.persistent.PersistentTasksExecutor; +import org.elasticsearch.plugin.Extension; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.ClusterPlugin; import org.elasticsearch.plugins.EnginePlugin; @@ -46,8 +44,7 @@ import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; import org.elasticsearch.tasks.Task; -import org.elasticsearch.threadpool.ExecutorBuilder; -import org.elasticsearch.threadpool.FixedExecutorBuilder; +import org.elasticsearch.threadpool.FixedExecutorBuilderSpec; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xcontent.ParseField; @@ -148,6 +145,14 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E private final SetOnce ccrSettings = new SetOnce<>(); private Client client; + @Extension + public static final FixedExecutorBuilderSpec CCR_THREAD_POOL = new FixedExecutorBuilderSpec( + CCR_THREAD_POOL_NAME, + 32, + 100, + "xpack.ccr.ccr_thread_pool" + ); + /** * Construct an instance of the CCR container with the specified settings. * @@ -356,20 +361,6 @@ public Optional getEngineFactory(final IndexSettings indexSetting } } - @SuppressWarnings("HiddenField") - public List> getExecutorBuilders(Settings settings) { - return Collections.singletonList( - new FixedExecutorBuilder( - settings, - CCR_THREAD_POOL_NAME, - 32, - 100, - "xpack.ccr.ccr_thread_pool", - EsExecutors.TaskTrackingConfig.DO_NOT_TRACK - ) - ); - } - @Override public Map getInternalRepositories( Environment env, diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/Downsample.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/Downsample.java index 24e12f3264a62..e2f24349db6de 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/Downsample.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/Downsample.java @@ -14,16 +14,12 @@ import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; -import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.features.NodeFeature; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.PersistentTaskPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; -import org.elasticsearch.threadpool.ExecutorBuilder; -import org.elasticsearch.threadpool.FixedExecutorBuilder; -import org.elasticsearch.threadpool.ThreadPool; import java.util.List; import java.util.function.Predicate; @@ -31,23 +27,6 @@ public class Downsample extends Plugin implements ActionPlugin, PersistentTaskPlugin { - public static final String DOWNSAMPLE_TASK_THREAD_POOL_NAME = "downsample_indexing"; - private static final int DOWNSAMPLE_TASK_THREAD_POOL_QUEUE_SIZE = 256; - public static final String DOWNSAMPLE_MIN_NUMBER_OF_REPLICAS_NAME = "downsample.min_number_of_replicas"; - - @Override - public List> getExecutorBuilders(Settings settings) { - final FixedExecutorBuilder downsample = new FixedExecutorBuilder( - settings, - DOWNSAMPLE_TASK_THREAD_POOL_NAME, - ThreadPool.oneEighthAllocatedProcessors(EsExecutors.allocatedProcessors(settings)), - DOWNSAMPLE_TASK_THREAD_POOL_QUEUE_SIZE, - "xpack.downsample.thread_pool", - EsExecutors.TaskTrackingConfig.DO_NOT_TRACK - ); - return List.of(downsample); - } - /*@Override public Collection getActions() { return List.of( diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java index b4d534ee6b133..4ae7d760a3b84 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java @@ -10,13 +10,12 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.BytesRef; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.AbstractActionRequest; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.IndicesRequest; -import org.elasticsearch.action.downsample.DownsampleAction; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.IndicesOptions; @@ -40,11 +39,13 @@ import org.elasticsearch.persistent.InjectablePersistentTasksExecutor; import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.plugin.Extension; import org.elasticsearch.plugin.NamedComponent; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.FixedExecutorBuilderSpec; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.downsample.DownsampleShardIndexerStatus; @@ -57,15 +58,25 @@ import java.util.Objects; import java.util.concurrent.Executor; -import static org.elasticsearch.xpack.downsample.Downsample.DOWNSAMPLE_TASK_THREAD_POOL_NAME; - @NamedComponent(DownsampleShardTask.TASK_NAME) public class DownsampleShardPersistentTaskExecutor extends InjectablePersistentTasksExecutor { private static final Logger LOGGER = LogManager.getLogger(DownsampleShardPersistentTaskExecutor.class); + + private static final String DOWNSAMPLE_TASK_THREAD_POOL_NAME = "downsample_indexing"; + private static final int DOWNSAMPLE_TASK_THREAD_POOL_QUEUE_SIZE = 256; + + @Extension + public static final FixedExecutorBuilderSpec DOWNSAMPLE_EXECUTOR = new FixedExecutorBuilderSpec( + DOWNSAMPLE_TASK_THREAD_POOL_NAME, + settings -> ThreadPool.oneEighthAllocatedProcessors(EsExecutors.allocatedProcessors(settings)), + DOWNSAMPLE_TASK_THREAD_POOL_QUEUE_SIZE, + "xpack.downsample.thread_pool" + ); + private final Client client; public DownsampleShardPersistentTaskExecutor(final NodeClient client, final ThreadPool threadPool) { - super(threadPool.executor(DOWNSAMPLE_TASK_THREAD_POOL_NAME)); + super(threadPool.executor(DOWNSAMPLE_EXECUTOR)); this.client = Objects.requireNonNull(client); } @@ -201,7 +212,7 @@ static void realNodeOperation( DownsampleShardTaskParams params, BytesRef lastDownsampledTsid ) { - client.threadPool().executor(DOWNSAMPLE_TASK_THREAD_POOL_NAME).execute(new AbstractRunnable() { + client.threadPool().executor(DOWNSAMPLE_EXECUTOR).execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { markAsFailed(task, e); diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java index a923815a77705..b40b11390398f 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java @@ -114,6 +114,8 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc private static final Logger logger = LogManager.getLogger(TransportDownsampleAction.class); + public static final String DOWNSAMPLE_MIN_NUMBER_OF_REPLICAS_NAME = "downsample.min_number_of_replicas"; + private final Client client; private final IndicesService indicesService; private final MasterServiceTaskQueue taskQueue; @@ -365,7 +367,7 @@ protected void masterOperation( * We should note that there is a risk of losing a node during the downsample process. In this * case downsample will fail. */ - int minNumReplicas = clusterService.getSettings().getAsInt(Downsample.DOWNSAMPLE_MIN_NUMBER_OF_REPLICAS_NAME, 0); + int minNumReplicas = clusterService.getSettings().getAsInt(DOWNSAMPLE_MIN_NUMBER_OF_REPLICAS_NAME, 0); // 3. Create downsample index createDownsampleIndex( diff --git a/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/ConsumeProcessor.java b/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/ConsumeProcessor.java index db18ff98e0bce..b7cfb694a71e5 100644 --- a/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/ConsumeProcessor.java +++ b/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/ConsumeProcessor.java @@ -42,6 +42,7 @@ public Set getSupportedAnnotationTypes() { "org.elasticsearch.xpack.esql.expression.function.MapParam", "org.elasticsearch.rest.ServerlessScope", "org.elasticsearch.xcontent.ParserConstructor", + "org.elasticsearch.plugin.Extension", Fixed.class.getName() ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java index 41b40a33e97aa..ec3ec37241520 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java @@ -6,8 +6,6 @@ */ package org.elasticsearch.xpack.esql.plugin; -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.ActionResponse; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.breaker.CircuitBreaker; @@ -40,12 +38,12 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.features.NodeFeature; import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.plugin.Extension; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; -import org.elasticsearch.threadpool.ExecutorBuilder; -import org.elasticsearch.threadpool.FixedExecutorBuilder; +import org.elasticsearch.threadpool.FixedExecutorBuilderSpec; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction; @@ -279,19 +277,14 @@ public List getNamedWriteables() { return entries; } - public List> getExecutorBuilders(Settings settings) { - final int allocatedProcessors = EsExecutors.allocatedProcessors(settings); - return List.of( - // TODO: Maybe have two types of threadpools for workers: one for CPU-bound and one for I/O-bound tasks. - // And we should also reduce the number of threads of the CPU-bound threadpool to allocatedProcessors. - new FixedExecutorBuilder( - settings, - ESQL_WORKER_THREAD_POOL_NAME, - ThreadPool.searchOrGetThreadPoolSize(allocatedProcessors), - 1000, - ESQL_WORKER_THREAD_POOL_NAME, - EsExecutors.TaskTrackingConfig.DEFAULT - ) - ); - } + // TODO: Maybe have two types of threadpools for workers: one for CPU-bound and one for I/O-bound tasks. + // And we should also reduce the number of threads of the CPU-bound threadpool to allocatedProcessors. + @Extension + public static final FixedExecutorBuilderSpec ESQL_WORKER_THREAD_POOL = new FixedExecutorBuilderSpec( + ESQL_WORKER_THREAD_POOL_NAME, + settings -> ThreadPool.searchOrGetThreadPoolSize(EsExecutors.allocatedProcessors(settings)), + 1000, + ESQL_WORKER_THREAD_POOL_NAME, + EsExecutors.TaskTrackingConfig.DEFAULT + ); } diff --git a/x-pack/plugin/inference/src/main/java/module-info.java b/x-pack/plugin/inference/src/main/java/module-info.java index 78f30e7da0670..3797d96db331a 100644 --- a/x-pack/plugin/inference/src/main/java/module-info.java +++ b/x-pack/plugin/inference/src/main/java/module-info.java @@ -35,6 +35,7 @@ requires org.reactivestreams; requires org.elasticsearch.logging; requires org.elasticsearch.sslconfig; + requires org.elasticsearch.plugin; exports org.elasticsearch.xpack.inference.action; exports org.elasticsearch.xpack.inference.registry; diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferencePlugin.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferencePlugin.java index 02167561f2dd4..6689545bcec77 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferencePlugin.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferencePlugin.java @@ -10,8 +10,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.SetOnce; -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.MappedActionFilter; import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -35,6 +33,7 @@ import org.elasticsearch.license.LicensedFeature; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.node.PluginComponentBinding; +import org.elasticsearch.plugin.Extension; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.ClusterPlugin; import org.elasticsearch.plugins.ExtensiblePlugin; @@ -50,8 +49,7 @@ import org.elasticsearch.search.fetch.subphase.highlight.Highlighter; import org.elasticsearch.search.rank.RankBuilder; import org.elasticsearch.search.rank.RankDoc; -import org.elasticsearch.threadpool.ExecutorBuilder; -import org.elasticsearch.threadpool.ScalingExecutorBuilder; +import org.elasticsearch.threadpool.ScalingExecutorBuilderSpec; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xcontent.ParseField; @@ -439,21 +437,14 @@ public Collection getSystemIndexDescriptors(Settings sett ); } - @Override - public List> getExecutorBuilders(Settings settingsToUse) { - return List.of(inferenceUtilityExecutor(settings)); - } - - public static ExecutorBuilder inferenceUtilityExecutor(Settings settings) { - return new ScalingExecutorBuilder( - UTILITY_THREAD_POOL_NAME, - 0, - 10, - TimeValue.timeValueMinutes(10), - false, - "xpack.inference.utility_thread_pool" - ); - } + @Extension + public static final ScalingExecutorBuilderSpec UTILITY_THREAD_POOL = new ScalingExecutorBuilderSpec( + UTILITY_THREAD_POOL_NAME, + 0, + 10, + TimeValue.timeValueMinutes(10), + false, + "xpack.inference.utility_thread_pool"); @Override public List> getSettings() { diff --git a/x-pack/plugin/ml-package-loader/src/main/java/org/elasticsearch/xpack/ml/packageloader/MachineLearningPackageLoader.java b/x-pack/plugin/ml-package-loader/src/main/java/org/elasticsearch/xpack/ml/packageloader/MachineLearningPackageLoader.java index 820c581b3e8f1..312ddcd1a71d9 100644 --- a/x-pack/plugin/ml-package-loader/src/main/java/org/elasticsearch/xpack/ml/packageloader/MachineLearningPackageLoader.java +++ b/x-pack/plugin/ml-package-loader/src/main/java/org/elasticsearch/xpack/ml/packageloader/MachineLearningPackageLoader.java @@ -7,21 +7,17 @@ package org.elasticsearch.xpack.ml.packageloader; import org.elasticsearch.Build; -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.ActionResponse; import org.elasticsearch.bootstrap.BootstrapCheck; import org.elasticsearch.bootstrap.BootstrapContext; import org.elasticsearch.common.ReferenceDocs; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Setting; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.plugin.Extension; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.tasks.Task; -import org.elasticsearch.threadpool.ExecutorBuilder; -import org.elasticsearch.threadpool.FixedExecutorBuilder; +import org.elasticsearch.threadpool.FixedExecutorBuilderSpec; import org.elasticsearch.xpack.core.ml.packageloader.action.GetTrainedModelPackageConfigAction; import org.elasticsearch.xpack.core.ml.packageloader.action.LoadTrainedModelPackageAction; import org.elasticsearch.xpack.ml.packageloader.action.ModelDownloadTask; @@ -57,7 +53,7 @@ public class MachineLearningPackageLoader extends Plugin implements ActionPlugin Build.current().version().replaceFirst("^(\\d+\\.\\d+).*", "$1") ); - public static final String MODEL_DOWNLOAD_THREADPOOL_NAME = "model_download"; + private static final String MODEL_DOWNLOAD_THREADPOOL_NAME = "model_download"; public MachineLearningPackageLoader() {} @@ -86,23 +82,15 @@ public List getNamedWriteables() { ); } - @Override - public List> getExecutorBuilders(Settings settings) { - return List.of(modelDownloadExecutor(settings)); - } - - public static FixedExecutorBuilder modelDownloadExecutor(Settings settings) { - // Threadpool with a fixed number of threads for - // downloading the model definition files - return new FixedExecutorBuilder( - settings, - MODEL_DOWNLOAD_THREADPOOL_NAME, - ModelImporter.NUMBER_OF_STREAMS, - -1, // unbounded queue size - "xpack.ml.model_download_thread_pool", - EsExecutors.TaskTrackingConfig.DO_NOT_TRACK - ); - } + // Threadpool with a fixed number of threads for + // downloading the model definition files + @Extension + public static final FixedExecutorBuilderSpec MODEL_DOWNLOAD_THREADPOOL = new FixedExecutorBuilderSpec( + MODEL_DOWNLOAD_THREADPOOL_NAME, + ModelImporter.NUMBER_OF_STREAMS, + -1, // unbounded queue size + "xpack.ml.model_download_thread_pool" + ); @Override public List getBootstrapChecks() { diff --git a/x-pack/plugin/ml-package-loader/src/main/java/org/elasticsearch/xpack/ml/packageloader/action/ModelImporter.java b/x-pack/plugin/ml-package-loader/src/main/java/org/elasticsearch/xpack/ml/packageloader/action/ModelImporter.java index b155d6c73ccef..eda3391cfa5b8 100644 --- a/x-pack/plugin/ml-package-loader/src/main/java/org/elasticsearch/xpack/ml/packageloader/action/ModelImporter.java +++ b/x-pack/plugin/ml-package-loader/src/main/java/org/elasticsearch/xpack/ml/packageloader/action/ModelImporter.java @@ -77,7 +77,7 @@ public class ModelImporter { this.modelId = Objects.requireNonNull(modelId); this.config = Objects.requireNonNull(packageConfig); this.task = Objects.requireNonNull(task); - this.executorService = threadPool.executor(MachineLearningPackageLoader.MODEL_DOWNLOAD_THREADPOOL_NAME); + this.executorService = threadPool.executor(MachineLearningPackageLoader.MODEL_DOWNLOAD_THREADPOOL); this.uri = ModelLoaderUtils.resolvePackageLocation( config.getModelRepository(), config.getPackagedModelId() + ModelLoaderUtils.MODEL_FILE_EXTENSION @@ -90,10 +90,10 @@ public void doImport(ActionListener listener) { } private void doImportInternal(ActionListener finalListener) { - assert ThreadPool.assertCurrentThreadPool(MachineLearningPackageLoader.MODEL_DOWNLOAD_THREADPOOL_NAME) + assert ThreadPool.assertCurrentThreadPool(MachineLearningPackageLoader.MODEL_DOWNLOAD_THREADPOOL.name()) : format( "Model download must execute from [%s] but thread is [%s]", - MachineLearningPackageLoader.MODEL_DOWNLOAD_THREADPOOL_NAME, + MachineLearningPackageLoader.MODEL_DOWNLOAD_THREADPOOL.name(), Thread.currentThread().getName() ); @@ -175,10 +175,10 @@ private void downloadPartInRange( RefCountingListener countingListener, ActionListener rangeFullyDownloadedListener ) { - assert ThreadPool.assertCurrentThreadPool(MachineLearningPackageLoader.MODEL_DOWNLOAD_THREADPOOL_NAME) + assert ThreadPool.assertCurrentThreadPool(MachineLearningPackageLoader.MODEL_DOWNLOAD_THREADPOOL.name()) : format( "Model download must execute from [%s] but thread is [%s]", - MachineLearningPackageLoader.MODEL_DOWNLOAD_THREADPOOL_NAME, + MachineLearningPackageLoader.MODEL_DOWNLOAD_THREADPOOL.name(), Thread.currentThread().getName() ); @@ -220,10 +220,10 @@ private void downloadFinalPart( ModelLoaderUtils.HttpStreamChunker downloader, ActionListener lastPartWrittenListener ) { - assert ThreadPool.assertCurrentThreadPool(MachineLearningPackageLoader.MODEL_DOWNLOAD_THREADPOOL_NAME) + assert ThreadPool.assertCurrentThreadPool(MachineLearningPackageLoader.MODEL_DOWNLOAD_THREADPOOL.name()) : format( "Model download must execute from [%s] but thread is [%s]", - MachineLearningPackageLoader.MODEL_DOWNLOAD_THREADPOOL_NAME, + MachineLearningPackageLoader.MODEL_DOWNLOAD_THREADPOOL.name(), Thread.currentThread().getName() ); diff --git a/x-pack/plugin/ml-package-loader/src/main/java/org/elasticsearch/xpack/ml/packageloader/action/TransportGetTrainedModelPackageConfigAction.java b/x-pack/plugin/ml-package-loader/src/main/java/org/elasticsearch/xpack/ml/packageloader/action/TransportGetTrainedModelPackageConfigAction.java index a19c077977a78..aa1afa4ac7111 100644 --- a/x-pack/plugin/ml-package-loader/src/main/java/org/elasticsearch/xpack/ml/packageloader/action/TransportGetTrainedModelPackageConfigAction.java +++ b/x-pack/plugin/ml-package-loader/src/main/java/org/elasticsearch/xpack/ml/packageloader/action/TransportGetTrainedModelPackageConfigAction.java @@ -70,7 +70,7 @@ protected void masterOperation(Task task, Request request, ClusterState state, A String packagedModelId = request.getPackagedModelId(); logger.debug(() -> format("Fetch package manifest for [%s] from [%s]", packagedModelId, repository)); - threadPool.executor(MachineLearningPackageLoader.MODEL_DOWNLOAD_THREADPOOL_NAME).execute(() -> { + threadPool.executor(MachineLearningPackageLoader.MODEL_DOWNLOAD_THREADPOOL).execute(() -> { try { URI uri = ModelLoaderUtils.resolvePackageLocation(repository, packagedModelId + ModelLoaderUtils.METADATA_FILE_EXTENSION); InputStream inputStream = ModelLoaderUtils.getInputStreamFromModelRepository(uri); diff --git a/x-pack/plugin/profiling/src/main/java/org/elasticsearch/xpack/profiling/ProfilingPlugin.java b/x-pack/plugin/profiling/src/main/java/org/elasticsearch/xpack/profiling/ProfilingPlugin.java index a7f629bf59264..c1ddc933f6e86 100644 --- a/x-pack/plugin/profiling/src/main/java/org/elasticsearch/xpack/profiling/ProfilingPlugin.java +++ b/x-pack/plugin/profiling/src/main/java/org/elasticsearch/xpack/profiling/ProfilingPlugin.java @@ -10,8 +10,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.SetOnce; -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.ActionResponse; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -24,12 +22,12 @@ import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.core.TimeValue; import org.elasticsearch.features.NodeFeature; +import org.elasticsearch.plugin.Extension; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; -import org.elasticsearch.threadpool.ExecutorBuilder; -import org.elasticsearch.threadpool.ScalingExecutorBuilder; +import org.elasticsearch.threadpool.ScalingExecutorBuilderSpec; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.XPackSettings; @@ -169,19 +167,15 @@ public List> getSettings() { ); } - @Override - public List> getExecutorBuilders(Settings settings) { - return List.of(responseExecutorBuilder()); - } - /** - * @return

An ExecutorBuilder that creates an executor to offload internal query response processing from the + *

An ExecutorBuilder that creates an executor to offload internal query response processing from the * transport thread. The executor will occupy no thread by default to avoid using resources when the plugin is not needed but once used, * it will hold onto allocated pool threads for 30 minutes by default to keep response times low.

*/ - public static ExecutorBuilder responseExecutorBuilder() { - return new ScalingExecutorBuilder(PROFILING_THREAD_POOL_NAME, 0, 1, TimeValue.timeValueMinutes(30L), false); - } + @Extension + public static final ScalingExecutorBuilderSpec PROFILING_THREAD_POOL = new ScalingExecutorBuilderSpec( + PROFILING_THREAD_POOL_NAME, 0, 1, TimeValue.timeValueMinutes(30L), false + ); @Override public Collection getActions() { diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java index 02a0d04a48e21..ff52bb8acaaea 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.features.NodeFeature; import org.elasticsearch.persistent.PersistentTasksExecutor; +import org.elasticsearch.plugin.Extension; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.PersistentTaskPlugin; import org.elasticsearch.plugins.Plugin; @@ -30,6 +31,7 @@ import org.elasticsearch.rest.RestHandler; import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.FixedExecutorBuilder; +import org.elasticsearch.threadpool.FixedExecutorBuilderSpec; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction; import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction; @@ -131,18 +133,13 @@ public Collection getActions() { ); } - @Override - public List> getExecutorBuilders(Settings settingsToUse) { - final FixedExecutorBuilder rollup = new FixedExecutorBuilder( - settingsToUse, - Rollup.TASK_THREAD_POOL_NAME, - 1, - -1, - "xpack.rollup.task_thread_pool", - EsExecutors.TaskTrackingConfig.DO_NOT_TRACK - ); - return List.of(rollup); - } + @Extension + public static final FixedExecutorBuilderSpec ROLLUP_TASK_THREAD_POOL = new FixedExecutorBuilderSpec( + Rollup.TASK_THREAD_POOL_NAME, + 1, + -1, + "xpack.rollup.task_thread_pool" + ); @Override public List> getPersistentTasksExecutor( diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/module-info.java b/x-pack/plugin/searchable-snapshots/src/main/java/module-info.java index 282a62f79dcb1..c5521ec1ef2bd 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/module-info.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/module-info.java @@ -16,6 +16,7 @@ requires org.apache.logging.log4j; requires org.apache.lucene.core; requires org.apache.lucene.analysis.common; + requires org.elasticsearch.plugin; exports org.elasticsearch.xpack.searchablesnapshots.action.cache to org.elasticsearch.server; exports org.elasticsearch.xpack.searchablesnapshots.action to org.elasticsearch.server; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index b0b4ffd6c9a13..7ccddfe17931f 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -10,8 +10,6 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.ActionResponse; import org.elasticsearch.blobcache.BlobCacheMetrics; import org.elasticsearch.blobcache.shared.SharedBlobCacheService; import org.elasticsearch.client.internal.Client; @@ -54,6 +52,7 @@ import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.plugin.Extension; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.ClusterPlugin; import org.elasticsearch.plugins.EnginePlugin; @@ -69,7 +68,7 @@ import org.elasticsearch.snapshots.SearchableSnapshotsSettings; import org.elasticsearch.snapshots.sourceonly.SourceOnlySnapshotRepository; import org.elasticsearch.threadpool.ExecutorBuilder; -import org.elasticsearch.threadpool.ScalingExecutorBuilder; +import org.elasticsearch.threadpool.ScalingExecutorBuilderSpec; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xpack.core.XPackPlugin; @@ -539,10 +538,6 @@ public Collection createAllocationDeciders(Settings settingsT ); } - public List> getExecutorBuilders(Settings settings) { - return List.of(executorBuilders(settings)); - } - public static final String SNAPSHOT_RECOVERY_STATE_FACTORY_KEY = "snapshot_prewarm"; @Override @@ -555,30 +550,25 @@ public Map getRecoveryStateFactories() { public static final String CACHE_PREWARMING_THREAD_POOL_NAME = BlobStoreRepository.SEARCHABLE_SNAPSHOTS_CACHE_PREWARMING_THREAD_NAME; public static final String CACHE_PREWARMING_THREAD_POOL_SETTING = "xpack.searchable_snapshots.cache_prewarming_thread_pool"; - public static ScalingExecutorBuilder[] executorBuilders(Settings settings) { - final int processors = EsExecutors.allocatedProcessors(settings); - // searchable snapshots cache thread pools should always reject tasks once they are shutting down, otherwise some threads might be - // waiting for some cache file regions to be populated but this will never happen once the thread pool is shutting down. In order to - // prevent these threads to be blocked the cache thread pools will reject after shutdown. - final boolean rejectAfterShutdown = true; - return new ScalingExecutorBuilder[] { - new ScalingExecutorBuilder( - CACHE_FETCH_ASYNC_THREAD_POOL_NAME, - 0, - Math.min(processors * 3, 50), - TimeValue.timeValueSeconds(30L), - rejectAfterShutdown, - CACHE_FETCH_ASYNC_THREAD_POOL_SETTING - ), - new ScalingExecutorBuilder( - CACHE_PREWARMING_THREAD_POOL_NAME, - 0, - 16, - TimeValue.timeValueSeconds(30L), - rejectAfterShutdown, - CACHE_PREWARMING_THREAD_POOL_SETTING - ) }; - } + @Extension + public static final ScalingExecutorBuilderSpec CACHE_FETCH_ASYNC_THREAD_POOL = new ScalingExecutorBuilderSpec( + CACHE_FETCH_ASYNC_THREAD_POOL_NAME, + 0, + settings -> Math.min(EsExecutors.allocatedProcessors(settings) * 3, 50), + TimeValue.timeValueSeconds(30L), + true, + CACHE_FETCH_ASYNC_THREAD_POOL_SETTING + ); + + @Extension + public static final ScalingExecutorBuilderSpec CACHE_PREWARMING_THREAD_POOL = new ScalingExecutorBuilderSpec( + CACHE_PREWARMING_THREAD_POOL_NAME, + 0, + 16, + TimeValue.timeValueSeconds(30L), + true, + CACHE_PREWARMING_THREAD_POOL_SETTING + ); private static Settings getIndexSettings() { return Settings.builder()