Skip to content

Commit 1a8ab17

Browse files
committed
Merge branch 'master' into msq-compact-task-failmsg
2 parents 60e80d8 + 51a7494 commit 1a8ab17

File tree

259 files changed

+10765
-4248
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

259 files changed

+10765
-4248
lines changed

.github/workflows/ci.yml

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ jobs:
2929
uses: ./.github/workflows/worker.yml
3030
with:
3131
script: .github/scripts/run_unit-tests -Dtest=!QTest,'${{ matrix.pattern }}' -Dmaven.test.failure.ignore=true
32-
artifact_prefix: "unit-test-reports"
32+
artifact_prefix: "unit-test-reports-jdk${{ matrix.jdk }}"
3333
jdk: ${{ matrix.jdk }}
3434
key: "test-jdk${{ matrix.jdk }}-[${{ matrix.pattern }}]"
3535

@@ -43,17 +43,21 @@ jobs:
4343
name: "test-report"
4444
needs: run-unit-tests
4545
runs-on: ubuntu-latest
46+
strategy:
47+
fail-fast: false
48+
matrix:
49+
jdk: [ "17", "21" ]
4650
steps:
4751
- name: Download reports for all unit test jobs
4852
uses: actions/download-artifact@v4
4953
with:
50-
pattern: "unit-test-reports-*"
54+
pattern: "unit-test-reports-jdk${{ matrix.jdk }}*"
5155
path: target/surefire-reports
5256

5357
- name: Publish Test Report
5458
uses: mikepenz/action-junit-report@v5
5559
with:
56-
check_name: "Unit Test Report"
60+
check_name: "Unit Test Report (JDK ${{ matrix.jdk }})"
5761
report_paths: '**/target/surefire-reports/TEST-*.xml'
5862
detailed_summary: true
5963
flaky_summary: true

.github/workflows/cron-job-its.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ jobs:
8888
strategy:
8989
fail-fast: false
9090
matrix:
91-
testing_group: [ query, query-retry, query-error, security ]
91+
testing_group: [ query, security ]
9292
uses: ./.github/workflows/reusable-standard-its.yml
9393
needs: build
9494
with:

.github/workflows/standard-its.yml

Lines changed: 2 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ jobs:
7878
strategy:
7979
fail-fast: false
8080
matrix:
81-
testing_group: [query, query-retry, query-error, security, centralized-datasource-schema]
81+
testing_group: [query, security, centralized-datasource-schema]
8282
uses: ./.github/workflows/reusable-standard-its.yml
8383
if: ${{ needs.changes.outputs.core == 'true' || needs.changes.outputs.common-extensions == 'true' }}
8484
with:
@@ -112,62 +112,4 @@ jobs:
112112
testing_groups: -Dgroups=custom-coordinator-duties
113113
use_indexer: middleManager
114114
override_config_path: ./environment-configs/test-groups/custom-coordinator-duties
115-
group: custom coordinator duties
116-
117-
integration-k8s-leadership-tests:
118-
needs: changes
119-
name: (Compile=openjdk17, Run=openjdk17, Cluster Build On K8s) ITNestedQueryPushDownTest integration test
120-
if: ${{ needs.changes.outputs.core == 'true' || needs.changes.outputs.common-extensions == 'true' }}
121-
runs-on: ubuntu-22.04
122-
env:
123-
MVN: mvn --no-snapshot-updates
124-
MAVEN_SKIP: -P skip-static-checks -Dweb.console.skip=true -Dmaven.javadoc.skip=true
125-
CONFIG_FILE: k8s_run_config_file.json
126-
IT_TEST: -Dit.test=ITNestedQueryPushDownTest
127-
POD_NAME: int-test
128-
POD_NAMESPACE: default
129-
BUILD_DRUID_CLUSTER: true
130-
steps:
131-
- name: Checkout branch
132-
uses: actions/checkout@v4
133-
134-
- name: setup java
135-
uses: actions/setup-java@v4
136-
with:
137-
java-version: '17'
138-
distribution: 'zulu'
139-
140-
# the build step produces SNAPSHOT artifacts into the local maven repository,
141-
# we include github.sha in the cache key to make it specific to that build/jdk
142-
- name: Restore Maven repository
143-
id: maven-restore
144-
uses: actions/cache/restore@v4
145-
with:
146-
path: ~/.m2/repository
147-
key: maven-${{ runner.os }}-17-${{ github.sha }}
148-
restore-keys: setup-java-Linux-maven-${{ hashFiles('**/pom.xml') }}
149-
150-
- name: Maven build
151-
if: steps.maven-restore.outputs.cache-hit != 'true'
152-
run: |
153-
./it.sh ci
154-
155-
- name: Run IT
156-
id: test
157-
timeout-minutes: 90
158-
run: |
159-
set -x
160-
mvn -B -ff install -pl '!web-console' -Pdist,bundle-contrib-exts -Pskip-static-checks,skip-tests -Dmaven.javadoc.skip=true -T1C
161-
# Note: The above command relies on the correct version of the JARs being installed in the local m2 repository.
162-
# For any changes, please rebuild it using the command from the previous step (./it.sh ci).
163-
164-
MAVEN_OPTS='-Xmx2048m' ${MVN} verify -pl integration-tests -P int-tests-config-file ${IT_TEST} ${MAVEN_SKIP} -Dpod.name=${POD_NAME} -Dpod.namespace=${POD_NAMESPACE} -Dbuild.druid.cluster=${BUILD_DRUID_CLUSTER}
165-
166-
- name: Debug on failure
167-
if: ${{ failure() && steps.test.conclusion == 'failure' }}
168-
run: |
169-
for v in broker middlemanager router coordinator historical ; do
170-
echo "------------------------druid-tiny-cluster-"$v"s-0-------------------------";
171-
/usr/local/bin/kubectl logs --tail 1000 druid-tiny-cluster-"$v"s-0 ||:;
172-
/usr/local/bin/kubectl get events | grep druid-tiny-cluster-"$v"s-0 ||:;
173-
done
115+
group: custom coordinator duties

docs/development/extensions-core/k8s-jobs.md

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,31 @@ Consider this an [EXPERIMENTAL](../experimental.md) feature mostly because it ha
3333

3434
The K8s extension builds a pod spec for each task using the specified pod adapter. All jobs are natively restorable, they are decoupled from the Druid deployment, thus restarting pods or doing upgrades has no effect on tasks in flight. They will continue to run and when the overlord comes back up it will start tracking them again.
3535

36+
## Kubernetes Client Mode
37+
38+
### "Direct" K8s API Interaction per task *(Default)*
39+
40+
Task lifecycle code in Druid talks directly to the Kubernetes API server for all operations that require interaction with the Kubernetes cluster.
41+
42+
### `SharedInformer` "Caching" *(Experimental)*
43+
44+
Enabled by setting `druid.indexer.runner.useK8sSharedInformers=true`, this mode uses `Fabric8` `SharedInformer` objects for monitoring state changes in the remote K8s cluster, reducing the number of direct API calls to the Kubernetes API server. This can greatly reduce load on the API server, especially in environments with a high volume of tasks.
45+
46+
This mode is experimental and should be used with caution in production until it has been vetted more thoroughly by the community.
47+
48+
The core idea is to use two `SharedInformers`, one for jobs and one for pods, to watch for changes in the remote K8s cluster. These informers maintain a local cache of jobs and pods that tasks can query. The informers can also notify listeners when changes occur, allowing tasks to react to state changes without polling the API server or creating per-task watches on the K8s cluster.
49+
50+
#### Architecture: Direct vs. Caching Mode
51+
52+
**Key Differences:**
53+
54+
- `DirectKubernetesPeonClient` (Default): Every read operation makes a direct HTTP call to the K8s API server. With 100 concurrent tasks, this results in 100+ active API connections with continuous polling.
55+
56+
- `CachingKubernetesPeonClient` (Experimental): All read operations query an in-memory cache maintained by `SharedInformers`. With 100 concurrent tasks, only 2 persistent watch connections are used (one for Jobs, one for Pods), achieving a large reduction in API calls.
57+
58+
**Shared Operations**:
59+
60+
Both implementations share the same write (job creation, deletion) and log read operations code, which always use direct API calls.
3661

3762
## Configuration
3863

@@ -48,9 +73,9 @@ Other configurations required are:
4873
Druid operators can dynamically tune certain features within this extension. You don't need to restart the Overlord
4974
service for these changes to take effect.
5075

51-
Druid can dynamically tune [pod template selection](#pod-template-selection), which allows you to configure the pod
52-
template based on the task to be run. To enable dynamic pod template selection, first configure the
53-
[custom template pod adapter](#custom-template-pod-adapter).
76+
Druid can dynamically tune [pod template selection](#pod-template-selection) and [capacity](#properties). Where capacity refers to `druid.indexer.runner.capacity`.
77+
78+
Pod template selection allows you to configure the pod template based on the task to be run. To enable dynamic pod template selection, first configure the [custom template pod adapter](#custom-template-pod-adapter).
5479

5580
Use the following APIs to view and update the dynamic configuration for the Kubernetes task runner.
5681

@@ -126,7 +151,8 @@ Host: http://ROUTER_IP:ROUTER_PORT
126151
"type": ["index_kafka"]
127152
}
128153
]
129-
}
154+
},
155+
"capacity": 12
130156
}
131157
```
132158
</details>
@@ -135,6 +161,8 @@ Host: http://ROUTER_IP:ROUTER_PORT
135161

136162
Updates the dynamic configuration for the Kubernetes Task Runner
137163

164+
Note: Both `podTemplateSelectStrategy` and `capacity` are optional fields. A POST request may include either, both, or neither.
165+
138166
##### URL
139167

140168
`POST` `/druid/indexer/v1/k8s/taskrunner/executionconfig`
@@ -193,7 +221,8 @@ curl "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/k8s/taskrunner/executionconf
193221
"type": ["index_kafka"]
194222
}
195223
]
196-
}
224+
},
225+
"capacity": 6
197226
}'
198227
```
199228

@@ -225,7 +254,8 @@ Content-Type: application/json
225254
"type": ["index_kafka"]
226255
}
227256
]
228-
}
257+
},
258+
"capacity": 6
229259
}
230260
```
231261

@@ -309,7 +339,7 @@ Host: http://ROUTER_IP:ROUTER_PORT
309339
"comment": "",
310340
"ip": "127.0.0.1"
311341
},
312-
"payload": "{\"type\": \"default\",\"podTemplateSelectStrategy\":{\"type\": \"taskType\"}",
342+
"payload": "{\"type\": \"default\",\"podTemplateSelectStrategy\":{\"type\": \"taskType\"},\"capacity\":6",
313343
"auditTime": "2024-06-13T20:59:51.622Z"
314344
}
315345
]
@@ -790,10 +820,11 @@ Should you require the needed permissions for interacting across Kubernetes name
790820
| `druid.indexer.runner.annotations` | `JsonObject` | Additional annotations you want to add to peon pod. | `{}` | No |
791821
| `druid.indexer.runner.peonMonitors` | `JsonArray` | Overrides `druid.monitoring.monitors`. Use this property if you don't want to inherit monitors from the Overlord. | `[]` | No |
792822
| `druid.indexer.runner.graceTerminationPeriodSeconds` | `Long` | Number of seconds you want to wait after a sigterm for container lifecycle hooks to complete. Keep at a smaller value if you want tasks to hold locks for shorter periods. | `PT30S` (K8s default) | No |
793-
| `druid.indexer.runner.capacity` | `Integer` | Number of concurrent jobs that can be sent to Kubernetes. | `2147483647` | No |
823+
| `druid.indexer.runner.capacity` | `Integer` | Number of concurrent jobs that can be sent to Kubernetes. Value will be overridden if a dynamic config value has been set. | `2147483647` | No |
794824
| `druid.indexer.runner.cpuCoreInMicro` | `Integer` | Number of CPU micro core for the task. | `1000` | No |
795825
| `druid.indexer.runner.logSaveTimeout` | `Duration` | The peon executing the ingestion task makes a best effort to persist the pod logs from `k8s` to persistent task log storage. The timeout ensures that `k8s` connection issues do not cause the pod to hang indefinitely thereby blocking Overlord operations. If the timeout occurs before the logs are saved, those logs will not be available in Druid. | `PT300S` | NO |
796-
826+
| `druid.indexer.runner.useK8sSharedInformers` | `boolean` | Whether to use shared informers to watch for pod/job changes. This is more efficient on the Kubernetes API server, but may use more memory in the Overlord. | `false` | No |
827+
| `druid.indexer.runner.k8sSharedInformerResyncPeriod` | `Duration` | When using shared informers, controls how frequently the informers resync with the Kubernetes API server. This prevents change events from being missed, keeping the informer cache clean and accurate. | `PT300S` | No |
797828

798829
### Metrics added
799830

docs/multi-stage-query/reference.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,8 @@ The following table lists the context parameters for the MSQ task engine:
395395
| `finalizeAggregations` | SELECT, INSERT, REPLACE<br /><br />Determines the type of aggregation to return. If true, Druid finalizes the results of complex aggregations that directly appear in query results. If false, Druid returns the aggregation's intermediate type rather than finalized type. This parameter is useful during ingestion, where it enables storing sketches directly in Druid tables. For more information about aggregations, see [SQL aggregation functions](../querying/sql-aggregations.md). | `true` |
396396
| `arrayIngestMode` | INSERT, REPLACE<br /><br /> Controls how ARRAY type values are stored in Druid segments. When set to `array` (recommended for SQL compliance), Druid will store all ARRAY typed values in [ARRAY typed columns](../querying/arrays.md), and supports storing both VARCHAR and numeric typed arrays. When set to `mvd` (the default, for backwards compatibility), Druid only supports VARCHAR typed arrays, and will store them as [multi-value string columns](../querying/multi-value-dimensions.md). See [`arrayIngestMode`] in the [Arrays](../querying/arrays.md) page for more details. | `mvd` (for backwards compatibility, recommended to use `array` for SQL compliance)|
397397
| `sqlJoinAlgorithm` | SELECT, INSERT, REPLACE<br /><br />Algorithm to use for JOIN. Use `broadcast` (the default) for broadcast hash join or `sortMerge` for sort-merge join. Affects all JOIN operations in the query. This is a hint to the MSQ engine and the actual joins in the query may proceed in a different way than specified. See [Joins](#joins) for more details. | `broadcast` |
398-
| `rowsInMemory` | INSERT or REPLACE<br /><br />Maximum number of rows to store in memory at once before flushing to disk during the segment generation process. Ignored for non-INSERT queries. In most cases, use the default value. You may need to override the default if you run into one of the [known issues](./known-issues.md) around memory usage. | 100,000 |
398+
| `maxRowsInMemory` | INSERT or REPLACE<br /><br />Maximum number of rows to store in memory at once before flushing to disk during the segment generation process. Ignored for non-INSERT queries. In most cases, use the default value. You may need to override the default if you run into one of the [known issues](./known-issues.md) around memory usage. | 100,000 |
399+
| `rowsInMemory` | INSERT or REPLACE<br /><br />Alternate spelling of `maxRowsInMemory`. Ignored if `maxRowsInMemory` is set. | 100,000 |
399400
| `segmentSortOrder` | INSERT or REPLACE<br /><br />Normally, Druid sorts rows in individual segments using `__time` first, followed by the [CLUSTERED BY](#clustered-by) clause. When you set `segmentSortOrder`, Druid uses the order from this context parameter instead. Provide the column list as comma-separated values or as a JSON array in string form.<br />< br/>For example, consider an INSERT query that uses `CLUSTERED BY country` and has `segmentSortOrder` set to `__time,city,country`. Within each time chunk, Druid assigns rows to segments based on `country`, and then within each of those segments, Druid sorts those rows by `__time` first, then `city`, then `country`. | empty list |
400401
| `forceSegmentSortByTime` | INSERT or REPLACE<br /><br />When set to `true` (the default), Druid prepends `__time` to [CLUSTERED BY](#clustered-by) when determining the sort order for individual segments. Druid also requires that `segmentSortOrder`, if provided, starts with `__time`.<br /><br />When set to `false`, Druid uses the [CLUSTERED BY](#clustered-by) alone to determine the sort order for individual segments, and does not require that `segmentSortOrder` begin with `__time`. Setting this parameter to `false` is an experimental feature; see [Sorting](../ingestion/partitioning.md#sorting) for details. | `true` |
401402
| `maxParseExceptions`| SELECT, INSERT, REPLACE<br /><br />Maximum number of parse exceptions that are ignored while executing the query before it stops with `TooManyWarningsFault`. To ignore all the parse exceptions, set the value to -1. | 0 |

docs/querying/sql-metadata-tables.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,8 @@ Servers table lists all discovered servers in the cluster.
238238
|start_time|STRING|Timestamp in ISO8601 format when the server was announced in the cluster|
239239
|version|VARCHAR|Druid version running on the server|
240240
|labels|VARCHAR|Labels for the server configured using the property [`druid.labels`](../configuration/index.md)|
241+
|available_processors|BIGINT|Total number of CPU processors available to the server|
242+
|total_memory|BIGINT|Total memory in bytes available to the server|
241243

242244
To retrieve information about all servers, use the query:
243245

embedded-tests/src/test/java/org/apache/druid/testing/embedded/auth/AbstractAuthConfigurationTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1018,9 +1018,8 @@ protected String getBrokerUrl()
10181018

10191019
/**
10201020
* curr_size on historicals changes because cluster state is not isolated across
1021-
* different
1022-
* integration tests, zero it out for consistent test results
1023-
* version and start_time are not configurable therefore we zero them as well
1021+
* different integration tests, zero it out for consistent test results.
1022+
* version, start_time, available_processors, total_memory are not configurable therefore we zero them as well
10241023
*/
10251024
protected static List<Map<String, Object>> getServersWithoutNonConfigurableFields(List<Map<String, Object>> servers)
10261025
{
@@ -1031,6 +1030,8 @@ protected static List<Map<String, Object>> getServersWithoutNonConfigurableField
10311030
newServer.put("curr_size", 0);
10321031
newServer.put("start_time", "0");
10331032
newServer.put("version", "0.0.0");
1033+
newServer.put("available_processors", 0);
1034+
newServer.put("total_memory", 0);
10341035
return newServer;
10351036
}
10361037
);

embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,10 @@
2020
package org.apache.druid.testing.embedded.indexing;
2121

2222
import com.google.common.base.Optional;
23-
import com.google.common.collect.ImmutableList;
2423
import org.apache.commons.io.IOUtils;
2524
import org.apache.druid.common.utils.IdUtils;
2625
import org.apache.druid.data.input.impl.CsvInputFormat;
2726
import org.apache.druid.data.input.impl.TimestampSpec;
28-
import org.apache.druid.indexer.TaskState;
29-
import org.apache.druid.indexer.TaskStatusPlus;
3027
import org.apache.druid.indexing.common.task.CompactionTask;
3128
import org.apache.druid.indexing.common.task.IndexTask;
3229
import org.apache.druid.indexing.common.task.NoopTask;
@@ -40,7 +37,6 @@
4037
import org.apache.druid.java.util.common.DateTimes;
4138
import org.apache.druid.java.util.common.Intervals;
4239
import org.apache.druid.java.util.common.StringUtils;
43-
import org.apache.druid.java.util.common.parsers.CloseableIterator;
4440
import org.apache.druid.metadata.storage.postgresql.PostgreSQLMetadataStorageModule;
4541
import org.apache.druid.query.DruidMetrics;
4642
import org.apache.druid.query.http.SqlTaskStatus;
@@ -243,7 +239,7 @@ public void test_runIndexParallelTask_andCompactData()
243239
.dynamicPartitionWithMaxRows(5000)
244240
.withId(compactTaskId);
245241
cluster.callApi().onLeaderOverlord(o -> o.runTask(compactTaskId, compactionTask));
246-
cluster.callApi().waitForTaskToSucceed(taskId, eventCollector.latchableEmitter());
242+
cluster.callApi().waitForTaskToSucceed(compactTaskId, eventCollector.latchableEmitter());
247243

248244
// Verify the compacted data
249245
final int numCompactedSegments = 5;
@@ -308,13 +304,10 @@ public void test_runKafkaSupervisor()
308304
Assertions.assertEquals("RUNNING", supervisorStatus.getState());
309305
Assertions.assertEquals(topic, supervisorStatus.getSource());
310306

311-
// Get the task statuses
312-
List<TaskStatusPlus> taskStatuses = ImmutableList.copyOf(
313-
(CloseableIterator<TaskStatusPlus>)
314-
cluster.callApi().onLeaderOverlord(o -> o.taskStatuses(null, dataSource, 1))
315-
);
316-
Assertions.assertFalse(taskStatuses.isEmpty());
317-
Assertions.assertEquals(TaskState.RUNNING, taskStatuses.get(0).getStatusCode());
307+
// Confirm tasks are being created and running
308+
int runningTasks = cluster.callApi().getTaskCount("running", dataSource);
309+
int completedTasks = cluster.callApi().getTaskCount("complete", dataSource);
310+
Assertions.assertTrue(runningTasks + completedTasks > 0);
318311

319312
// Suspend the supervisor and verify the state
320313
cluster.callApi().onLeaderOverlord(

embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import org.junit.jupiter.api.Test;
5252
import org.junit.jupiter.api.Timeout;
5353

54-
import java.util.List;
5554
import java.util.Map;
5655
import java.util.Set;
5756
import java.util.stream.Collectors;
@@ -154,7 +153,8 @@ public void test_ingest10kRows_ofSelfClusterMetrics_andVerifyValues()
154153
// Wait for segments to be handed off
155154
indexer.latchableEmitter().waitForEventAggregate(
156155
event -> event.hasMetricName("ingest/handoff/count")
157-
.hasDimension(DruidMetrics.DATASOURCE, List.of(dataSource)),
156+
.hasDimension(DruidMetrics.DATASOURCE, dataSource)
157+
.hasDimension(DruidMetrics.SUPERVISOR_ID, supervisorId),
158158
agg -> agg.hasSumAtLeast(expectedSegmentsHandedOff)
159159
);
160160

0 commit comments

Comments
 (0)