Skip to content

Commit e670394

Browse files
authored
apply code style plugin and spotless fixes (#49)
1 parent e0991f9 commit e670394

35 files changed

+417
-357
lines changed

build.gradle.kts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ plugins {
77
id("org.hypertrace.avro-plugin") version "0.4.0" apply false
88
id("org.hypertrace.publish-plugin") version "1.0.4" apply false
99
id("org.hypertrace.jacoco-report-plugin") version "0.2.0" apply false
10+
id("org.hypertrace.code-style-plugin") version "1.1.2" apply false
1011
}
1112

1213
subprojects {
@@ -16,10 +17,13 @@ subprojects {
1617
license.set(License.APACHE_2_0)
1718
}
1819
}
20+
1921
pluginManager.withPlugin("java") {
2022
configure<JavaPluginExtension> {
2123
sourceCompatibility = JavaVersion.VERSION_11
2224
targetCompatibility = JavaVersion.VERSION_11
25+
26+
apply(plugin = "org.hypertrace.code-style-plugin")
2327
}
2428
}
2529
}

kafka-streams-framework/build.gradle.kts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,11 @@ tasks.named<org.hypertrace.gradle.avro.CheckAvroCompatibility>("avroCompatibilit
3838
setAgainstFiles(null)
3939
}
4040

41-
if(project.hasProperty("includeSource")) {
41+
if (project.hasProperty("includeSource")) {
4242
tasks {
4343
withType<Jar> {
4444
from(sourceSets["main"].allSource)
4545
duplicatesStrategy = DuplicatesStrategy.EXCLUDE
4646
}
4747
}
4848
}
49-
50-

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/ConsolidatedKafkaStreamsApp.java

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,16 @@
1010
import org.apache.kafka.streams.StreamsBuilder;
1111
import org.apache.kafka.streams.kstream.KStream;
1212
import org.hypertrace.core.serviceframework.config.ConfigClient;
13-
import org.hypertrace.core.serviceframework.config.ConfigUtils;
1413

15-
/**
16-
* Base class for consolidating multiple independent KafkaStreamApps
17-
*/
14+
/** Base class for consolidating multiple independent KafkaStreamApps */
1815
public abstract class ConsolidatedKafkaStreamsApp extends KafkaStreamsApp {
1916

2017
static final String SUB_TOPOLOGY_NAMES_CONFIG_KEY = "sub.topology.names";
2118
static final String CONFIG_OVERRIDES = "overrides";
2219

2320
private final Map<String, SubTopologyKStreamApp> jobNameToSubTopologyKStreamApp = new HashMap<>();
2421

25-
public ConsolidatedKafkaStreamsApp(
26-
ConfigClient configClient) {
22+
public ConsolidatedKafkaStreamsApp(ConfigClient configClient) {
2723
super(configClient);
2824
}
2925

@@ -43,16 +39,21 @@ public StreamsBuilder buildTopology(
4339
}
4440

4541
public void doStop() {
46-
jobNameToSubTopologyKStreamApp.values()
47-
.forEach(subTopologyKStreamApp -> subTopologyKStreamApp.getInstance().doCleanUpForConsolidatedKStreamApp());
42+
jobNameToSubTopologyKStreamApp
43+
.values()
44+
.forEach(
45+
subTopologyKStreamApp ->
46+
subTopologyKStreamApp.getInstance().doCleanUpForConsolidatedKStreamApp());
4847
super.doStop();
4948
}
5049

5150
@Override
5251
public List<String> getInputTopics(Map<String, Object> properties) {
5352
Set<String> inputTopics = new HashSet<>();
54-
for (Map.Entry<String, SubTopologyKStreamApp> entry : jobNameToSubTopologyKStreamApp.entrySet()) {
55-
List<String> subTopologyInputTopics = entry.getValue().getInstance().getInputTopics(properties);
53+
for (Map.Entry<String, SubTopologyKStreamApp> entry :
54+
jobNameToSubTopologyKStreamApp.entrySet()) {
55+
List<String> subTopologyInputTopics =
56+
entry.getValue().getInstance().getInputTopics(properties);
5657
inputTopics.addAll(subTopologyInputTopics);
5758
}
5859
return new ArrayList<>(inputTopics);
@@ -61,9 +62,10 @@ public List<String> getInputTopics(Map<String, Object> properties) {
6162
@Override
6263
public List<String> getOutputTopics(Map<String, Object> properties) {
6364
Set<String> outputTopics = new HashSet<>();
64-
for (Map.Entry<String, SubTopologyKStreamApp> entry : jobNameToSubTopologyKStreamApp.entrySet()) {
65-
List<String> subTopologyOutputTopics = entry.getValue().getInstance()
66-
.getOutputTopics(properties);
65+
for (Map.Entry<String, SubTopologyKStreamApp> entry :
66+
jobNameToSubTopologyKStreamApp.entrySet()) {
67+
List<String> subTopologyOutputTopics =
68+
entry.getValue().getInstance().getOutputTopics(properties);
6769
outputTopics.addAll(subTopologyOutputTopics);
6870
}
6971
return new ArrayList<>(outputTopics);
@@ -78,8 +80,8 @@ private StreamsBuilder buildSubTopology(
7880
Map<String, KStream<?, ?>> inputStreams) {
7981
// create an instance and retains is reference to be used later in other methods
8082
KafkaStreamsApp subTopology = getSubTopologyInstance(subTopologyName);
81-
jobNameToSubTopologyKStreamApp
82-
.put(subTopologyName, new SubTopologyKStreamApp (subTopology.getJobConfigKey(), subTopology));
83+
jobNameToSubTopologyKStreamApp.put(
84+
subTopologyName, new SubTopologyKStreamApp(subTopology.getJobConfigKey(), subTopology));
8385

8486
// need to use its own copy as input/output topics are different
8587
Config subTopologyJobConfig = getSubJobConfig(subTopologyName);
@@ -105,8 +107,8 @@ private StreamsBuilder buildSubTopology(
105107
streamsBuilder = subTopology.buildTopology(properties, streamsBuilder, inputStreams);
106108

107109
// retain per job key and its topology
108-
jobNameToSubTopologyKStreamApp
109-
.put(subTopologyName, new SubTopologyKStreamApp (subTopology.getJobConfigKey(), subTopology));
110+
jobNameToSubTopologyKStreamApp.put(
111+
subTopologyName, new SubTopologyKStreamApp(subTopology.getJobConfigKey(), subTopology));
110112
return streamsBuilder;
111113
}
112114

@@ -115,11 +117,7 @@ private List<String> getSubTopologiesNames(Map<String, Object> properties) {
115117
}
116118

117119
private Config getSubJobConfig(String jobName) {
118-
return configClient.getConfig(
119-
jobName,
120-
CONFIG_OVERRIDES,
121-
null,
122-
null);
120+
return configClient.getConfig(jobName, CONFIG_OVERRIDES, null, null);
123121
}
124122

125123
private Config getJobConfig(Map<String, Object> properties) {
@@ -139,8 +137,7 @@ static class SubTopologyKStreamApp {
139137
private final String subTopologyName;
140138
private final KafkaStreamsApp instance;
141139

142-
public SubTopologyKStreamApp(String subTopologyName,
143-
KafkaStreamsApp instance) {
140+
public SubTopologyKStreamApp(String subTopologyName, KafkaStreamsApp instance) {
144141
this.subTopologyName = subTopologyName;
145142
this.instance = instance;
146143
}
@@ -153,5 +150,4 @@ public KafkaStreamsApp getInstance() {
153150
return instance;
154151
}
155152
}
156-
157153
}

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java

Lines changed: 30 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package org.hypertrace.core.kafkastreams.framework;
22

3-
43
import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
54
import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
65
import static org.apache.kafka.clients.producer.ProducerConfig.BATCH_SIZE_CONFIG;
@@ -106,12 +105,15 @@ protected void doInit() {
106105

107106
app.setStateListener(new LoggingStateListener(app));
108107
app.setGlobalStateRestoreListener(new LoggingStateRestoreListener());
109-
app.setUncaughtExceptionHandler((thread, exception) -> {
110-
getLogger().error("Thread=[{}] encountered=[{}]. Will exit.", thread.getName(),
111-
ExceptionUtils.getStackTrace(exception));
108+
app.setUncaughtExceptionHandler(
109+
(thread, exception) -> {
110+
getLogger()
111+
.error(
112+
"Thread=[{}] encountered=[{}]. Will exit.",
113+
thread.getName(),
114+
ExceptionUtils.getStackTrace(exception));
112115
System.exit(1);
113-
}
114-
);
116+
});
115117

116118
getLogger().info("kafka streams topologies: {}", topology.describe());
117119
} catch (Exception e) {
@@ -141,16 +143,15 @@ protected void doStop() {
141143
}
142144

143145
/**
144-
* This method is invoked just before a subtopology is created
145-
* Any dependencies that need to be initialized need to be done here
146+
* This method is invoked just before a subtopology is created Any dependencies that need to be
147+
* initialized need to be done here
148+
*
146149
* @param subTopologyJobConfig
147150
*/
148-
protected void doInitForConsolidatedKStreamApp(Config subTopologyJobConfig){}
151+
protected void doInitForConsolidatedKStreamApp(Config subTopologyJobConfig) {}
149152

150-
/**
151-
* Cleanup any dependencies before the {@link ConsolidatedKafkaStreamsApp#doStop()} is invoked
152-
*/
153-
protected void doCleanUpForConsolidatedKStreamApp(){}
153+
/** Cleanup any dependencies before the {@link ConsolidatedKafkaStreamsApp#doStop()} is invoked */
154+
protected void doCleanUpForConsolidatedKStreamApp() {}
154155

155156
@Override
156157
public boolean healthCheck() {
@@ -168,9 +169,10 @@ public Map<String, Object> getBaseStreamsConfig() {
168169
// ##########################
169170
baseStreamsConfig.put(TOPOLOGY_OPTIMIZATION, "all");
170171
baseStreamsConfig.put(METRICS_RECORDING_LEVEL_CONFIG, "INFO");
171-
baseStreamsConfig
172-
.put(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, UseWallclockTimeOnInvalidTimestamp.class);
173-
baseStreamsConfig.put(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
172+
baseStreamsConfig.put(
173+
DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, UseWallclockTimeOnInvalidTimestamp.class);
174+
baseStreamsConfig.put(
175+
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
174176
LogAndContinueExceptionHandler.class);
175177
baseStreamsConfig.put(ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, BoundedMemoryConfigSetter.class);
176178

@@ -210,13 +212,13 @@ public Map<String, Object> getBaseStreamsConfig() {
210212
return baseStreamsConfig;
211213
}
212214

213-
public abstract StreamsBuilder buildTopology(Map<String, Object> streamsConfig,
215+
public abstract StreamsBuilder buildTopology(
216+
Map<String, Object> streamsConfig,
214217
StreamsBuilder streamsBuilder,
215218
Map<String, KStream<?, ?>> sourceStreams);
216219

217220
public Map<String, Object> getStreamsConfig(Config jobConfig) {
218-
return new HashMap<>(
219-
ConfigUtils.getFlatMapConfig(jobConfig, getStreamsConfigKey()));
221+
return new HashMap<>(ConfigUtils.getFlatMapConfig(jobConfig, getStreamsConfigKey()));
220222
}
221223

222224
public String getStreamsConfigKey() {
@@ -247,27 +249,22 @@ private Map<String, Object> getJobStreamsConfig(Config jobConfig) {
247249
return properties;
248250
}
249251

250-
/**
251-
* Merge the props into baseProps
252-
*/
253-
private Map<String, Object> mergeProperties(Map<String, Object> baseProps,
254-
Map<String, Object> props) {
252+
/** Merge the props into baseProps */
253+
private Map<String, Object> mergeProperties(
254+
Map<String, Object> baseProps, Map<String, Object> props) {
255255
props.forEach(baseProps::put);
256256
return baseProps;
257257
}
258258

259259
private void preCreateTopics(Map<String, Object> properties) {
260260
Config jobConfig = (Config) properties.get(getJobConfigKey());
261261
if (jobConfig.hasPath(PRE_CREATE_TOPICS) && jobConfig.getBoolean(PRE_CREATE_TOPICS)) {
262-
List<String> topics = Streams.concat(
263-
getInputTopics(properties).stream(),
264-
getOutputTopics(properties).stream()
265-
).collect(Collectors.toList());
266-
267-
KafkaTopicCreator
268-
.createTopics((String) properties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
269-
topics
270-
);
262+
List<String> topics =
263+
Streams.concat(getInputTopics(properties).stream(), getOutputTopics(properties).stream())
264+
.collect(Collectors.toList());
265+
266+
KafkaTopicCreator.createTopics(
267+
(String) properties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG), topics);
271268
}
272269
}
273270
}

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/exceptionhandlers/IgnoreProductionExceptionHandler.java

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,34 @@
11
package org.hypertrace.core.kafkastreams.framework.exceptionhandlers;
22

3-
import org.apache.kafka.clients.producer.ProducerRecord;
4-
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
5-
import org.slf4j.Logger;
6-
import org.slf4j.LoggerFactory;
7-
83
import java.util.ArrayList;
94
import java.util.Arrays;
105
import java.util.List;
116
import java.util.Map;
7+
import org.apache.kafka.clients.producer.ProducerRecord;
8+
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
1211

1312
public class IgnoreProductionExceptionHandler implements ProductionExceptionHandler {
14-
private static final String IGNORE_PRODUCTION_EXCEPTION_CLASSES = "ignore.production.exception.classes";
13+
private static final String IGNORE_PRODUCTION_EXCEPTION_CLASSES =
14+
"ignore.production.exception.classes";
1515

16-
private static final Logger LOGGER = LoggerFactory.getLogger(IgnoreProductionExceptionHandler.class);
16+
private static final Logger LOGGER =
17+
LoggerFactory.getLogger(IgnoreProductionExceptionHandler.class);
1718

1819
private List<Class<Exception>> ignoreExceptionClasses = new ArrayList<>();
1920

2021
@Override
21-
public ProductionExceptionHandlerResponse handle(ProducerRecord<byte[], byte[]> record, Exception exception) {
22+
public ProductionExceptionHandlerResponse handle(
23+
ProducerRecord<byte[], byte[]> record, Exception exception) {
2224
for (Class<Exception> exceptionClass : ignoreExceptionClasses) {
2325
if (exceptionClass.isInstance(exception)) {
24-
LOGGER.error("Failed to produce record to topic={}, partition={}, size={} due to exception {}. will skip this record.",
25-
record.topic(), record.partition(), record.value().length, exception.getLocalizedMessage());
26+
LOGGER.error(
27+
"Failed to produce record to topic={}, partition={}, size={} due to exception {}. will skip this record.",
28+
record.topic(),
29+
record.partition(),
30+
record.value().length,
31+
exception.getLocalizedMessage());
2632
return ProductionExceptionHandlerResponse.CONTINUE;
2733
}
2834
}
@@ -35,7 +41,8 @@ public void configure(Map<String, ?> configs) {
3541
Object configValue = configs.get(IGNORE_PRODUCTION_EXCEPTION_CLASSES);
3642
if (configValue instanceof String) {
3743
LOGGER.info("found {}={}", IGNORE_PRODUCTION_EXCEPTION_CLASSES, configValue);
38-
List<String> classNameList = Arrays.asList(((String) configValue).trim().split("\\s*,\\s*"));
44+
List<String> classNameList =
45+
Arrays.asList(((String) configValue).trim().split("\\s*,\\s*"));
3946
for (String className : classNameList) {
4047
try {
4148
ignoreExceptionClasses.add((Class<Exception>) Class.forName(className));

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/listeners/LoggingStateListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,4 @@ public void onChange(State newState, State oldState) {
2727
LOGGER.info("Application is entering [REBALANCING] phase");
2828
}
2929
}
30-
}
30+
}

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/listeners/LoggingStateRestoreListener.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,23 @@ public void onRestoreStart(TopicPartition topicPartition, String store, long sta
2323
long toRestore = end - start;
2424
totalToRestore.put(topicPartition, toRestore);
2525
startTimes.put(topicPartition, System.currentTimeMillis());
26-
LOGGER
27-
.info("Starting restoration for [{}] on topic-partition [{}] total to restore [{}]", store,
28-
topicPartition, toRestore);
26+
LOGGER.info(
27+
"Starting restoration for [{}] on topic-partition [{}] total to restore [{}]",
28+
store,
29+
topicPartition,
30+
toRestore);
2931
}
3032

3133
@Override
32-
public void onBatchRestored(TopicPartition topicPartition, String store, long start,
33-
long batchCompleted) {
34+
public void onBatchRestored(
35+
TopicPartition topicPartition, String store, long start, long batchCompleted) {
3436
NumberFormat formatter = new DecimalFormat("#.##");
3537

3638
long currentProgress = batchCompleted + restoredSoFar.getOrDefault(topicPartition, 0L);
3739
double percentComplete = (double) currentProgress / totalToRestore.get(topicPartition);
3840

39-
LOGGER.info("Completed [{}] for [{}]% of total restoration for [{}] on [{}]",
41+
LOGGER.info(
42+
"Completed [{}] for [{}]% of total restoration for [{}] on [{}]",
4043
batchCompleted, formatter.format(percentComplete * 100.00), store, topicPartition);
4144
restoredSoFar.put(topicPartition, currentProgress);
4245
}
@@ -46,7 +49,9 @@ public void onRestoreEnd(TopicPartition topicPartition, String store, long total
4649
long startTs = startTimes.remove(topicPartition);
4750
LOGGER.info(
4851
"Restoration completed for [{}] on topic-partition [{}]. Total restored [{}] records. Duration [{}]",
49-
store, topicPartition, totalRestored,
52+
store,
53+
topicPartition,
54+
totalRestored,
5055
Duration.between(Instant.ofEpochMilli(startTs), Instant.now()));
5156
restoredSoFar.remove(topicPartition);
5257
totalToRestore.remove(topicPartition);
@@ -63,4 +68,4 @@ Map<TopicPartition, Long> getRestoredSoFar() {
6368
Map<TopicPartition, Long> getStartTimes() {
6469
return startTimes;
6570
}
66-
}
71+
}

0 commit comments

Comments
 (0)