Skip to content
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
64b8b73
Move lookup and broadcast datasource load spec configs to its own cla…
abhishekrb19 Nov 10, 2025
eb3feb3
No final in strict compilation mode
abhishekrb19 Nov 10, 2025
51c5f85
Add indexing-logs to CI artifact
abhishekrb19 Nov 10, 2025
ccaefec
Consolidate into one TaskPropertiesHolder
abhishekrb19 Nov 13, 2025
2b08b0d
Works with named injections
abhishekrb19 Nov 13, 2025
ca5e6ae
Test works!
abhishekrb19 Nov 14, 2025
eb80524
Different approach
abhishekrb19 Nov 24, 2025
32e672e
Cleanup MetricsModule
abhishekrb19 Nov 24, 2025
a434eeb
Merge branch 'master' into refactor_ds_lookup_holder
abhishekrb19 Nov 24, 2025
5033413
Revert changes to EmbeddedKafkaSupervisorTest
abhishekrb19 Nov 24, 2025
f44be2f
Revert logging changes
abhishekrb19 Nov 24, 2025
2093cd1
Better exception
abhishekrb19 Nov 24, 2025
6fd60d6
Cleanup and add LoadSpecHolder
abhishekrb19 Nov 24, 2025
58185c0
Try ImplementedBy to avoid explicit bindings everywhere
abhishekrb19 Nov 25, 2025
519d536
Revert "Try ImplementedBy to avoid explicit bindings everywhere"
abhishekrb19 Nov 25, 2025
0ec0380
Add common DefaultServerHolderModule
abhishekrb19 Nov 25, 2025
89e3418
Cleanup CliPeonTest and rename
abhishekrb19 Nov 25, 2025
b04d09c
Fix test
abhishekrb19 Nov 25, 2025
98979dc
Fix quidem test module init
abhishekrb19 Nov 25, 2025
efa0b5e
Rename to mapOfTaskHolderDimensions()
abhishekrb19 Nov 25, 2025
b81a9b6
Add test for default non-peon servers
abhishekrb19 Nov 26, 2025
91acfd3
Return null from NoopTaskHolder to avoid callers from doing instanceo…
abhishekrb19 Nov 26, 2025
014dac2
Merge branch 'master' into refactor_ds_lookup_holder
abhishekrb19 Nov 26, 2025
9ed233a
Review suggestions & more tests
abhishekrb19 Nov 30, 2025
a810414
Merge branch 'master' into refactor_ds_lookup_holder
abhishekrb19 Nov 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1240,6 +1240,8 @@ public static class Builder
private final String dataSource;
private final SegmentCacheManagerFactory segmentCacheManagerFactory;

@Nullable
private String id;
private CompactionIOConfig ioConfig;
@Nullable
private DimensionsSpec dimensionsSpec;
Expand Down Expand Up @@ -1268,6 +1270,12 @@ public Builder(
this.segmentCacheManagerFactory = segmentCacheManagerFactory;
}

public Builder id(String id)
{
this.id = id;
return this;
}

public Builder interval(Interval interval)
{
return inputSpec(new CompactionIntervalSpec(interval, null));
Expand Down Expand Up @@ -1353,7 +1361,7 @@ public Builder projections(@Nullable List<AggregateProjectionSpec> projections)
public CompactionTask build()
{
return new CompactionTask(
null,
id,
null,
dataSource,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.guice.AnnouncerModule;
import org.apache.druid.guice.BrokerServiceModule;
import org.apache.druid.guice.DefaultServerHolderModule;
import org.apache.druid.guice.ExpressionModule;
import org.apache.druid.guice.ExtensionsModule;
import org.apache.druid.guice.JacksonConfigManagerModule;
Expand Down Expand Up @@ -194,6 +195,7 @@ private List<Module> forServerModules()
new ExternalStorageAccessSecurityModule(),
new ServiceClientModule(),
new StorageConnectorModule(),
new DefaultServerHolderModule(),
ServerInjectorBuilder.registerNodeRoleModule(ImmutableSet.of())
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/


package org.apache.druid.guice;

import com.google.inject.Binder;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.annotations.ExcludeScope;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.server.metrics.DefaultLoadSpecHolder;
import org.apache.druid.server.metrics.LoadSpecHolder;
import org.apache.druid.server.metrics.NoopTaskHolder;
import org.apache.druid.server.metrics.TaskHolder;

/**
* Binds the following holder configs for all servers except {@code CliPeon}:
* <ul>
* <li>{@link TaskHolder} to {@link NoopTaskHolder}</li>
* <li>{@link LoadSpecHolder} to {@link DefaultLoadSpecHolder}</li>
* </ul>
*
* <p>For {@code CliPeon}, these bindings are overridden by the peon-specific module.</p>
*/
@ExcludeScope(roles = {NodeRole.PEON_JSON_NAME})
public class DefaultServerHolderModule implements DruidModule
{
@Override
public void configure(Binder binder)
{
binder.bind(TaskHolder.class).to(NoopTaskHolder.class).in(LazySingleton.class);
binder.bind(LoadSpecHolder.class).to(DefaultLoadSpecHolder.class).in(LazySingleton.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.lang.annotation.Target;

/**
* An annotation to exlcude specific node types that a {@link com.google.inject.Module} can be loaded on.
* An annotation to exclude specific node types that a {@link com.google.inject.Module} can be loaded on.
* The {@link #roles()} should be the {@link org.apache.druid.discovery.NodeRole#jsonName}. If both {@link ExcludeScope}
* and {@link LoadScope} are set, {@link ExcludeScope} takes precedence.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.druid.guice.AnnouncerModule;
import org.apache.druid.guice.BuiltInTypesModule;
import org.apache.druid.guice.CatalogCoreModule;
import org.apache.druid.guice.DefaultServerHolderModule;
import org.apache.druid.guice.DruidInjectorBuilder;
import org.apache.druid.guice.DruidSecondaryModule;
import org.apache.druid.guice.ExpressionModule;
Expand Down Expand Up @@ -142,7 +143,8 @@ public CoreInjectorBuilder forServerWithoutJetty()
new ExternalStorageAccessSecurityModule(),
new ServiceClientModule(),
new StorageConnectorModule(),
new CatalogCoreModule()
new CatalogCoreModule(),
new DefaultServerHolderModule()
);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,28 @@
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
import org.apache.druid.server.metrics.LoadSpecHolder;
import org.apache.druid.server.metrics.TaskHolder;

class LookupListeningAnnouncerConfig
{
public static final String DEFAULT_TIER = "__default";
private final DataSourceTaskIdHolder dataSourceTaskIdHolder;
private final TaskHolder taskHolder;
private final LoadSpecHolder loadSpecHolder;

@JsonProperty("lookupTier")
private String lookupTier = null;
@JsonProperty("lookupTierIsDatasource")
private boolean lookupTierIsDatasource = false;

@JsonCreator
public LookupListeningAnnouncerConfig(
@JacksonInject DataSourceTaskIdHolder dataSourceTaskIdHolder
@JacksonInject TaskHolder taskHolder,
@JacksonInject LoadSpecHolder loadSpecHolder
)
{
this.dataSourceTaskIdHolder = dataSourceTaskIdHolder;
this.taskHolder = taskHolder;
this.loadSpecHolder = loadSpecHolder;
}

public String getLookupTier()
Expand All @@ -50,7 +55,8 @@
!(lookupTierIsDatasource && null != lookupTier),
"Cannot specify both `lookupTier` and `lookupTierIsDatasource`"
);
final String lookupTier = lookupTierIsDatasource ? dataSourceTaskIdHolder.getDataSource() : this.lookupTier;

final String lookupTier = lookupTierIsDatasource ? taskHolder.getDataSource() : this.lookupTier;

Check notice

Code scanning / CodeQL

Possible confusion of local and field Note

Confusing name: method
getLookupTier
also refers to field
lookupTier
(without qualifying it with 'this').

Copilot Autofix

AI 2 months ago

The best fix is to rename the local variable lookupTier in the getLookupTier() method to a different name, such as tierToReturn or effectiveLookupTier. This removes any ambiguity between the field and the local variable. This change should only be performed in the method getLookupTier() within LookupListeningAnnouncerConfig.java, specifically where the local variable is declared and used. No imports or additional definitions are necessary, as it's a straightforward renaming.


Suggested changeset 1
server/src/main/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfig.java

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/server/src/main/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfig.java b/server/src/main/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfig.java
--- a/server/src/main/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfig.java
+++ b/server/src/main/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfig.java
@@ -56,10 +56,10 @@
         "Cannot specify both `lookupTier` and `lookupTierIsDatasource`"
     );
 
-    final String lookupTier = lookupTierIsDatasource ? taskHolder.getDataSource() : this.lookupTier;
+    final String effectiveLookupTier = lookupTierIsDatasource ? taskHolder.getDataSource() : this.lookupTier;
 
     return Preconditions.checkNotNull(
-        lookupTier == null ? DEFAULT_TIER : StringUtils.emptyToNullNonDruidDataString(lookupTier),
+        effectiveLookupTier == null ? DEFAULT_TIER : StringUtils.emptyToNullNonDruidDataString(effectiveLookupTier),
         "Cannot have empty lookup tier from %s",
         lookupTierIsDatasource ? "bound value" : LookupModule.PROPERTY_BASE
     );
EOF
@@ -56,10 +56,10 @@
"Cannot specify both `lookupTier` and `lookupTierIsDatasource`"
);

final String lookupTier = lookupTierIsDatasource ? taskHolder.getDataSource() : this.lookupTier;
final String effectiveLookupTier = lookupTierIsDatasource ? taskHolder.getDataSource() : this.lookupTier;

return Preconditions.checkNotNull(
lookupTier == null ? DEFAULT_TIER : StringUtils.emptyToNullNonDruidDataString(lookupTier),
effectiveLookupTier == null ? DEFAULT_TIER : StringUtils.emptyToNullNonDruidDataString(effectiveLookupTier),
"Cannot have empty lookup tier from %s",
lookupTierIsDatasource ? "bound value" : LookupModule.PROPERTY_BASE
);
Copilot is powered by AI and may make mistakes. Always verify output.

return Preconditions.checkNotNull(
lookupTier == null ? DEFAULT_TIER : StringUtils.emptyToNullNonDruidDataString(lookupTier),
Expand All @@ -61,6 +67,6 @@

public LookupLoadingSpec getLookupLoadingSpec()
{
return dataSourceTaskIdHolder.getLookupLoadingSpec();
return loadSpecHolder.getLookupLoadingSpec();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.http.ServletResourceUtils;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.metrics.DataSourceTaskIdHolder;

import javax.annotation.Nullable;
import java.io.File;
Expand Down Expand Up @@ -380,7 +379,7 @@ private void takeSnapshot(Map<String, LookupExtractorFactoryContainer> lookupMap
}

/**
* Load a set of lookups based on the injected value in {@link DataSourceTaskIdHolder#getLookupLoadingSpec()}.
* Load a set of lookups based on the injected value in {@link TaskPropertiesHolder#getLookupLoadingSpec()}.
*/
private void loadLookupsAndInitStateRef()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.server.initialization.jetty.BadRequestException;
import org.apache.druid.server.initialization.jetty.ServiceUnavailableException;
import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
import org.apache.druid.server.metrics.TaskHolder;

import javax.annotation.Nullable;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.core.Context;
Expand All @@ -39,13 +40,14 @@ public class ChatHandlerResource
public static final String TASK_ID_HEADER = "X-Druid-Task-Id";

private final ChatHandlerProvider handlers;
@Nullable
private final String taskId;

@Inject
public ChatHandlerResource(final ChatHandlerProvider handlers, final DataSourceTaskIdHolder taskIdHolder)
public ChatHandlerResource(final ChatHandlerProvider handlers, final TaskHolder taskHolder)
{
this.handlers = handlers;
this.taskId = taskIdHolder.getTaskId();
this.taskId = taskHolder.getTaskId();
}

@Path("/{id}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
import org.apache.druid.server.metrics.LoadSpecHolder;
import org.apache.druid.timeline.DataSegment;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -74,6 +74,7 @@ public class SegmentCacheBootstrapper
private final ServerTypeConfig serverTypeConfig;
private final CoordinatorClient coordinatorClient;
private final ServiceEmitter emitter;
private final LoadSpecHolder loadSpecHolder;

private volatile boolean isComplete = false;

Expand All @@ -82,8 +83,6 @@ public class SegmentCacheBootstrapper

private static final EmittingLogger log = new EmittingLogger(SegmentCacheBootstrapper.class);

private final DataSourceTaskIdHolder datasourceHolder;

@Inject
public SegmentCacheBootstrapper(
SegmentLoadDropHandler loadDropHandler,
Expand All @@ -94,7 +93,7 @@ public SegmentCacheBootstrapper(
ServerTypeConfig serverTypeConfig,
CoordinatorClient coordinatorClient,
ServiceEmitter emitter,
DataSourceTaskIdHolder datasourceHolder
LoadSpecHolder loadSpecHolder
)
{
this.loadDropHandler = loadDropHandler;
Expand All @@ -105,7 +104,7 @@ public SegmentCacheBootstrapper(
this.serverTypeConfig = serverTypeConfig;
this.coordinatorClient = coordinatorClient;
this.emitter = emitter;
this.datasourceHolder = datasourceHolder;
this.loadSpecHolder = loadSpecHolder;
}

@LifecycleStart
Expand Down Expand Up @@ -270,12 +269,12 @@ private void loadSegmentsOnStartup() throws IOException

/**
* @return a list of bootstrap segments. When bootstrap segments cannot be found, an empty list is returned.
* The bootstrap segments returned are filtered by the broadcast datasources indicated by {@link DataSourceTaskIdHolder#getBroadcastDatasourceLoadingSpec()}
* if applicable.
* The bootstrap segments returned are filtered by the broadcast datasources indicated by
* {@link LoadSpecHolder#getBroadcastDatasourceLoadingSpec()} if applicable.
*/
private List<DataSegment> getBootstrapSegments()
{
final BroadcastDatasourceLoadingSpec.Mode mode = datasourceHolder.getBroadcastDatasourceLoadingSpec().getMode();
final BroadcastDatasourceLoadingSpec.Mode mode = loadSpecHolder.getBroadcastDatasourceLoadingSpec().getMode();
if (mode == BroadcastDatasourceLoadingSpec.Mode.NONE) {
log.info("Skipping fetch of bootstrap segments.");
return ImmutableList.of();
Expand All @@ -290,7 +289,7 @@ private List<DataSegment> getBootstrapSegments()
final BootstrapSegmentsResponse response =
FutureUtils.getUnchecked(coordinatorClient.fetchBootstrapSegments(), true);
if (mode == BroadcastDatasourceLoadingSpec.Mode.ONLY_REQUIRED) {
final Set<String> broadcastDatasourcesToLoad = datasourceHolder.getBroadcastDatasourceLoadingSpec().getBroadcastDatasourcesToLoad();
final Set<String> broadcastDatasourcesToLoad = loadSpecHolder.getBroadcastDatasourceLoadingSpec().getBroadcastDatasourcesToLoad();
final List<DataSegment> filteredBroadcast = new ArrayList<>();
response.getIterator().forEachRemaining(segment -> {
if (broadcastDatasourcesToLoad.contains(segment.getDataSource())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.initialization.TLSServerConfig;
import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
import org.apache.druid.server.metrics.TaskHolder;
import org.apache.druid.server.security.TLSCertificateChecker;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.ssl.SslContextFactory;
Expand Down Expand Up @@ -83,10 +83,10 @@ public void configure(Binder binder)
@Provides
@LazySingleton
public TaskIdResponseHeaderFilterHolder taskIdResponseHeaderFilterHolderBuilder(
final DataSourceTaskIdHolder taskIdHolder
final TaskHolder taskHolder
)
{
return new TaskIdResponseHeaderFilterHolder("/druid/worker/v1/chat/*", taskIdHolder.getTaskId());
return new TaskIdResponseHeaderFilterHolder("/druid/worker/v1/chat/*", taskHolder.getTaskId());
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.initialization.TLSServerConfig;
import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
import org.apache.druid.server.metrics.TaskHolder;
import org.apache.druid.server.security.TLSCertificateChecker;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.ssl.SslContextFactory;
Expand Down Expand Up @@ -109,10 +109,10 @@ public void configure(Binder binder)
@Provides
@LazySingleton
public TaskIdResponseHeaderFilterHolder taskIdResponseHeaderFilterHolderBuilder(
final DataSourceTaskIdHolder taskIdHolder
final TaskHolder taskHolder
)
{
return new TaskIdResponseHeaderFilterHolder("/druid/worker/v1/chat/*", taskIdHolder.getTaskId());
return new TaskIdResponseHeaderFilterHolder("/druid/worker/v1/chat/*", taskHolder.getTaskId());
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@
import org.apache.druid.server.StatusResource;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.initialization.TLSServerConfig;
import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
import org.apache.druid.server.metrics.MetricsModule;
import org.apache.druid.server.metrics.MonitorsConfig;
import org.apache.druid.server.metrics.TaskHolder;
import org.apache.druid.server.security.CustomCheckX509TrustManager;
import org.apache.druid.server.security.TLSCertificateChecker;
import org.eclipse.jetty.server.ConnectionFactory;
Expand Down Expand Up @@ -517,12 +517,10 @@ private static int getTCPAcceptQueueSize()

@Provides
@LazySingleton
public JettyMonitor getJettyMonitor(DataSourceTaskIdHolder dataSourceTaskIdHolder)
public JettyMonitor getJettyMonitor(TaskHolder taskHolder)
{
return new JettyMonitor(
MonitorsConfig.mapOfDatasourceAndTaskID(
dataSourceTaskIdHolder.getDataSource(), dataSourceTaskIdHolder.getTaskId()
)
MonitorsConfig.mapOfTaskHolderDimensions(taskHolder)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.realtime.ChatHandlerResource;

import javax.annotation.Nullable;

public class TaskIdResponseHeaderFilterHolder extends ResponseHeaderFilterHolder
{
public TaskIdResponseHeaderFilterHolder(String path, String taskId)
public TaskIdResponseHeaderFilterHolder(String path, @Nullable String taskId)
{
super(path,
taskId == null
Expand Down
Loading
Loading