1717 * under the License.
1818 */
1919
20- package org .apache .druid .indexing . common . task ;
20+ package org .apache .druid .indexer ;
2121
2222import com .fasterxml .jackson .annotation .JacksonInject ;
2323import com .fasterxml .jackson .annotation .JsonCreator ;
3232import com .google .common .collect .ImmutableMap ;
3333import com .google .common .collect .Iterables ;
3434import org .apache .commons .lang3 .BooleanUtils ;
35- import org .apache .druid .indexer .DataSegmentAndIndexZipFilePath ;
36- import org .apache .druid .indexer .HadoopDruidDetermineConfigurationJob ;
37- import org .apache .druid .indexer .HadoopDruidIndexerConfig ;
38- import org .apache .druid .indexer .HadoopDruidIndexerJob ;
39- import org .apache .druid .indexer .HadoopIngestionSpec ;
40- import org .apache .druid .indexer .IngestionState ;
41- import org .apache .druid .indexer .JobHelper ;
42- import org .apache .druid .indexer .TaskMetricsGetter ;
43- import org .apache .druid .indexer .TaskMetricsUtils ;
44- import org .apache .druid .indexer .TaskStatus ;
35+ import org .apache .druid .annotations .SuppressFBWarnings ;
4536import org .apache .druid .indexer .granularity .ArbitraryGranularitySpec ;
4637import org .apache .druid .indexer .granularity .GranularitySpec ;
4738import org .apache .druid .indexer .path .SegmentMetadataPublisher ;
5344import org .apache .druid .indexing .common .actions .TimeChunkLockAcquireAction ;
5445import org .apache .druid .indexing .common .actions .TimeChunkLockTryAcquireAction ;
5546import org .apache .druid .indexing .common .config .TaskConfig ;
56- import org .apache .druid .indexing .hadoop .OverlordActionBasedUsedSegmentsRetriever ;
47+ import org .apache .druid .indexing .common .task .AbstractTask ;
48+ import org .apache .druid .indexing .common .task .Tasks ;
5749import org .apache .druid .indexing .overlord .IndexerMetadataStorageCoordinator ;
5850import org .apache .druid .java .util .common .JodaUtils ;
5951import org .apache .druid .java .util .common .StringUtils ;
9082import java .util .Set ;
9183import java .util .stream .Collectors ;
9284
85+ @ SuppressFBWarnings ({"NP_NONNULL_PARAM_VIOLATION" , "NP_STORE_INTO_NONNULL_FIELD" })
9386public class HadoopIndexTask extends HadoopTask implements ChatHandler
9487{
9588 public static final String TYPE = "index_hadoop" ;
@@ -115,6 +108,9 @@ private static String getTheDataSource(HadoopIngestionSpec spec)
115108 @ JsonIgnore
116109 private final AuthorizerMapper authorizerMapper ;
117110
111+ @ JsonIgnore
112+ private final HadoopTaskConfig hadoopTaskConfig ;
113+
118114 @ JsonIgnore
119115 private final Optional <ChatHandlerProvider > chatHandlerProvider ;
120116
@@ -136,6 +132,7 @@ private static String getTheDataSource(HadoopIngestionSpec spec)
136132 @ JsonIgnore
137133 private String errorMsg ;
138134
135+
139136 /**
140137 * @param spec is used by the HadoopDruidIndexerJob to set up the appropriate parameters
141138 * for creating Druid index segments. It may be modified.
@@ -156,16 +153,18 @@ public HadoopIndexTask(
156153 @ JacksonInject ObjectMapper jsonMapper ,
157154 @ JsonProperty ("context" ) Map <String , Object > context ,
158155 @ JacksonInject AuthorizerMapper authorizerMapper ,
159- @ JacksonInject ChatHandlerProvider chatHandlerProvider
156+ @ JacksonInject ChatHandlerProvider chatHandlerProvider ,
157+ @ JacksonInject HadoopTaskConfig hadoopTaskConfig
160158 )
161159 {
162160 super (
163- getOrMakeId (id , TYPE , getTheDataSource (spec )),
161+ AbstractTask . getOrMakeId (id , TYPE , getTheDataSource (spec )),
164162 getTheDataSource (spec ),
165163 hadoopDependencyCoordinates == null
166164 ? (hadoopCoordinates == null ? null : ImmutableList .of (hadoopCoordinates ))
167165 : hadoopDependencyCoordinates ,
168- context
166+ context ,
167+ hadoopTaskConfig
169168 );
170169 this .authorizerMapper = authorizerMapper ;
171170 this .chatHandlerProvider = Optional .fromNullable (chatHandlerProvider );
@@ -181,6 +180,7 @@ public HadoopIndexTask(
181180 this .spec .getIOConfig ().getMetadataUpdateSpec () == null ,
182181 "metadataUpdateSpec must be absent"
183182 );
183+ this .hadoopTaskConfig = hadoopTaskConfig ;
184184
185185 this .classpathPrefix = classpathPrefix ;
186186 this .jsonMapper = Preconditions .checkNotNull (jsonMapper , "null ObjectMappper" );
@@ -338,8 +338,8 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
338338 try {
339339 registerResourceCloserOnAbnormalExit (config -> killHadoopJob ());
340340 String hadoopJobIdFile = getHadoopJobIdFileName ();
341- logExtensionsConfig ();
342- final ClassLoader loader = buildClassLoader (toolbox );
341+ HadoopTask . logExtensionsConfig ();
342+ final ClassLoader loader = buildClassLoader ();
343343 boolean determineIntervals = spec .getDataSchema ().getGranularitySpec ().inputIntervals ().isEmpty ();
344344
345345 HadoopIngestionSpec .updateSegmentListIfDatasourcePathSpecIsUsed (
@@ -348,15 +348,15 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
348348 new OverlordActionBasedUsedSegmentsRetriever (toolbox )
349349 );
350350
351- Object determinePartitionsInnerProcessingRunner = getForeignClassloaderObject (
352- "org.apache.druid.indexing.common.task .HadoopIndexTask$HadoopDetermineConfigInnerProcessingRunner" ,
351+ Object determinePartitionsInnerProcessingRunner = HadoopTask . getForeignClassloaderObject (
352+ "org.apache.druid.indexer .HadoopIndexTask$HadoopDetermineConfigInnerProcessingRunner" ,
353353 loader
354354 );
355355 determinePartitionsStatsGetter = new InnerProcessingStatsGetter (determinePartitionsInnerProcessingRunner );
356356
357357 String [] determinePartitionsInput = new String []{
358358 toolbox .getJsonMapper ().writeValueAsString (spec ),
359- toolbox . getConfig () .getHadoopWorkingPath (),
359+ hadoopTaskConfig .getHadoopWorkingPath (),
360360 toolbox .getSegmentPusher ().getPathForHadoop (),
361361 hadoopJobIdFile
362362 };
@@ -418,7 +418,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
418418 lock .assertNotRevoked ();
419419 version = lock .getVersion ();
420420 } else {
421- Iterable <TaskLock > locks = getTaskLocks (toolbox .getTaskActionClient ());
421+ Iterable <TaskLock > locks = AbstractTask . getTaskLocks (toolbox .getTaskActionClient ());
422422 final TaskLock myLock = Iterables .getOnlyElement (locks );
423423 version = myLock .getVersion ();
424424 }
@@ -442,8 +442,8 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
442442
443443 log .info ("Setting version to: %s" , version );
444444
445- Object innerProcessingRunner = getForeignClassloaderObject (
446- "org.apache.druid.indexing.common.task .HadoopIndexTask$HadoopIndexGeneratorInnerProcessingRunner" ,
445+ Object innerProcessingRunner = HadoopTask . getForeignClassloaderObject (
446+ "org.apache.druid.indexer .HadoopIndexTask$HadoopIndexGeneratorInnerProcessingRunner" ,
447447 loader
448448 );
449449 buildSegmentsStatsGetter = new InnerProcessingStatsGetter (innerProcessingRunner );
@@ -535,11 +535,11 @@ private void killHadoopJob()
535535 try {
536536 ClassLoader loader = HadoopTask .buildClassLoader (
537537 getHadoopDependencyCoordinates (),
538- taskConfig .getDefaultHadoopCoordinates ()
538+ hadoopTaskConfig .getDefaultHadoopCoordinates ()
539539 );
540540
541- Object killMRJobInnerProcessingRunner = getForeignClassloaderObject (
542- "org.apache.druid.indexing.common.task .HadoopIndexTask$HadoopKillMRJobIdProcessingRunner" ,
541+ Object killMRJobInnerProcessingRunner = HadoopTask . getForeignClassloaderObject (
542+ "org.apache.druid.indexer .HadoopIndexTask$HadoopKillMRJobIdProcessingRunner" ,
543543 loader
544544 );
545545
@@ -576,7 +576,7 @@ private void renameSegmentIndexFilesJob(
576576 final ClassLoader loader = Thread .currentThread ().getContextClassLoader ();
577577 try {
578578 final Class <?> clazz = loader .loadClass (
579- "org.apache.druid.indexing.common.task .HadoopIndexTask$HadoopRenameSegmentIndexFilesRunner"
579+ "org.apache.druid.indexer .HadoopIndexTask$HadoopRenameSegmentIndexFilesRunner"
580580 );
581581 Object renameSegmentIndexFilesRunner = clazz .newInstance ();
582582
@@ -616,11 +616,11 @@ private void indexerGeneratorCleanupJob(
616616 try {
617617 ClassLoader loader = HadoopTask .buildClassLoader (
618618 getHadoopDependencyCoordinates (),
619- taskConfig .getDefaultHadoopCoordinates ()
619+ hadoopTaskConfig .getDefaultHadoopCoordinates ()
620620 );
621621
622- Object indexerGeneratorCleanupRunner = getForeignClassloaderObject (
623- "org.apache.druid.indexing.common.task .HadoopIndexTask$HadoopIndexerGeneratorCleanupRunner" ,
622+ Object indexerGeneratorCleanupRunner = HadoopTask . getForeignClassloaderObject (
623+ "org.apache.druid.indexer .HadoopIndexTask$HadoopIndexerGeneratorCleanupRunner" ,
624624 loader
625625 );
626626
0 commit comments