Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
64b8b73
Move lookup and broadcast datasource load spec configs to its own cla…
abhishekrb19 Nov 10, 2025
eb3feb3
No final in strict compilation mode
abhishekrb19 Nov 10, 2025
51c5f85
Add indexing-logs to CI artifact
abhishekrb19 Nov 10, 2025
ccaefec
Consolidate into one TaskPropertiesHolder
abhishekrb19 Nov 13, 2025
2b08b0d
Works with named injections
abhishekrb19 Nov 13, 2025
ca5e6ae
Test works!
abhishekrb19 Nov 14, 2025
eb80524
Different approach
abhishekrb19 Nov 24, 2025
32e672e
Cleanup MetricsModule
abhishekrb19 Nov 24, 2025
a434eeb
Merge branch 'master' into refactor_ds_lookup_holder
abhishekrb19 Nov 24, 2025
5033413
Revert changes to EmbeddedKafkaSupervisorTest
abhishekrb19 Nov 24, 2025
f44be2f
Revert logging changes
abhishekrb19 Nov 24, 2025
2093cd1
Better exception
abhishekrb19 Nov 24, 2025
6fd60d6
Cleanup and add LoadSpecHolder
abhishekrb19 Nov 24, 2025
58185c0
Try ImplementedBy to avoid explicit bindings everywhere
abhishekrb19 Nov 25, 2025
519d536
Revert "Try ImplementedBy to avoid explicit bindings everywhere"
abhishekrb19 Nov 25, 2025
0ec0380
Add common DefaultServerHolderModule
abhishekrb19 Nov 25, 2025
89e3418
Cleanup CliPeonTest and rename
abhishekrb19 Nov 25, 2025
b04d09c
Fix test
abhishekrb19 Nov 25, 2025
98979dc
Fix quidem test module init
abhishekrb19 Nov 25, 2025
efa0b5e
Rename to mapOfTaskHolderDimensions()
abhishekrb19 Nov 25, 2025
b81a9b6
Add test for default non-peon servers
abhishekrb19 Nov 26, 2025
91acfd3
Return null from NoopTaskHolder to avoid callers from doing instanceo…
abhishekrb19 Nov 26, 2025
014dac2
Merge branch 'master' into refactor_ds_lookup_holder
abhishekrb19 Nov 26, 2025
9ed233a
Review suggestions & more tests
abhishekrb19 Nov 30, 2025
a810414
Merge branch 'master' into refactor_ds_lookup_holder
abhishekrb19 Nov 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,28 @@
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
import org.apache.druid.server.metrics.LoadSpecHolder;
import org.apache.druid.server.metrics.TaskPropertiesHolder;

class LookupListeningAnnouncerConfig
{
public static final String DEFAULT_TIER = "__default";
private final DataSourceTaskIdHolder dataSourceTaskIdHolder;
private final LoadSpecHolder loadSpecHolder;
private final TaskPropertiesHolder taskPropsHolder;

@JsonProperty("lookupTier")
private String lookupTier = null;
@JsonProperty("lookupTierIsDatasource")
private boolean lookupTierIsDatasource = false;

@JsonCreator
public LookupListeningAnnouncerConfig(
@JacksonInject DataSourceTaskIdHolder dataSourceTaskIdHolder
@JacksonInject LoadSpecHolder loadSpecHolder,
@JacksonInject TaskPropertiesHolder taskPropsHolder
)
{
this.dataSourceTaskIdHolder = dataSourceTaskIdHolder;
this.loadSpecHolder = loadSpecHolder;
this.taskPropsHolder = taskPropsHolder;
}

public String getLookupTier()
Expand All @@ -50,7 +55,7 @@
!(lookupTierIsDatasource && null != lookupTier),
"Cannot specify both `lookupTier` and `lookupTierIsDatasource`"
);
final String lookupTier = lookupTierIsDatasource ? dataSourceTaskIdHolder.getDataSource() : this.lookupTier;
final String lookupTier = lookupTierIsDatasource ? taskPropsHolder.getDataSource() : this.lookupTier;

return Preconditions.checkNotNull(
lookupTier == null ? DEFAULT_TIER : StringUtils.emptyToNullNonDruidDataString(lookupTier),
Expand All @@ -61,6 +66,6 @@

public LookupLoadingSpec getLookupLoadingSpec()
{
return dataSourceTaskIdHolder.getLookupLoadingSpec();
return loadSpecHolder.getLookupLoadingSpec();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.http.ServletResourceUtils;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
import org.apache.druid.server.metrics.LoadSpecHolder;

import javax.annotation.Nullable;
import java.io.File;
Expand Down Expand Up @@ -380,7 +380,7 @@ private void takeSnapshot(Map<String, LookupExtractorFactoryContainer> lookupMap
}

/**
* Load a set of lookups based on the injected value in {@link DataSourceTaskIdHolder#getLookupLoadingSpec()}.
* Load a set of lookups based on the injected value in {@link LoadSpecHolder#getLookupLoadingSpec()}.
*/
private void loadLookupsAndInitStateRef()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.server.initialization.jetty.BadRequestException;
import org.apache.druid.server.initialization.jetty.ServiceUnavailableException;
import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
import org.apache.druid.server.metrics.TaskPropertiesHolder;

import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
Expand All @@ -42,10 +42,10 @@ public class ChatHandlerResource
private final String taskId;

@Inject
public ChatHandlerResource(final ChatHandlerProvider handlers, final DataSourceTaskIdHolder taskIdHolder)
public ChatHandlerResource(final ChatHandlerProvider handlers, final TaskPropertiesHolder taskPropsHolder)
{
this.handlers = handlers;
this.taskId = taskIdHolder.getTaskId();
this.taskId = taskPropsHolder.getTaskId();
}

@Path("/{id}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
import org.apache.druid.server.metrics.LoadSpecHolder;
import org.apache.druid.timeline.DataSegment;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -82,7 +82,7 @@ public class SegmentCacheBootstrapper

private static final EmittingLogger log = new EmittingLogger(SegmentCacheBootstrapper.class);

private final DataSourceTaskIdHolder datasourceHolder;
private final LoadSpecHolder loadSpecHolder;

@Inject
public SegmentCacheBootstrapper(
Expand All @@ -94,7 +94,7 @@ public SegmentCacheBootstrapper(
ServerTypeConfig serverTypeConfig,
CoordinatorClient coordinatorClient,
ServiceEmitter emitter,
DataSourceTaskIdHolder datasourceHolder
LoadSpecHolder loadSpecHolder
)
{
this.loadDropHandler = loadDropHandler;
Expand All @@ -105,7 +105,7 @@ public SegmentCacheBootstrapper(
this.serverTypeConfig = serverTypeConfig;
this.coordinatorClient = coordinatorClient;
this.emitter = emitter;
this.datasourceHolder = datasourceHolder;
this.loadSpecHolder = loadSpecHolder;
}

@LifecycleStart
Expand Down Expand Up @@ -270,12 +270,12 @@ private void loadSegmentsOnStartup() throws IOException

/**
* @return a list of bootstrap segments. When bootstrap segments cannot be found, an empty list is returned.
* The bootstrap segments returned are filtered by the broadcast datasources indicated by {@link DataSourceTaskIdHolder#getBroadcastDatasourceLoadingSpec()}
* The bootstrap segments returned are filtered by the broadcast datasources indicated by {@link LoadSpecHolder#getBroadcastDatasourceLoadingSpec()}
* if applicable.
*/
private List<DataSegment> getBootstrapSegments()
{
final BroadcastDatasourceLoadingSpec.Mode mode = datasourceHolder.getBroadcastDatasourceLoadingSpec().getMode();
final BroadcastDatasourceLoadingSpec.Mode mode = loadSpecHolder.getBroadcastDatasourceLoadingSpec().getMode();
if (mode == BroadcastDatasourceLoadingSpec.Mode.NONE) {
log.info("Skipping fetch of bootstrap segments.");
return ImmutableList.of();
Expand All @@ -290,7 +290,7 @@ private List<DataSegment> getBootstrapSegments()
final BootstrapSegmentsResponse response =
FutureUtils.getUnchecked(coordinatorClient.fetchBootstrapSegments(), true);
if (mode == BroadcastDatasourceLoadingSpec.Mode.ONLY_REQUIRED) {
final Set<String> broadcastDatasourcesToLoad = datasourceHolder.getBroadcastDatasourceLoadingSpec().getBroadcastDatasourcesToLoad();
final Set<String> broadcastDatasourcesToLoad = loadSpecHolder.getBroadcastDatasourceLoadingSpec().getBroadcastDatasourcesToLoad();
final List<DataSegment> filteredBroadcast = new ArrayList<>();
response.getIterator().forEachRemaining(segment -> {
if (broadcastDatasourcesToLoad.contains(segment.getDataSource())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.initialization.TLSServerConfig;
import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
import org.apache.druid.server.metrics.TaskPropertiesHolder;
import org.apache.druid.server.security.TLSCertificateChecker;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.ssl.SslContextFactory;
Expand Down Expand Up @@ -83,10 +83,10 @@ public void configure(Binder binder)
@Provides
@LazySingleton
public TaskIdResponseHeaderFilterHolder taskIdResponseHeaderFilterHolderBuilder(
final DataSourceTaskIdHolder taskIdHolder
final TaskPropertiesHolder taskPropsHolder
)
{
return new TaskIdResponseHeaderFilterHolder("/druid/worker/v1/chat/*", taskIdHolder.getTaskId());
return new TaskIdResponseHeaderFilterHolder("/druid/worker/v1/chat/*", taskPropsHolder.getTaskId());
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.initialization.TLSServerConfig;
import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
import org.apache.druid.server.metrics.TaskPropertiesHolder;
import org.apache.druid.server.security.TLSCertificateChecker;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.ssl.SslContextFactory;
Expand Down Expand Up @@ -109,7 +109,7 @@ public void configure(Binder binder)
@Provides
@LazySingleton
public TaskIdResponseHeaderFilterHolder taskIdResponseHeaderFilterHolderBuilder(
final DataSourceTaskIdHolder taskIdHolder
final TaskPropertiesHolder taskIdHolder
)
{
return new TaskIdResponseHeaderFilterHolder("/druid/worker/v1/chat/*", taskIdHolder.getTaskId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@
import org.apache.druid.server.StatusResource;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.initialization.TLSServerConfig;
import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
import org.apache.druid.server.metrics.MetricsModule;
import org.apache.druid.server.metrics.MonitorsConfig;
import org.apache.druid.server.metrics.TaskPropertiesHolder;
import org.apache.druid.server.security.CustomCheckX509TrustManager;
import org.apache.druid.server.security.TLSCertificateChecker;
import org.eclipse.jetty.server.ConnectionFactory;
Expand Down Expand Up @@ -517,11 +517,11 @@ private static int getTCPAcceptQueueSize()

@Provides
@LazySingleton
public JettyMonitor getJettyMonitor(DataSourceTaskIdHolder dataSourceTaskIdHolder)
public JettyMonitor getJettyMonitor(TaskPropertiesHolder taskPropsHolder)
{
return new JettyMonitor(
MonitorsConfig.mapOfDatasourceAndTaskID(
dataSourceTaskIdHolder.getDataSource(), dataSourceTaskIdHolder.getTaskId()
taskPropsHolder.getDataSource(), taskPropsHolder.getTaskId()
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ public class GroupByStatsMonitor extends AbstractMonitor
public GroupByStatsMonitor(
GroupByStatsProvider groupByStatsProvider,
@Merging BlockingPool<ByteBuffer> mergeBufferPool,
DataSourceTaskIdHolder dataSourceTaskIdHolder
TaskPropertiesHolder taskPropsHolder
)
{
this.groupByStatsProvider = groupByStatsProvider;
this.mergeBufferPool = mergeBufferPool;
this.dimensions = MonitorsConfig.mapOfDatasourceAndTaskID(
dataSourceTaskIdHolder.getDataSource(),
dataSourceTaskIdHolder.getTaskId()
taskPropsHolder.getDataSource(),
taskPropsHolder.getTaskId()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,15 @@
import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;

import javax.annotation.Nullable;

/**
* This holder is only applicable to {@code CliPeon} servers.
* A holder applicable to all servers that contains load specifications such as {@link LookupLoadingSpec}
* and {@link BroadcastDatasourceLoadingSpec}.
*/
public class DataSourceTaskIdHolder
public class LoadSpecHolder
{
public static final String DATA_SOURCE_BINDING = "druidDataSource";
public static final String TASK_ID_BINDING = "druidTaskId";
public static final String LOOKUPS_TO_LOAD_FOR_TASK = "lookupsToLoadForTask";
public static final String BROADCAST_DATASOURCES_TO_LOAD_FOR_TASK = "broadcastDatasourcesToLoadForTask";

@Named(DATA_SOURCE_BINDING)
@Inject(optional = true)
String dataSource = null;

@Named(TASK_ID_BINDING)
@Inject(optional = true)
String taskId = null;

@Named(LOOKUPS_TO_LOAD_FOR_TASK)
@Inject(optional = true)
LookupLoadingSpec lookupLoadingSpec = LookupLoadingSpec.ALL;
Expand All @@ -52,24 +41,6 @@ public class DataSourceTaskIdHolder
@Inject(optional = true)
BroadcastDatasourceLoadingSpec broadcastDatasourceLoadingSpec = BroadcastDatasourceLoadingSpec.ALL;

/**
* @return the taskId for CliPeon servers; {@code null} for all other servers.
*/
@Nullable
public String getDataSource()
{
return dataSource;
}

/**
* @return the dataSource for CliPeon servers; {@code null} for all other servers.
*/
@Nullable
public String getTaskId()
{
return taskId;
}

public LookupLoadingSpec getLookupLoadingSpec()
{
return lookupLoadingSpec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void configure(Binder binder)

DruidBinders.metricMonitorBinder(binder); // get the binder so that it will inject the empty set at a minimum.

binder.bind(DataSourceTaskIdHolder.class).in(LazySingleton.class);
binder.bind(TaskPropertiesHolder.class).in(LazySingleton.class);

binder.bind(ExecutorServiceMonitor.class).in(LazySingleton.class);

Expand All @@ -111,8 +111,8 @@ public MonitorScheduler getMonitorScheduler(
List<Monitor> monitors = new ArrayList<>();
// HACK: when ServiceStatusMonitor is the first to be loaded, it introduces a circular dependency between
// CliPeon.runTask and CliPeon.getDataSourceFromTask/CliPeon.getTaskIDFromTask. The reason for this is unclear
// but by injecting DataSourceTaskIdHolder early this cycle is avoided.
injector.getInstance(DataSourceTaskIdHolder.class);
// but by injecting TaskPropertiesHolder early this cycle is avoided.
injector.getInstance(TaskPropertiesHolder.class);
for (Class<? extends Monitor> monitorClass : Iterables.concat(monitorsConfig.getMonitors(), monitorSet)) {
if (shouldLoadMonitor(monitorClass, nodeRoles)) {
monitors.add(injector.getInstance(monitorClass));
Expand Down Expand Up @@ -150,50 +150,50 @@ public MonitorScheduler getMonitorScheduler(
@Provides
@ManageLifecycle
public JvmMonitor getJvmMonitor(
DataSourceTaskIdHolder dataSourceTaskIdHolder
TaskPropertiesHolder taskPropsHolder
)
{
Map<String, String[]> dimensions = MonitorsConfig.mapOfDatasourceAndTaskID(
dataSourceTaskIdHolder.getDataSource(),
dataSourceTaskIdHolder.getTaskId()
taskPropsHolder.getDataSource(),
taskPropsHolder.getTaskId()
);
return new JvmMonitor(dimensions);
}

@Provides
@ManageLifecycle
public JvmCpuMonitor getJvmCpuMonitor(
DataSourceTaskIdHolder dataSourceTaskIdHolder
TaskPropertiesHolder taskPropsHolder
)
{
Map<String, String[]> dimensions = MonitorsConfig.mapOfDatasourceAndTaskID(
dataSourceTaskIdHolder.getDataSource(),
dataSourceTaskIdHolder.getTaskId()
taskPropsHolder.getDataSource(),
taskPropsHolder.getTaskId()
);
return new JvmCpuMonitor(dimensions);
}

@Provides
@ManageLifecycle
public JvmThreadsMonitor getJvmThreadsMonitor(DataSourceTaskIdHolder dataSourceTaskIdHolder)
public JvmThreadsMonitor getJvmThreadsMonitor(TaskPropertiesHolder taskPropsHolder)
{
Map<String, String[]> dimensions = MonitorsConfig.mapOfDatasourceAndTaskID(
dataSourceTaskIdHolder.getDataSource(),
dataSourceTaskIdHolder.getTaskId()
taskPropsHolder.getDataSource(),
taskPropsHolder.getTaskId()
);
return new JvmThreadsMonitor(dimensions);
}

@Provides
@ManageLifecycle
public SysMonitor getSysMonitor(DataSourceTaskIdHolder dataSourceTaskIdHolder, @Self Set<NodeRole> nodeRoles)
public SysMonitor getSysMonitor(TaskPropertiesHolder taskPropsHolder, @Self Set<NodeRole> nodeRoles)
{
if (nodeRoles.contains(NodeRole.PEON)) {
return new NoopSysMonitor();
} else {
Map<String, String[]> dimensions = MonitorsConfig.mapOfDatasourceAndTaskID(
dataSourceTaskIdHolder.getDataSource(),
dataSourceTaskIdHolder.getTaskId()
taskPropsHolder.getDataSource(),
taskPropsHolder.getTaskId()
);
return new SysMonitor(dimensions);
}
Expand All @@ -202,7 +202,7 @@ public SysMonitor getSysMonitor(DataSourceTaskIdHolder dataSourceTaskIdHolder, @
@Provides
@ManageLifecycle
public OshiSysMonitor getOshiSysMonitor(
DataSourceTaskIdHolder dataSourceTaskIdHolder,
TaskPropertiesHolder taskPropsHolder,
@Self Set<NodeRole> nodeRoles,
OshiSysMonitorConfig oshiSysConfig
)
Expand All @@ -211,8 +211,8 @@ public OshiSysMonitor getOshiSysMonitor(
return new NoopOshiSysMonitor();
} else {
Map<String, String[]> dimensions = MonitorsConfig.mapOfDatasourceAndTaskID(
dataSourceTaskIdHolder.getDataSource(),
dataSourceTaskIdHolder.getTaskId()
taskPropsHolder.getDataSource(),
taskPropsHolder.getTaskId()
);
return new OshiSysMonitor(dimensions, oshiSysConfig);
}
Expand Down
Loading
Loading