Skip to content

Commit 89d6bc3

Browse files
authored
Fix Iceberg Integration tests (#34686)
* Skip HiveCatalogIT and BigQueryMetastoreCatalogIT in Iceberg ITs * Fix Iceberg Integration tests * Change SALT var name * Fix comments * Decrease numRecords for DirectRunner
1 parent 46b2021 commit 89d6bc3

File tree

4 files changed

+18
-9
lines changed

4 files changed

+18
-9
lines changed

sdks/java/io/iceberg/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ task integrationTest(type: Test) {
148148
exclude '**/BigQueryMetastoreCatalogIT.class'
149149
}
150150

151-
maxParallelForks 4
151+
maxParallelForks 1
152152
classpath = sourceSets.test.runtimeClasspath
153153
testClassesDirs = sourceSets.test.output.classesDirs
154154
}

sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public class BigQueryMetastoreCatalogIT extends IcebergCatalogBaseIT {
4848
private static final BigqueryClient BQ_CLIENT = new BigqueryClient("BigQueryMetastoreCatalogIT");
4949
static final String BQMS_CATALOG = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog";
5050
static final String DATASET = "managed_iceberg_bqms_tests_" + System.nanoTime();;
51-
static final long SALT = System.nanoTime();
51+
private long salt = System.nanoTime();
5252

5353
@BeforeClass
5454
public static void createDataset() throws IOException, InterruptedException {
@@ -62,11 +62,12 @@ public static void deleteDataset() {
6262

6363
@Override
6464
public String tableId() {
65-
return DATASET + "." + testName.getMethodName() + "_" + SALT;
65+
return DATASET + "." + testName.getMethodName() + "_" + salt;
6666
}
6767

6868
@Override
6969
public Catalog createCatalog() {
70+
salt += System.nanoTime();
7071
return CatalogUtil.loadCatalog(
7172
BQMS_CATALOG,
7273
"bqms_" + catalogName,
@@ -82,7 +83,7 @@ public Catalog createCatalog() {
8283
public void catalogCleanup() {
8384
for (TableIdentifier tableIdentifier : catalog.listTables(Namespace.of(DATASET))) {
8485
// only delete tables that were created in this test run
85-
if (tableIdentifier.name().contains(String.valueOf(SALT))) {
86+
if (tableIdentifier.name().contains(String.valueOf(salt))) {
8687
catalog.dropTable(tableIdentifier);
8788
}
8889
}

sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HiveCatalogIT.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,15 @@
4141
*/
4242
public class HiveCatalogIT extends IcebergCatalogBaseIT {
4343
private static HiveMetastoreExtension hiveMetastoreExtension;
44+
private long salt = System.nanoTime();
4445

4546
private String testDb() {
4647
return "test_db_" + testName.getMethodName();
4748
}
4849

4950
@Override
5051
public String tableId() {
51-
return String.format("%s.%s", testDb(), "test_table");
52+
return String.format("%s.%s", testDb(), "test_table" + "_" + salt);
5253
}
5354

5455
@BeforeClass
@@ -73,6 +74,7 @@ public void catalogSetup() throws Exception {
7374

7475
@Override
7576
public Catalog createCatalog() {
77+
salt += System.nanoTime();
7678
return CatalogUtil.loadCatalog(
7779
HiveCatalog.class.getName(),
7880
"hive_" + catalogName,

sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@
133133
* #numRecords()}.
134134
*/
135135
public abstract class IcebergCatalogBaseIT implements Serializable {
136+
private static final long SETUP_TEARDOWN_SLEEP_MS = 5000;
137+
136138
public abstract Catalog createCatalog();
137139

138140
public abstract Map<String, Object> managedIcebergConfig(String tableId);
@@ -142,7 +144,7 @@ public void catalogSetup() throws Exception {}
142144
public void catalogCleanup() throws Exception {}
143145

144146
public Integer numRecords() {
145-
return 1000;
147+
return OPTIONS.getRunner().equals(DirectRunner.class) ? 10 : 1000;
146148
}
147149

148150
public String tableId() {
@@ -159,7 +161,8 @@ public static String warehouse(Class<? extends IcebergCatalogBaseIT> testClass)
159161

160162
@Before
161163
public void setUp() throws Exception {
162-
OPTIONS.as(DirectOptions.class).setTargetParallelism(3);
164+
catalogName += System.nanoTime();
165+
OPTIONS.as(DirectOptions.class).setTargetParallelism(1);
163166
warehouse =
164167
String.format(
165168
"%s/%s/%s",
@@ -169,12 +172,14 @@ public void setUp() throws Exception {
169172
warehouse = warehouse(getClass());
170173
catalogSetup();
171174
catalog = createCatalog();
175+
Thread.sleep(SETUP_TEARDOWN_SLEEP_MS);
172176
}
173177

174178
@After
175179
public void cleanUp() throws Exception {
176180
try {
177181
catalogCleanup();
182+
Thread.sleep(SETUP_TEARDOWN_SLEEP_MS);
178183
} catch (Exception e) {
179184
LOG.warn("Catalog cleanup failed.", e);
180185
}
@@ -201,6 +206,7 @@ public void cleanUp() throws Exception {
201206
.collect(Collectors.toList());
202207
gcsUtil.remove(filesToDelete);
203208
}
209+
Thread.sleep(SETUP_TEARDOWN_SLEEP_MS);
204210
} catch (Exception e) {
205211
LOG.warn("Failed to clean up GCS files.", e);
206212
}
@@ -216,9 +222,9 @@ public void cleanUp() throws Exception {
216222

217223
@Rule
218224
public transient Timeout globalTimeout =
219-
Timeout.seconds(OPTIONS.getRunner().equals(DirectRunner.class) ? 180 : 20 * 60);
225+
Timeout.seconds(OPTIONS.getRunner().equals(DirectRunner.class) ? 300 : 20 * 60);
220226

221-
private static final int NUM_SHARDS = 10;
227+
private static final int NUM_SHARDS = OPTIONS.getRunner().equals(DirectRunner.class) ? 1 : 10;
222228
private static final Logger LOG = LoggerFactory.getLogger(IcebergCatalogBaseIT.class);
223229
private static final Schema DOUBLY_NESTED_ROW_SCHEMA =
224230
Schema.builder()

0 commit comments

Comments
 (0)