Skip to content

Commit 3d4caea

Browse files
YongGangFrankChen021
authored andcommitted
Support Dynamic Peon Pod Template Selection in K8s extension (#16510)
* initial commit * add Javadocs * refine JSON input config * more test and fix build * extract existing behavior as default strategy * change template mapping fallback * add docs * update doc * fix doc * address comments * define Matcher interface * fix test coverage * use lower case for endpoint path * update Json name * add more tests * refactoring Selector class
1 parent 375043a commit 3d4caea

20 files changed

+1485
-40
lines changed

docs/development/extensions-contrib/k8s-jobs.md

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,59 @@ data:
214214
druid.peon.mode=remote
215215
druid.indexer.task.encapsulatedTask=true
216216
```
217+
#### Dynamic Pod Template Selection Config
218+
The Dynamic Pod Template Selection feature enhances the K8s extension by enabling more flexible and dynamic selection of pod templates based on task properties. This process is governed by the `PodTemplateSelectStrategy`. Below are the two strategies implemented:
219+
220+
|Property|Description|Default|
221+
|--------|-----------|-------|
222+
|`TaskTypePodTemplateSelectStrategy`| This strategy selects pod templates based on task type for execution purposes, implementing the behavior that maps templates to specific task types. | true |
223+
|`SelectorBasedPodTemplateSelectStrategy`| This strategy evaluates a series of selectors, known as `selectors`, which are aligned with potential task properties. | false |
224+
225+
`SelectorBasedPodTemplateSelectStrategy`, the strategy implementing this new feature, is based on conditional `selectors` that match against top-level keys from the task payload. Currently, it supports matching based on task context tags, task type, and dataSource. These selectors are ordered in the dynamic configuration, with the first selector given the highest priority during the evaluation process. This means that the selection process uses these ordered conditions to determine a task’s Pod template. The first matching condition immediately determines the Pod template, thereby prioritizing certain configurations over others. If no selector matches, it will fall back to an optional `defaultKey` if configured; if there is still no match, it will use the `base` template.
226+
227+
Example Configuration:
228+
229+
We define two template keys in the configuration—`low-throughput` and `medium-throughput`—each associated with specific task conditions and arranged in a priority order.
230+
231+
- Low Throughput Template: This is the first template evaluated and has the highest priority. Tasks that have a context tag `billingCategory=streaming_ingestion` and a datasource of `wikipedia` will be classified under the `low-throughput` template. This classification directs such tasks to utilize a predefined pod template optimized for low throughput requirements.
232+
233+
- Medium Throughput Template: If a task does not meet the low-throughput criteria, the system will then evaluate it against the next selector in order. In this example, if the task type is index_kafka, it will fall into the `medium-throughput` template.
234+
```
235+
{
236+
"type": "default",
237+
"podTemplateSelectStrategy":
238+
{
239+
"type": "selectorBased",
240+
"selectors": [
241+
{
242+
"selectionKey": "low-throughput",
243+
"context.tags":
244+
{
245+
"billingCategory": ["streaming_ingestion"]
246+
},
247+
"dataSource": ["wikipedia"]
248+
},
249+
{
250+
"selectionKey": "medium-throughput",
251+
"type": ["index_kafka"]
252+
}
253+
],
254+
"defaultKey"" "base"
255+
}
256+
}
257+
```
258+
Task specific pod templates can be specified as the runtime property `druid.indexer.runner.k8s.podTemplate.{template}: /path/to/taskSpecificPodSpec.yaml` where {template} is the matched `selectionKey` of the `podTemplateSelectStrategy` i.e low-throughput.
259+
260+
Similar to Overlord dynamic configuration, the following API endpoints are defined to retrieve and manage dynamic configurations of Pod Template Selection config:
261+
262+
- Get dynamic configuration:
263+
`POST` `/druid/indexer/v1/k8s/taskRunner/executionConfig`
264+
265+
- Update dynamic configuration:
266+
`GET` `/druid/indexer/v1/k8s/taskRunner/executionConfig`
267+
268+
- Get dynamic configuration history:
269+
`GET` `/druid/indexer/v1/k8s/taskRunner/executionConfig/history`
217270

218271
### Properties
219272
|Property| Possible Values | Description |Default|required|

extensions-contrib/kubernetes-overlord-extensions/pom.xml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,26 @@
127127
<version>6.7.2</version>
128128
<scope>runtime</scope>
129129
</dependency>
130+
<dependency>
131+
<groupId>javax.ws.rs</groupId>
132+
<artifactId>jsr311-api</artifactId>
133+
<scope>provided</scope>
134+
</dependency>
135+
<dependency>
136+
<groupId>javax.servlet</groupId>
137+
<artifactId>javax.servlet-api</artifactId>
138+
<scope>provided</scope>
139+
</dependency>
140+
<dependency>
141+
<groupId>com.sun.jersey</groupId>
142+
<artifactId>jersey-server</artifactId>
143+
<scope>provided</scope>
144+
</dependency>
145+
<dependency>
146+
<groupId>jakarta.inject</groupId>
147+
<artifactId>jakarta.inject-api</artifactId>
148+
<scope>provided</scope>
149+
</dependency>
130150

131151
<!-- Tests -->
132152
<dependency>

extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import org.apache.druid.discovery.NodeRole;
3333
import org.apache.druid.guice.Binders;
3434
import org.apache.druid.guice.IndexingServiceModuleHelper;
35+
import org.apache.druid.guice.JacksonConfigProvider;
36+
import org.apache.druid.guice.Jerseys;
3537
import org.apache.druid.guice.JsonConfigProvider;
3638
import org.apache.druid.guice.JsonConfigurator;
3739
import org.apache.druid.guice.LazySingleton;
@@ -49,6 +51,8 @@
4951
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
5052
import org.apache.druid.java.util.common.logger.Logger;
5153
import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
54+
import org.apache.druid.k8s.overlord.execution.KubernetesTaskExecutionConfigResource;
55+
import org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
5256
import org.apache.druid.k8s.overlord.runnerstrategy.RunnerStrategy;
5357
import org.apache.druid.tasklogs.NoopTaskLogs;
5458
import org.apache.druid.tasklogs.TaskLogKiller;
@@ -75,6 +79,7 @@ public void configure(Binder binder)
7579
JsonConfigProvider.bind(binder, IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX, KubernetesTaskRunnerConfig.class);
7680
JsonConfigProvider.bind(binder, K8SANDWORKER_PROPERTIES_PREFIX, KubernetesAndWorkerTaskRunnerConfig.class);
7781
JsonConfigProvider.bind(binder, "druid.indexer.queue", TaskQueueConfig.class);
82+
JacksonConfigProvider.bind(binder, KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, KubernetesTaskRunnerDynamicConfig.class, null);
7883
PolyBind.createChoice(
7984
binder,
8085
"druid.indexer.runner.type",
@@ -98,6 +103,8 @@ public void configure(Binder binder)
98103
.toProvider(RunnerStrategyProvider.class)
99104
.in(LazySingleton.class);
100105
configureTaskLogs(binder);
106+
107+
Jerseys.addResource(binder, KubernetesTaskExecutionConfigResource.class);
101108
}
102109

103110
@Provides

extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.druid.k8s.overlord;
2121

2222
import com.fasterxml.jackson.databind.ObjectMapper;
23+
import com.google.common.base.Supplier;
2324
import com.google.inject.Inject;
2425
import org.apache.druid.guice.IndexingServiceModuleHelper;
2526
import org.apache.druid.guice.annotations.EscalatedGlobal;
@@ -32,6 +33,7 @@
3233
import org.apache.druid.java.util.http.client.HttpClient;
3334
import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
3435
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
36+
import org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
3537
import org.apache.druid.k8s.overlord.taskadapter.MultiContainerTaskAdapter;
3638
import org.apache.druid.k8s.overlord.taskadapter.PodTemplateTaskAdapter;
3739
import org.apache.druid.k8s.overlord.taskadapter.SingleContainerTaskAdapter;
@@ -56,6 +58,7 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
5658
private final Properties properties;
5759
private final DruidKubernetesClient druidKubernetesClient;
5860
private final ServiceEmitter emitter;
61+
private final Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigRef;
5962
private KubernetesTaskRunner runner;
6063

6164
@Inject
@@ -69,7 +72,8 @@ public KubernetesTaskRunnerFactory(
6972
TaskConfig taskConfig,
7073
Properties properties,
7174
DruidKubernetesClient druidKubernetesClient,
72-
ServiceEmitter emitter
75+
ServiceEmitter emitter,
76+
Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigRef
7377
)
7478
{
7579
this.smileMapper = smileMapper;
@@ -82,6 +86,7 @@ public KubernetesTaskRunnerFactory(
8286
this.properties = properties;
8387
this.druidKubernetesClient = druidKubernetesClient;
8488
this.emitter = emitter;
89+
this.dynamicConfigRef = dynamicConfigRef;
8590
}
8691

8792
@Override
@@ -146,7 +151,8 @@ private TaskAdapter buildTaskAdapter(DruidKubernetesClient client)
146151
druidNode,
147152
smileMapper,
148153
properties,
149-
taskLogs
154+
taskLogs,
155+
dynamicConfigRef
150156
);
151157
} else {
152158
return new SingleContainerTaskAdapter(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.k8s.overlord.execution;
21+
22+
import com.fasterxml.jackson.annotation.JsonCreator;
23+
import com.fasterxml.jackson.annotation.JsonProperty;
24+
import com.google.common.base.Preconditions;
25+
26+
import java.util.Objects;
27+
28+
public class DefaultKubernetesTaskRunnerDynamicConfig implements KubernetesTaskRunnerDynamicConfig
29+
{
30+
private final PodTemplateSelectStrategy podTemplateSelectStrategy;
31+
32+
@JsonCreator
33+
public DefaultKubernetesTaskRunnerDynamicConfig(
34+
@JsonProperty("podTemplateSelectStrategy") PodTemplateSelectStrategy podTemplateSelectStrategy
35+
)
36+
{
37+
Preconditions.checkNotNull(podTemplateSelectStrategy);
38+
this.podTemplateSelectStrategy = podTemplateSelectStrategy;
39+
}
40+
41+
@Override
42+
@JsonProperty
43+
public PodTemplateSelectStrategy getPodTemplateSelectStrategy()
44+
{
45+
return podTemplateSelectStrategy;
46+
}
47+
48+
@Override
49+
public boolean equals(Object o)
50+
{
51+
if (this == o) {
52+
return true;
53+
}
54+
if (o == null || getClass() != o.getClass()) {
55+
return false;
56+
}
57+
DefaultKubernetesTaskRunnerDynamicConfig that = (DefaultKubernetesTaskRunnerDynamicConfig) o;
58+
return Objects.equals(podTemplateSelectStrategy, that.podTemplateSelectStrategy);
59+
}
60+
61+
@Override
62+
public int hashCode()
63+
{
64+
return Objects.hashCode(podTemplateSelectStrategy);
65+
}
66+
67+
@Override
68+
public String toString()
69+
{
70+
return "DefaultKubernetesTaskRunnerDynamicConfig{" +
71+
"podTemplateSelectStrategy=" + podTemplateSelectStrategy +
72+
'}';
73+
}
74+
}

0 commit comments

Comments
 (0)