4545import org .apache .druid .indexing .seekablestream .SeekableStreamIndexTaskTuningConfig ;
4646import org .apache .druid .indexing .seekablestream .SeekableStreamStartSequenceNumbers ;
4747import org .apache .druid .indexing .seekablestream .common .RecordSupplier ;
48+ import org .apache .druid .indexing .worker .executor .ExecutorLifecycleConfig ;
4849import org .apache .druid .java .util .common .DateTimes ;
4950import org .apache .druid .java .util .common .Intervals ;
5051import org .apache .druid .java .util .common .granularity .AllGranularity ;
7172import javax .annotation .Nullable ;
7273import java .io .File ;
7374import java .io .IOException ;
74- import java .lang .reflect .Field ;
7575import java .nio .charset .StandardCharsets ;
76- import java .util .ArrayList ;
7776import java .util .Collections ;
78- import java .util .List ;
7977import java .util .Map ;
8078import java .util .Properties ;
8179import java .util .Set ;
@@ -97,51 +95,43 @@ public class CliPeonTest
9795 @ Test
9896 public void testCliPeonK8sMode () throws IOException
9997 {
100- File file = temporaryFolder .newFile ("task.json" );
101- FileUtils .write (file , "{\" type\" :\" noop\" }" , StandardCharsets .UTF_8 );
102- GuiceRunnable runnable = new FakeCliPeon (file .getParent (), "k8s" );
103- final Injector injector = GuiceInjectors .makeStartupInjector ();
104- injector .injectMembers (runnable );
105- Assert .assertNotNull (runnable .makeInjector ());
98+ final Properties properties = new Properties ();
99+ properties .setProperty ("druid.indexer.runner.type" , "k8s" );
100+ final Injector peonInjector = makePeonInjector (NoopTask .create (), properties );
101+ final ExecutorLifecycleConfig executorLifecycleConfig = peonInjector .getInstance (ExecutorLifecycleConfig .class );
102+ Assert .assertFalse (executorLifecycleConfig .isParentStreamDefined ());
106103 }
107104
108105 @ Test
109106 public void testCliPeonNonK8sMode () throws IOException
110107 {
111- File file = temporaryFolder .newFile ("task.json" );
112- FileUtils .write (file , "{\" type\" :\" noop\" }" , StandardCharsets .UTF_8 );
113- GuiceRunnable runnable = new FakeCliPeon (file .getParent (), "httpRemote" );
114- final Injector injector = GuiceInjectors .makeStartupInjector ();
115- injector .injectMembers (runnable );
116- Assert .assertNotNull (runnable .makeInjector ());
108+ final Properties properties = new Properties ();
109+ properties .setProperty ("druid.indexer.runner.type" , "httpRemote" );
110+ final Injector peonInjector = makePeonInjector (NoopTask .create (), properties );
111+ final ExecutorLifecycleConfig executorLifecycleConfig = peonInjector .getInstance (ExecutorLifecycleConfig .class );
112+ Assert .assertTrue (executorLifecycleConfig .isParentStreamDefined ());
117113 }
118114
119115 @ Test
120- public void testCliPeonK8sANdWorkerIsK8sMode () throws IOException
116+ public void testCliPeonK8sAndWorkerIsK8sMode () throws IOException
121117 {
122- File file = temporaryFolder .newFile ("task.json" );
123- FileUtils .write (file , "{\" type\" :\" noop\" }" , StandardCharsets .UTF_8 );
124- GuiceRunnable runnable = new FakeCliPeon (file .getParent (), "k8sAndWorker" );
125- final Injector injector = GuiceInjectors .makeStartupInjector ();
126- injector .injectMembers (runnable );
127- Assert .assertNotNull (runnable .makeInjector ());
118+ final Properties properties = new Properties ();
119+ properties .setProperty ("druid.indexer.runner.type" , "k8sAndWorker" );
120+ final Injector peonInjector = makePeonInjector (NoopTask .create (), properties );
121+ final ExecutorLifecycleConfig executorLifecycleConfig = peonInjector .getInstance (ExecutorLifecycleConfig .class );
122+ Assert .assertFalse (executorLifecycleConfig .isParentStreamDefined ());
128123 }
129124
130125 @ Test
131126 public void testCliPeonPolicyEnforcerInToolbox () throws IOException
132127 {
133- CliPeon runnable = new CliPeon ();
134- File file = temporaryFolder .newFile ("task.json" );
135- FileUtils .write (file , "{\" type\" :\" noop\" }" , StandardCharsets .UTF_8 );
136- runnable .taskAndStatusFile = ImmutableList .of (file .getParent (), "1" );
137-
138- Properties properties = new Properties ();
128+ final Properties properties = new Properties ();
139129 properties .setProperty ("druid.policy.enforcer.type" , "restrictAllTables" );
140- runnable . configure ( properties );
141- runnable . configure ( properties , GuiceInjectors . makeStartupInjector ());
142-
143- Injector secondaryInjector = runnable . makeInjector ();
144- Assert . assertEquals ( new RestrictAllTablesPolicyEnforcer ( null ), secondaryInjector . getInstance ( PolicyEnforcer . class ) );
130+ final Injector peonInjector = makePeonInjector ( NoopTask . create (), properties );
131+ Assert . assertEquals (
132+ new RestrictAllTablesPolicyEnforcer ( null ),
133+ peonInjector . getInstance ( PolicyEnforcer . class )
134+ );
145135 }
146136
147137 @ Test
@@ -192,26 +182,20 @@ public void testCliPeonHeartbeatDimensions()
192182 @ Test
193183 public void testCliPeonLocalTmpStorage () throws IOException
194184 {
185+ final Properties properties = new Properties ();
195186 File file = temporaryFolder .newFile ("task.json" );
196- FileUtils .write (file , "{\" type\" :\" noop\" }" , StandardCharsets .UTF_8 );
197-
198- CliPeon runnable = new CliPeon ();
199- runnable .taskAndStatusFile = ImmutableList .of (file .getParent (), "1" );
200- Properties properties = new Properties ();
201- runnable .configure (properties );
202- runnable .configure (properties , GuiceInjectors .makeStartupInjector ());
203- Injector secondaryInjector = runnable .makeInjector ();
204- Assert .assertNotNull (secondaryInjector );
187+ FileUtils .write (file , mapper .writeValueAsString (NoopTask .create ()), StandardCharsets .UTF_8 );
188+ final Injector peonInjector = makePeonInjector (file , properties );
205189
206- LocalTmpStorageConfig localTmpStorageConfig = secondaryInjector .getInstance (LocalTmpStorageConfig .class );
190+ LocalTmpStorageConfig localTmpStorageConfig = peonInjector .getInstance (LocalTmpStorageConfig .class );
207191 Assert .assertEquals (new File (file .getParent (), "/tmp" ).getAbsolutePath (), localTmpStorageConfig .getTmpDir ().getAbsolutePath ());
208192 }
209193
210194 @ Test
211195 public void testTaskWithDefaultContext () throws IOException
212196 {
213197 final CompactionTask compactionTask = compactBuilder .build ();
214- final Injector peonInjector = makePeonInjector (new Properties (), compactionTask );
198+ final Injector peonInjector = makePeonInjector (compactionTask , new Properties ());
215199
216200 verifyLoadSpecHolder (peonInjector .getInstance (LoadSpecHolder .class ), compactionTask );
217201 verifyTaskHolder (peonInjector .getInstance (TaskHolder .class ), compactionTask );
@@ -227,7 +211,7 @@ public void testTaskWithContextOverrides() throws IOException
227211 ))
228212 .build ();
229213
230- final Injector peonInjector = makePeonInjector (new Properties (), compactionTask );
214+ final Injector peonInjector = makePeonInjector (compactionTask , new Properties ());
231215
232216 verifyLoadSpecHolder (peonInjector .getInstance (LoadSpecHolder .class ), compactionTask );
233217 verifyTaskHolder (peonInjector .getInstance (TaskHolder .class ), compactionTask );
@@ -243,7 +227,7 @@ public void testTaskWithMetricsSpecDoesNotCauseCyclicDependency() throws IOExcep
243227 })
244228 .build ();
245229
246- final Injector peonInjector = makePeonInjector (new Properties (), compactionTask );
230+ final Injector peonInjector = makePeonInjector (compactionTask , new Properties ());
247231
248232 verifyLoadSpecHolder (peonInjector .getInstance (LoadSpecHolder .class ), compactionTask );
249233 verifyTaskHolder (peonInjector .getInstance (TaskHolder .class ), compactionTask );
@@ -264,25 +248,29 @@ public void testTaskWithMonitorsAndMetricsSpecDoNotCauseCyclicDependency() throw
264248 "[\" org.apache.druid.server.metrics.ServiceStatusMonitor\" ,"
265249 + " \" org.apache.druid.java.util.metrics.JvmMonitor\" ]"
266250 );
267- final Injector peonInjector = makePeonInjector (properties , compactionTask );
251+ final Injector peonInjector = makePeonInjector (compactionTask , properties );
268252
269253 verifyLoadSpecHolder (peonInjector .getInstance (LoadSpecHolder .class ), compactionTask );
270254 verifyTaskHolder (peonInjector .getInstance (TaskHolder .class ), compactionTask );
271255 }
272256
273- private Injector makePeonInjector (Properties properties , Task task ) throws IOException
257+ private Injector makePeonInjector (File taskFile , Properties properties )
274258 {
275- File file = temporaryFolder .newFile ("task.json" );
276- FileUtils .write (file , mapper .writeValueAsString (task ), StandardCharsets .UTF_8 );
277-
278259 final CliPeon peon = new CliPeon ();
279- peon .taskAndStatusFile = ImmutableList .of (file .getParent (), "1" );
260+ peon .taskAndStatusFile = ImmutableList .of (taskFile .getParent (), "1" );
280261
281262 peon .configure (properties );
282263 peon .configure (properties , GuiceInjectors .makeStartupInjector ());
283264 return peon .makeInjector (Set .of (NodeRole .PEON ));
284265 }
285266
267+ private Injector makePeonInjector (Task task , Properties properties ) throws IOException
268+ {
269+ File taskFile = temporaryFolder .newFile ("task.json" );
270+ FileUtils .write (taskFile , mapper .writeValueAsString (task ), StandardCharsets .UTF_8 );
271+ return makePeonInjector (taskFile , properties );
272+ }
273+
286274 private static void verifyLoadSpecHolder (LoadSpecHolder observedLoadSpecHolder , Task task )
287275 {
288276 Assert .assertTrue (observedLoadSpecHolder instanceof PeonLoadSpecHolder );
@@ -300,30 +288,6 @@ private static void verifyTaskHolder(TaskHolder observedTaskHolder, Task task)
300288 Assert .assertEquals (task .getDataSource (), observedTaskHolder .getDataSource ());
301289 }
302290
303- private static class FakeCliPeon extends CliPeon
304- {
305- List <String > taskAndStatusFile = new ArrayList <>();
306-
307- FakeCliPeon (String taskDirectory , String runnerType )
308- {
309- try {
310- taskAndStatusFile .add (taskDirectory );
311- taskAndStatusFile .add ("1" );
312-
313- Field privateField = CliPeon .class
314- .getDeclaredField ("taskAndStatusFile" );
315- privateField .setAccessible (true );
316- privateField .set (this , taskAndStatusFile );
317-
318- System .setProperty ("druid.indexer.runner.type" , runnerType );
319- }
320- catch (Exception ex ) {
321- // do nothing
322- }
323-
324- }
325- }
326-
327291 private static class TestTask extends NoopTask
328292 {
329293
0 commit comments