Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
ef610e3
Connect integration test harness (WIP)
wicknicks Aug 16, 2018
97dc0fb
MINOR: Fix checkstyle issues
wicknicks Aug 16, 2018
1ec70a5
MINOR: add util functions to produce and consume records
wicknicks Aug 16, 2018
305a5d4
MINOR: Add monitorable sink connector
wicknicks Aug 17, 2018
b047825
Rename variable to producer
wicknicks Aug 20, 2018
52d44ce
Add some javadocs
wicknicks Aug 24, 2018
b9b8c86
DLQ integration test
wicknicks Aug 24, 2018
411b678
Wait for only limited time for records to come through
wicknicks Aug 24, 2018
c5c3494
Use REST endpoint to start/delete connectors
wicknicks Aug 24, 2018
041490a
Use distributed herder to bring up connect cluster
wicknicks Aug 24, 2018
9321248
Make checkstyle happy and setup and tear down clusters with @Before a…
wicknicks Aug 24, 2018
eaa9835
Clean up ConnectIntegrationTest and reset COUNTER in stop
wicknicks Aug 24, 2018
054ba50
Address reviewer comments
wicknicks Sep 18, 2018
aa22a70
Reset log4j.properties to original state
wicknicks Sep 18, 2018
c6db65d
Clean up imports
wicknicks Sep 18, 2018
50b9166
Align variable name to convention
wicknicks Sep 20, 2018
708fd2d
Use a latch to wait for connector to sink records
wicknicks Sep 20, 2018
63bfbb4
Test can wait till task is created instead of busy loop
wicknicks Sep 25, 2018
b9a1f19
Address other review comments
wicknicks Sep 25, 2018
bf72fc6
Make variables final
wicknicks Sep 25, 2018
1f0e9f2
Remove unused imports
wicknicks Sep 25, 2018
b1def66
Add some javadoc and count down on latch when task exits
wicknicks Oct 4, 2018
1fb10b8
Update ConnectIntegrationTest to use multiple topic partitions and tasks
wicknicks Oct 5, 2018
cdcfb9f
Changes to startConnector method
wicknicks Oct 5, 2018
213b31c
Remove before() and after() methods from EmbeddedConnectCluster
wicknicks Oct 5, 2018
2f235c0
Generate clusterName randomly instead of using argument
wicknicks Oct 5, 2018
676721c
bootstrapServers() returns all brokers
wicknicks Oct 5, 2018
4cc9a73
Throw exception if desired replicator is unachievable
wicknicks Oct 5, 2018
7380c1d
Refactor logic for consuming up to n records in maxDuration
wicknicks Oct 6, 2018
b3cf19a
Remove unused methods
wicknicks Oct 8, 2018
00d9177
Rename tests and broaden the scope of error handling test
wicknicks Oct 8, 2018
1cc92a1
Clean up handles for tasks before starting the test
wicknicks Oct 8, 2018
af5ccd8
Correct some typos
wicknicks Oct 8, 2018
56e504f
KAFKA-7503: Fix typos and remove unnecessary task props
wicknicks Oct 25, 2018
fa68baa
KAFKA-7503: Rename taskInstances method
wicknicks Oct 25, 2018
3d91b8e
KAFKA-7503: Add documentation to clarify taskAvailable's contracts
wicknicks Oct 25, 2018
91acd05
KAFKA-7503: Let Jetty pick any available port for integration tests
wicknicks Oct 25, 2018
591f8de
KAFKA-7503: Multi worker connect cluster in integration tests
wicknicks Oct 28, 2018
1d86332
KAFKA-7503: Add counters in connector handle
wicknicks Oct 28, 2018
dab22a1
KAFKA-7503: Remove unnecessary task handle cleanup requests
wicknicks Oct 29, 2018
96d53f5
MINOR: Change example worker props
wicknicks Nov 5, 2018
63cc56f
MINOR: Use better error message
wicknicks Nov 5, 2018
d4ba51d
TEMP: Modify log4j.properties to debug failing test
wicknicks Nov 5, 2018
3af72cd
MINOR: Find free ports using ServerSocket API
wicknicks Nov 6, 2018
a9a3895
KAFKA-7503: Address reviewer comments
wicknicks Nov 13, 2018
85d72fc
KAFKA-7503: Initialize RestServer before starting herder (#8)
wicknicks Nov 16, 2018
016da0a
KAFKA-7503: Set REST_PORT_CONFIG to 0 for integration tests
wicknicks Nov 19, 2018
6c6b5e4
MINOR: Suppress deprecation errors
wicknicks Nov 19, 2018
cc4d11b
KAFKA-7503: Replace premature exception with a warning
wicknicks Nov 22, 2018
c12fedd
KAFKA-7503: Add waitUntil and address reviewer comments
wicknicks Dec 19, 2018
8b24c6f
KAFKA-7503: Use waitForCondition from TestUtils; throw error if Latch…
wicknicks Jan 12, 2019
027e68e
KAFKA-7503: Fix call to RestServer#start in unit test
wicknicks Jan 12, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1453,6 +1453,8 @@ project(':connect:runtime') {
testCompile libs.powermockEasymock

testCompile project(':clients').sourceSets.test.output
testCompile project(':core')
testCompile project(':core').sourceSets.test.output

testRuntime libs.slf4jlog4j
}
Expand Down
14 changes: 12 additions & 2 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,6 @@
</subpackage>
</subpackage>



<subpackage name="cli">
<allow pkg="org.apache.kafka.connect.runtime" />
<allow pkg="org.apache.kafka.connect.storage" />
Expand All @@ -366,6 +364,18 @@
<allow pkg="org.reflections.vfs" />
<!-- for annotations to avoid code duplication -->
<allow pkg="com.fasterxml.jackson.annotation" />
<allow pkg="com.fasterxml.jackson.databind" />
<subpackage name="clusters">
<allow pkg="kafka.server" />
<allow pkg="kafka.zk" />
<allow pkg="kafka.utils" />
<allow class="javax.servlet.http.HttpServletResponse" />
</subpackage>
</subpackage>

<subpackage name="integration">
<allow pkg="org.apache.kafka.connect.util.clusters" />
<allow pkg="org.apache.kafka.connect" />
</subpackage>

<subpackage name="json">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.Connect;
import org.apache.kafka.connect.runtime.HerderProvider;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import org.apache.kafka.connect.runtime.WorkerInfo;
Expand Down Expand Up @@ -54,62 +55,26 @@
public class ConnectDistributed {
private static final Logger log = LoggerFactory.getLogger(ConnectDistributed.class);

private final Time time = Time.SYSTEM;
private final long initStart = time.hiResClockMs();

public static void main(String[] args) {

if (args.length < 1 || Arrays.asList(args).contains("--help")) {
log.info("Usage: ConnectDistributed worker.properties");
Exit.exit(1);
}

try {
Time time = Time.SYSTEM;
log.info("Kafka Connect distributed worker initializing ...");
long initStart = time.hiResClockMs();
WorkerInfo initInfo = new WorkerInfo();
initInfo.logAll();

String workerPropsFile = args[0];
Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();

log.info("Scanning for plugin classes. This might take a moment ...");
Plugins plugins = new Plugins(workerProps);
plugins.compareAndSwapWithDelegatingLoader();
DistributedConfig config = new DistributedConfig(workerProps);

String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
log.debug("Kafka cluster ID: {}", kafkaClusterId);

RestServer rest = new RestServer(config);
URI advertisedUrl = rest.advertisedUrl();
String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();

KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
offsetBackingStore.configure(config);

Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore);
WorkerConfigTransformer configTransformer = worker.configTransformer();

Converter internalValueConverter = worker.getInternalValueConverter();
StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
statusBackingStore.configure(config);

ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
internalValueConverter,
config,
configTransformer);

DistributedHerder herder = new DistributedHerder(config, time, worker,
kafkaClusterId, statusBackingStore, configBackingStore,
advertisedUrl.toString());
final Connect connect = new Connect(herder, rest);
log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart);
try {
connect.start();
} catch (Exception e) {
log.error("Failed to start Connect", e);
connect.stop();
Exit.exit(3);
}
Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.emptyMap();

ConnectDistributed connectDistributed = new ConnectDistributed();
Connect connect = connectDistributed.startConnect(workerProps);

// Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
connect.awaitStop();
Expand All @@ -119,4 +84,55 @@ public static void main(String[] args) {
Exit.exit(2);
}
}

public Connect startConnect(Map<String, String> workerProps) {
log.info("Scanning for plugin classes. This might take a moment ...");
Plugins plugins = new Plugins(workerProps);
plugins.compareAndSwapWithDelegatingLoader();
DistributedConfig config = new DistributedConfig(workerProps);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be even more valuable to break this method into multiple protected methods and to have more of the components created in this method be members. However, that's probably work for another ticket.


String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
log.debug("Kafka cluster ID: {}", kafkaClusterId);

RestServer rest = new RestServer(config);
HerderProvider provider = new HerderProvider();
rest.start(provider, plugins);

URI advertisedUrl = rest.advertisedUrl();
String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();

KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
offsetBackingStore.configure(config);

Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore);
WorkerConfigTransformer configTransformer = worker.configTransformer();

Converter internalValueConverter = worker.getInternalValueConverter();
StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
statusBackingStore.configure(config);

ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
internalValueConverter,
config,
configTransformer);

DistributedHerder herder = new DistributedHerder(config, time, worker,
kafkaClusterId, statusBackingStore, configBackingStore,
advertisedUrl.toString());

final Connect connect = new Connect(herder, rest);
log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart);
try {
connect.start();
// herder has initialized now, and ready to be used by the RestServer.
provider.setHerder(herder);
} catch (Exception e) {
log.error("Failed to start Connect", e);
connect.stop();
Exit.exit(3);
}

return connect;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -50,7 +51,6 @@ public void start() {
Runtime.getRuntime().addShutdownHook(shutdownHook);

herder.start();
rest.start(herder);

log.info("Kafka Connect started");
} finally {
Expand Down Expand Up @@ -82,6 +82,11 @@ public void awaitStop() {
}
}

// Visible for testing
public URI restUrl() {
return rest.serverUrl();
}

private class ShutdownHook extends Thread {
@Override
public void run() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime;

import org.apache.kafka.connect.errors.ConnectException;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
* A supplier for {@link Herder}s.
*/
public class HerderProvider {

private final CountDownLatch initialized = new CountDownLatch(1);
volatile Herder herder = null;

public HerderProvider() {
}

/**
* Create a herder provider with a herder.
* @param herder the herder that will be supplied to threads waiting on this provider
*/
public HerderProvider(Herder herder) {
this.herder = herder;
initialized.countDown();
}

/**
* @return the contained herder.
* @throws ConnectException if a herder was not available within a duration of calling this method
*/
public Herder get() {
try {
// wait for herder to be initialized
if (!initialized.await(1, TimeUnit.MINUTES)) {
throw new ConnectException("Timed out waiting for herder to be initialized.");
}
} catch (InterruptedException e) {
throw new ConnectException("Interrupted while waiting for herder to be initialized.", e);
}
return herder;
}

/**
* @param herder set a herder, and signal to all threads waiting on get().
*/
public void setHerder(Herder herder) {
this.herder = herder;
initialized.countDown();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.kafka.connect.health.ConnectorState;
import org.apache.kafka.connect.health.ConnectorType;
import org.apache.kafka.connect.health.TaskState;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.HerderProvider;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.util.Callback;

Expand All @@ -34,16 +34,16 @@

public class ConnectClusterStateImpl implements ConnectClusterState {

private Herder herder;
private HerderProvider herderProvider;

public ConnectClusterStateImpl(Herder herder) {
this.herder = herder;
public ConnectClusterStateImpl(HerderProvider herderProvider) {
this.herderProvider = herderProvider;
}

@Override
public Collection<String> connectors() {
final Collection<String> connectors = new ArrayList<>();
herder.connectors(new Callback<java.util.Collection<String>>() {
herderProvider.get().connectors(new Callback<java.util.Collection<String>>() {
@Override
public void onCompletion(Throwable error, Collection<String> result) {
connectors.addAll(result);
Expand All @@ -55,7 +55,7 @@ public void onCompletion(Throwable error, Collection<String> result) {
@Override
public ConnectorHealth connectorHealth(String connName) {

ConnectorStateInfo state = herder.connectorStatus(connName);
ConnectorStateInfo state = herderProvider.get().connectorStatus(connName);
ConnectorState connectorState = new ConnectorState(
state.connector().state(),
state.connector().workerId(),
Expand Down
Loading