Skip to content

Commit 6b0f120

Browse files
authored
Merge pull request #5 from aiven/add-integration-tests
Add integration tests #5
2 parents 3f5a547 + 0a51ca6 commit 6b0f120

File tree

8 files changed

+543
-0
lines changed

8 files changed

+543
-0
lines changed

.travis.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,6 @@ cache:
1414
directories:
1515
- $HOME/.gradle/caches/
1616
- $HOME/.gradle/wrapper/
17+
18+
script:
19+
- ./gradlew check --info

build.gradle

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ plugins {
2323

2424
// https://docs.gradle.org/current/userguide/checkstyle_plugin.html
2525
id 'checkstyle'
26+
27+
// https://docs.gradle.org/current/userguide/idea_plugin.html
28+
id 'idea'
2629
}
2730

2831
repositories {
@@ -34,6 +37,8 @@ targetCompatibility = JavaVersion.VERSION_1_8
3437

3538
ext {
3639
kafkaVersion = "2.0.1"
40+
41+
testcontainersVersion = "1.12.1"
3742
}
3843

3944
distributions {
@@ -45,6 +50,27 @@ distributions {
4550
}
4651
}
4752

53+
sourceSets {
54+
integrationTest {
55+
java.srcDir file('src/integration-test/java')
56+
resources.srcDir file('src/integration-test/resources')
57+
compileClasspath += sourceSets.main.output + configurations.testRuntime
58+
runtimeClasspath += output + compileClasspath
59+
}
60+
}
61+
62+
idea {
63+
module {
64+
testSourceDirs += project.sourceSets.integrationTest.java.srcDirs
65+
testSourceDirs += project.sourceSets.integrationTest.resources.srcDirs
66+
}
67+
}
68+
69+
configurations {
70+
integrationTestImplementation.extendsFrom testImplementation
71+
integrationTestRuntime.extendsFrom testRuntime
72+
}
73+
4874
dependencies {
4975
compileOnly "org.apache.kafka:connect-api:$kafkaVersion"
5076

@@ -53,13 +79,48 @@ dependencies {
5379
testImplementation "org.junit.jupiter:junit-jupiter:5.5.1"
5480
testImplementation "org.hamcrest:hamcrest:2.1"
5581
testImplementation "org.apache.kafka:connect-api:$kafkaVersion"
82+
testImplementation "org.testcontainers:junit-jupiter:$testcontainersVersion"
83+
5684
testRuntime "org.apache.logging.log4j:log4j-slf4j-impl:2.12.1"
85+
testRuntime "org.apache.logging.log4j:log4j-api:2.12.1"
86+
testRuntime "org.apache.logging.log4j:log4j-core:2.12.1"
87+
88+
integrationTestImplementation "org.apache.kafka:connect-api:$kafkaVersion"
89+
integrationTestImplementation("org.apache.kafka:connect-runtime:$kafkaVersion") {
90+
exclude group: "org.slf4j", module: "slf4j-log4j12"
91+
}
92+
integrationTestImplementation "org.apache.kafka:connect-json:$kafkaVersion"
93+
integrationTestImplementation "org.apache.kafka:connect-transforms:$kafkaVersion"
94+
95+
integrationTestImplementation "org.testcontainers:junit-jupiter:$testcontainersVersion"
96+
integrationTestImplementation "org.testcontainers:kafka:$testcontainersVersion" // this is not Kafka version
97+
// Make test utils from 'test' available in 'integration-test'
98+
integrationTestImplementation sourceSets.test.output
5799
}
58100

59101
checkstyle {
60102
toolVersion "8.21"
61103
}
62104

105+
task integrationTest(type: Test) {
106+
description = 'Runs the integration tests.'
107+
group = 'verification'
108+
testClassesDirs = sourceSets.integrationTest.output.classesDirs
109+
classpath = sourceSets.integrationTest.runtimeClasspath
110+
111+
dependsOn test, distTar
112+
113+
useJUnitPlatform()
114+
115+
// Run always.
116+
outputs.upToDateWhen { false }
117+
118+
// Pass the distribution file path to the tests.
119+
systemProperty("integration-test.distribution.file.path", distTar.archiveFile.get().asFile.path)
120+
systemProperty("integration-test.classes.path", sourceSets.integrationTest.output.classesDirs.getAsPath())
121+
}
122+
check.dependsOn integrationTest
123+
63124
test {
64125
useJUnitPlatform {
65126
includeEngines 'junit-jupiter'

config/checkstyle/checkstyle.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@
2727

2828
<!-- See http://checkstyle.sourceforge.net/config.html#Checker -->
2929
<module name="Checker">
30+
<!-- See http://checkstyle.sourceforge.net/config_filters.html#SuppressionFilter -->
31+
<module name="SuppressionFilter">
32+
<property name="file" value="${config_loc}/suppressions.xml" default="checkstyle/suppressions.xml"/>
33+
</module>
34+
3035
<property name="charset" value="UTF-8"/>
3136
<!-- <property name="severity" value="warning"/>-->
3237
<property name="localeLanguage" value="en"/>

config/checkstyle/suppressions.xml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
<?xml version="1.0"?>
2+
<!--
3+
// Copyright 2019 Aiven Oy
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
-->
17+
<!DOCTYPE suppressions PUBLIC
18+
"-//Checkstyle//DTD SuppressionFilter Configuration 1.2//EN"
19+
"https://checkstyle.org/dtds/suppressions_1_2.dtd">
20+
<suppressions>
21+
<!-- Switch off these complexity metrics for integration tests,
22+
as in them we must instantiate many classes explicitly. -->
23+
<suppress checks="ClassFanOutComplexity"
24+
files="(IntegrationTest|ConnectRunner).java"/>
25+
<suppress checks="ClassDataAbstractionCoupling"
26+
files="(IntegrationTest|ConnectRunner).java"/>
27+
</suppressions>
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Copyright 2019 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.transforms;
18+
19+
import java.io.File;
20+
import java.util.HashMap;
21+
import java.util.Map;
22+
import java.util.concurrent.ExecutionException;
23+
24+
import org.apache.kafka.common.utils.Time;
25+
import org.apache.kafka.connect.runtime.Connect;
26+
import org.apache.kafka.connect.runtime.ConnectorConfig;
27+
import org.apache.kafka.connect.runtime.Herder;
28+
import org.apache.kafka.connect.runtime.Worker;
29+
import org.apache.kafka.connect.runtime.isolation.Plugins;
30+
import org.apache.kafka.connect.runtime.rest.RestServer;
31+
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
32+
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
33+
import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
34+
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
35+
import org.apache.kafka.connect.util.Callback;
36+
import org.apache.kafka.connect.util.FutureCallback;
37+
38+
import org.slf4j.Logger;
39+
import org.slf4j.LoggerFactory;
40+
41+
final class ConnectRunner {
42+
private static final Logger log = LoggerFactory.getLogger(ConnectRunner.class);
43+
44+
private final File pluginDir;
45+
private final String bootstrapServers;
46+
47+
private Herder herder;
48+
private Connect connect;
49+
50+
public ConnectRunner(final File pluginDir,
51+
final String bootstrapServers) {
52+
this.pluginDir = pluginDir;
53+
this.bootstrapServers = bootstrapServers;
54+
}
55+
56+
void start() {
57+
final Map<String, String> workerProps = new HashMap<>();
58+
workerProps.put("bootstrap.servers", bootstrapServers);
59+
60+
workerProps.put("offset.flush.interval.ms", "5000");
61+
62+
// These don't matter much (each connector sets its own converters), but need to be filled with valid classes.
63+
workerProps.put("key.converter", "org.apache.kafka.connect.converters.ByteArrayConverter");
64+
workerProps.put("value.converter", "org.apache.kafka.connect.converters.ByteArrayConverter");
65+
workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
66+
workerProps.put("internal.key.converter.schemas.enable", "false");
67+
workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
68+
workerProps.put("internal.value.converter.schemas.enable", "false");
69+
70+
// Don't need it since we'll memory MemoryOffsetBackingStore.
71+
workerProps.put("offset.storage.file.filename", "");
72+
73+
workerProps.put("plugin.path", pluginDir.getPath());
74+
75+
final Time time = Time.SYSTEM;
76+
final String workerId = "test-worker";
77+
78+
final Plugins plugins = new Plugins(workerProps);
79+
final StandaloneConfig config = new StandaloneConfig(workerProps);
80+
81+
final Worker worker = new Worker(
82+
workerId, time, plugins, config, new MemoryOffsetBackingStore());
83+
herder = new StandaloneHerder(worker, "cluster-id");
84+
85+
final RestServer rest = new RestServer(config);
86+
87+
connect = new Connect(herder, rest);
88+
89+
connect.start();
90+
}
91+
92+
void createConnector(final Map<String, String> config) throws ExecutionException, InterruptedException {
93+
assert herder != null;
94+
95+
final FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(
96+
new Callback<Herder.Created<ConnectorInfo>>() {
97+
@Override
98+
public void onCompletion(final Throwable error, final Herder.Created<ConnectorInfo> info) {
99+
if (error != null) {
100+
log.error("Failed to create job");
101+
} else {
102+
log.info("Created connector {}", info.result().name());
103+
}
104+
}
105+
});
106+
herder.putConnectorConfig(
107+
config.get(ConnectorConfig.NAME_CONFIG),
108+
config, false, cb
109+
);
110+
111+
final Herder.Created<ConnectorInfo> connectorInfoCreated = cb.get();
112+
assert connectorInfoCreated.created();
113+
}
114+
115+
void stop() {
116+
connect.stop();
117+
}
118+
119+
void awaitStop() {
120+
connect.awaitStop();
121+
}
122+
}

0 commit comments

Comments
 (0)