Skip to content

Commit 8bf1b79

Browse files
committed
Implementing Exercise 20 - Managing State (#11)
1 parent e6ab988 commit 8bf1b79

File tree

4 files changed

+248
-0
lines changed

4 files changed

+248
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,4 @@
2424

2525
* Implemented Exercise 16 - Merging Streams.
2626
* Implemented Exercise 18 - Aggregating Flink Data using Windowing.
27+
* Implemented Exercise 20 - Managing State in Flink.
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package userstatistics;
2+
3+
import org.apache.flink.api.common.state.ValueState;
4+
import org.apache.flink.api.common.state.ValueStateDescriptor;
5+
import org.apache.flink.configuration.Configuration;
6+
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
7+
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
8+
import org.apache.flink.util.Collector;
9+
import models.UserStatistics;
10+
11+
class ProcessUserStatisticsFunction extends ProcessWindowFunction<UserStatistics, UserStatistics, String, TimeWindow> {
12+
private ValueStateDescriptor<UserStatistics> stateDescriptor;
13+
14+
@Override
15+
public void open(Configuration parameters) throws Exception {
16+
stateDescriptor = new ValueStateDescriptor<>("User Statistics", UserStatistics.class);
17+
super.open(parameters);
18+
}
19+
20+
@Override
21+
public void process(String emailAddress, ProcessWindowFunction<UserStatistics, UserStatistics, String, TimeWindow>.Context context, Iterable<UserStatistics> statsList, Collector<UserStatistics> collector) throws Exception {
22+
ValueState<UserStatistics> state = context.globalState().getState(stateDescriptor);
23+
UserStatistics accumulatedStats = state.value();
24+
25+
for (UserStatistics newStats: statsList) {
26+
if(accumulatedStats == null)
27+
accumulatedStats = newStats;
28+
else
29+
accumulatedStats = accumulatedStats.merge(newStats);
30+
}
31+
32+
state.update(accumulatedStats);
33+
34+
collector.collect(accumulatedStats);
35+
}
36+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package userstatistics;
2+
3+
import datagen.DataGeneratorJob;
4+
import models.FlightData;
5+
import models.UserStatistics;
6+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
7+
import org.apache.flink.connector.base.DeliveryGuarantee;
8+
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
9+
import org.apache.flink.connector.kafka.sink.KafkaSink;
10+
import org.apache.flink.connector.kafka.source.KafkaSource;
11+
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
12+
import org.apache.flink.formats.json.JsonDeserializationSchema;
13+
import org.apache.flink.formats.json.JsonSerializationSchema;
14+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
15+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
16+
import org.apache.flink.streaming.api.datastream.DataStream;
17+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
18+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
19+
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
20+
import org.apache.flink.streaming.api.windowing.time.Time;
21+
22+
import java.io.InputStream;
23+
import java.util.Properties;
24+
25+
public class UserStatisticsJob {
26+
public static void main(String[] args) throws Exception {
27+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
28+
29+
Properties consumerConfig = new Properties();
30+
try (InputStream stream = DataGeneratorJob.class.getClassLoader().getResourceAsStream("consumer.properties")) {
31+
consumerConfig.load(stream);
32+
}
33+
34+
Properties producerConfig = new Properties();
35+
try (InputStream stream = DataGeneratorJob.class.getClassLoader().getResourceAsStream("producer.properties")) {
36+
producerConfig.load(stream);
37+
}
38+
39+
KafkaSource<FlightData> flightDataSource = KafkaSource.<FlightData>builder()
40+
.setProperties(consumerConfig)
41+
.setTopics("flightdata")
42+
.setStartingOffsets(OffsetsInitializer.latest())
43+
.setValueOnlyDeserializer(new JsonDeserializationSchema<>(FlightData.class))
44+
.build();
45+
46+
DataStreamSource<FlightData> flightDataStream = env
47+
.fromSource(flightDataSource, WatermarkStrategy.forMonotonousTimestamps(), "flightdata_source");
48+
49+
KafkaRecordSerializationSchema<UserStatistics> statisticsSerializer = KafkaRecordSerializationSchema.<UserStatistics>builder()
50+
.setTopic("userstatistics")
51+
.setValueSerializationSchema(new JsonSerializationSchema<>(
52+
() -> new ObjectMapper().registerModule(new JavaTimeModule())
53+
))
54+
.build();
55+
56+
KafkaSink<UserStatistics> statsSink = KafkaSink.<UserStatistics>builder()
57+
.setKafkaProducerConfig(producerConfig)
58+
.setRecordSerializer(statisticsSerializer)
59+
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
60+
.build();
61+
62+
defineWorkflow(flightDataStream)
63+
.sinkTo(statsSink)
64+
.name("userstatistics_sink")
65+
.uid("userstatistics_sink");
66+
67+
env.execute("UserStatistics");
68+
}
69+
70+
public static DataStream<UserStatistics> defineWorkflow(
71+
DataStream<FlightData> flightDataSource
72+
) {
73+
return flightDataSource
74+
.map(UserStatistics::new)
75+
.keyBy(UserStatistics::getEmailAddress)
76+
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
77+
.reduce(UserStatistics::merge, new ProcessUserStatisticsFunction());
78+
}
79+
}
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package userstatistics;
2+
3+
import models.FlightData;
4+
import models.TestHelpers;
5+
import models.UserStatistics;
6+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
7+
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
8+
import org.apache.flink.streaming.api.datastream.DataStream;
9+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
10+
import org.apache.flink.test.junit5.MiniClusterExtension;
11+
import org.junit.jupiter.api.BeforeEach;
12+
import org.junit.jupiter.api.Test;
13+
import org.junit.jupiter.api.extension.RegisterExtension;
14+
15+
import java.time.Duration;
16+
import java.util.ArrayList;
17+
import java.util.Arrays;
18+
import java.util.List;
19+
20+
import static org.junit.jupiter.api.Assertions.*;
21+
22+
class UserStatisticsJobTest {
23+
24+
StreamExecutionEnvironment env;
25+
WatermarkStrategy<FlightData> defaultWatermarkStrategy;
26+
27+
DataStream.Collector<UserStatistics> collector;
28+
29+
static final MiniClusterResourceConfiguration miniClusterConfig = new MiniClusterResourceConfiguration.Builder()
30+
.setNumberSlotsPerTaskManager(2)
31+
.setNumberTaskManagers(1)
32+
.build();
33+
34+
@RegisterExtension
35+
static final MiniClusterExtension FLINK = new MiniClusterExtension(miniClusterConfig);
36+
37+
private void assertContains(DataStream.Collector<UserStatistics> collector, List<UserStatistics> expected) {
38+
List<UserStatistics> actual = new ArrayList<>();
39+
collector.getOutput().forEachRemaining(actual::add);
40+
41+
assertEquals(expected.size(), actual.size());
42+
43+
assertTrue(actual.containsAll(expected));
44+
}
45+
46+
@BeforeEach
47+
public void setup() {
48+
env = StreamExecutionEnvironment.getExecutionEnvironment();
49+
defaultWatermarkStrategy = WatermarkStrategy
50+
.<FlightData>forMonotonousTimestamps()
51+
.withTimestampAssigner((event, timestamp) -> System.currentTimeMillis());
52+
53+
collector = new DataStream.Collector<>();
54+
}
55+
56+
@Test
57+
public void defineWorkflow_shouldConvertFlightDataToUserStatistics() throws Exception {
58+
FlightData flight = new TestHelpers.FlightDataBuilder().build();
59+
60+
DataStream<FlightData> stream = env
61+
.fromElements(flight)
62+
.assignTimestampsAndWatermarks(defaultWatermarkStrategy);
63+
64+
UserStatisticsJob
65+
.defineWorkflow(stream)
66+
.collectAsync(collector);
67+
68+
env.executeAsync();
69+
70+
UserStatistics expected = new UserStatistics(flight);
71+
72+
assertContains(collector, Arrays.asList(expected));
73+
}
74+
75+
@Test
76+
public void defineWorkflow_shouldGroupStatisticsByEmailAddress() throws Exception {
77+
String email1 = TestHelpers.generateEmail();
78+
String email2 = TestHelpers.generateEmail();
79+
80+
FlightData flight1 = new TestHelpers.FlightDataBuilder().setEmailAddress(email1).build();
81+
FlightData flight2 = new TestHelpers.FlightDataBuilder().setEmailAddress(email2).build();
82+
FlightData flight3 = new TestHelpers.FlightDataBuilder().setEmailAddress(email1).build();
83+
84+
DataStream<FlightData> stream = env
85+
.fromElements(flight1, flight2, flight3)
86+
.assignTimestampsAndWatermarks(defaultWatermarkStrategy);
87+
88+
UserStatisticsJob
89+
.defineWorkflow(stream)
90+
.collectAsync(collector);
91+
92+
env.executeAsync();
93+
94+
UserStatistics expected1 = new UserStatistics(flight1).merge(new UserStatistics(flight3));
95+
UserStatistics expected2 = new UserStatistics(flight2);
96+
97+
assertContains(collector, Arrays.asList(expected1, expected2));
98+
}
99+
100+
@Test
101+
public void defineWorkflow_shouldWindowStatisticsByMinute() throws Exception {
102+
String email = TestHelpers.generateEmail();
103+
FlightData flight1 = new TestHelpers.FlightDataBuilder().setEmailAddress(email).build();
104+
FlightData flight2 = new TestHelpers.FlightDataBuilder().setEmailAddress(email).build();
105+
FlightData flight3 = new TestHelpers.FlightDataBuilder().setEmailAddress(email).setDepartureAirportCode("LATE").build();
106+
107+
WatermarkStrategy<FlightData> watermarkStrategy = WatermarkStrategy
108+
.<FlightData>forMonotonousTimestamps()
109+
.withTimestampAssigner((event, timestamp) -> {
110+
if(event.getDepartureAirportCode().equals("LATE")) {
111+
return System.currentTimeMillis() + Duration.ofMinutes(1).toMillis();
112+
} else {
113+
return System.currentTimeMillis();
114+
}
115+
});
116+
117+
DataStream<FlightData> stream = env
118+
.fromElements(flight1, flight2, flight3)
119+
.assignTimestampsAndWatermarks(watermarkStrategy);
120+
121+
UserStatisticsJob
122+
.defineWorkflow(stream)
123+
.collectAsync(collector);
124+
125+
env.executeAsync();
126+
127+
UserStatistics expected1 = new UserStatistics(flight1).merge(new UserStatistics(flight2));
128+
UserStatistics expected2 = expected1.merge(new UserStatistics(flight3));
129+
130+
assertContains(collector, Arrays.asList(expected1, expected2));
131+
}
132+
}

0 commit comments

Comments
 (0)