Skip to content

Commit 39c9a0c

Browse files
[native] FusionNext - Infrastructure testing
1 parent 4cd16b0 commit 39c9a0c

File tree

5 files changed

+325
-40
lines changed

5 files changed

+325
-40
lines changed

presto-native-execution/src/test/java/com/facebook/presto/nativeworker/ContainerQueryRunner.java

Lines changed: 110 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ public class ContainerQueryRunner
6868
private static final Logger logger = Logger.getLogger(ContainerQueryRunner.class.getName());
6969
private final GenericContainer<?> coordinator;
7070
private final List<GenericContainer<?>> workers = new ArrayList<>();
71+
private GenericContainer<?> sidecar = null;
7172
private final int coordinatorPort;
7273
private final String catalog;
7374
private final String schema;
@@ -123,12 +124,84 @@ public ContainerQueryRunner(int coordinatorPort, String catalog, String schema,
123124
}
124125
}
125126

127+
public ContainerQueryRunner(int numberOfWorkers, boolean isNativeCluster, boolean isSidecarEnabled, boolean isSidecarDelayed)
128+
throws IOException, InterruptedException
129+
{
130+
this.coordinatorPort = DEFAULT_COORDINATOR_PORT;
131+
this.catalog = TPCH_CATALOG;
132+
this.schema = TINY_SCHEMA;
133+
this.numberOfWorkers = numberOfWorkers;
134+
coordinator = createCoordinator(isNativeCluster, isSidecarEnabled);
135+
coordinator.start();
136+
// Delete the temporary files once the containers are started.
137+
ContainerQueryRunnerUtils.deleteDirectory(BASE_DIR + "/testcontainers/coordinator");
138+
139+
if (isNativeCluster) {
140+
for (int i = 0; i < numberOfWorkers; i++) {
141+
workers.add(createNativeWorker(7777 + i, "native-worker-" + i, isSidecarEnabled, false));
142+
}
143+
workers.forEach(GenericContainer::start);
144+
for (int i = 0; i < numberOfWorkers; i++) {
145+
ContainerQueryRunnerUtils.deleteDirectory(BASE_DIR + "/testcontainers/native-worker-" + i);
146+
}
147+
}
148+
else {
149+
for (int i = 0; i < numberOfWorkers; i++) {
150+
workers.add(createJavaWorker(7777 + i, "java-worker-" + i));
151+
}
152+
workers.forEach(GenericContainer::start);
153+
for (int i = 0; i < numberOfWorkers; i++) {
154+
ContainerQueryRunnerUtils.deleteDirectory(BASE_DIR + "/testcontainers/java-worker-" + i);
155+
}
156+
}
157+
158+
if (isSidecarEnabled) {
159+
sidecar = createSidecar(7777 + numberOfWorkers, "sidecar");
160+
if (isSidecarDelayed) {
161+
Thread.sleep(10000);
162+
}
163+
sidecar.start();
164+
ContainerQueryRunnerUtils.deleteDirectory(BASE_DIR + "/testcontainers/sidecar");
165+
}
166+
167+
TimeUnit.SECONDS.sleep(5);
168+
// Need some extra time for sidecar to register otherwise it throws sidecar not found error
169+
if (isSidecarEnabled && !isSidecarDelayed) {
170+
TimeUnit.SECONDS.sleep(60);
171+
}
172+
173+
String dockerHostIp = coordinator.getHost();
174+
logger.info("Presto UI is accessible at http://" + dockerHostIp + ":" + coordinator.getMappedPort(coordinatorPort));
175+
176+
String url = String.format("jdbc:presto://%s:%s/%s/%s?%s",
177+
dockerHostIp,
178+
coordinator.getMappedPort(coordinatorPort),
179+
catalog,
180+
schema,
181+
"timeZoneId=UTC");
182+
183+
try {
184+
connection = getConnection(url, "test", null);
185+
}
186+
catch (SQLException e) {
187+
throw new RuntimeException(e);
188+
}
189+
}
190+
126191
private GenericContainer<?> createCoordinator()
127192
throws IOException
193+
{
194+
return createCoordinator(true, false);
195+
}
196+
private GenericContainer<?> createCoordinator(boolean isNativeCluster, boolean isSidecarEnabled)
197+
throws IOException
128198
{
129199
ContainerQueryRunnerUtils.createCoordinatorTpchProperties();
130200
ContainerQueryRunnerUtils.createCoordinatorTpcdsProperties();
131-
ContainerQueryRunnerUtils.createCoordinatorConfigProperties(coordinatorPort);
201+
ContainerQueryRunnerUtils.createCoordinatorConfigProperties(coordinatorPort, isNativeCluster, isSidecarEnabled);
202+
if (isSidecarEnabled) {
203+
ContainerQueryRunnerUtils.createCoordinatorSidecarProperties();
204+
}
132205
ContainerQueryRunnerUtils.createCoordinatorJvmConfig();
133206
ContainerQueryRunnerUtils.createCoordinatorLogProperties();
134207
ContainerQueryRunnerUtils.createCoordinatorNodeProperties();
@@ -144,11 +217,40 @@ private GenericContainer<?> createCoordinator()
144217
.withStartupTimeout(Duration.ofSeconds(Long.parseLong(CONTAINER_TIMEOUT)));
145218
}
146219

147-
private GenericContainer<?> createNativeWorker(int port, String nodeId)
220+
private GenericContainer<?> createJavaWorker(int port, String nodeId)
148221
throws IOException
149222
{
150-
ContainerQueryRunnerUtils.createNativeWorkerConfigProperties(coordinatorPort, nodeId);
223+
ContainerQueryRunnerUtils.createJavaWorkerConfigProperties(port, coordinatorPort, nodeId);
151224
ContainerQueryRunnerUtils.createNativeWorkerTpchProperties(nodeId);
225+
ContainerQueryRunnerUtils.createJavaEntryPointScript(nodeId);
226+
ContainerQueryRunnerUtils.createNativeWorkerNodeProperties(nodeId);
227+
return new GenericContainer<>(PRESTO_COORDINATOR_IMAGE)
228+
.withExposedPorts(port)
229+
.withNetwork(network)
230+
.withNetworkAliases(nodeId)
231+
.withCopyFileToContainer(MountableFile.forHostPath(BASE_DIR + "/testcontainers/" + nodeId + "/etc"), "/opt/presto-server/etc")
232+
.withCopyFileToContainer(MountableFile.forHostPath(BASE_DIR + "/testcontainers/" + nodeId + "/entrypoint.sh"), "/opt/entrypoint.sh");
233+
}
234+
235+
private GenericContainer<?> createSidecar(int port, String nodeId)
236+
throws IOException
237+
{
238+
return createNativeWorker(port, nodeId, true, true);
239+
}
240+
241+
private GenericContainer<?> createNativeWorker(int port, String nodeId)
242+
throws IOException
243+
{
244+
return createNativeWorker(port, nodeId, false, false);
245+
}
246+
247+
private GenericContainer<?> createNativeWorker(int port, String nodeId, boolean isSidecarEnabled, boolean isSidecarNode)
248+
throws IOException
249+
{
250+
ContainerQueryRunnerUtils.createNativeWorkerConfigProperties(coordinatorPort, nodeId, isSidecarEnabled, isSidecarNode);
251+
if (!isSidecarEnabled) {
252+
ContainerQueryRunnerUtils.createNativeWorkerTpchProperties(nodeId);
253+
}
152254
ContainerQueryRunnerUtils.createNativeWorkerEntryPointScript(nodeId);
153255
ContainerQueryRunnerUtils.createNativeWorkerNodeProperties(nodeId);
154256
ContainerQueryRunnerUtils.createNativeWorkerVeloxProperties(nodeId);
@@ -172,6 +274,9 @@ public void close()
172274
}
173275
coordinator.stop();
174276
workers.forEach(GenericContainer::stop);
277+
if(sidecar != null) {
278+
sidecar.stop();
279+
}
175280
}
176281

177282
@Override
@@ -312,7 +417,8 @@ public MaterializedResult execute(Session session, String sql)
312417
return ContainerQueryRunnerUtils.toMaterializedResult(resultSet);
313418
}
314419
catch (SQLException e) {
315-
throw new RuntimeException("Error executing query: " + sql, e);
420+
e.printStackTrace();
421+
throw new RuntimeException("Error executing query: " + sql + "\n" + e.getMessage());
316422
}
317423
}
318424
}

presto-native-execution/src/test/java/com/facebook/presto/nativeworker/ContainerQueryRunnerUtils.java

Lines changed: 112 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -13,26 +13,7 @@
1313
*/
1414
package com.facebook.presto.nativeworker;
1515

16-
import com.facebook.presto.common.type.ArrayType;
17-
import com.facebook.presto.common.type.BigintType;
18-
import com.facebook.presto.common.type.BooleanType;
19-
import com.facebook.presto.common.type.CharType;
20-
import com.facebook.presto.common.type.DateType;
21-
import com.facebook.presto.common.type.DecimalType;
22-
import com.facebook.presto.common.type.DoubleType;
23-
import com.facebook.presto.common.type.IntegerType;
24-
import com.facebook.presto.common.type.RealType;
25-
import com.facebook.presto.common.type.SmallintType;
26-
import com.facebook.presto.common.type.TimeType;
27-
import com.facebook.presto.common.type.TimestampType;
28-
import com.facebook.presto.common.type.TimestampWithTimeZoneType;
29-
import com.facebook.presto.common.type.TinyintType;
30-
import com.facebook.presto.common.type.Type;
31-
import com.facebook.presto.common.type.TypeSignature;
32-
import com.facebook.presto.common.type.TypeSignatureParameter;
33-
import com.facebook.presto.common.type.UnknownType;
34-
import com.facebook.presto.common.type.VarbinaryType;
35-
import com.facebook.presto.common.type.VarcharType;
16+
import com.facebook.presto.common.type.*;
3617
import com.facebook.presto.testing.MaterializedResult;
3718
import com.facebook.presto.testing.MaterializedRow;
3819

@@ -45,11 +26,7 @@
4526
import java.sql.ResultSet;
4627
import java.sql.ResultSetMetaData;
4728
import java.sql.SQLException;
48-
import java.util.ArrayList;
49-
import java.util.Arrays;
50-
import java.util.List;
51-
import java.util.Map;
52-
import java.util.Properties;
29+
import java.util.*;
5330

5431
public class ContainerQueryRunnerUtils
5532
{
@@ -82,20 +59,35 @@ public static void createNativeWorkerTpchProperties(String nodeId)
8259
createPropertiesFile("testcontainers/" + nodeId + "/etc/catalog/tpch.properties", properties);
8360
}
8461

85-
public static void createNativeWorkerConfigProperties(int coordinatorPort, String nodeId)
62+
public static void createNativeWorkerConfigProperties(
63+
int coordinatorPort,
64+
String nodeId,
65+
boolean isSidecarEnabled,
66+
boolean isSidecarNode)
8667
throws IOException
8768
{
8869
Properties properties = new Properties();
8970
properties.setProperty("presto.version", "testversion");
9071
properties.setProperty("http-server.http.port", "7777");
9172
properties.setProperty("discovery.uri", "http://presto-coordinator:" + coordinatorPort);
9273
properties.setProperty("system-memory-gb", "2");
93-
properties.setProperty("native.sidecar", "false");
74+
75+
if (isSidecarEnabled) {
76+
properties.setProperty("presto.default-namespace", "native.default");
77+
}
78+
79+
properties.setProperty("native-sidecar", String.valueOf(isSidecarNode));
9480
createPropertiesFile("testcontainers/" + nodeId + "/etc/config.properties", properties);
9581
}
9682

9783
public static void createCoordinatorConfigProperties(int port)
9884
throws IOException
85+
{
86+
createCoordinatorConfigProperties(port, true, false);
87+
}
88+
89+
public static void createCoordinatorConfigProperties(int port, boolean isNativeCluster, boolean isSidecarEnabled)
90+
throws IOException
9991
{
10092
Properties properties = new Properties();
10193
properties.setProperty("coordinator", "true");
@@ -104,16 +96,58 @@ public static void createCoordinatorConfigProperties(int port)
10496
properties.setProperty("http-server.http.port", Integer.toString(port));
10597
properties.setProperty("discovery-server.enabled", "true");
10698
properties.setProperty("discovery.uri", "http://presto-coordinator:" + port);
99+
if (isSidecarEnabled) {
100+
properties.setProperty("presto.default-namespace", "native.default");
101+
properties.setProperty("coordinator-sidecar-enabled", "true");
102+
properties.setProperty("exclude-invalid-worker-session-properties", "true");
103+
}
107104

108-
// Get native worker system properties and add them to the coordinator properties
109-
Map<String, String> nativeWorkerProperties = NativeQueryRunnerUtils.getNativeWorkerSystemProperties();
110-
for (Map.Entry<String, String> entry : nativeWorkerProperties.entrySet()) {
111-
properties.setProperty(entry.getKey(), entry.getValue());
105+
if (isNativeCluster) {
106+
// Get native worker system properties and add them to the coordinator properties
107+
Map<String, String> nativeWorkerProperties = NativeQueryRunnerUtils.getNativeWorkerSystemProperties();
108+
for (Map.Entry<String, String> entry : nativeWorkerProperties.entrySet()) {
109+
properties.setProperty(entry.getKey(), entry.getValue());
110+
}
112111
}
113112

114113
createPropertiesFile("testcontainers/coordinator/etc/config.properties", properties);
115114
}
116115

116+
public static void createJavaWorkerConfigProperties(int port, int coordinatorPort, String nodeId)
117+
throws IOException
118+
{
119+
Properties properties = new Properties();
120+
properties.setProperty("coordinator", "false");
121+
properties.setProperty("presto.version", "testversion");
122+
properties.setProperty("node-scheduler.include-coordinator", "false");
123+
properties.setProperty("http-server.http.port", Integer.toString(port));
124+
properties.setProperty("discovery.uri", "http://presto-coordinator:" + coordinatorPort);
125+
createPropertiesFile("testcontainers/" + nodeId + "/etc/config.properties", properties);
126+
}
127+
128+
public static void createCoordinatorSidecarProperties()
129+
throws IOException
130+
{
131+
Properties properties1 = new Properties();
132+
properties1.setProperty("function-namespace-manager.name", "native");
133+
properties1.setProperty("function-implementation-type", "CPP");
134+
properties1.setProperty("supported-function-languages", "CPP");
135+
136+
createPropertiesFile("testcontainers/coordinator/etc/function-namespace/native.properties", properties1);
137+
138+
Properties properties2 = new Properties();
139+
properties2.setProperty("session-property-provider.name", "native");
140+
createPropertiesFile("testcontainers/coordinator/etc/session-property-provider/native.properties", properties2);
141+
142+
Properties properties3 = new Properties();
143+
properties3.setProperty("type-manager.name", "native");
144+
createPropertiesFile("testcontainers/coordinator/etc/type-managers/native.properties", properties3);
145+
146+
Properties properties4 = new Properties();
147+
properties4.setProperty("plan-checker-provider.name", "native");
148+
createPropertiesFile("testcontainers/coordinator/etc/plan-checker-providers/native.properties", properties4);
149+
}
150+
117151
public static void createCoordinatorJvmConfig()
118152
throws IOException
119153

@@ -170,10 +204,7 @@ public static void createNativeWorkerVeloxProperties(String nodeId)
170204
public static void createCoordinatorEntryPointScript()
171205
throws IOException
172206
{
173-
String scriptContent = "#!/bin/sh\n" +
174-
"set -e\n" +
175-
"$PRESTO_HOME/bin/launcher run\n";
176-
createScriptFile("testcontainers/coordinator/entrypoint.sh", scriptContent);
207+
createJavaEntryPointScript("coordinator");
177208
}
178209

179210
public static void createNativeWorkerEntryPointScript(String nodeId)
@@ -185,6 +216,15 @@ public static void createNativeWorkerEntryPointScript(String nodeId)
185216
createScriptFile("testcontainers/" + nodeId + "/entrypoint.sh", scriptContent);
186217
}
187218

219+
public static void createJavaEntryPointScript(String nodeId)
220+
throws IOException
221+
{
222+
String scriptContent = "#!/bin/sh\n" +
223+
"set -e\n" +
224+
"$PRESTO_HOME/bin/launcher run\n";
225+
createScriptFile("testcontainers/" + nodeId + "/entrypoint.sh", scriptContent);
226+
}
227+
188228
public static void deleteDirectory(String directoryPath)
189229
{
190230
File directory = new File(directoryPath);
@@ -353,6 +393,9 @@ private static Type mapSqlTypeToType(int sqlType, String typeName)
353393
case java.sql.Types.VARBINARY:
354394
case java.sql.Types.LONGVARBINARY:
355395
return VarbinaryType.VARBINARY;
396+
case java.sql.Types.NULL:
397+
// This happens in select fail() or similar cases
398+
return VarcharType.createUnboundedVarcharType();
356399
case java.sql.Types.OTHER:
357400
// Attempt to map based on type name
358401
return mapSqlTypeNameToType(typeName);
@@ -363,6 +406,40 @@ private static Type mapSqlTypeToType(int sqlType, String typeName)
363406

364407
private static Type mapSqlTypeNameToType(String typeName)
365408
{
409+
String upperTypeName = typeName.toUpperCase(Locale.ENGLISH).trim();
410+
411+
// Handle flat ROW types like ROW(VARCHAR, INTEGER) or ROW(a VARCHAR, b INTEGER)
412+
if (upperTypeName.startsWith("ROW(") && upperTypeName.endsWith(")")) {
413+
String fieldsPart = upperTypeName.substring(4, upperTypeName.length() - 1).trim();
414+
415+
// Split fields by comma (no nested ROWs expected)
416+
String[] fieldTokens = fieldsPart.split(",");
417+
418+
List<RowType.Field> rowFields = new ArrayList<>();
419+
for (int i = 0; i < fieldTokens.length; i++) {
420+
String token = fieldTokens[i].trim();
421+
422+
// Handle both: "a VARCHAR" and "VARCHAR"
423+
String[] parts = token.split("\\s+", 2);
424+
String fieldName;
425+
String fieldType;
426+
427+
if (parts.length == 2) {
428+
fieldName = parts[0];
429+
fieldType = parts[1];
430+
}
431+
else {
432+
fieldName = "field" + i;
433+
fieldType = parts[0];
434+
}
435+
436+
Type innerType = mapSqlTypeNameToType(fieldType);
437+
rowFields.add(new RowType.Field(Optional.of(fieldName), innerType));
438+
}
439+
440+
return RowType.from(rowFields);
441+
}
442+
366443
switch (typeName.toUpperCase()) {
367444
case "INT":
368445
case "INTEGER":

0 commit comments

Comments
 (0)