Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -71,17 +70,25 @@ public void testIssueSimpleDateFormat() throws Throwable {
String.format(
"CREATE OR REPLACE TABLE %s.%s.%s (" + "ID int, " + "C1 timestamp)",
TARGET_DB, TARGET_SCHEMA, targetTable));
Thread t1 =
new Thread(
new FlatfileRead(NUM_RECORDS, TARGET_DB, TARGET_SCHEMA, TARGET_STAGE, targetTable));
Thread t2 =
new Thread(
new FlatfileRead(NUM_RECORDS, TARGET_DB, TARGET_SCHEMA, TARGET_STAGE, targetTable));
FlatfileRead r1 =
new FlatfileRead(NUM_RECORDS, TARGET_DB, TARGET_SCHEMA, TARGET_STAGE, targetTable);
FlatfileRead r2 =
new FlatfileRead(NUM_RECORDS, TARGET_DB, TARGET_SCHEMA, TARGET_STAGE, targetTable);
Thread t1 = new Thread(r1);
Thread t2 = new Thread(r2);

t1.start();
t2.start();
t1.join();
t2.join();

if (r1.getError() != null) {
throw new AssertionError("Thread 1 failed", r1.getError());
}
if (r2.getError() != null) {
throw new AssertionError("Thread 2 failed", r2.getError());
}

try (ResultSet rs =
statement.executeQuery(
String.format(
Expand All @@ -103,6 +110,7 @@ class FlatfileRead implements Runnable {
private final String schemaName;
private final String tableName;
private final String stageName;
private volatile Throwable error;

FlatfileRead(
int totalRows, String dbName, String schemaName, String stageName, String tableName) {
Expand All @@ -113,14 +121,17 @@ class FlatfileRead implements Runnable {
this.tableName = tableName;
}

Throwable getError() {
return error;
}

@Override
public void run() {
try (Connection testConnection = AbstractDriverIT.getConnection();
Connection putConnection = AbstractDriverIT.getConnection()) {

ResultListener _resultListener = new ResultListener();

// init properties
Map<LoaderProperty, Object> prop = new HashMap<>();
prop.put(LoaderProperty.tableName, this.tableName);
prop.put(LoaderProperty.schemaName, this.schemaName);
Expand All @@ -142,25 +153,21 @@ public void run() {
for (int i = 0; i < this.totalRows; ++i) {
Object[] row = new Object[2];
row[0] = i;
// random timestamp data
long ms = -946771200000L + (Math.abs(rnd.nextLong()) % (70L * 365 * 24 * 60 * 60 * 1000));
row[1] = new Date(ms);
underTest.submitRow(row);
}

try {
underTest.finish();
underTest.close();
} catch (Exception e) {
e.printStackTrace();
}
underTest.finish();
underTest.close();

assertThat("must be no error", _resultListener.getErrorCount(), equalTo(0));
assertThat(
"total number of rows",
_resultListener.getSubmittedRowCount(),
equalTo(this.totalRows));
} catch (SQLException e) {
e.printStackTrace();
} catch (Throwable e) {
error = e;
}
}

Expand Down
Loading