Skip to content

Commit 69505a3

Browse files
capistrantkfaraz
andauthored
Create a new KubernetesPeonClient that uses fabric8 informers to reduce load on an underlying k8s API (#18599)
* app code working but needs cleanup and testing * caching side cleaner. need to add back direct client * Implementation ready for deeper UT and ET writing * checkstyle cleanup * Remove the busy waiting. Overhaul caching client testing * some config and instantion cleanup along with basic docs * extnd the K8s task runner docker test to run with both direct and caching mode for the k8s client * fix spelling and add resync to dictionary * fix strict compile issues * fixup checkstyle * fix k8s overlord module setup * few small fixups * fix checkstyle * fix up some issues with wait for job completion * cleanup and fix some tests * Make DruidKubernetesClient defend against invalid use if caching is off * cleanup checkstyle * dont use deprecated method * doc update * fix spelling * fix checkstyle after merge with master * Improve reliability of the Caching K8s Peon Client code and associated embedded tests * fix checkstyle * Modifications to try and reduce caching client api impact even more * Fixup tests now that we have refactored log fetching * remove some whitespace from the diff. Can be corrected in a future formatting patch * one more whitespace cleanup * more diff cleanup * Make another api usage optimization for the caching client. Clean up code and javadocs * diff cleanup * Some better class javadocs for the k8s clients * logging and comment cleanup * DruidKubernetesClient tidy up * javadoc link add * Use background propagation policy when deleting jobs to lessen load on k8s api * fix an npe and add a test to caching client * Remove AbstractK8sClient, rename DirectClient * Remove formatting changes in KubernetesPeonClient * Remove more formatting changes * Address the more minor review comments * re-add log watch refactors to KubernetesPeonClient, they reduce API traffic * migrate timers to stopwatch in caching k8s client per review comments * Remove unused code * style fix * remove unneeded code * Extract Caching client code from DruidKubernetesClient per review * Make name for cache read methods more logical * Stop exposing the EventNotifier in DruidKubernetesCachingClient * Improve informer executor name per review * Simplify informer setup for caching client * cleanup caching client tests and add a lifecycle stop to the informers * Improve thread safety of KubernetesResourceEventNotifier * Simply the peon waiting code for the caching client * Fix the k8s overlord module for the caching client * fix configs for docker embedded test * fix broken embedded tests * use the indexer not informer for cache reads * Cleanup after another review round --------- Co-authored-by: Kashif Faraz <[email protected]>
1 parent 6275277 commit 69505a3

31 files changed

+2065
-192
lines changed

docs/development/extensions-core/k8s-jobs.md

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,31 @@ Consider this an [EXPERIMENTAL](../experimental.md) feature mostly because it ha
3333

3434
The K8s extension builds a pod spec for each task using the specified pod adapter. All jobs are natively restorable, they are decoupled from the Druid deployment, thus restarting pods or doing upgrades has no effect on tasks in flight. They will continue to run and when the overlord comes back up it will start tracking them again.
3535

36+
## Kubernetes Client Mode
37+
38+
### "Direct" K8s API Interaction per task *(Default)*
39+
40+
Task lifecycle code in Druid talks directly to the Kubernetes API server for all operations that require interaction with the Kubernetes cluster.
41+
42+
### `SharedInformer` "Caching" *(Experimental)*
43+
44+
Enabled by setting `druid.indexer.runner.useK8sSharedInformers=true`, this mode uses `Fabric8` `SharedInformer` objects for monitoring state changes in the remote K8s cluster, reducing the number of direct API calls to the Kubernetes API server. This can greatly reduce load on the API server, especially in environments with a high volume of tasks.
45+
46+
This mode is experimental and should be used with caution in production until it has been vetted more thoroughly by the community.
47+
48+
The core idea is to use two `SharedInformers`, one for jobs and one for pods, to watch for changes in the remote K8s cluster. These informers maintain a local cache of jobs and pods that tasks can query. The informers can also notify listeners when changes occur, allowing tasks to react to state changes without polling the API server or creating per-task watches on the K8s cluster.
49+
50+
#### Architecture: Direct vs. Caching Mode
51+
52+
**Key Differences:**
53+
54+
- `DirectKubernetesPeonClient` (Default): Every read operation makes a direct HTTP call to the K8s API server. With 100 concurrent tasks, this results in 100+ active API connections with continuous polling.
55+
56+
- `CachingKubernetesPeonClient` (Experimental): All read operations query an in-memory cache maintained by `SharedInformers`. With 100 concurrent tasks, only 2 persistent watch connections are used (one for Jobs, one for Pods), achieving a large reduction in API calls.
57+
58+
**Shared Operations**:
59+
60+
Both implementations share the same write (job creation, deletion) and log read operations code, which always use direct API calls.
3661

3762
## Configuration
3863

@@ -798,7 +823,8 @@ Should you require the needed permissions for interacting across Kubernetes name
798823
| `druid.indexer.runner.capacity` | `Integer` | Number of concurrent jobs that can be sent to Kubernetes. Value will be overridden if a dynamic config value has been set. | `2147483647` | No |
799824
| `druid.indexer.runner.cpuCoreInMicro` | `Integer` | Number of CPU micro core for the task. | `1000` | No |
800825
| `druid.indexer.runner.logSaveTimeout` | `Duration` | The peon executing the ingestion task makes a best effort to persist the pod logs from `k8s` to persistent task log storage. The timeout ensures that `k8s` connection issues do not cause the pod to hang indefinitely thereby blocking Overlord operations. If the timeout occurs before the logs are saved, those logs will not be available in Druid. | `PT300S` | NO |
801-
826+
| `druid.indexer.runner.useK8sSharedInformers` | `boolean` | Whether to use shared informers to watch for pod/job changes. This is more efficient on the Kubernetes API server, but may use more memory in the Overlord. | `false` | No |
827+
| `druid.indexer.runner.k8sSharedInformerResyncPeriod` | `Duration` | When using shared informers, controls how frequently the informers resync with the Kubernetes API server. This prevents change events from being missed, keeping the informer cache clean and accurate. | `PT300S` | No |
802828

803829
### Metrics added
804830

embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,10 @@
2020
package org.apache.druid.testing.embedded.indexing;
2121

2222
import com.google.common.base.Optional;
23-
import com.google.common.collect.ImmutableList;
2423
import org.apache.commons.io.IOUtils;
2524
import org.apache.druid.common.utils.IdUtils;
2625
import org.apache.druid.data.input.impl.CsvInputFormat;
2726
import org.apache.druid.data.input.impl.TimestampSpec;
28-
import org.apache.druid.indexer.TaskState;
29-
import org.apache.druid.indexer.TaskStatusPlus;
3027
import org.apache.druid.indexing.common.task.CompactionTask;
3128
import org.apache.druid.indexing.common.task.IndexTask;
3229
import org.apache.druid.indexing.common.task.NoopTask;
@@ -40,7 +37,6 @@
4037
import org.apache.druid.java.util.common.DateTimes;
4138
import org.apache.druid.java.util.common.Intervals;
4239
import org.apache.druid.java.util.common.StringUtils;
43-
import org.apache.druid.java.util.common.parsers.CloseableIterator;
4440
import org.apache.druid.metadata.storage.postgresql.PostgreSQLMetadataStorageModule;
4541
import org.apache.druid.query.DruidMetrics;
4642
import org.apache.druid.query.http.SqlTaskStatus;
@@ -243,7 +239,7 @@ public void test_runIndexParallelTask_andCompactData()
243239
.dynamicPartitionWithMaxRows(5000)
244240
.withId(compactTaskId);
245241
cluster.callApi().onLeaderOverlord(o -> o.runTask(compactTaskId, compactionTask));
246-
cluster.callApi().waitForTaskToSucceed(taskId, eventCollector.latchableEmitter());
242+
cluster.callApi().waitForTaskToSucceed(compactTaskId, eventCollector.latchableEmitter());
247243

248244
// Verify the compacted data
249245
final int numCompactedSegments = 5;
@@ -308,13 +304,10 @@ public void test_runKafkaSupervisor()
308304
Assertions.assertEquals("RUNNING", supervisorStatus.getState());
309305
Assertions.assertEquals(topic, supervisorStatus.getSource());
310306

311-
// Get the task statuses
312-
List<TaskStatusPlus> taskStatuses = ImmutableList.copyOf(
313-
(CloseableIterator<TaskStatusPlus>)
314-
cluster.callApi().onLeaderOverlord(o -> o.taskStatuses(null, dataSource, 1))
315-
);
316-
Assertions.assertFalse(taskStatuses.isEmpty());
317-
Assertions.assertEquals(TaskState.RUNNING, taskStatuses.get(0).getStatusCode());
307+
// Confirm tasks are being created and running
308+
int runningTasks = cluster.callApi().getTaskCount("running", dataSource);
309+
int completedTasks = cluster.callApi().getTaskCount("complete", dataSource);
310+
Assertions.assertTrue(runningTasks + completedTasks > 0);
318311

319312
// Suspend the supervisor and verify the state
320313
cluster.callApi().onLeaderOverlord(

embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesTaskRunnerDockerTest.java renamed to embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/BaseKubernetesTaskRunnerDockerTest.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,17 @@
2727
import org.junit.jupiter.api.BeforeEach;
2828

2929
/**
30-
* Runs some basic ingestion tests against latest image Druid containers running
31-
* on a K3s cluster with druid-operator and using {@code k8s} task runner type.
30+
* Base class for Kubernetes task runner tests. Subclasses configure whether to use
31+
* SharedInformers for caching.
3232
*/
33-
public class KubernetesTaskRunnerDockerTest extends IngestionSmokeTest implements LatestImageDockerTest
33+
abstract class BaseKubernetesTaskRunnerDockerTest extends IngestionSmokeTest implements LatestImageDockerTest
3434
{
35-
private static final String MANIFEST_TEMPLATE = "manifests/druid-service-with-operator.yaml";
35+
protected static final String MANIFEST_TEMPLATE = "manifests/druid-service-with-operator.yaml";
36+
37+
/**
38+
* Subclasses override to enable/disable SharedInformer caching.
39+
*/
40+
protected abstract boolean useSharedInformers();
3641

3742
@Override
3843
protected EmbeddedDruidCluster addServers(EmbeddedDruidCluster cluster)
@@ -45,6 +50,8 @@ protected EmbeddedDruidCluster addServers(EmbeddedDruidCluster cluster)
4550
.addProperty("druid.indexer.runner.type", "k8s")
4651
.addProperty("druid.indexer.runner.namespace", "druid")
4752
.addProperty("druid.indexer.runner.capacity", "4")
53+
.addProperty("druid.indexer.runner.useK8sSharedInformers", String.valueOf(useSharedInformers()))
54+
.addProperty("druid.indexer.runner.k8sSharedInformerResyncPeriod", "PT1s")
4855
.usingPort(30090);
4956

5057
final K3sClusterResource k3sCluster = new K3sClusterWithOperatorResource()
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.testing.embedded.k8s;
21+
22+
/**
23+
* Runs ingestion tests using SharedInformer caching mode.
24+
* Uses Fabric8 SharedInformers to maintain a local cache of Jobs and Pods,
25+
* reducing load on the Kubernetes API server.
26+
*/
27+
public class KubernetesTaskRunnerCachingModeDockerTest extends BaseKubernetesTaskRunnerDockerTest
28+
{
29+
@Override
30+
protected boolean useSharedInformers()
31+
{
32+
return true;
33+
}
34+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.testing.embedded.k8s;
21+
22+
/**
23+
* Runs ingestion tests using direct K8s API interaction (default mode).
24+
* Each task makes direct API calls to the Kubernetes API server.
25+
*/
26+
public class KubernetesTaskRunnerDirectModeDockerTest extends BaseKubernetesTaskRunnerDockerTest
27+
{
28+
@Override
29+
protected boolean useSharedInformers()
30+
{
31+
return false;
32+
}
33+
}

extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.druid.java.util.common.StringUtils;
5454
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
5555
import org.apache.druid.java.util.common.logger.Logger;
56+
import org.apache.druid.k8s.overlord.common.DruidKubernetesCachingClient;
5657
import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
5758
import org.apache.druid.k8s.overlord.common.httpclient.DruidKubernetesHttpClientFactory;
5859
import org.apache.druid.k8s.overlord.common.httpclient.jdk.DruidKubernetesJdkHttpClientConfig;
@@ -73,6 +74,7 @@
7374
import org.apache.druid.server.log.StartupLoggingConfig;
7475
import org.apache.druid.tasklogs.TaskLogs;
7576

77+
import javax.annotation.Nullable;
7678
import java.util.Locale;
7779
import java.util.Properties;
7880

@@ -160,6 +162,10 @@ public KubernetesTaskRunnerEffectiveConfig provideEffectiveConfig(
160162
return new KubernetesTaskRunnerEffectiveConfig(staticConfig, dynamicConfigSupplier);
161163
}
162164

165+
/**
166+
* Provides the base Kubernetes client for direct API operations.
167+
* This is always created regardless of caching configuration.
168+
*/
163169
@Provides
164170
@LazySingleton
165171
public DruidKubernetesClient makeKubernetesClient(
@@ -168,15 +174,16 @@ public DruidKubernetesClient makeKubernetesClient(
168174
Lifecycle lifecycle
169175
)
170176
{
171-
final DruidKubernetesClient client;
172177
final Config config = new ConfigBuilder().build();
173178

174179
if (kubernetesTaskRunnerConfig.isDisableClientProxy()) {
175180
config.setHttpsProxy(null);
176181
config.setHttpProxy(null);
177182
}
178183

179-
client = new DruidKubernetesClient(httpClientFactory, config);
184+
config.setNamespace(kubernetesTaskRunnerConfig.getNamespace());
185+
186+
final DruidKubernetesClient client = new DruidKubernetesClient(httpClientFactory, config);
180187

181188
lifecycle.addHandler(
182189
new Lifecycle.Handler()
@@ -199,6 +206,58 @@ public void stop()
199206
return client;
200207
}
201208

209+
/**
210+
* Provides the caching Kubernetes client that uses informers for efficient resource watching.
211+
* Only created when caching is enabled via configuration.
212+
*/
213+
@Provides
214+
@LazySingleton
215+
@Nullable
216+
public DruidKubernetesCachingClient makeCachingKubernetesClient(
217+
KubernetesTaskRunnerStaticConfig kubernetesTaskRunnerConfig,
218+
DruidKubernetesClient baseClient,
219+
Lifecycle lifecycle
220+
)
221+
{
222+
if (!kubernetesTaskRunnerConfig.isUseK8sSharedInformers()) {
223+
log.info("Kubernetes shared informers disabled, caching client will not be created");
224+
return null;
225+
}
226+
227+
String namespace = kubernetesTaskRunnerConfig.getNamespace();
228+
long resyncPeriodMillis = kubernetesTaskRunnerConfig
229+
.getK8sSharedInformerResyncPeriod()
230+
.toStandardDuration()
231+
.getMillis();
232+
233+
log.info("Creating Kubernetes caching client with informer resync period: %d ms", resyncPeriodMillis);
234+
final DruidKubernetesCachingClient cachingClient = new DruidKubernetesCachingClient(
235+
baseClient,
236+
namespace,
237+
resyncPeriodMillis
238+
);
239+
240+
lifecycle.addHandler(
241+
new Lifecycle.Handler()
242+
{
243+
@Override
244+
public void start()
245+
{
246+
247+
}
248+
249+
@Override
250+
public void stop()
251+
{
252+
log.info("Stopping Kubernetes caching client");
253+
cachingClient.stop();
254+
}
255+
}
256+
);
257+
258+
return cachingClient;
259+
}
260+
202261
/**
203262
* Provides a TaskRunnerFactory instance suitable for environments without Zookeeper.
204263
* In such environments, the standard RemoteTaskRunnerFactory may not be operational.

extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,25 @@ public interface KubernetesTaskRunnerConfig
7272

7373
Integer getCapacity();
7474

75+
/**
76+
* Whether to use caching for Kubernetes resources tied to indexing tasks.
77+
* <p>
78+
* Enabling shared informers can significantly reduce the number of API calls made to the Kubernetes API server,
79+
* improving performance and reducing load on the server. However, it also increases memory usage as informers
80+
* maintain local caches of resources.
81+
* </p>
82+
*/
83+
boolean isUseK8sSharedInformers();
84+
85+
/**
86+
* The resync period for the Kubernetes shared informers, if enabled.
87+
* <p>
88+
* Periodic resyncs ensure that the informer's local cache is kept up to date with the remote Kubernetes API server
89+
* state. This helps handle missed events or transient errors.
90+
* </p>
91+
*/
92+
Period getK8sSharedInformerResyncPeriod();
93+
7594
static Builder builder()
7695
{
7796
return new Builder();
@@ -100,6 +119,8 @@ public static class Builder
100119
private Integer capacity;
101120
private Period taskJoinTimeout;
102121
private Period logSaveTimeout;
122+
private boolean useK8sSharedInformers;
123+
private Period k8sSharedInformerResyncPeriod;
103124

104125
public Builder()
105126
{
@@ -232,6 +253,18 @@ public Builder withLogSaveTimeout(Period logSaveTimeout)
232253
return this;
233254
}
234255

256+
public Builder withUseK8sSharedInformers(boolean useK8sSharedInformers)
257+
{
258+
this.useK8sSharedInformers = useK8sSharedInformers;
259+
return this;
260+
}
261+
262+
public Builder withK8sSharedInformerResyncPeriod(Period k8sSharedInformerResyncPeriod)
263+
{
264+
this.k8sSharedInformerResyncPeriod = k8sSharedInformerResyncPeriod;
265+
return this;
266+
}
267+
235268
public KubernetesTaskRunnerStaticConfig build()
236269
{
237270
return new KubernetesTaskRunnerStaticConfig(
@@ -255,7 +288,9 @@ public KubernetesTaskRunnerStaticConfig build()
255288
this.labels,
256289
this.annotations,
257290
this.capacity,
258-
this.taskJoinTimeout
291+
this.taskJoinTimeout,
292+
this.useK8sSharedInformers,
293+
this.k8sSharedInformerResyncPeriod
259294
);
260295
}
261296
}

extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerEffectiveConfig.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,18 @@ public Integer getCapacity()
177177
return dynamicConfigSupplier.get().getCapacity();
178178
}
179179

180+
@Override
181+
public boolean isUseK8sSharedInformers()
182+
{
183+
return staticConfig.isUseK8sSharedInformers();
184+
}
185+
186+
@Override
187+
public Period getK8sSharedInformerResyncPeriod()
188+
{
189+
return staticConfig.getK8sSharedInformerResyncPeriod();
190+
}
191+
180192
public PodTemplateSelectStrategy getPodTemplateSelectStrategy()
181193
{
182194
if (dynamicConfigSupplier == null || dynamicConfigSupplier.get() == null || dynamicConfigSupplier.get().getPodTemplateSelectStrategy() == null) {

0 commit comments

Comments
 (0)