Skip to content

Commit fcd5dd9

Browse files
wombatu-kunVova Kolmakov
andauthored
Kafka-connect-runtime: remove code duplications in integration tests (apache#11883)
Co-authored-by: Vova Kolmakov <[email protected]>
1 parent dbfefb0 commit fcd5dd9

File tree

4 files changed

+123
-200
lines changed

4 files changed

+123
-200
lines changed

kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationDynamicTableTest.java

Lines changed: 14 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -20,44 +20,22 @@
2020

2121
import static org.assertj.core.api.Assertions.assertThat;
2222

23-
import java.time.Duration;
2423
import java.time.Instant;
2524
import java.util.List;
2625
import org.apache.iceberg.DataFile;
27-
import org.apache.iceberg.Table;
28-
import org.apache.iceberg.catalog.Namespace;
29-
import org.apache.iceberg.catalog.SupportsNamespaces;
3026
import org.apache.iceberg.catalog.TableIdentifier;
31-
import org.awaitility.Awaitility;
32-
import org.junit.jupiter.api.AfterEach;
33-
import org.junit.jupiter.api.BeforeEach;
27+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
3428
import org.junit.jupiter.params.ParameterizedTest;
3529
import org.junit.jupiter.params.provider.NullSource;
3630
import org.junit.jupiter.params.provider.ValueSource;
3731

3832
public class IntegrationDynamicTableTest extends IntegrationTestBase {
3933

40-
private static final String TEST_DB = "test";
4134
private static final String TEST_TABLE1 = "tbl1";
4235
private static final String TEST_TABLE2 = "tbl2";
4336
private static final TableIdentifier TABLE_IDENTIFIER1 = TableIdentifier.of(TEST_DB, TEST_TABLE1);
4437
private static final TableIdentifier TABLE_IDENTIFIER2 = TableIdentifier.of(TEST_DB, TEST_TABLE2);
4538

46-
@BeforeEach
47-
public void before() {
48-
createTopic(testTopic(), TEST_TOPIC_PARTITIONS);
49-
((SupportsNamespaces) catalog()).createNamespace(Namespace.of(TEST_DB));
50-
}
51-
52-
@AfterEach
53-
public void after() {
54-
context().stopConnector(connectorName());
55-
deleteTopic(testTopic());
56-
catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE1));
57-
catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE2));
58-
((SupportsNamespaces) catalog()).dropNamespace(Namespace.of(TEST_DB));
59-
}
60-
6139
@ParameterizedTest
6240
@NullSource
6341
@ValueSource(strings = "test_branch")
@@ -68,7 +46,7 @@ public void testIcebergSink(String branch) {
6846
catalog().createTable(TABLE_IDENTIFIER2, TestEvent.TEST_SCHEMA);
6947

7048
boolean useSchema = branch == null; // use a schema for one of the tests
71-
runTest(branch, useSchema);
49+
runTest(branch, useSchema, ImmutableMap.of(), List.of(TABLE_IDENTIFIER1, TABLE_IDENTIFIER2));
7250

7351
List<DataFile> files = dataFiles(TABLE_IDENTIFIER1, branch);
7452
assertThat(files).hasSize(1);
@@ -81,55 +59,27 @@ public void testIcebergSink(String branch) {
8159
assertSnapshotProps(TABLE_IDENTIFIER2, branch);
8260
}
8361

84-
private void runTest(String branch, boolean useSchema) {
85-
// set offset reset to earliest so we don't miss any test messages
86-
KafkaConnectUtils.Config connectorConfig =
87-
new KafkaConnectUtils.Config(connectorName())
88-
.config("topics", testTopic())
89-
.config("connector.class", IcebergSinkConnector.class.getName())
90-
.config("tasks.max", 2)
91-
.config("consumer.override.auto.offset.reset", "earliest")
92-
.config("key.converter", "org.apache.kafka.connect.json.JsonConverter")
93-
.config("key.converter.schemas.enable", false)
94-
.config("value.converter", "org.apache.kafka.connect.json.JsonConverter")
95-
.config("value.converter.schemas.enable", useSchema)
96-
.config("iceberg.tables.dynamic-enabled", true)
97-
.config("iceberg.tables.route-field", "payload")
98-
.config("iceberg.control.commit.interval-ms", 1000)
99-
.config("iceberg.control.commit.timeout-ms", Integer.MAX_VALUE)
100-
.config("iceberg.kafka.auto.offset.reset", "earliest");
101-
102-
context().connectorCatalogProperties().forEach(connectorConfig::config);
103-
104-
if (branch != null) {
105-
connectorConfig.config("iceberg.tables.default-commit-branch", branch);
106-
}
107-
108-
if (!useSchema) {
109-
connectorConfig.config("value.converter.schemas.enable", false);
110-
}
111-
112-
context().startConnector(connectorConfig);
62+
@Override
63+
protected KafkaConnectUtils.Config createConfig(boolean useSchema) {
64+
return createCommonConfig(useSchema)
65+
.config("iceberg.tables.dynamic-enabled", true)
66+
.config("iceberg.tables.route-field", "payload");
67+
}
11368

69+
@Override
70+
protected void sendEvents(boolean useSchema) {
11471
TestEvent event1 = new TestEvent(1, "type1", Instant.now(), TEST_DB + "." + TEST_TABLE1);
11572
TestEvent event2 = new TestEvent(2, "type2", Instant.now(), TEST_DB + "." + TEST_TABLE2);
11673
TestEvent event3 = new TestEvent(3, "type3", Instant.now(), TEST_DB + ".tbl3");
11774

11875
send(testTopic(), event1, useSchema);
11976
send(testTopic(), event2, useSchema);
12077
send(testTopic(), event3, useSchema);
121-
flush();
122-
123-
Awaitility.await()
124-
.atMost(Duration.ofSeconds(30))
125-
.pollInterval(Duration.ofSeconds(1))
126-
.untilAsserted(this::assertSnapshotAdded);
12778
}
12879

129-
private void assertSnapshotAdded() {
130-
Table table = catalog().loadTable(TABLE_IDENTIFIER1);
131-
assertThat(table.snapshots()).hasSize(1);
132-
table = catalog().loadTable(TABLE_IDENTIFIER2);
133-
assertThat(table.snapshots()).hasSize(1);
80+
@Override
81+
void dropTables() {
82+
catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE1));
83+
catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE2));
13484
}
13585
}

kafka-connect/kafka-connect-runtime/src/integration/java/org/apache/iceberg/connect/IntegrationMultiTableTest.java

Lines changed: 18 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -20,44 +20,22 @@
2020

2121
import static org.assertj.core.api.Assertions.assertThat;
2222

23-
import java.time.Duration;
2423
import java.time.Instant;
2524
import java.util.List;
2625
import org.apache.iceberg.DataFile;
27-
import org.apache.iceberg.Table;
28-
import org.apache.iceberg.catalog.Namespace;
29-
import org.apache.iceberg.catalog.SupportsNamespaces;
3026
import org.apache.iceberg.catalog.TableIdentifier;
31-
import org.awaitility.Awaitility;
32-
import org.junit.jupiter.api.AfterEach;
33-
import org.junit.jupiter.api.BeforeEach;
27+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
3428
import org.junit.jupiter.params.ParameterizedTest;
3529
import org.junit.jupiter.params.provider.NullSource;
3630
import org.junit.jupiter.params.provider.ValueSource;
3731

3832
public class IntegrationMultiTableTest extends IntegrationTestBase {
3933

40-
private static final String TEST_DB = "test";
4134
private static final String TEST_TABLE1 = "foobar1";
4235
private static final String TEST_TABLE2 = "foobar2";
4336
private static final TableIdentifier TABLE_IDENTIFIER1 = TableIdentifier.of(TEST_DB, TEST_TABLE1);
4437
private static final TableIdentifier TABLE_IDENTIFIER2 = TableIdentifier.of(TEST_DB, TEST_TABLE2);
4538

46-
@BeforeEach
47-
public void before() {
48-
createTopic(testTopic(), TEST_TOPIC_PARTITIONS);
49-
((SupportsNamespaces) catalog()).createNamespace(Namespace.of(TEST_DB));
50-
}
51-
52-
@AfterEach
53-
public void after() {
54-
context().stopConnector(connectorName());
55-
deleteTopic(testTopic());
56-
catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE1));
57-
catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE2));
58-
((SupportsNamespaces) catalog()).dropNamespace(Namespace.of(TEST_DB));
59-
}
60-
6139
@ParameterizedTest
6240
@NullSource
6341
@ValueSource(strings = "test_branch")
@@ -68,7 +46,7 @@ public void testIcebergSink(String branch) {
6846
catalog().createTable(TABLE_IDENTIFIER2, TestEvent.TEST_SCHEMA);
6947

7048
boolean useSchema = branch == null; // use a schema for one of the tests
71-
runTest(branch, useSchema);
49+
runTest(branch, useSchema, ImmutableMap.of(), List.of(TABLE_IDENTIFIER1, TABLE_IDENTIFIER2));
7250

7351
List<DataFile> files = dataFiles(TABLE_IDENTIFIER1, branch);
7452
assertThat(files).hasSize(1);
@@ -81,60 +59,31 @@ public void testIcebergSink(String branch) {
8159
assertSnapshotProps(TABLE_IDENTIFIER2, branch);
8260
}
8361

84-
private void runTest(String branch, boolean useSchema) {
85-
// set offset reset to earliest so we don't miss any test messages
86-
KafkaConnectUtils.Config connectorConfig =
87-
new KafkaConnectUtils.Config(connectorName())
88-
.config("topics", testTopic())
89-
.config("connector.class", IcebergSinkConnector.class.getName())
90-
.config("tasks.max", 2)
91-
.config("consumer.override.auto.offset.reset", "earliest")
92-
.config("key.converter", "org.apache.kafka.connect.json.JsonConverter")
93-
.config("key.converter.schemas.enable", false)
94-
.config("value.converter", "org.apache.kafka.connect.json.JsonConverter")
95-
.config("value.converter.schemas.enable", useSchema)
96-
.config(
97-
"iceberg.tables",
98-
String.format("%s.%s, %s.%s", TEST_DB, TEST_TABLE1, TEST_DB, TEST_TABLE2))
99-
.config("iceberg.tables.route-field", "type")
100-
.config(String.format("iceberg.table.%s.%s.route-regex", TEST_DB, TEST_TABLE1), "type1")
101-
.config(String.format("iceberg.table.%s.%s.route-regex", TEST_DB, TEST_TABLE2), "type2")
102-
.config("iceberg.control.commit.interval-ms", 1000)
103-
.config("iceberg.control.commit.timeout-ms", Integer.MAX_VALUE)
104-
.config("iceberg.kafka.auto.offset.reset", "earliest");
105-
106-
context().connectorCatalogProperties().forEach(connectorConfig::config);
107-
108-
if (branch != null) {
109-
connectorConfig.config("iceberg.tables.default-commit-branch", branch);
110-
}
111-
112-
// use a schema for one of the cases
113-
if (!useSchema) {
114-
connectorConfig.config("value.converter.schemas.enable", false);
115-
}
116-
117-
context().startConnector(connectorConfig);
62+
@Override
63+
protected KafkaConnectUtils.Config createConfig(boolean useSchema) {
64+
return createCommonConfig(useSchema)
65+
.config(
66+
"iceberg.tables",
67+
String.format("%s.%s, %s.%s", TEST_DB, TEST_TABLE1, TEST_DB, TEST_TABLE2))
68+
.config("iceberg.tables.route-field", "type")
69+
.config(String.format("iceberg.table.%s.%s.route-regex", TEST_DB, TEST_TABLE1), "type1")
70+
.config(String.format("iceberg.table.%s.%s.route-regex", TEST_DB, TEST_TABLE2), "type2");
71+
}
11872

73+
@Override
74+
protected void sendEvents(boolean useSchema) {
11975
TestEvent event1 = new TestEvent(1, "type1", Instant.now(), "hello world!");
12076
TestEvent event2 = new TestEvent(2, "type2", Instant.now(), "having fun?");
12177
TestEvent event3 = new TestEvent(3, "type3", Instant.now(), "ignore me");
12278

12379
send(testTopic(), event1, useSchema);
12480
send(testTopic(), event2, useSchema);
12581
send(testTopic(), event3, useSchema);
126-
flush();
127-
128-
Awaitility.await()
129-
.atMost(Duration.ofSeconds(30))
130-
.pollInterval(Duration.ofSeconds(1))
131-
.untilAsserted(this::assertSnapshotAdded);
13282
}
13383

134-
private void assertSnapshotAdded() {
135-
Table table = catalog().loadTable(TABLE_IDENTIFIER1);
136-
assertThat(table.snapshots()).hasSize(1);
137-
table = catalog().loadTable(TABLE_IDENTIFIER2);
138-
assertThat(table.snapshots()).hasSize(1);
84+
@Override
85+
void dropTables() {
86+
catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE1));
87+
catalog().dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE2));
13988
}
14089
}

0 commit comments

Comments
 (0)