Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdks/java/io/iceberg/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ task integrationTest(type: Test) {
exclude '**/BigQueryMetastoreCatalogIT.class'
}

maxParallelForks 4
maxParallelForks 1
classpath = sourceSets.test.runtimeClasspath
testClassesDirs = sourceSets.test.output.classesDirs
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class BigQueryMetastoreCatalogIT extends IcebergCatalogBaseIT {
private static final BigqueryClient BQ_CLIENT = new BigqueryClient("BigQueryMetastoreCatalogIT");
static final String BQMS_CATALOG = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog";
static final String DATASET = "managed_iceberg_bqms_tests_" + System.nanoTime();;
static final long SALT = System.nanoTime();
static long salt = System.nanoTime();

@BeforeClass
public static void createDataset() throws IOException, InterruptedException {
Expand All @@ -62,11 +62,12 @@ public static void deleteDataset() {

@Override
public String tableId() {
return DATASET + "." + testName.getMethodName() + "_" + SALT;
return DATASET + "." + testName.getMethodName() + "_" + salt;
}

@Override
public Catalog createCatalog() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since createCatalog is called in setup() which is a non-static method, salt should also be non-static member

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

salt = System.nanoTime();
return CatalogUtil.loadCatalog(
BQMS_CATALOG,
"bqms_" + catalogName,
Expand All @@ -82,7 +83,7 @@ public Catalog createCatalog() {
public void catalogCleanup() {
for (TableIdentifier tableIdentifier : catalog.listTables(Namespace.of(DATASET))) {
// only delete tables that were created in this test run
if (tableIdentifier.name().contains(String.valueOf(SALT))) {
if (tableIdentifier.name().contains(String.valueOf(salt))) {
catalog.dropTable(tableIdentifier);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,15 @@
*/
public class HiveCatalogIT extends IcebergCatalogBaseIT {
private static HiveMetastoreExtension hiveMetastoreExtension;
private static long salt = System.nanoTime();

private String testDb() {
return "test_db_" + testName.getMethodName();
}

@Override
public String tableId() {
return String.format("%s.%s", testDb(), "test_table");
return String.format("%s.%s", testDb(), "test_table" + "_" + salt);
}

@BeforeClass
Expand All @@ -73,6 +74,7 @@ public void catalogSetup() throws Exception {

@Override
public Catalog createCatalog() {
salt = System.nanoTime();
return CatalogUtil.loadCatalog(
HiveCatalog.class.getName(),
"hive_" + catalogName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@
* #numRecords()}.
*/
public abstract class IcebergCatalogBaseIT implements Serializable {
private static final long SETUP_TEARDOWN_SLEEP_MS = 5000;

public abstract Catalog createCatalog();

public abstract Map<String, Object> managedIcebergConfig(String tableId);
Expand All @@ -142,7 +144,7 @@ public void catalogSetup() throws Exception {}
public void catalogCleanup() throws Exception {}

public Integer numRecords() {
return 1000;
return 100;
}

public String tableId() {
Expand All @@ -159,7 +161,8 @@ public static String warehouse(Class<? extends IcebergCatalogBaseIT> testClass)

@Before
public void setUp() throws Exception {
OPTIONS.as(DirectOptions.class).setTargetParallelism(3);
catalogName += System.nanoTime();
OPTIONS.as(DirectOptions.class).setTargetParallelism(1);
warehouse =
String.format(
"%s/%s/%s",
Expand All @@ -169,12 +172,14 @@ public void setUp() throws Exception {
warehouse = warehouse(getClass());
catalogSetup();
catalog = createCatalog();
Thread.sleep(SETUP_TEARDOWN_SLEEP_MS);
}

@After
public void cleanUp() throws Exception {
try {
catalogCleanup();
Thread.sleep(SETUP_TEARDOWN_SLEEP_MS);
} catch (Exception e) {
LOG.warn("Catalog cleanup failed.", e);
}
Expand All @@ -201,6 +206,7 @@ public void cleanUp() throws Exception {
.collect(Collectors.toList());
gcsUtil.remove(filesToDelete);
}
Thread.sleep(SETUP_TEARDOWN_SLEEP_MS);
} catch (Exception e) {
LOG.warn("Failed to clean up GCS files.", e);
}
Expand All @@ -216,9 +222,9 @@ public void cleanUp() throws Exception {

@Rule
public transient Timeout globalTimeout =
Timeout.seconds(OPTIONS.getRunner().equals(DirectRunner.class) ? 180 : 20 * 60);
Timeout.seconds(OPTIONS.getRunner().equals(DirectRunner.class) ? 300 : 20 * 60);

private static final int NUM_SHARDS = 10;
private static final int NUM_SHARDS = 1;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this remove test coverage of sharded case? We can decrease shard / number of records on direct runner but good to keep parallelism on other runners (Dataflow)

cc: @ahmedabu98

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

private static final Logger LOG = LoggerFactory.getLogger(IcebergCatalogBaseIT.class);
private static final Schema DOUBLY_NESTED_ROW_SCHEMA =
Schema.builder()
Expand Down
Loading