Skip to content

Commit b3be269

Browse files
authored
Refactor: Rename DataSourceTaskIdHolder and extract load spec holder to its own class (#18732)
Refactor: Remove named injections of DataSourceTaskIdHolder in favor of: - TaskHolder interface: implemented by PeonTaskHolder for peon processes and by NoopTaskHolder for all other servers, where the methods return null. - LoadSpecHolder interface: implemented by PeonLoadSpecHolder for peon processes and DefaultLoadSpecHolder for all other servers. This also removes some hacks that were added to avoid cyclic dependencies during Guice init. A module DefaultServerHolderModule that binds the above modules as part of the core injector module for all servers except CliPeon. For CliPeon, the binding to PeonTaskHolder and PeonLoadSpecHolder is explicitly done in CliPeon itself.
1 parent 2d7cd69 commit b3be269

File tree

36 files changed

+861
-323
lines changed

36 files changed

+861
-323
lines changed

extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,12 @@
3535
import com.google.common.util.concurrent.Futures;
3636
import com.google.common.util.concurrent.ListenableFuture;
3737
import com.google.common.util.concurrent.MoreExecutors;
38+
import com.google.inject.Injector;
39+
import org.apache.commons.io.FileUtils;
3840
import org.apache.curator.test.TestingCluster;
41+
import org.apache.druid.cli.CliPeon;
42+
import org.apache.druid.cli.PeonLoadSpecHolder;
43+
import org.apache.druid.cli.PeonTaskHolder;
3944
import org.apache.druid.data.input.InputEntity;
4045
import org.apache.druid.data.input.InputEntityReader;
4146
import org.apache.druid.data.input.InputFormat;
@@ -52,6 +57,8 @@
5257
import org.apache.druid.data.input.kafka.KafkaTopicPartition;
5358
import org.apache.druid.data.input.kafkainput.KafkaInputFormat;
5459
import org.apache.druid.data.input.kafkainput.KafkaStringHeaderFormat;
60+
import org.apache.druid.discovery.NodeRole;
61+
import org.apache.druid.guice.GuiceInjectors;
5562
import org.apache.druid.indexer.IngestionState;
5663
import org.apache.druid.indexer.TaskState;
5764
import org.apache.druid.indexer.TaskStatus;
@@ -112,6 +119,10 @@
112119
import org.apache.druid.segment.indexing.DataSchema;
113120
import org.apache.druid.segment.transform.ExpressionTransform;
114121
import org.apache.druid.segment.transform.TransformSpec;
122+
import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
123+
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
124+
import org.apache.druid.server.metrics.LoadSpecHolder;
125+
import org.apache.druid.server.metrics.TaskHolder;
115126
import org.apache.druid.server.security.Action;
116127
import org.apache.druid.server.security.Resource;
117128
import org.apache.druid.server.security.ResourceAction;
@@ -127,7 +138,9 @@
127138
import org.junit.Assert;
128139
import org.junit.Before;
129140
import org.junit.BeforeClass;
141+
import org.junit.Rule;
130142
import org.junit.Test;
143+
import org.junit.rules.TemporaryFolder;
131144
import org.junit.runner.RunWith;
132145
import org.junit.runners.Parameterized;
133146

@@ -141,6 +154,8 @@
141154
import java.util.List;
142155
import java.util.Map;
143156
import java.util.Objects;
157+
import java.util.Properties;
158+
import java.util.Set;
144159
import java.util.TreeMap;
145160
import java.util.concurrent.ConcurrentMap;
146161
import java.util.concurrent.ExecutionException;
@@ -153,6 +168,9 @@
153168
@RunWith(Parameterized.class)
154169
public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
155170
{
171+
@Rule
172+
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
173+
156174
private static final long POLL_RETRY_MS = 100;
157175
private static final Iterable<Header> SAMPLE_HEADERS = ImmutableList.of(new Header()
158176
{
@@ -3360,6 +3378,80 @@ public void testCompletionReportMultiplePartitionStats() throws Exception
33603378
Assert.assertTrue(reportData.getRecordsProcessed().values().containsAll(ImmutableSet.of(6L, 2L)));
33613379
}
33623380

3381+
@Test(timeout = 60_000L)
3382+
public void testTaskWithTransformSpecDoesNotCauseCliPeonCyclicDependency()
3383+
throws IOException, ExecutionException, InterruptedException
3384+
{
3385+
insertData();
3386+
3387+
final KafkaIndexTask task = createTask(
3388+
"index_kafka_test_id1",
3389+
NEW_DATA_SCHEMA.withTransformSpec(
3390+
new TransformSpec(
3391+
new SelectorDimFilter("dim1", "b", null),
3392+
ImmutableList.of(
3393+
new ExpressionTransform("dim1t", "concat(dim1,dim1)", ExprMacroTable.nil())
3394+
)
3395+
)
3396+
),
3397+
new KafkaIndexTaskIOConfig(
3398+
0,
3399+
"sequence0",
3400+
new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(new KafkaTopicPartition(false, topic, 0), 0L), ImmutableSet.of()),
3401+
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(new KafkaTopicPartition(false, topic, 0), 5L)),
3402+
kafkaServer.consumerProperties(),
3403+
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
3404+
true,
3405+
null,
3406+
null,
3407+
INPUT_FORMAT,
3408+
null,
3409+
Duration.standardHours(2).getStandardMinutes()
3410+
)
3411+
);
3412+
3413+
File file = temporaryFolder.newFile("task.json");
3414+
3415+
FileUtils.write(file, OBJECT_MAPPER.writeValueAsString(task), StandardCharsets.UTF_8);
3416+
3417+
final CliPeon peon = new CliPeon();
3418+
peon.taskAndStatusFile = ImmutableList.of(file.getParent(), "1");
3419+
3420+
final Properties properties = new Properties();
3421+
peon.configure(properties);
3422+
final Injector baseInjector = GuiceInjectors.makeStartupInjector();
3423+
peon.configure(properties, baseInjector);
3424+
final Injector peonInjector = peon.makeInjector(Set.of(NodeRole.PEON));
3425+
3426+
final LoadSpecHolder loadSpecHolder = peonInjector.getInstance(LoadSpecHolder.class);
3427+
Assert.assertTrue(loadSpecHolder instanceof PeonLoadSpecHolder);
3428+
Assert.assertEquals(LookupLoadingSpec.ALL, loadSpecHolder.getLookupLoadingSpec());
3429+
Assert.assertEquals(BroadcastDatasourceLoadingSpec.ALL, loadSpecHolder.getBroadcastDatasourceLoadingSpec());
3430+
3431+
final TaskHolder taskHolder = peonInjector.getInstance(TaskHolder.class);
3432+
Assert.assertTrue(taskHolder instanceof PeonTaskHolder);
3433+
Assert.assertEquals("index_kafka_test_id1", taskHolder.getTaskId());
3434+
Assert.assertEquals("test_ds", taskHolder.getDataSource());
3435+
3436+
final ListenableFuture<TaskStatus> future = runTask(task);
3437+
3438+
// Wait for task to exit
3439+
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
3440+
verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0, 5)).thrownAway(4).totalProcessed(1));
3441+
3442+
// Check published metadata
3443+
final List<SegmentDescriptor> publishedDescriptors = publishedDescriptors();
3444+
assertEqualsExceptVersion(ImmutableList.of(sdd("2009/P1D", 0)), publishedDescriptors);
3445+
Assert.assertEquals(
3446+
new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(new KafkaTopicPartition(false, topic, 0), 5L))),
3447+
newDataSchemaMetadata()
3448+
);
3449+
3450+
// Check segments in deep storage
3451+
Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1", publishedDescriptors.get(0)));
3452+
Assert.assertEquals(ImmutableList.of("bb"), readSegmentColumn("dim1t", publishedDescriptors.get(0)));
3453+
}
3454+
33633455
public static class TestKafkaInputFormat implements InputFormat
33643456
{
33653457
final InputFormat baseInputFormat;

indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1240,6 +1240,8 @@ public static class Builder
12401240
private final String dataSource;
12411241
private final SegmentCacheManagerFactory segmentCacheManagerFactory;
12421242

1243+
@Nullable
1244+
private String id;
12431245
private CompactionIOConfig ioConfig;
12441246
@Nullable
12451247
private DimensionsSpec dimensionsSpec;
@@ -1268,6 +1270,12 @@ public Builder(
12681270
this.segmentCacheManagerFactory = segmentCacheManagerFactory;
12691271
}
12701272

1273+
public Builder id(String id)
1274+
{
1275+
this.id = id;
1276+
return this;
1277+
}
1278+
12711279
public Builder interval(Interval interval)
12721280
{
12731281
return inputSpec(new CompactionIntervalSpec(interval, null));
@@ -1353,7 +1361,7 @@ public Builder projections(@Nullable List<AggregateProjectionSpec> projections)
13531361
public CompactionTask build()
13541362
{
13551363
return new CompactionTask(
1356-
null,
1364+
id,
13571365
null,
13581366
dataSource,
13591367
null,

quidem-ut/src/main/java/org/apache/druid/quidem/ExposedAsBrokerQueryComponentSupplierWrapper.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
4242
import org.apache.druid.guice.AnnouncerModule;
4343
import org.apache.druid.guice.BrokerServiceModule;
44+
import org.apache.druid.guice.DefaultServerHolderModule;
4445
import org.apache.druid.guice.ExpressionModule;
4546
import org.apache.druid.guice.ExtensionsModule;
4647
import org.apache.druid.guice.JacksonConfigManagerModule;
@@ -194,6 +195,7 @@ private List<Module> forServerModules()
194195
new ExternalStorageAccessSecurityModule(),
195196
new ServiceClientModule(),
196197
new StorageConnectorModule(),
198+
new DefaultServerHolderModule(),
197199
ServerInjectorBuilder.registerNodeRoleModule(ImmutableSet.of())
198200
);
199201
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
21+
package org.apache.druid.guice;
22+
23+
import com.google.inject.Binder;
24+
import org.apache.druid.discovery.NodeRole;
25+
import org.apache.druid.guice.annotations.ExcludeScope;
26+
import org.apache.druid.initialization.DruidModule;
27+
import org.apache.druid.server.metrics.DefaultLoadSpecHolder;
28+
import org.apache.druid.server.metrics.LoadSpecHolder;
29+
import org.apache.druid.server.metrics.NoopTaskHolder;
30+
import org.apache.druid.server.metrics.TaskHolder;
31+
32+
/**
33+
* Binds the following holder configs for all servers except {@code CliPeon}:
34+
* <ul>
35+
* <li>{@link TaskHolder} to {@link NoopTaskHolder}</li>
36+
* <li>{@link LoadSpecHolder} to {@link DefaultLoadSpecHolder}</li>
37+
* </ul>
38+
*
39+
* <p>For {@code CliPeon}, these bindings are overridden by the peon-specific module.</p>
40+
*/
41+
@ExcludeScope(roles = {NodeRole.PEON_JSON_NAME})
42+
public class DefaultServerHolderModule implements DruidModule
43+
{
44+
@Override
45+
public void configure(Binder binder)
46+
{
47+
binder.bind(TaskHolder.class).to(NoopTaskHolder.class).in(LazySingleton.class);
48+
binder.bind(LoadSpecHolder.class).to(DefaultLoadSpecHolder.class).in(LazySingleton.class);
49+
}
50+
}

server/src/main/java/org/apache/druid/guice/annotations/ExcludeScope.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import java.lang.annotation.Target;
2626

2727
/**
28-
* An annotation to exlcude specific node types that a {@link com.google.inject.Module} can be loaded on.
28+
* An annotation to exclude specific node types that a {@link com.google.inject.Module} can be loaded on.
2929
* The {@link #roles()} should be the {@link org.apache.druid.discovery.NodeRole#jsonName}. If both {@link ExcludeScope}
3030
* and {@link LoadScope} are set, {@link ExcludeScope} takes precedence.
3131
* <p>

server/src/main/java/org/apache/druid/initialization/CoreInjectorBuilder.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.druid.guice.AnnouncerModule;
2727
import org.apache.druid.guice.BuiltInTypesModule;
2828
import org.apache.druid.guice.CatalogCoreModule;
29+
import org.apache.druid.guice.DefaultServerHolderModule;
2930
import org.apache.druid.guice.DruidInjectorBuilder;
3031
import org.apache.druid.guice.DruidSecondaryModule;
3132
import org.apache.druid.guice.ExpressionModule;
@@ -142,7 +143,8 @@ public CoreInjectorBuilder forServerWithoutJetty()
142143
new ExternalStorageAccessSecurityModule(),
143144
new ServiceClientModule(),
144145
new StorageConnectorModule(),
145-
new CatalogCoreModule()
146+
new CatalogCoreModule(),
147+
new DefaultServerHolderModule()
146148
);
147149
return this;
148150
}

server/src/main/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfig.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,23 +25,28 @@
2525
import com.google.common.base.Preconditions;
2626
import org.apache.druid.java.util.common.StringUtils;
2727
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
28-
import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
28+
import org.apache.druid.server.metrics.LoadSpecHolder;
29+
import org.apache.druid.server.metrics.TaskHolder;
2930

3031
class LookupListeningAnnouncerConfig
3132
{
3233
public static final String DEFAULT_TIER = "__default";
33-
private final DataSourceTaskIdHolder dataSourceTaskIdHolder;
34+
private final TaskHolder taskHolder;
35+
private final LoadSpecHolder loadSpecHolder;
36+
3437
@JsonProperty("lookupTier")
3538
private String lookupTier = null;
3639
@JsonProperty("lookupTierIsDatasource")
3740
private boolean lookupTierIsDatasource = false;
3841

3942
@JsonCreator
4043
public LookupListeningAnnouncerConfig(
41-
@JacksonInject DataSourceTaskIdHolder dataSourceTaskIdHolder
44+
@JacksonInject TaskHolder taskHolder,
45+
@JacksonInject LoadSpecHolder loadSpecHolder
4246
)
4347
{
44-
this.dataSourceTaskIdHolder = dataSourceTaskIdHolder;
48+
this.taskHolder = taskHolder;
49+
this.loadSpecHolder = loadSpecHolder;
4550
}
4651

4752
public String getLookupTier()
@@ -50,7 +55,8 @@ public String getLookupTier()
5055
!(lookupTierIsDatasource && null != lookupTier),
5156
"Cannot specify both `lookupTier` and `lookupTierIsDatasource`"
5257
);
53-
final String lookupTier = lookupTierIsDatasource ? dataSourceTaskIdHolder.getDataSource() : this.lookupTier;
58+
59+
final String lookupTier = lookupTierIsDatasource ? taskHolder.getDataSource() : this.lookupTier;
5460

5561
return Preconditions.checkNotNull(
5662
lookupTier == null ? DEFAULT_TIER : StringUtils.emptyToNullNonDruidDataString(lookupTier),
@@ -61,6 +67,6 @@ public String getLookupTier()
6167

6268
public LookupLoadingSpec getLookupLoadingSpec()
6369
{
64-
return dataSourceTaskIdHolder.getLookupLoadingSpec();
70+
return loadSpecHolder.getLookupLoadingSpec();
6571
}
6672
}

server/src/main/java/org/apache/druid/query/lookup/LookupReferencesManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
import org.apache.druid.java.util.emitter.EmittingLogger;
4343
import org.apache.druid.server.http.ServletResourceUtils;
4444
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
45-
import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
45+
import org.apache.druid.server.metrics.LoadSpecHolder;
4646

4747
import javax.annotation.Nullable;
4848
import java.io.File;
@@ -380,7 +380,7 @@ private void takeSnapshot(Map<String, LookupExtractorFactoryContainer> lookupMap
380380
}
381381

382382
/**
383-
* Load a set of lookups based on the injected value in {@link DataSourceTaskIdHolder#getLookupLoadingSpec()}.
383+
* Load a set of lookups based on the injected value in {@link LoadSpecHolder#getLookupLoadingSpec()}.
384384
*/
385385
private void loadLookupsAndInitStateRef()
386386
{

server/src/main/java/org/apache/druid/segment/realtime/ChatHandlerResource.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@
2525
import org.apache.druid.java.util.common.StringUtils;
2626
import org.apache.druid.server.initialization.jetty.BadRequestException;
2727
import org.apache.druid.server.initialization.jetty.ServiceUnavailableException;
28-
import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
28+
import org.apache.druid.server.metrics.TaskHolder;
2929

30+
import javax.annotation.Nullable;
3031
import javax.ws.rs.Path;
3132
import javax.ws.rs.PathParam;
3233
import javax.ws.rs.core.Context;
@@ -39,13 +40,14 @@ public class ChatHandlerResource
3940
public static final String TASK_ID_HEADER = "X-Druid-Task-Id";
4041

4142
private final ChatHandlerProvider handlers;
43+
@Nullable
4244
private final String taskId;
4345

4446
@Inject
45-
public ChatHandlerResource(final ChatHandlerProvider handlers, final DataSourceTaskIdHolder taskIdHolder)
47+
public ChatHandlerResource(final ChatHandlerProvider handlers, final TaskHolder taskHolder)
4648
{
4749
this.handlers = handlers;
48-
this.taskId = taskIdHolder.getTaskId();
50+
this.taskId = taskHolder.getTaskId();
4951
}
5052

5153
@Path("/{id}")

0 commit comments

Comments
 (0)