Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/pxf-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ jobs:
- s3
- features
- gpdb
- load
- pxf_extension
steps:
- name: Free disk space
run: |
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ server/tmp
/.vscode/settings.json
/automation/dataTempFolder/
/cli/go/pkg/
/automation/test_artifacts
2 changes: 1 addition & 1 deletion automation/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
<version>2.15</version>
<configuration>
<testFailureIgnore>true</testFailureIgnore>
<argLine>-Xmx2048m -XX:MaxPermSize=512m</argLine>
<argLine>-Xmx4096m</argLine>
<forkCount>1</forkCount>
<reuseForks>false</reuseForks>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ FROM pg_catalog.pg_extension AS e
INNER JOIN pg_catalog.pg_proc AS p ON (p.oid = d.objid)
WHERE d.deptype = 'e' AND e.extname = 'pxf'
ORDER BY 1;
proname | prosrc | probin
--------------------+------------------------------+-------------
pxf_read | pxfprotocol_import | $libdir/pxf
pxf_validate | pxfprotocol_validate_urls | $libdir/pxf
pxf_write | pxfprotocol_export | $libdir/pxf
pxfwritable_export | gpdbwritableformatter_export | $libdir/pxf
pxfwritable_import | gpdbwritableformatter_import | $libdir/pxf
proname | prosrc | probin
--------------------+------------------------------+----------------------------------
pxf_read | pxfprotocol_import | $PXF_HOME/gpextable/pxf
pxf_validate | pxfprotocol_validate_urls | $PXF_HOME/gpextable/pxf
pxf_write | pxfprotocol_export | $PXF_HOME/gpextable/pxf
pxfwritable_export | gpdbwritableformatter_export | $PXF_HOME/gpextable/pxf
pxfwritable_import | gpdbwritableformatter_import | $PXF_HOME/gpextable/pxf
(5 rows)
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ FROM pg_catalog.pg_extension AS e
INNER JOIN pg_catalog.pg_proc AS p ON (p.oid = d.objid)
WHERE d.deptype = 'e' AND e.extname = 'pxf'
ORDER BY 1;
proname | prosrc | probin
---------------------+------------------------------+-------------
pxf_read | pxfprotocol_import | $libdir/pxf
pxf_validate | pxfprotocol_validate_urls | $libdir/pxf
pxf_write | pxfprotocol_export | $libdir/pxf
pxfdelimited_import | pxfdelimited_import | $libdir/pxf
pxfwritable_export | gpdbwritableformatter_export | $libdir/pxf
pxfwritable_import | gpdbwritableformatter_import | $libdir/pxf
proname | prosrc | probin
---------------------+------------------------------+----------------------------------
pxf_read | pxfprotocol_import | $PXF_HOME/gpextable/pxf
pxf_validate | pxfprotocol_validate_urls | $PXF_HOME/gpextable/pxf
pxf_write | pxfprotocol_export | $PXF_HOME/gpextable/pxf
pxfdelimited_import | pxfdelimited_import | $PXF_HOME/gpextable/pxf
pxfwritable_export | gpdbwritableformatter_export | $PXF_HOME/gpextable/pxf
pxfwritable_import | gpdbwritableformatter_import | $PXF_HOME/gpextable/pxf
(6 rows)

Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@
-- end_matchsubs

INSERT INTO pxf_invalid_encoding_json_write SELECT * from gpdb_primitive_types;
ERROR: gpdbwritable formatter can only export UTF8 formatted data. Define the external table with ENCODING UTF8
ERROR: pxfwritable_export formatter can only export UTF8 formatted data. Define the external table with ENCODING UTF8
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
SELECT * FROM pxf_jdbc_read_view_no_params WHERE name='client_min_messages' OR name='default_statistics_target' ORDER BY name;
name | setting
---------------------------+---------
client_min_messages | error
client_min_messages | notice
default_statistics_target | 100
(2 rows)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,16 +570,64 @@ public void writeTableToFile(String destPath, Table dataTable,
if (parent != null) {
fs.mkdirs(parent);
}
FSDataOutputStream out = fs.create(datapath, true,
bufferSize, replicationSize, blockSize);

DataOutputStream dos = out;
if (codec != null) {
dos = new DataOutputStream(codec.createOutputStream(out));
final int maxAttempts = 3;
try {
for (int attempt = 1; attempt <= maxAttempts; attempt++) {
FSDataOutputStream out = null;
DataOutputStream dos = null;
try {
out = fs.create(datapath, true, bufferSize, replicationSize, blockSize);
dos = out;
if (codec != null) {
dos = new DataOutputStream(codec.createOutputStream(out));
}
writeTableToStream(dos, dataTable, delimiter, encoding, newLine);
return;
} catch (Exception e) {
if (attempt >= maxAttempts || !isRetryableWriteException(e)) {
throw e;
}

// Best-effort cleanup before retry (handles partially created files)
try {
if (dos != null) {
dos.close();
}
} catch (Exception ignored) {
}
try {
if (out != null) {
out.close();
}
} catch (Exception ignored) {
}
try {
fs.delete(datapath, false);
} catch (Exception ignored) {
}

ReportUtils.report(report, getClass(),
String.format("HDFS write failed (attempt %d/%d), retrying: %s", attempt, maxAttempts, e.getMessage()));
Thread.sleep(2000L * attempt);
}
}
} finally {
ReportUtils.stopLevel(report);
}
}

writeTableToStream(dos, dataTable, delimiter, encoding, newLine);
ReportUtils.stopLevel(report);
private boolean isRetryableWriteException(Exception e) {
if (e == null) {
return false;
}
String message = e.getMessage();
if (message == null) {
return false;
}
// Common transient failure on single-node HDFS when the only DataNode is briefly unavailable/blacklisted
return message.contains("could only be written to 0 of the 1 minReplication nodes")
|| message.contains("node(s) are excluded in this operation");
}

public void appendTableToFile(String pathToFile, Table dataTable, String delimiter) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,14 @@ public static ProtocolEnum getProtocol() {

ProtocolEnum result;
try {
result = ProtocolEnum.valueOf(System.getProperty(PROTOCOL_KEY, ProtocolEnum.HDFS.name()).toUpperCase());
String protocol = System.getProperty(PROTOCOL_KEY);
if (protocol == null) {
protocol = System.getenv(PROTOCOL_KEY);
}
if (protocol == null) {
protocol = ProtocolEnum.HDFS.name();
}
result = ProtocolEnum.valueOf(protocol.toUpperCase());
} catch (Exception e) {
result = ProtocolEnum.HDFS; // use HDFS as default mode
}
Expand All @@ -23,15 +30,19 @@ public static ProtocolEnum getProtocol() {
}

public static String getSecret() {
return System.getProperty(AWS_SECRET_ACCESS_KEY);
String secret = System.getProperty(AWS_SECRET_ACCESS_KEY);
return secret != null ? secret : System.getenv(AWS_SECRET_ACCESS_KEY);
}

public static String getAccess() {
return System.getProperty(AWS_ACCESS_KEY_ID);
String access = System.getProperty(AWS_ACCESS_KEY_ID);
String result = access != null ? access : System.getenv(AWS_ACCESS_KEY_ID);
return result;
}

public static String getPxfTestKeepData() {
return System.getProperty(PXF_TEST_KEEP_DATA, "false");
String keepData = System.getProperty(PXF_TEST_KEEP_DATA);
return keepData != null ? keepData : System.getenv().getOrDefault(PXF_TEST_KEEP_DATA, "false");
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
import org.testng.annotations.Test;

import java.io.File;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URL;

/** Tests how failures are handled **/
@FailsWithFDW
Expand All @@ -28,14 +32,7 @@ protected void beforeClass() throws Exception {
@Override
protected void afterClass() throws Exception {
super.afterClass();
// We need to restore the service after it has been stopped
if (cluster != null) {
try {
cluster.start(PhdCluster.EnumClusterServices.pxf);
} catch (Exception e) {
// Ignore if service is already running
}
}
ensurePxfRunning();
}

/**
Expand Down Expand Up @@ -64,5 +61,95 @@ public void stopTomcatOnOutOfMemory() throws Exception {
gpdb.createTableAndVerify(pxfExternalTable);

runSqlTest("features/general/outOfMemory");

// The test intentionally kills the PXF JVM; restart it for subsequent tests.
ensurePxfRunning();
}

private void ensurePxfRunning() throws Exception {
Integer port = parsePxfPort();
if (cluster == null || port == null) {
return;
}

String host = getPxfHttpHost();
if (waitForPxfHealthy(host, port, 5_000)) {
return;
}

// Wait for the OOM kill hook to fully stop the old process to avoid false positives
// from jps/Bootstrap checks while the JVM is shutting down.
waitForPortClosed(host, port, 60_000);

for (int attempt = 1; attempt <= 3; attempt++) {
cluster.restart(PhdCluster.EnumClusterServices.pxf);
if (waitForPxfHealthy(host, port, 120_000)) {
return;
}
}
throw new RuntimeException("Failed to restart PXF after OutOfMemory test");
}

private Integer parsePxfPort() {
if (pxfPort == null) {
return null;
}
try {
return Integer.parseInt(pxfPort);
} catch (NumberFormatException ignored) {
return null;
}
}

private String getPxfHttpHost() {
if (pxfHost == null || pxfHost.trim().isEmpty() || "0.0.0.0".equals(pxfHost.trim())) {
return "localhost";
}
return pxfHost.trim();
}

private void waitForPortClosed(String host, int port, long timeoutMs) throws InterruptedException {
long deadline = System.currentTimeMillis() + timeoutMs;
while (System.currentTimeMillis() < deadline) {
if (!isPortOpen(host, port, 500)) {
return;
}
Thread.sleep(500);
}
}

private boolean waitForPxfHealthy(String host, int port, long timeoutMs) throws InterruptedException {
long deadline = System.currentTimeMillis() + timeoutMs;
while (System.currentTimeMillis() < deadline) {
if (isActuatorHealthy(host, port)) {
return true;
}
Thread.sleep(1000);
}
return false;
}

private boolean isPortOpen(String host, int port, int timeoutMs) {
try (Socket socket = new Socket()) {
socket.connect(new InetSocketAddress(host, port), timeoutMs);
return true;
} catch (Exception ignored) {
return false;
}
}

private boolean isActuatorHealthy(String host, int port) {
try {
URL url = new URL(String.format("http://%s:%d/actuator/health", host, port));
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("GET");
connection.setConnectTimeout(2000);
connection.setReadTimeout(2000);
int code = connection.getResponseCode();
connection.disconnect();
return code >= 200 && code < 300;
} catch (Exception ignored) {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ protected void prepareData(Hive hive, Hdfs hdfs, String hiveTypesFileName) throw
}

protected void createTables(Hive hive, String serverName, String gpdbTypesTableName, String gpdbQueryTableName) throws Exception {
String jdbcUrl = HIVE_JDBC_URL_PREFIX + hive.getHost() + ":10000/default;auth=noSasl";
String jdbcUrl = HIVE_JDBC_URL_PREFIX + hive.getHost() + ":10000/default";
String user = null;

// On kerberized cluster, enabled then we need the hive/hiveserver2_hostname principal in the connection string.
Expand Down Expand Up @@ -219,7 +219,7 @@ protected void createTablesForWriteTest(Hive hive, String hiverServerName, Strin
hiveReadable = TableFactory.getPxfJdbcReadableTable(
hiveReadableName, GPDB_WRITE_TYPES_TABLE_FIELDS, targetHiveTable.getFullName(), serverName);
} else {
String jdbcUrl = String.format("%s%s:10000/default;auth=noSasl", HIVE_JDBC_URL_PREFIX, hive.getHost());
String jdbcUrl = String.format("%s%s:10000/default", HIVE_JDBC_URL_PREFIX, hive.getHost());
// create GPDB external table for writing data from GPDB to Hive with JDBC profile
hiveWritable = TableFactory.getPxfJdbcWritableTable(
hiveWritableName, GPDB_WRITE_TYPES_TABLE_FIELDS, targetHiveTable.getFullName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,6 @@ public class MultiServerTest extends BaseFeature {
*/
@Override
public void beforeClass() throws Exception {
if (ProtocolUtils.getProtocol() == ProtocolEnum.HDFS) {
return;
}
// Initialize an additional HDFS system object (optional system object)
hdfs2 = (Hdfs) systemManager.
getSystemObject("/sut", "hdfs2", -1, null, false, null, SutFactory.getInstance().getSutInstance());
Expand All @@ -71,6 +68,10 @@ public void beforeClass() throws Exception {
}

String hdfsWorkingDirectory = hdfs.getWorkingDirectory();
if (hdfsWorkingDirectory == null) {
// Fallback to the default automation working directory to avoid NPE when protocol is HDFS
hdfsWorkingDirectory = "/tmp/pxf_automation_data";
}
defaultPath = hdfsWorkingDirectory + "/" + fileName;

// Initialize server objects
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ public void orcWritePrimitivesReadWithHive() throws Exception {
hive.runQuery(ctasHiveQuery);

// use the Hive JDBC profile to avoid using the PXF ORC reader implementation
String jdbcUrl = HIVE_JDBC_URL_PREFIX + hive.getHost() + ":10000/default;auth=noSasl";
String jdbcUrl = HIVE_JDBC_URL_PREFIX + hive.getHost() + ":10000/default";
ExternalTable exHiveJdbcTable = TableFactory.getPxfJdbcReadableTable(
gpdbTableNamePrefix + "_readable", ORC_PRIMITIVE_TABLE_COLUMNS_READ_FROM_HIVE,
hiveTable.getName() + "_ctas", HIVE_JDBC_DRIVER_CLASS, jdbcUrl, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ public void parquetWriteListsReadWithHive() throws Exception {
}

// use the Hive JDBC profile to avoid using the PXF Parquet reader implementation
String jdbcUrl = HIVE_JDBC_URL_PREFIX + hive.getHost() + ":10000/default;auth=noSasl";
String jdbcUrl = HIVE_JDBC_URL_PREFIX + hive.getHost() + ":10000/default";

ExternalTable exHiveJdbcTable = TableFactory.getPxfJdbcReadableTable(
readTableName, PARQUET_PRIMITIVE_ARRAYS_TABLE_COLUMNS_READ_FROM_HIVE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,11 @@ private void prepareRcData() throws Exception {

private void prepareNativeGpdbData() throws Exception {
gpdbNativeTable = new Table("perf_test", getColumnTypeGpdb());
gpdbNativeTable.setDistributionFields(new String[] { "int0" });
gpdb.createTableAndVerify(gpdbNativeTable);

gpdb.insertData(gpdbTextHiveProfile, gpdbNativeTable);
gpdb.runQuery(String.format("INSERT INTO %s SELECT * FROM %s",
gpdbNativeTable.getName(), gpdbTextHiveProfile.getName()));
}

@Override
Expand Down
5 changes: 5 additions & 0 deletions concourse/docker/pxf-cbdb-dev/ubuntu/script/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -389,10 +389,15 @@ start_hive_services() {
export PATH="${JAVA_HOME}/bin:${HIVE_ROOT}/bin:${HADOOP_ROOT}/bin:${PATH}"
export HIVE_HOME="${HIVE_ROOT}"
export HADOOP_HOME="${HADOOP_ROOT}"
local tez_root="${TEZ_ROOT:-${GPHD_ROOT}/tez}"
# bump HS2 heap to reduce Tez OOMs during tests
export HADOOP_HEAPSIZE=${HADOOP_HEAPSIZE:-1024}
export HADOOP_CLIENT_OPTS="-Xmx${HADOOP_HEAPSIZE}m -Xms512m ${HADOOP_CLIENT_OPTS:-}"

# ensure Tez libs are available on HDFS for hive.execution.engine=tez
"${HADOOP_ROOT}/bin/hadoop" fs -mkdir -p /apps/tez
"${HADOOP_ROOT}/bin/hadoop" fs -copyFromLocal -f "${tez_root}"/* /apps/tez

# ensure clean state
pkill -f HiveServer2 || true
pkill -f HiveMetaStore || true
Expand Down
Loading
Loading