Skip to content

Commit 4a76915

Browse files
authored
Implement dynamic capacity for kubernetes task runner (#18591)
* Implement dynamic capacity for kubernetes task runner * Update docs * Refactor to use getIfNull * Update docs wording * Fix based on comments * Update wording for docs * Upate static config java doc * Undo removal of constructor * Initial config observer implementation * Use StringUtils.format * Add missing import * Update listener operations to be atomic * Fix config manager intialisation in tests * Add tests for effective config * Test syncCapacityWithDynamicConfig * Fix checkstyle * Update KubernetesOverlordModuleTest to setup ConfigManager * Update docs * Use AtomicInteger for currentCapacity
1 parent d91af35 commit 4a76915

27 files changed

+1255
-445
lines changed

docs/development/extensions-core/k8s-jobs.md

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ Other configurations required are:
4848
Druid operators can dynamically tune certain features within this extension. You don't need to restart the Overlord
4949
service for these changes to take effect.
5050

51-
Druid can dynamically tune [pod template selection](#pod-template-selection), which allows you to configure the pod
52-
template based on the task to be run. To enable dynamic pod template selection, first configure the
53-
[custom template pod adapter](#custom-template-pod-adapter).
51+
Druid can dynamically tune [pod template selection](#pod-template-selection) and [capacity](#properties). Where capacity refers to `druid.indexer.runner.capacity`.
52+
53+
Pod template selection allows you to configure the pod template based on the task to be run. To enable dynamic pod template selection, first configure the [custom template pod adapter](#custom-template-pod-adapter).
5454

5555
Use the following APIs to view and update the dynamic configuration for the Kubernetes task runner.
5656

@@ -126,7 +126,8 @@ Host: http://ROUTER_IP:ROUTER_PORT
126126
"type": ["index_kafka"]
127127
}
128128
]
129-
}
129+
},
130+
"capacity": 12
130131
}
131132
```
132133
</details>
@@ -135,6 +136,8 @@ Host: http://ROUTER_IP:ROUTER_PORT
135136

136137
Updates the dynamic configuration for the Kubernetes Task Runner
137138

139+
Note: Both `podTemplateSelectStrategy` and `capacity` are optional fields. A POST request may include either, both, or neither.
140+
138141
##### URL
139142

140143
`POST` `/druid/indexer/v1/k8s/taskrunner/executionconfig`
@@ -193,7 +196,8 @@ curl "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/k8s/taskrunner/executionconf
193196
"type": ["index_kafka"]
194197
}
195198
]
196-
}
199+
},
200+
"capacity": 6
197201
}'
198202
```
199203

@@ -225,7 +229,8 @@ Content-Type: application/json
225229
"type": ["index_kafka"]
226230
}
227231
]
228-
}
232+
},
233+
"capacity": 6
229234
}
230235
```
231236

@@ -309,7 +314,7 @@ Host: http://ROUTER_IP:ROUTER_PORT
309314
"comment": "",
310315
"ip": "127.0.0.1"
311316
},
312-
"payload": "{\"type\": \"default\",\"podTemplateSelectStrategy\":{\"type\": \"taskType\"}",
317+
"payload": "{\"type\": \"default\",\"podTemplateSelectStrategy\":{\"type\": \"taskType\"},\"capacity\":6",
313318
"auditTime": "2024-06-13T20:59:51.622Z"
314319
}
315320
]
@@ -790,7 +795,7 @@ Should you require the needed permissions for interacting across Kubernetes name
790795
| `druid.indexer.runner.annotations` | `JsonObject` | Additional annotations you want to add to peon pod. | `{}` | No |
791796
| `druid.indexer.runner.peonMonitors` | `JsonArray` | Overrides `druid.monitoring.monitors`. Use this property if you don't want to inherit monitors from the Overlord. | `[]` | No |
792797
| `druid.indexer.runner.graceTerminationPeriodSeconds` | `Long` | Number of seconds you want to wait after a sigterm for container lifecycle hooks to complete. Keep at a smaller value if you want tasks to hold locks for shorter periods. | `PT30S` (K8s default) | No |
793-
| `druid.indexer.runner.capacity` | `Integer` | Number of concurrent jobs that can be sent to Kubernetes. | `2147483647` | No |
798+
| `druid.indexer.runner.capacity` | `Integer` | Number of concurrent jobs that can be sent to Kubernetes. Value will be overridden if a dynamic config value has been set. | `2147483647` | No |
794799
| `druid.indexer.runner.cpuCoreInMicro` | `Integer` | Number of CPU micro core for the task. | `1000` | No |
795800
| `druid.indexer.runner.logSaveTimeout` | `Duration` | The peon executing the ingestion task makes a best effort to persist the pod logs from `k8s` to persistent task log storage. The timeout ensures that `k8s` connection issues do not cause the pod to hang indefinitely thereby blocking Overlord operations. If the timeout occurs before the logs are saved, those logs will not be available in Druid. | `PT300S` | NO |
796801

extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public class KubernetesOverlordModule implements DruidModule
9595
public void configure(Binder binder)
9696
{
9797
// druid.indexer.runner.type=k8s
98-
JsonConfigProvider.bind(binder, IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX, KubernetesTaskRunnerConfig.class);
98+
JsonConfigProvider.bind(binder, IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX, KubernetesTaskRunnerStaticConfig.class);
9999
JsonConfigProvider.bind(binder, K8SANDWORKER_PROPERTIES_PREFIX, KubernetesAndWorkerTaskRunnerConfig.class);
100100
JsonConfigProvider.bind(binder, "druid.indexer.queue", TaskQueueConfig.class);
101101
JacksonConfigProvider.bind(binder, KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, KubernetesTaskRunnerDynamicConfig.class, null);
@@ -150,10 +150,20 @@ public void configure(Binder binder)
150150
JsonConfigProvider.bind(binder, JDK_HTTPCLIENT_PROPERITES_PREFIX, DruidKubernetesJdkHttpClientConfig.class);
151151
}
152152

153+
@Provides
154+
@LazySingleton
155+
public KubernetesTaskRunnerEffectiveConfig provideEffectiveConfig(
156+
KubernetesTaskRunnerStaticConfig staticConfig,
157+
Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigSupplier
158+
)
159+
{
160+
return new KubernetesTaskRunnerEffectiveConfig(staticConfig, dynamicConfigSupplier);
161+
}
162+
153163
@Provides
154164
@LazySingleton
155165
public DruidKubernetesClient makeKubernetesClient(
156-
KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig,
166+
KubernetesTaskRunnerStaticConfig kubernetesTaskRunnerConfig,
157167
DruidKubernetesHttpClientFactory httpClientFactory,
158168
Lifecycle lifecycle
159169
)
@@ -217,7 +227,7 @@ TaskRunnerFactory<? extends WorkerTaskRunner> provideWorkerTaskRunner(
217227
TaskAdapter provideTaskAdapter(
218228
DruidKubernetesClient client,
219229
Properties properties,
220-
KubernetesTaskRunnerConfig kubernetesTaskRunnerConfig,
230+
KubernetesTaskRunnerEffectiveConfig kubernetesTaskRunnerConfig,
221231
TaskConfig taskConfig,
222232
StartupLoggingConfig startupLoggingConfig,
223233
@Self DruidNode druidNode,
@@ -260,7 +270,7 @@ TaskAdapter provideTaskAdapter(
260270
druidNode,
261271
smileMapper,
262272
taskLogs,
263-
new DynamicConfigPodTemplateSelector(properties, dynamicConfigRef)
273+
new DynamicConfigPodTemplateSelector(properties, kubernetesTaskRunnerConfig)
264274
);
265275
} else {
266276
return new SingleContainerTaskAdapter(

extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.google.common.util.concurrent.ListeningExecutorService;
3030
import com.google.common.util.concurrent.MoreExecutors;
3131
import io.fabric8.kubernetes.api.model.batch.v1.Job;
32+
import org.apache.druid.common.config.ConfigManager;
3233
import org.apache.druid.common.guava.FutureUtils;
3334
import org.apache.druid.error.DruidException;
3435
import org.apache.druid.indexer.RunnerTaskState;
@@ -44,6 +45,7 @@
4445
import org.apache.druid.java.util.common.DateTimes;
4546
import org.apache.druid.java.util.common.ISE;
4647
import org.apache.druid.java.util.common.Pair;
48+
import org.apache.druid.java.util.common.StringUtils;
4749
import org.apache.druid.java.util.common.concurrent.Execs;
4850
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
4951
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
@@ -56,6 +58,7 @@
5658
import org.apache.druid.k8s.overlord.common.K8sTaskId;
5759
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
5860
import org.apache.druid.k8s.overlord.common.KubernetesResourceNotFoundException;
61+
import org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
5962
import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter;
6063
import org.apache.druid.tasklogs.TaskLogStreamer;
6164
import org.jboss.netty.handler.codec.http.HttpMethod;
@@ -76,8 +79,11 @@
7679
import java.util.concurrent.ExecutionException;
7780
import java.util.concurrent.Executor;
7881
import java.util.concurrent.Executors;
82+
import java.util.concurrent.LinkedBlockingQueue;
7983
import java.util.concurrent.ScheduledExecutorService;
84+
import java.util.concurrent.ThreadPoolExecutor;
8085
import java.util.concurrent.TimeUnit;
86+
import java.util.concurrent.atomic.AtomicInteger;
8187
import java.util.stream.Collectors;
8288

8389
/**
@@ -100,6 +106,7 @@
100106
public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
101107
{
102108
private static final EmittingLogger log = new EmittingLogger(KubernetesTaskRunner.class);
109+
private static final String OBSERVER_KEY = "k8s-task-runner-capacity-%s";
103110
private final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>> listeners = new CopyOnWriteArrayList<>();
104111

105112
// to cleanup old jobs that might not have been deleted.
@@ -111,19 +118,23 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
111118
private final KubernetesPeonClient client;
112119
private final KubernetesTaskRunnerConfig config;
113120
private final ListeningExecutorService exec;
121+
private final ThreadPoolExecutor tpe;
114122
private final HttpClient httpClient;
115123
private final PeonLifecycleFactory peonLifecycleFactory;
116124
private final ServiceEmitter emitter;
117125
// currently worker categories aren't supported, so it's hardcoded.
118126
protected static final String WORKER_CATEGORY = "_k8s_worker_category";
119127

128+
private final AtomicInteger currentCapacity;
129+
120130
public KubernetesTaskRunner(
121131
TaskAdapter adapter,
122132
KubernetesTaskRunnerConfig config,
123133
KubernetesPeonClient client,
124134
HttpClient httpClient,
125135
PeonLifecycleFactory peonLifecycleFactory,
126-
ServiceEmitter emitter
136+
ServiceEmitter emitter,
137+
ConfigManager configManager
127138
)
128139
{
129140
this.adapter = adapter;
@@ -132,10 +143,12 @@ public KubernetesTaskRunner(
132143
this.httpClient = httpClient;
133144
this.peonLifecycleFactory = peonLifecycleFactory;
134145
this.cleanupExecutor = Executors.newScheduledThreadPool(1);
135-
this.exec = MoreExecutors.listeningDecorator(
136-
Execs.multiThreaded(config.getCapacity(), "k8s-task-runner-%d")
137-
);
138146
this.emitter = emitter;
147+
148+
this.currentCapacity = new AtomicInteger(config.getCapacity());
149+
this.tpe = new ThreadPoolExecutor(currentCapacity.get(), currentCapacity.get(), 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), Execs.makeThreadFactory("k8s-task-runner-%d", null));
150+
this.exec = MoreExecutors.listeningDecorator(this.tpe);
151+
configManager.addListener(KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, StringUtils.format(OBSERVER_KEY, Thread.currentThread().getId()), this::syncCapacityWithDynamicConfig);
139152
}
140153

141154
@Override
@@ -179,6 +192,24 @@ protected KubernetesWorkItem joinAsync(Task task)
179192
}
180193
}
181194

195+
private void syncCapacityWithDynamicConfig(KubernetesTaskRunnerDynamicConfig config)
196+
{
197+
int newCapacity = config.getCapacity();
198+
if (newCapacity == currentCapacity.get()) {
199+
return;
200+
}
201+
log.info("Adjusting k8s task runner capacity from [%d] to [%d]", currentCapacity.get(), newCapacity);
202+
// maximum pool size must always be greater than or equal to the core pool size
203+
if (newCapacity < currentCapacity.get()) {
204+
tpe.setCorePoolSize(newCapacity);
205+
tpe.setMaximumPoolSize(newCapacity);
206+
} else {
207+
tpe.setMaximumPoolSize(newCapacity);
208+
tpe.setCorePoolSize(newCapacity);
209+
}
210+
currentCapacity.set(newCapacity);
211+
}
212+
182213
private TaskStatus runTask(Task task)
183214
{
184215
return doTask(task, true);
@@ -294,7 +325,7 @@ public void shutdown(String taskid, String reason)
294325
synchronized (tasks) {
295326
tasks.remove(taskid);
296327
}
297-
328+
298329
}
299330

300331
@Override
@@ -420,7 +451,7 @@ public void stop()
420451
@Override
421452
public Map<String, Long> getTotalTaskSlotCount()
422453
{
423-
return ImmutableMap.of(WORKER_CATEGORY, (long) config.getCapacity());
454+
return ImmutableMap.of(WORKER_CATEGORY, (long) currentCapacity.get());
424455
}
425456

426457
@Override
@@ -438,13 +469,13 @@ public Optional<ScalingStats> getScalingStats()
438469
@Override
439470
public Map<String, Long> getIdleTaskSlotCount()
440471
{
441-
return ImmutableMap.of(WORKER_CATEGORY, (long) Math.max(0, config.getCapacity() - tasks.size()));
472+
return ImmutableMap.of(WORKER_CATEGORY, (long) Math.max(0, currentCapacity.get() - tasks.size()));
442473
}
443474

444475
@Override
445476
public Map<String, Long> getUsedTaskSlotCount()
446477
{
447-
return ImmutableMap.of(WORKER_CATEGORY, (long) Math.min(config.getCapacity(), tasks.size()));
478+
return ImmutableMap.of(WORKER_CATEGORY, (long) Math.min(currentCapacity.get(), tasks.size()));
448479
}
449480

450481
@Override
@@ -535,7 +566,7 @@ public RunnerTaskState getRunnerTaskState(String taskId)
535566
@Override
536567
public int getTotalCapacity()
537568
{
538-
return config.getCapacity();
569+
return currentCapacity.get();
539570
}
540571

541572
@Override

0 commit comments

Comments
 (0)