Skip to content

Commit e6ab988

Browse files
committed
Implementing exercise 18 - Aggregating Windows (#9)
1 parent 642f140 commit e6ab988

File tree

9 files changed

+849
-1
lines changed

9 files changed

+849
-1
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,5 @@
2222

2323
## Version 2.0
2424

25-
* Implemented Exercise 16 - Merging Streams.
25+
* Implemented Exercise 16 - Merging Streams.
26+
* Implemented Exercise 18 - Aggregating Flink Data using Windowing.
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package models;
2+
3+
import java.time.Duration;
4+
import java.util.Objects;
5+
6+
public class UserStatistics {
7+
private String emailAddress;
8+
private Duration totalFlightDuration;
9+
private long numberOfFlights;
10+
11+
public UserStatistics() {
12+
}
13+
14+
public UserStatistics(FlightData flightData) {
15+
this.emailAddress = flightData.getEmailAddress();
16+
this.totalFlightDuration = Duration.between(
17+
flightData.getDepartureTime(),
18+
flightData.getArrivalTime()
19+
);
20+
this.numberOfFlights = 1;
21+
}
22+
23+
public String getEmailAddress() {
24+
return emailAddress;
25+
}
26+
27+
public void setEmailAddress(String emailAddress) {
28+
this.emailAddress = emailAddress;
29+
}
30+
31+
public Duration getTotalFlightDuration() {
32+
return totalFlightDuration;
33+
}
34+
35+
public void setTotalFlightDuration(Duration totalFlightDuration) {
36+
this.totalFlightDuration = totalFlightDuration;
37+
}
38+
39+
public long getNumberOfFlights() {
40+
return numberOfFlights;
41+
}
42+
43+
public void setNumberOfFlights(long numberOfFlights) {
44+
this.numberOfFlights = numberOfFlights;
45+
}
46+
47+
@Override
48+
public boolean equals(Object o) {
49+
if (this == o) return true;
50+
if (o == null || getClass() != o.getClass()) return false;
51+
UserStatistics that = (UserStatistics) o;
52+
return numberOfFlights == that.numberOfFlights && Objects.equals(emailAddress, that.emailAddress) && Objects.equals(totalFlightDuration, that.totalFlightDuration);
53+
}
54+
55+
@Override
56+
public int hashCode() {
57+
return Objects.hash(emailAddress, totalFlightDuration, numberOfFlights);
58+
}
59+
60+
@Override
61+
public String toString() {
62+
return "UserStatistics{" +
63+
"emailAddress='" + emailAddress + '\'' +
64+
", totalFlightDuration=" + totalFlightDuration +
65+
", numberOfFlights=" + numberOfFlights +
66+
'}';
67+
}
68+
69+
public UserStatistics merge(UserStatistics that) {
70+
assert(this.emailAddress.equals(that.emailAddress));
71+
72+
UserStatistics merged = new UserStatistics();
73+
74+
merged.setEmailAddress(this.emailAddress);
75+
merged.setTotalFlightDuration(this.totalFlightDuration.plus(that.getTotalFlightDuration()));
76+
merged.setNumberOfFlights(this.numberOfFlights + that.getNumberOfFlights());
77+
78+
return merged;
79+
}
80+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package userstatistics;
2+
3+
import datagen.DataGeneratorJob;
4+
import models.FlightData;
5+
import models.UserStatistics;
6+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
7+
import org.apache.flink.connector.base.DeliveryGuarantee;
8+
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
9+
import org.apache.flink.connector.kafka.sink.KafkaSink;
10+
import org.apache.flink.connector.kafka.source.KafkaSource;
11+
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
12+
import org.apache.flink.formats.json.JsonDeserializationSchema;
13+
import org.apache.flink.formats.json.JsonSerializationSchema;
14+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
15+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
16+
import org.apache.flink.streaming.api.datastream.DataStream;
17+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
18+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
19+
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
20+
import org.apache.flink.streaming.api.windowing.time.Time;
21+
22+
import java.io.InputStream;
23+
import java.util.Properties;
24+
25+
public class UserStatisticsJob {
26+
public static void main(String[] args) throws Exception {
27+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
28+
29+
Properties consumerConfig = new Properties();
30+
try (InputStream stream = DataGeneratorJob.class.getClassLoader().getResourceAsStream("consumer.properties")) {
31+
consumerConfig.load(stream);
32+
}
33+
34+
Properties producerConfig = new Properties();
35+
try (InputStream stream = DataGeneratorJob.class.getClassLoader().getResourceAsStream("producer.properties")) {
36+
producerConfig.load(stream);
37+
}
38+
39+
KafkaSource<FlightData> flightDataSource = KafkaSource.<FlightData>builder()
40+
.setProperties(consumerConfig)
41+
.setTopics("flightdata")
42+
.setStartingOffsets(OffsetsInitializer.latest())
43+
.setValueOnlyDeserializer(new JsonDeserializationSchema<>(FlightData.class))
44+
.build();
45+
46+
DataStreamSource<FlightData> flightDataStream = env
47+
.fromSource(flightDataSource, WatermarkStrategy.forMonotonousTimestamps(), "flightdata_source");
48+
49+
KafkaRecordSerializationSchema<UserStatistics> statisticsSerializer = KafkaRecordSerializationSchema.<UserStatistics>builder()
50+
.setTopic("userstatistics")
51+
.setValueSerializationSchema(new JsonSerializationSchema<>(
52+
() -> new ObjectMapper().registerModule(new JavaTimeModule())
53+
))
54+
.build();
55+
56+
KafkaSink<UserStatistics> statsSink = KafkaSink.<UserStatistics>builder()
57+
.setKafkaProducerConfig(producerConfig)
58+
.setRecordSerializer(statisticsSerializer)
59+
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
60+
.build();
61+
62+
defineWorkflow(flightDataStream)
63+
.sinkTo(statsSink)
64+
.name("userstatistics_sink")
65+
.uid("userstatistics_sink");
66+
67+
env.execute("UserStatistics");
68+
}
69+
70+
public static DataStream<UserStatistics> defineWorkflow(
71+
DataStream<FlightData> flightDataSource
72+
) {
73+
return flightDataSource
74+
.map(UserStatistics::new)
75+
.keyBy(UserStatistics::getEmailAddress)
76+
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
77+
.reduce(UserStatistics::merge);
78+
}
79+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package models;
2+
3+
import java.time.Duration;
4+
import java.util.Objects;
5+
6+
public class UserStatistics {
7+
private String emailAddress;
8+
private Duration totalFlightDuration;
9+
private long numberOfFlights;
10+
11+
public UserStatistics() {
12+
}
13+
14+
public UserStatistics(FlightData flightData) {
15+
this.emailAddress = flightData.getEmailAddress();
16+
this.totalFlightDuration = Duration.between(
17+
flightData.getDepartureTime(),
18+
flightData.getArrivalTime()
19+
);
20+
this.numberOfFlights = 1;
21+
}
22+
23+
public String getEmailAddress() {
24+
return emailAddress;
25+
}
26+
27+
public void setEmailAddress(String emailAddress) {
28+
this.emailAddress = emailAddress;
29+
}
30+
31+
public Duration getTotalFlightDuration() {
32+
return totalFlightDuration;
33+
}
34+
35+
public void setTotalFlightDuration(Duration totalFlightDuration) {
36+
this.totalFlightDuration = totalFlightDuration;
37+
}
38+
39+
public long getNumberOfFlights() {
40+
return numberOfFlights;
41+
}
42+
43+
public void setNumberOfFlights(long numberOfFlights) {
44+
this.numberOfFlights = numberOfFlights;
45+
}
46+
47+
@Override
48+
public boolean equals(Object o) {
49+
if (this == o) return true;
50+
if (o == null || getClass() != o.getClass()) return false;
51+
UserStatistics that = (UserStatistics) o;
52+
return numberOfFlights == that.numberOfFlights && Objects.equals(emailAddress, that.emailAddress) && Objects.equals(totalFlightDuration, that.totalFlightDuration);
53+
}
54+
55+
@Override
56+
public int hashCode() {
57+
return Objects.hash(emailAddress, totalFlightDuration, numberOfFlights);
58+
}
59+
60+
@Override
61+
public String toString() {
62+
return "UserStatistics{" +
63+
"emailAddress='" + emailAddress + '\'' +
64+
", totalFlightDuration=" + totalFlightDuration +
65+
", numberOfFlights=" + numberOfFlights +
66+
'}';
67+
}
68+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package userstatistics;
2+
3+
import datagen.DataGeneratorJob;
4+
import models.FlightData;
5+
import models.UserStatistics;
6+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
7+
import org.apache.flink.connector.base.DeliveryGuarantee;
8+
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
9+
import org.apache.flink.connector.kafka.sink.KafkaSink;
10+
import org.apache.flink.connector.kafka.source.KafkaSource;
11+
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
12+
import org.apache.flink.formats.json.JsonDeserializationSchema;
13+
import org.apache.flink.formats.json.JsonSerializationSchema;
14+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
15+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
16+
import org.apache.flink.streaming.api.datastream.DataStream;
17+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
18+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
19+
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
20+
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
21+
import org.apache.flink.streaming.api.windowing.time.Time;
22+
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
23+
import org.apache.flink.util.Collector;
24+
25+
import java.io.InputStream;
26+
import java.util.Properties;
27+
28+
public class UserStatisticsJob {
29+
public static void main(String[] args) throws Exception {
30+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
31+
32+
Properties consumerConfig = new Properties();
33+
try (InputStream stream = DataGeneratorJob.class.getClassLoader().getResourceAsStream("consumer.properties")) {
34+
consumerConfig.load(stream);
35+
}
36+
37+
Properties producerConfig = new Properties();
38+
try (InputStream stream = DataGeneratorJob.class.getClassLoader().getResourceAsStream("producer.properties")) {
39+
producerConfig.load(stream);
40+
}
41+
42+
KafkaSource<FlightData> flightDataSource = KafkaSource.<FlightData>builder()
43+
.setProperties(consumerConfig)
44+
.setTopics("<INPUT_TOPIC>")
45+
.setStartingOffsets(OffsetsInitializer.latest())
46+
.setValueOnlyDeserializer(new JsonDeserializationSchema<>(FlightData.class))
47+
.build();
48+
49+
DataStreamSource<FlightData> flightDataStream = env
50+
.fromSource(flightDataSource, WatermarkStrategy.noWatermarks(), "flightdata_source");
51+
52+
KafkaRecordSerializationSchema<UserStatistics> statisticsSerializer = KafkaRecordSerializationSchema.<UserStatistics>builder()
53+
.setTopic("<OUTPUT_TOPIC>")
54+
.setValueSerializationSchema(new JsonSerializationSchema<>(
55+
() -> new ObjectMapper().registerModule(new JavaTimeModule())
56+
))
57+
.build();
58+
59+
KafkaSink<UserStatistics> statsSink = KafkaSink.<UserStatistics>builder()
60+
.setKafkaProducerConfig(producerConfig)
61+
.setRecordSerializer(statisticsSerializer)
62+
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
63+
.build();
64+
65+
defineWorkflow(flightDataStream)
66+
.sinkTo(statsSink)
67+
.name("userstatistics_sink")
68+
.uid("userstatistics_sink");
69+
70+
env.execute("UserStatistics");
71+
}
72+
73+
public static DataStream<UserStatistics> defineWorkflow(
74+
DataStream<FlightData> flightDataSource
75+
) {
76+
return null;
77+
}
78+
}

0 commit comments

Comments
 (0)