4545import java .util .ArrayList ;
4646import java .util .List ;
4747import java .util .Map ;
48+ import java .util .Optional ;
4849import java .util .concurrent .TimeUnit ;
4950import java .util .concurrent .locks .Lock ;
5051import java .util .logging .Logger ;
@@ -68,7 +69,7 @@ public class ContainerQueryRunner
6869 private static final Logger logger = Logger .getLogger (ContainerQueryRunner .class .getName ());
6970 private final GenericContainer <?> coordinator ;
7071 private final List <GenericContainer <?>> workers = new ArrayList <>();
71- private GenericContainer <?> sidecar ;
72+ private final Optional < GenericContainer <?> > sidecar ;
7273 private final int coordinatorPort ;
7374 private final String catalog ;
7475 private final String schema ;
@@ -88,91 +89,92 @@ public ContainerQueryRunner(int coordinatorPort, String catalog, String schema,
8889 this .catalog = catalog ;
8990 this .schema = schema ;
9091 this .numberOfWorkers = numberOfWorkers ;
92+ this .sidecar = Optional .empty ();
9193
92- // The container details can be added as properties in VM options for testing in IntelliJ.
93- coordinator = createCoordinator ();
94- for (int i = 0 ; i < numberOfWorkers ; i ++) {
95- workers .add (createNativeWorker (7777 + i , "native-worker-" + i ));
96- }
94+ this .coordinator = createCoordinator ();
95+ startWorkers (numberOfWorkers , true , false );
9796
98- coordinator .start ();
99- workers .forEach (GenericContainer ::start );
97+ startCoordinatorAndLogUI ();
98+ initializeConnection ();
99+ cleanupDirectories (numberOfWorkers , true , false );
100+ }
100101
101- TimeUnit .SECONDS .sleep (5 );
102+ public ContainerQueryRunner (int numberOfWorkers , boolean isNativeCluster , boolean isSidecarEnabled , boolean isSidecarDelayed )
103+ throws IOException , InterruptedException
104+ {
105+ this .coordinatorPort = DEFAULT_COORDINATOR_PORT ;
106+ this .catalog = TPCH_CATALOG ;
107+ this .schema = TINY_SCHEMA ;
108+ this .numberOfWorkers = numberOfWorkers ;
102109
103- String dockerHostIp = coordinator . getHost ( );
104- logger . info ( "Presto UI is accessible at http://" + dockerHostIp + ":" + coordinator . getMappedPort ( coordinatorPort ) );
110+ this . coordinator = createCoordinator ( isNativeCluster , isSidecarEnabled );
111+ startWorkers ( numberOfWorkers , isNativeCluster , isSidecarEnabled );
105112
106- String url = String .format ("jdbc:presto://%s:%s/%s/%s?%s" ,
107- dockerHostIp ,
108- coordinator .getMappedPort (coordinatorPort ),
109- catalog ,
110- schema ,
111- "timeZoneId=UTC" );
112-
113- try {
114- connection = getConnection (url , "test" , null );
113+ if (isSidecarEnabled ) {
114+ GenericContainer <?> sidecarContainer = createSidecar (7777 + numberOfWorkers , "sidecar" );
115+ if (isSidecarDelayed ) {
116+ Thread .sleep (10000 );
117+ }
118+ sidecarContainer .start ();
119+ this .sidecar = Optional .of (sidecarContainer );
115120 }
116- catch ( SQLException e ) {
117- throw new RuntimeException ( e );
121+ else {
122+ this . sidecar = Optional . empty ( );
118123 }
119124
120- // Delete the temporary files once the containers are started.
121- ContainerQueryRunnerUtils .deleteDirectory (BASE_DIR + "/testcontainers/coordinator" );
122- for (int i = 0 ; i < numberOfWorkers ; i ++) {
123- ContainerQueryRunnerUtils .deleteDirectory (BASE_DIR + "/testcontainers/native-worker-" + i );
125+ // Need some extra time for sidecar to register otherwise it throws sidecar not found error
126+ if (isSidecarEnabled && !isSidecarDelayed ) {
127+ TimeUnit .SECONDS .sleep (60 );
124128 }
129+
130+ startCoordinatorAndLogUI ();
131+ initializeConnection ();
132+ cleanupDirectories (numberOfWorkers , isNativeCluster , isSidecarEnabled );
125133 }
126134
127- public ContainerQueryRunner (int numberOfWorkers , boolean isNativeCluster , boolean isSidecarEnabled , boolean isSidecarDelayed )
128- throws IOException , InterruptedException
135+ private void startWorkers (int numberOfWorkers , boolean isNativeCluster , boolean isSidecarEnabled )
136+ throws InterruptedException , IOException
129137 {
130- this .coordinatorPort = DEFAULT_COORDINATOR_PORT ;
131- this .catalog = TPCH_CATALOG ;
132- this .schema = TINY_SCHEMA ;
133- this .numberOfWorkers = numberOfWorkers ;
134- coordinator = createCoordinator (isNativeCluster , isSidecarEnabled );
135138 coordinator .start ();
136- // Delete the temporary files once the containers are started.
137139 ContainerQueryRunnerUtils .deleteDirectory (BASE_DIR + "/testcontainers/coordinator" );
138140
139141 if (isNativeCluster ) {
140142 for (int i = 0 ; i < numberOfWorkers ; i ++) {
141143 workers .add (createNativeWorker (7777 + i , "native-worker-" + i , isSidecarEnabled , false ));
142144 }
143- workers .forEach (GenericContainer ::start );
144- for (int i = 0 ; i < numberOfWorkers ; i ++) {
145- ContainerQueryRunnerUtils .deleteDirectory (BASE_DIR + "/testcontainers/native-worker-" + i );
146- }
147145 }
148146 else {
149147 for (int i = 0 ; i < numberOfWorkers ; i ++) {
150148 workers .add (createJavaWorker (7777 + i , "java-worker-" + i ));
151149 }
152- workers .forEach (GenericContainer ::start );
153- for (int i = 0 ; i < numberOfWorkers ; i ++) {
154- ContainerQueryRunnerUtils .deleteDirectory (BASE_DIR + "/testcontainers/java-worker-" + i );
155- }
156150 }
157151
158- if (isSidecarEnabled ) {
159- sidecar = createSidecar (7777 + numberOfWorkers , "sidecar" );
160- if (isSidecarDelayed ) {
161- Thread .sleep (10000 );
162- }
163- sidecar .start ();
164- ContainerQueryRunnerUtils .deleteDirectory (BASE_DIR + "/testcontainers/sidecar" );
165- }
152+ workers .forEach (GenericContainer ::start );
166153
167154 TimeUnit .SECONDS .sleep (5 );
168- // Need some extra time for sidecar to register otherwise it throws sidecar not found error
169- if (isSidecarEnabled && !isSidecarDelayed ) {
170- TimeUnit .SECONDS .sleep (60 );
155+ }
156+
157+ private void cleanupDirectories (int numberOfWorkers , boolean isNativeCluster , boolean isSidecarEnabled )
158+ {
159+ for (int i = 0 ; i < numberOfWorkers ; i ++) {
160+ String workerType = isNativeCluster ? "native-worker-" : "java-worker-" ;
161+ ContainerQueryRunnerUtils .deleteDirectory (BASE_DIR + "/testcontainers/" + workerType + i );
162+ }
163+
164+ if (isSidecarEnabled ) {
165+ ContainerQueryRunnerUtils .deleteDirectory (BASE_DIR + "/testcontainers/sidecar" );
171166 }
167+ }
172168
169+ private void startCoordinatorAndLogUI ()
170+ {
173171 String dockerHostIp = coordinator .getHost ();
174172 logger .info ("Presto UI is accessible at http://" + dockerHostIp + ":" + coordinator .getMappedPort (coordinatorPort ));
173+ }
175174
175+ private void initializeConnection ()
176+ {
177+ String dockerHostIp = coordinator .getHost ();
176178 String url = String .format ("jdbc:presto://%s:%s/%s/%s?%s" ,
177179 dockerHostIp ,
178180 coordinator .getMappedPort (coordinatorPort ),
@@ -199,7 +201,7 @@ private GenericContainer<?> createCoordinator(boolean isNativeCluster, boolean i
199201 ContainerQueryRunnerUtils .createCoordinatorTpchProperties ();
200202 ContainerQueryRunnerUtils .createCoordinatorTpcdsProperties ();
201203 ContainerQueryRunnerUtils .createCoordinatorConfigProperties (coordinatorPort , isNativeCluster , isSidecarEnabled );
202- if (isSidecarEnabled ) {
204+ if (isSidecarEnabled && isNativeCluster ) {
203205 ContainerQueryRunnerUtils .createCoordinatorSidecarProperties ();
204206 }
205207 ContainerQueryRunnerUtils .createCoordinatorJvmConfig ();
@@ -274,9 +276,7 @@ public void close()
274276 }
275277 coordinator .stop ();
276278 workers .forEach (GenericContainer ::stop );
277- if (sidecar != null ) {
278- sidecar .stop ();
279- }
279+ sidecar .ifPresent (GenericContainer ::stop );
280280 }
281281
282282 @ Override
0 commit comments