Skip to content

Commit b5191c6

Browse files
Merge pull request #77 from RADAR-base/release-0.3.0
Release 0.3.0
2 parents 7adbef1 + 599809b commit b5191c6

File tree

10 files changed

+116
-23
lines changed

10 files changed

+116
-23
lines changed

Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@ RUN ./gradlew distTar && \
3232
tar xf build/distributions/*.tar && \
3333
rm build/distributions/*.tar
3434

35-
FROM confluentinc/cp-base:3.3.1
35+
FROM confluentinc/cp-base:4.1.0
3636

37-
MAINTAINER Nivethika M <nivethika@thehyve.nl> , Joris Borgdorff <joris@thehyve.nl>
37+
MAINTAINER Nivethika M <nivethika@thehyve.nl> , Joris Borgdorff <joris@thehyve.nl> , Yatharth Ranjan <yatharth.ranjan@kcl.ac.uk>
3838

3939
LABEL description="RADAR-CNS Backend streams and monitor"
4040

build.gradle

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ plugins {
1111
//---------------------------------------------------------------------------//
1212

1313
group = 'org.radarcns'
14-
version = '0.2.3'
14+
version = '0.3.0'
15+
1516
ext.description = 'Kafka backend for processing device data.'
1617

1718
mainClassName = 'org.radarcns.RadarBackend'
@@ -24,9 +25,9 @@ sourceCompatibility = '1.8'
2425

2526
ext.boundaryVersion = '1.0.6'
2627
ext.codacyVersion = '1.0.10'
27-
ext.confluentVersion = '3.3.1'
28+
ext.confluentVersion = '4.1.0'
2829
ext.hamcrestVersion = '1.3'
29-
ext.kafkaVersion = '0.11.0.2'
30+
ext.kafkaVersion = '1.1.0'
3031
ext.jacksonVersion='2.8.5'
3132
ext.javaMailVersion = '1.5.6'
3233
ext.junitVersion = '4.12'

gradle/codacy.gradle

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,21 @@ configurations {
88
codacy
99
}
1010

11+
jacoco {
12+
toolVersion = "0.8.1"
13+
}
14+
1115
dependencies {
12-
codacy group: 'com.github.codacy', name: 'codacy-coverage-reporter', version: '2.0.1'
16+
codacy group: 'com.github.codacy', name: 'codacy-coverage-reporter', version: '4.0.1'
1317
}
1418

1519
jacocoTestReport {
16-
executionData test, integrationTest
1720
reports {
1821
xml.enabled true
1922
csv.enabled false
2023
html.enabled true
2124
}
25+
executionData test, integrationTest
2226
}
2327

2428
task sendCoverageToCodacy(type: JavaExec, dependsOn: jacocoTestReport) {

gradle/test.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ dependencies {
3434
integrationTestImplementation group: 'org.slf4j', name: 'slf4j-log4j12', version: slf4jVersion
3535

3636
// For Topic name validation based on Kafka classes
37-
testImplementation (group: 'org.apache.kafka', name: 'kafka_2.11', version: kafkaVersion) {
37+
testCompile (group: 'org.apache.kafka', name: 'kafka_2.11', version: kafkaVersion) {
3838
exclude group: 'org.apache.kafka', module: 'kafka-clients'
3939
exclude group: 'net.sf.jopt-simple'
4040
exclude group: 'com.yammer.metrics'

radar.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ broker:
1919

2020
#Kafka internal parameters
2121
stream_properties:
22-
auto_commit_interval_ms: 1000
2322
max.request.size: 3500042 #Set message.max.bytes for kafka brokers higher than or equal to this value
2423
retries: 15
2524
session_timeout_ms: 20000

src/integrationTest/docker/docker-compose.yml

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
---
22
version: '2'
33

4+
networks:
5+
kafka:
6+
driver: bridge
47
services:
58
#---------------------------------------------------------------------------#
69
# Zookeeper Cluster #
710
#---------------------------------------------------------------------------#
811
zookeeper-1:
9-
image: confluentinc/cp-zookeeper:3.3.1
12+
image: confluentinc/cp-zookeeper:4.1.0
13+
networks:
14+
- kafka
1015
environment:
1116
ZOOKEEPER_SERVER_ID: 1
1217
ZOOKEEPER_CLIENT_PORT: 2181
@@ -19,9 +24,11 @@ services:
1924
# Kafka Cluster #
2025
#---------------------------------------------------------------------------#
2126
kafka-1:
22-
image: confluentinc/cp-kafka:3.3.1
27+
image: confluentinc/cp-kafka:4.1.0
2328
depends_on:
2429
- zookeeper-1
30+
networks:
31+
- kafka
2532
environment:
2633
KAFKA_BROKER_ID: 1
2734
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
@@ -33,9 +40,11 @@ services:
3340
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
3441

3542
kafka-2:
36-
image: confluentinc/cp-kafka:3.3.1
43+
image: confluentinc/cp-kafka:4.1.0
3744
depends_on:
3845
- zookeeper-1
46+
networks:
47+
- kafka
3948
environment:
4049
KAFKA_BROKER_ID: 2
4150
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
@@ -47,9 +56,11 @@ services:
4756
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
4857

4958
kafka-3:
50-
image: confluentinc/cp-kafka:3.3.1
59+
image: confluentinc/cp-kafka:4.1.0
5160
depends_on:
5261
- zookeeper-1
62+
networks:
63+
- kafka
5364
environment:
5465
KAFKA_BROKER_ID: 3
5566
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
@@ -64,12 +75,14 @@ services:
6475
# Schema Registry #
6576
#---------------------------------------------------------------------------#
6677
schema-registry-1:
67-
image: confluentinc/cp-schema-registry:3.3.1
78+
image: confluentinc/cp-schema-registry:4.1.0
6879
depends_on:
6980
- zookeeper-1
7081
- kafka-1
7182
- kafka-2
7283
- kafka-3
84+
networks:
85+
- kafka
7386
restart: always
7487
ports:
7588
- "8081:8081"
@@ -83,12 +96,14 @@ services:
8396
# REST proxy #
8497
#---------------------------------------------------------------------------#
8598
rest-proxy-1:
86-
image: confluentinc/cp-kafka-rest:3.3.1
99+
image: confluentinc/cp-kafka-rest:4.1.0
87100
depends_on:
88101
- kafka-1
89102
- kafka-2
90103
- kafka-3
91104
- schema-registry-1
105+
networks:
106+
- kafka
92107
ports:
93108
- "8082:8082"
94109
environment:
@@ -109,8 +124,9 @@ services:
109124
depends_on:
110125
- kafka-1
111126
- schema-registry-1
112-
command:
113-
- integrationTest
127+
networks:
128+
- kafka
129+
command: integrationTest
114130
volumes:
115131
- ../../../build/jacoco:/code/build/jacoco
116132
- ../../../build/reports:/code/build/reports

src/main/java/org/radarcns/monitor/SourceStatisticsMonitor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public SourceStatisticsMonitor(RadarPropertyHandler radar,
6060
SourceStatisticsMonitorConfig config) {
6161
super(radar, config.getTopics(), Objects.requireNonNull(config.getName(),
6262
"Source statistics monitor must have a name"), "1-"
63-
+ config.getOutputTopic(),
63+
+ config.getOutputTopic() + UUID.randomUUID(),
6464
new SourceStatisticsState());
6565

6666
if (getStateStore() == null) {

src/main/java/org/radarcns/stream/GeneralStreamGroup.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ protected Collection<StreamDefinition> createWindowedSensorStream(String input,
111111
Collection<StreamDefinition> streams = Arrays.stream(TimeWindowMetadata.values())
112112
.map(w -> new StreamDefinition(
113113
new KafkaTopic(input), new KafkaTopic(w.getTopicLabel(outputBase)),
114-
w.getIntervalInMilliSec()))
114+
w.getIntervalInMilliSec(), getCommitIntervalForTimeWindow(w)))
115115
.collect(Collectors.toList());
116116

117117
topicNames.addAll(streams.stream()
@@ -135,6 +135,25 @@ public void addTopicNames(Collection<String> topicNames) {
135135
this.topicNames.addAll(topicNames);
136136
}
137137

138+
public long getCommitIntervalForTimeWindow(TimeWindowMetadata metadata) {
139+
switch (metadata) {
140+
case ONE_DAY:
141+
return CommitInterval.COMMIT_INTERVAL_FOR_ONE_DAY.getCommitInterval();
142+
case ONE_MIN:
143+
return CommitInterval.COMMIT_INTERVAL_FOR_ONE_MIN.getCommitInterval();
144+
case TEN_MIN:
145+
return CommitInterval.COMMIT_INTERVAL_FOR_TEN_MIN.getCommitInterval();
146+
case ONE_HOUR:
147+
return CommitInterval.COMMIT_INTERVAL_FOR_ONE_HOUR.getCommitInterval();
148+
case ONE_WEEK:
149+
return CommitInterval.COMMIT_INTERVAL_FOR_ONE_WEEK.getCommitInterval();
150+
case TEN_SECOND:
151+
return CommitInterval.COMMIT_INTERVAL_FOR_TEN_SECOND.getCommitInterval();
152+
default:
153+
return CommitInterval.COMMIT_INTERVAL_DEFAULT.getCommitInterval();
154+
}
155+
}
156+
138157
@Override
139158
public Collection<StreamDefinition> getStreamDefinition(String inputTopic) {
140159
Collection<StreamDefinition> topic = topicMap.get(inputTopic);
@@ -150,4 +169,25 @@ public List<String> getTopicNames() {
150169
topicList.sort(String.CASE_INSENSITIVE_ORDER);
151170
return topicList;
152171
}
172+
173+
public enum CommitInterval {
174+
COMMIT_INTERVAL_FOR_TEN_SECOND(10_000L),
175+
COMMIT_INTERVAL_FOR_ONE_MIN(30_000L),
176+
COMMIT_INTERVAL_FOR_TEN_MIN(300_000L),
177+
COMMIT_INTERVAL_FOR_ONE_HOUR(1800_000L),
178+
COMMIT_INTERVAL_FOR_ONE_DAY(7200_000L),
179+
COMMIT_INTERVAL_FOR_ONE_WEEK(10800_000L),
180+
COMMIT_INTERVAL_DEFAULT(30_000L);
181+
182+
private final long commitInterval;
183+
184+
185+
CommitInterval(long commitInterval) {
186+
this.commitInterval = commitInterval;
187+
}
188+
189+
public long getCommitInterval() {
190+
return commitInterval;
191+
}
192+
}
153193
}

src/main/java/org/radarcns/stream/KStreamWorker.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,15 @@
2424
import java.util.List;
2525
import java.util.Properties;
2626
import java.util.concurrent.ScheduledFuture;
27+
import java.util.concurrent.ThreadLocalRandom;
2728
import java.util.function.Function;
2829
import java.util.stream.Collectors;
2930
import javax.annotation.Nonnull;
3031
import org.apache.avro.Schema;
3132
import org.apache.avro.specific.SpecificRecord;
3233
import org.apache.kafka.streams.KafkaStreams;
3334
import org.apache.kafka.streams.KeyValue;
35+
import org.apache.kafka.streams.StreamsConfig;
3436
import org.apache.kafka.streams.errors.StreamsException;
3537
import org.apache.kafka.streams.kstream.KStream;
3638
import org.apache.kafka.streams.kstream.KStreamBuilder;
@@ -126,8 +128,14 @@ protected Properties getStreamProperties(@Nonnull StreamDefinition definition) {
126128
localClientId += '-' + window.sizeMs + '-' + window.advanceMs;
127129
}
128130

129-
return kafkaProperty.getStreamProperties(localClientId, numThreads,
131+
Properties props = kafkaProperty.getStreamProperties(localClientId, numThreads,
130132
DeviceTimestampExtractor.class);
133+
long interval = (long)(ThreadLocalRandom.current().nextDouble(0.75, 1.25)
134+
* definition.getCommitIntervalMs());
135+
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
136+
String.valueOf(interval));
137+
138+
return props;
131139
}
132140

133141
/**

src/main/java/org/radarcns/stream/StreamDefinition.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.radarcns.stream;
1818

19+
import static org.radarcns.stream.GeneralStreamGroup.CommitInterval.COMMIT_INTERVAL_DEFAULT;
1920
import static org.radarcns.util.Comparison.compare;
2021

2122
import java.util.Objects;
@@ -24,10 +25,12 @@
2425
import org.apache.kafka.streams.kstream.TimeWindows;
2526
import org.radarcns.topic.KafkaTopic;
2627

28+
2729
public class StreamDefinition implements Comparable<StreamDefinition> {
2830
private final KafkaTopic inputTopic;
2931
private final KafkaTopic outputTopic;
3032
private final TimeWindows window;
33+
private final long commitIntervalMs;
3134

3235
/**
3336
* Constructor. It takes in input the topic name to be consumed and to topic name where the
@@ -36,7 +39,7 @@ public class StreamDefinition implements Comparable<StreamDefinition> {
3639
* @param output output {@link KafkaTopic}
3740
*/
3841
public StreamDefinition(@Nonnull KafkaTopic input, @Nonnull KafkaTopic output) {
39-
this(input, output, 0L);
42+
this(input, output, 0L, COMMIT_INTERVAL_DEFAULT.getCommitInterval());
4043
}
4144

4245
/**
@@ -47,24 +50,41 @@ public StreamDefinition(@Nonnull KafkaTopic input, @Nonnull KafkaTopic output) {
4750
* @param window time window for aggregation.
4851
*/
4952
public StreamDefinition(@Nonnull KafkaTopic input, @Nonnull KafkaTopic output, long window) {
50-
this(input, output, window == 0 ? null : TimeWindows.of(window));
53+
this(input, output, window == 0 ? null : TimeWindows.of(window),
54+
COMMIT_INTERVAL_DEFAULT.getCommitInterval());
55+
}
56+
57+
/**
58+
* Constructor. It takes in input the topic name to be consumed and to topic name where the
59+
* related stream will write the computed values.
60+
* @param input source {@link KafkaTopic}
61+
* @param output output {@link KafkaTopic}
62+
* @param window time window for aggregation.
63+
* @param commitIntervalMs The commit.interval.ms config for the stream
64+
*/
65+
public StreamDefinition(@Nonnull KafkaTopic input, @Nonnull KafkaTopic output, long window,
66+
long commitIntervalMs) {
67+
this(input, output, window == 0 ? null : TimeWindows.of(window), commitIntervalMs);
5168
}
5269

70+
5371
/**
5472
* Constructor. It takes in input the topic name to be consumed and to topic name where the
5573
* related stream will write the computed values.
5674
* @param input source {@link KafkaTopic}
5775
* @param output output {@link KafkaTopic}
5876
* @param window time window for aggregation.
77+
* @param commitIntervalMs The commit.interval.ms config for the stream
5978
*/
6079
public StreamDefinition(@Nonnull KafkaTopic input, @Nonnull KafkaTopic output,
61-
@Nullable TimeWindows window) {
80+
@Nullable TimeWindows window, @Nonnull long commitIntervalMs) {
6281
Objects.requireNonNull(input);
6382
Objects.requireNonNull(output);
6483

6584
this.inputTopic = input;
6685
this.outputTopic = output;
6786
this.window = window;
87+
this.commitIntervalMs = commitIntervalMs;
6888
}
6989

7090
@Nonnull
@@ -94,6 +114,11 @@ public TimeWindows getTimeWindows() {
94114
return window;
95115
}
96116

117+
@Nullable
118+
public long getCommitIntervalMs(){
119+
return commitIntervalMs;
120+
}
121+
97122
@Override
98123
public boolean equals(Object o) {
99124
if (this == o) {

0 commit comments

Comments
 (0)