Skip to content

Commit 80e1408

Browse files
added code for nativeclusterrunnerwithsidecar and testcase
1 parent 0729c1d commit 80e1408

File tree

4 files changed

+830
-1
lines changed

4 files changed

+830
-1
lines changed
Lines changed: 339 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,339 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
package com.facebook.presto.nativeworker;
16+
17+
import com.facebook.presto.Session;
18+
import com.facebook.presto.common.QualifiedObjectName;
19+
import com.facebook.presto.common.type.Type;
20+
import com.facebook.presto.cost.StatsCalculator;
21+
import com.facebook.presto.metadata.Metadata;
22+
import com.facebook.presto.spi.Plugin;
23+
import com.facebook.presto.spi.eventlistener.EventListener;
24+
import com.facebook.presto.split.PageSourceManager;
25+
import com.facebook.presto.split.SplitManager;
26+
import com.facebook.presto.sql.expressions.ExpressionOptimizerManager;
27+
import com.facebook.presto.sql.planner.ConnectorPlanOptimizerManager;
28+
import com.facebook.presto.sql.planner.NodePartitioningManager;
29+
import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager;
30+
import com.facebook.presto.testing.MaterializedResult;
31+
import com.facebook.presto.testing.QueryRunner;
32+
import com.facebook.presto.testing.TestingAccessControlManager;
33+
import com.facebook.presto.transaction.TransactionManager;
34+
import org.testcontainers.containers.GenericContainer;
35+
import org.testcontainers.containers.Network;
36+
import org.testcontainers.containers.wait.strategy.Wait;
37+
import org.testcontainers.utility.MountableFile;
38+
39+
import java.io.IOException;
40+
import java.sql.Connection;
41+
import java.sql.ResultSet;
42+
import java.sql.SQLException;
43+
import java.sql.Statement;
44+
import java.time.Duration;
45+
import java.util.ArrayList;
46+
import java.util.List;
47+
import java.util.Map;
48+
import java.util.concurrent.TimeUnit;
49+
import java.util.concurrent.locks.Lock;
50+
import java.util.logging.Logger;
51+
52+
import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
53+
import static java.sql.DriverManager.getConnection;
54+
55+
public class ContainerNativeQueryRunnerWithSidecar
56+
implements QueryRunner
57+
{
58+
private static final Network network = Network.newNetwork();
59+
private static final String PRESTO_COORDINATOR_IMAGE = System.getProperty("coordinatorImage", "presto-coordinator:latest");
60+
private static final String PRESTO_WORKER_IMAGE = System.getProperty("workerImage", "presto-worker:latest");
61+
private static final String CONTAINER_TIMEOUT = System.getProperty("containerTimeout", "120");
62+
private static final String CLUSTER_SHUTDOWN_TIMEOUT = System.getProperty("clusterShutDownTimeout", "10");
63+
private static final String BASE_DIR = System.getProperty("user.dir");
64+
private static final int DEFAULT_COORDINATOR_PORT = 8081;
65+
private static final String TPCH_CATALOG = "tpch";
66+
private static final String TINY_SCHEMA = "tiny";
67+
private static final int DEFAULT_NUMBER_OF_WORKERS = 4;
68+
private static final Logger logger = Logger.getLogger(ContainerQueryRunner.class.getName());
69+
private final GenericContainer<?> coordinator;
70+
private final List<GenericContainer<?>> workers = new ArrayList<>();
71+
private final GenericContainer<?> sidecar;
72+
private final int coordinatorPort;
73+
private final String catalog;
74+
private final String schema;
75+
private final int numberOfWorkers;
76+
private Connection connection;
77+
78+
public ContainerNativeQueryRunnerWithSidecar()
79+
throws InterruptedException, IOException
80+
{
81+
this(DEFAULT_COORDINATOR_PORT, TPCH_CATALOG, TINY_SCHEMA, DEFAULT_NUMBER_OF_WORKERS);
82+
}
83+
84+
public ContainerNativeQueryRunnerWithSidecar(int coordinatorPort, String catalog, String schema, int numberOfWorkers)
85+
throws InterruptedException, IOException
86+
{
87+
this.coordinatorPort = coordinatorPort;
88+
this.catalog = catalog;
89+
this.schema = schema;
90+
this.numberOfWorkers = numberOfWorkers;
91+
92+
// The container details can be added as properties in VM options for testing in IntelliJ.
93+
coordinator = createCoordinator();
94+
for (int i = 0; i < numberOfWorkers; i++) {
95+
workers.add(createNativeWorker(7777 + i, "native-worker-" + i));
96+
}
97+
sidecar = createSidecar(7777 + numberOfWorkers, "sidecar");
98+
99+
coordinator.start();
100+
workers.forEach(GenericContainer::start);
101+
sidecar.start();
102+
103+
TimeUnit.SECONDS.sleep(5);
104+
105+
String dockerHostIp = coordinator.getHost();
106+
logger.info("Presto UI is accessible at http://" + dockerHostIp + ":" + coordinator.getMappedPort(coordinatorPort));
107+
108+
String url = String.format("jdbc:presto://%s:%s/%s/%s?%s",
109+
dockerHostIp,
110+
coordinator.getMappedPort(coordinatorPort),
111+
catalog,
112+
schema,
113+
"timeZoneId=UTC");
114+
115+
try {
116+
connection = getConnection(url, "test", null);
117+
}
118+
catch (SQLException e) {
119+
throw new RuntimeException(e);
120+
}
121+
122+
// Delete the temporary files once the containers are started.
123+
ContainerNativeQueryRunnerWithSidecarUtils.deleteDirectory(BASE_DIR + "/testcontainers/coordinator");
124+
for (int i = 0; i < numberOfWorkers; i++) {
125+
ContainerNativeQueryRunnerWithSidecarUtils.deleteDirectory(BASE_DIR + "/testcontainers/native-worker-" + i);
126+
}
127+
ContainerNativeQueryRunnerWithSidecarUtils.deleteDirectory(BASE_DIR + "/testcontainers/sidecar");
128+
}
129+
130+
private GenericContainer<?> createCoordinator()
131+
throws IOException
132+
{
133+
ContainerNativeQueryRunnerWithSidecarUtils.createCoordinatorTpchProperties();
134+
ContainerNativeQueryRunnerWithSidecarUtils.createCoordinatorTpcdsProperties();
135+
ContainerNativeQueryRunnerWithSidecarUtils.createCoordinatorConfigProperties(coordinatorPort);
136+
ContainerNativeQueryRunnerWithSidecarUtils.createCoordinatorSidecarProperties();
137+
ContainerNativeQueryRunnerWithSidecarUtils.createCoordinatorJvmConfig();
138+
ContainerNativeQueryRunnerWithSidecarUtils.createCoordinatorLogProperties();
139+
ContainerNativeQueryRunnerWithSidecarUtils.createCoordinatorNodeProperties();
140+
ContainerNativeQueryRunnerWithSidecarUtils.createCoordinatorEntryPointScript();
141+
142+
return new GenericContainer<>(PRESTO_COORDINATOR_IMAGE)
143+
.withExposedPorts(coordinatorPort)
144+
.withNetwork(network)
145+
.withNetworkAliases("presto-coordinator")
146+
.withCopyFileToContainer(MountableFile.forHostPath(BASE_DIR + "/testcontainers/coordinator/etc"), "/opt/presto-server/etc")
147+
.withCopyFileToContainer(MountableFile.forHostPath(BASE_DIR + "/testcontainers/coordinator/entrypoint.sh"), "/opt/entrypoint.sh")
148+
.waitingFor(Wait.forLogMessage(".*======== SERVER STARTED ========.*", 1))
149+
.withStartupTimeout(Duration.ofSeconds(Long.parseLong(CONTAINER_TIMEOUT)));
150+
}
151+
152+
private GenericContainer<?> createSidecar(int port, String nodeId)
153+
throws IOException
154+
{
155+
ContainerNativeQueryRunnerWithSidecarUtils.createSidecarConfigProperties(coordinatorPort, nodeId);
156+
ContainerNativeQueryRunnerWithSidecarUtils.createNativeWorkerEntryPointScript(nodeId);
157+
ContainerNativeQueryRunnerWithSidecarUtils.createNativeWorkerNodeProperties(nodeId);
158+
ContainerNativeQueryRunnerWithSidecarUtils.createNativeWorkerVeloxProperties(nodeId);
159+
return new GenericContainer<>(PRESTO_WORKER_IMAGE)
160+
.withExposedPorts(port)
161+
.withNetwork(network)
162+
.withNetworkAliases(nodeId)
163+
.withCopyFileToContainer(MountableFile.forHostPath(BASE_DIR + "/testcontainers/" + nodeId + "/etc"), "/opt/presto-server/etc")
164+
.withCopyFileToContainer(MountableFile.forHostPath(BASE_DIR + "/testcontainers/" + nodeId + "/entrypoint.sh"), "/opt/entrypoint.sh")
165+
.waitingFor(Wait.forLogMessage(".*Announcement succeeded: HTTP 202.*", 1));
166+
}
167+
168+
private GenericContainer<?> createNativeWorker(int port, String nodeId)
169+
throws IOException
170+
{
171+
ContainerNativeQueryRunnerWithSidecarUtils.createNativeWorkerConfigProperties(coordinatorPort, nodeId);
172+
ContainerNativeQueryRunnerWithSidecarUtils.createNativeWorkerTpchProperties(nodeId);
173+
ContainerNativeQueryRunnerWithSidecarUtils.createNativeWorkerEntryPointScript(nodeId);
174+
ContainerNativeQueryRunnerWithSidecarUtils.createNativeWorkerNodeProperties(nodeId);
175+
ContainerNativeQueryRunnerWithSidecarUtils.createNativeWorkerVeloxProperties(nodeId);
176+
return new GenericContainer<>(PRESTO_WORKER_IMAGE)
177+
.withExposedPorts(port)
178+
.withNetwork(network)
179+
.withNetworkAliases(nodeId)
180+
.withCopyFileToContainer(MountableFile.forHostPath(BASE_DIR + "/testcontainers/" + nodeId + "/etc"), "/opt/presto-server/etc")
181+
.withCopyFileToContainer(MountableFile.forHostPath(BASE_DIR + "/testcontainers/" + nodeId + "/entrypoint.sh"), "/opt/entrypoint.sh")
182+
.waitingFor(Wait.forLogMessage(".*Announcement succeeded: HTTP 202.*", 1));
183+
}
184+
185+
@Override
186+
public void close()
187+
{
188+
try {
189+
TimeUnit.SECONDS.sleep(Long.parseLong(CLUSTER_SHUTDOWN_TIMEOUT));
190+
}
191+
catch (InterruptedException e) {
192+
throw new RuntimeException(e);
193+
}
194+
coordinator.stop();
195+
workers.forEach(GenericContainer::stop);
196+
}
197+
198+
@Override
199+
public TransactionManager getTransactionManager()
200+
{
201+
throw new UnsupportedOperationException();
202+
}
203+
204+
@Override
205+
public Metadata getMetadata()
206+
{
207+
throw new UnsupportedOperationException();
208+
}
209+
210+
@Override
211+
public SplitManager getSplitManager()
212+
{
213+
throw new UnsupportedOperationException();
214+
}
215+
216+
@Override
217+
public PageSourceManager getPageSourceManager()
218+
{
219+
throw new UnsupportedOperationException();
220+
}
221+
222+
@Override
223+
public NodePartitioningManager getNodePartitioningManager()
224+
{
225+
throw new UnsupportedOperationException();
226+
}
227+
228+
@Override
229+
public ConnectorPlanOptimizerManager getPlanOptimizerManager()
230+
{
231+
throw new UnsupportedOperationException();
232+
}
233+
234+
@Override
235+
public PlanCheckerProviderManager getPlanCheckerProviderManager()
236+
{
237+
throw new UnsupportedOperationException();
238+
}
239+
240+
@Override
241+
public StatsCalculator getStatsCalculator()
242+
{
243+
throw new UnsupportedOperationException();
244+
}
245+
246+
@Override
247+
public List<EventListener> getEventListeners()
248+
{
249+
throw new UnsupportedOperationException();
250+
}
251+
252+
@Override
253+
public TestingAccessControlManager getAccessControl()
254+
{
255+
throw new UnsupportedOperationException();
256+
}
257+
258+
@Override
259+
public ExpressionOptimizerManager getExpressionManager()
260+
{
261+
throw new UnsupportedOperationException();
262+
}
263+
264+
@Override
265+
public MaterializedResult execute(String sql)
266+
{
267+
throw new UnsupportedOperationException();
268+
}
269+
270+
@Override
271+
public MaterializedResult execute(Session session, String sql, List<? extends Type> resultTypes)
272+
{
273+
throw new UnsupportedOperationException();
274+
}
275+
276+
@Override
277+
public List<QualifiedObjectName> listTables(Session session, String catalog, String schema)
278+
{
279+
throw new UnsupportedOperationException();
280+
}
281+
282+
@Override
283+
public boolean tableExists(Session session, String table)
284+
{
285+
throw new UnsupportedOperationException();
286+
}
287+
288+
@Override
289+
public void installPlugin(Plugin plugin)
290+
{
291+
throw new UnsupportedOperationException();
292+
}
293+
294+
@Override
295+
public void createCatalog(String catalogName, String connectorName, Map<String, String> properties)
296+
{
297+
throw new UnsupportedOperationException();
298+
}
299+
300+
@Override
301+
public void loadFunctionNamespaceManager(String functionNamespaceManagerName, String catalogName, Map<String, String> properties)
302+
{
303+
throw new UnsupportedOperationException();
304+
}
305+
306+
@Override
307+
public Lock getExclusiveLock()
308+
{
309+
throw new UnsupportedOperationException();
310+
}
311+
312+
@Override
313+
public int getNodeCount()
314+
{
315+
throw new UnsupportedOperationException();
316+
}
317+
318+
@Override
319+
public Session getDefaultSession()
320+
{
321+
return testSessionBuilder()
322+
.setCatalog(catalog)
323+
.setSchema(schema)
324+
.build();
325+
}
326+
327+
@Override
328+
public MaterializedResult execute(Session session, String sql)
329+
{
330+
try {
331+
Statement statement = connection.createStatement();
332+
ResultSet resultSet = statement.executeQuery(sql);
333+
return ContainerNativeQueryRunnerWithSidecarUtils.toMaterializedResult(resultSet);
334+
}
335+
catch (SQLException e) {
336+
throw new RuntimeException("Error executing query: " + sql + " \n " + e.getMessage());
337+
}
338+
}
339+
}

0 commit comments

Comments
 (0)