Skip to content

Commit 9c223d0

Browse files
committed
Spotless code style changes
1 parent 479349f commit 9c223d0

File tree

127 files changed

+11812
-9680
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

127 files changed

+11812
-9680
lines changed

src/integrationTest/java/com/mongodb/kafka/connect/ConnectorValidationTest.java

Lines changed: 446 additions & 396 deletions
Large diffs are not rendered by default.

src/integrationTest/java/com/mongodb/kafka/connect/MongoSinkConnectorTest.java

Lines changed: 248 additions & 207 deletions
Large diffs are not rendered by default.

src/integrationTest/java/com/mongodb/kafka/connect/MongoSourceConnectorTest.java

Lines changed: 509 additions & 466 deletions
Large diffs are not rendered by default.

src/integrationTest/java/com/mongodb/kafka/connect/avro/TweetMsg.java

Lines changed: 339 additions & 319 deletions
Large diffs are not rendered by default.

src/integrationTest/java/com/mongodb/kafka/connect/embedded/ConnectStandalone.java

Lines changed: 90 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -43,99 +43,104 @@
4343

4444
class ConnectStandalone {
4545

46-
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectStandalone.class);
47-
48-
private final String connectionString;
49-
private final Herder herder;
50-
private final Connect connect;
51-
52-
@SuppressWarnings("unchecked")
53-
ConnectStandalone(final Properties workerProperties) {
54-
Time time = Time.SYSTEM;
55-
LOGGER.info("Kafka Connect standalone worker initializing ...");
56-
long initStart = time.hiResClockMs();
57-
WorkerInfo initInfo = new WorkerInfo();
58-
initInfo.logAll();
59-
60-
Map<String, String> workerProps = (Map) workerProperties;
61-
62-
LOGGER.info("Scanning for plugin classes. This might take a moment ...");
63-
Plugins plugins = new Plugins(workerProps);
64-
plugins.compareAndSwapWithDelegatingLoader();
65-
StandaloneConfig config = new StandaloneConfig(workerProps);
66-
67-
String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
68-
LOGGER.debug("Kafka cluster ID: {}", kafkaClusterId);
69-
70-
RestServer rest = new RestServer(config);
71-
URI advertisedUrl = rest.advertisedUrl();
72-
String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
73-
74-
Worker worker = new Worker(workerId, time, plugins, config, new FileOffsetBackingStore());
75-
this.herder = new StandaloneHerder(worker, kafkaClusterId);
76-
connectionString = advertisedUrl.toString() + herder.kafkaClusterId();
77-
78-
this.connect = new Connect(herder, rest);
79-
LOGGER.info("Kafka Connect standalone worker initialization took {}ms", time.hiResClockMs() - initStart);
80-
}
81-
82-
String getConnectionString() {
83-
return connectionString;
84-
}
85-
86-
void start() {
87-
connect.start();
88-
}
89-
90-
@SuppressWarnings({"unchecked", "rawtypes"})
91-
void addConnector(final String name, final Properties properties) {
92-
FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>((error, info) -> {
93-
if (error != null) {
46+
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectStandalone.class);
47+
48+
private final String connectionString;
49+
private final Herder herder;
50+
private final Connect connect;
51+
52+
@SuppressWarnings("unchecked")
53+
ConnectStandalone(final Properties workerProperties) {
54+
Time time = Time.SYSTEM;
55+
LOGGER.info("Kafka Connect standalone worker initializing ...");
56+
long initStart = time.hiResClockMs();
57+
WorkerInfo initInfo = new WorkerInfo();
58+
initInfo.logAll();
59+
60+
Map<String, String> workerProps = (Map) workerProperties;
61+
62+
LOGGER.info("Scanning for plugin classes. This might take a moment ...");
63+
Plugins plugins = new Plugins(workerProps);
64+
plugins.compareAndSwapWithDelegatingLoader();
65+
StandaloneConfig config = new StandaloneConfig(workerProps);
66+
67+
String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
68+
LOGGER.debug("Kafka cluster ID: {}", kafkaClusterId);
69+
70+
RestServer rest = new RestServer(config);
71+
URI advertisedUrl = rest.advertisedUrl();
72+
String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
73+
74+
Worker worker = new Worker(workerId, time, plugins, config, new FileOffsetBackingStore());
75+
this.herder = new StandaloneHerder(worker, kafkaClusterId);
76+
connectionString = advertisedUrl.toString() + herder.kafkaClusterId();
77+
78+
this.connect = new Connect(herder, rest);
79+
LOGGER.info(
80+
"Kafka Connect standalone worker initialization took {}ms",
81+
time.hiResClockMs() - initStart);
82+
}
83+
84+
String getConnectionString() {
85+
return connectionString;
86+
}
87+
88+
void start() {
89+
connect.start();
90+
}
91+
92+
@SuppressWarnings({"unchecked", "rawtypes"})
93+
void addConnector(final String name, final Properties properties) {
94+
FutureCallback<Herder.Created<ConnectorInfo>> cb =
95+
new FutureCallback<>(
96+
(error, info) -> {
97+
if (error != null) {
9498
LOGGER.error("Failed to create job for {}", properties);
95-
} else {
99+
} else {
96100
LOGGER.info("Created connector {}", info.result().name());
97-
}
98-
});
99-
try {
100-
herder.putConnectorConfig(name, (Map) properties, true, cb);
101-
cb.get();
102-
sleep(1000);
103-
} catch (Exception e) {
104-
LOGGER.error("Failed to add connector for {}", properties);
105-
throw new ConnectorConfigurationException(e);
106-
}
101+
}
102+
});
103+
try {
104+
herder.putConnectorConfig(name, (Map) properties, true, cb);
105+
cb.get();
106+
sleep(1000);
107+
} catch (Exception e) {
108+
LOGGER.error("Failed to add connector for {}", properties);
109+
throw new ConnectorConfigurationException(e);
107110
}
111+
}
108112

109-
void deleteConnector(final String name) {
110-
FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>((error, info) -> {
111-
if (error != null) {
113+
void deleteConnector(final String name) {
114+
FutureCallback<Herder.Created<ConnectorInfo>> cb =
115+
new FutureCallback<>(
116+
(error, info) -> {
117+
if (error != null) {
112118
LOGGER.error("Failed to delete connector: {}", name);
113-
} else {
119+
} else {
114120
LOGGER.info("Deleted connector {}", name);
115-
}
116-
});
117-
try {
118-
herder.deleteConnectorConfig(name, cb);
119-
cb.get();
120-
} catch (NotFoundException e) {
121-
// Ignore
122-
} catch (Exception e) {
123-
if (!(e.getCause() instanceof NotFoundException)) {
124-
throw new ConnectorConfigurationException(e);
125-
}
126-
}
121+
}
122+
});
123+
try {
124+
herder.deleteConnectorConfig(name, cb);
125+
cb.get();
126+
} catch (NotFoundException e) {
127+
// Ignore
128+
} catch (Exception e) {
129+
if (!(e.getCause() instanceof NotFoundException)) {
130+
throw new ConnectorConfigurationException(e);
131+
}
127132
}
133+
}
128134

129-
void stop() {
130-
LOGGER.debug("Connect Standalone stop called");
131-
connect.stop();
132-
connect.awaitStop();
133-
}
135+
void stop() {
136+
LOGGER.debug("Connect Standalone stop called");
137+
connect.stop();
138+
connect.awaitStop();
139+
}
134140

135-
class ConnectorConfigurationException extends RuntimeException {
136-
ConnectorConfigurationException(final Throwable cause) {
137-
super(cause);
138-
}
141+
class ConnectorConfigurationException extends RuntimeException {
142+
ConnectorConfigurationException(final Throwable cause) {
143+
super(cause);
139144
}
140-
145+
}
141146
}

0 commit comments

Comments
 (0)