Skip to content

Commit 642f140

Browse files
committed
Implementing Exercise 16 - Merging Streams. (#7)
1 parent 4ca7454 commit 642f140

File tree

6 files changed

+654
-1
lines changed

6 files changed

+654
-1
lines changed

CHANGELOG.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,8 @@
1818

1919
## Version 1.1.1
2020

21-
* Upgrade to Flink 1.17.1
21+
* Upgrade to Flink 1.17.1
22+
23+
## Version 2.0
24+
25+
* Implemented Exercise 16 - Merging Streams.
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package flightimporter;
2+
3+
import models.SkyOneAirlinesFlightData;
4+
import models.FlightData;
5+
import models.SunsetAirFlightData;
6+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
7+
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
8+
import org.apache.flink.connector.kafka.sink.KafkaSink;
9+
import org.apache.flink.connector.kafka.source.KafkaSource;
10+
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
11+
import org.apache.flink.formats.json.JsonDeserializationSchema;
12+
import org.apache.flink.formats.json.JsonSerializationSchema;
13+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
14+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
15+
import org.apache.flink.streaming.api.datastream.DataStream;
16+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
17+
18+
import java.time.ZonedDateTime;
19+
import java.io.InputStream;
20+
import java.util.Properties;
21+
22+
public class FlightImporterJob {
23+
24+
public static void main(String[] args) throws Exception {
25+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
26+
27+
Properties consumerConfig = new Properties();
28+
try (InputStream stream = FlightImporterJob.class.getClassLoader().getResourceAsStream("consumer.properties")) {
29+
consumerConfig.load(stream);
30+
}
31+
32+
Properties producerConfig = new Properties();
33+
try (InputStream stream = FlightImporterJob.class.getClassLoader().getResourceAsStream("producer.properties")) {
34+
producerConfig.load(stream);
35+
}
36+
37+
KafkaSource<SkyOneAirlinesFlightData> skyOneSource = KafkaSource.<SkyOneAirlinesFlightData>builder()
38+
.setProperties(consumerConfig)
39+
.setTopics("skyone")
40+
.setStartingOffsets(OffsetsInitializer.latest())
41+
.setValueOnlyDeserializer(new JsonDeserializationSchema(SkyOneAirlinesFlightData.class))
42+
.build();
43+
44+
KafkaSource<SunsetAirFlightData> sunsetSource = KafkaSource.<SkyOneAirlinesFlightData>builder()
45+
.setProperties(consumerConfig)
46+
.setTopics("sunset")
47+
.setStartingOffsets(OffsetsInitializer.latest())
48+
.setValueOnlyDeserializer(new JsonDeserializationSchema(SunsetAirFlightData.class))
49+
.build();
50+
51+
DataStream<SkyOneAirlinesFlightData> skyOneStream = env
52+
.fromSource(skyOneSource, WatermarkStrategy.noWatermarks(), "skyone_source");
53+
54+
DataStream<SunsetAirFlightData> sunsetStream = env
55+
.fromSource(sunsetSource, WatermarkStrategy.noWatermarks(), "sunset_source");
56+
57+
KafkaRecordSerializationSchema<FlightData> flightSerializer = KafkaRecordSerializationSchema.<FlightData>builder()
58+
.setTopic("flightdata")
59+
.setValueSerializationSchema(new JsonSerializationSchema<FlightData>(
60+
() -> {
61+
return new ObjectMapper()
62+
.registerModule(new JavaTimeModule());
63+
}
64+
))
65+
.build();
66+
67+
KafkaSink<FlightData> flightSink = KafkaSink.<FlightData>builder()
68+
.setKafkaProducerConfig(producerConfig)
69+
.setRecordSerializer(flightSerializer)
70+
.build();
71+
72+
defineWorkflow(skyOneStream, sunsetStream)
73+
.sinkTo(flightSink)
74+
.name("flightdata_sink");
75+
76+
env.execute("FlightImporter");
77+
}
78+
79+
public static DataStream<FlightData> defineWorkflow(DataStream<SkyOneAirlinesFlightData> skyOneSource, DataStream<SunsetAirFlightData> sunsetSource) {
80+
DataStream<FlightData> skyOneFlightStream = skyOneSource
81+
.filter(flight -> flight.getFlightArrivalTime().isAfter(ZonedDateTime.now()))
82+
.map(flight -> flight.toFlightData());
83+
84+
DataStream<FlightData> sunsetFlightStream = sunsetSource
85+
.filter(flight -> flight.getArrivalTime().isAfter(ZonedDateTime.now()))
86+
.map(flight -> flight.toFlightData());
87+
88+
return skyOneFlightStream.union(sunsetFlightStream);
89+
}
90+
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package models;
2+
3+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
4+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonFormat;
5+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
6+
7+
import java.time.ZonedDateTime;
8+
import java.util.Objects;
9+
10+
@JsonIgnoreProperties(ignoreUnknown = true)
11+
public class SunsetAirFlightData {
12+
private String customerEmailAddress;
13+
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
14+
private ZonedDateTime departureTime;
15+
private String departureAirport;
16+
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
17+
private ZonedDateTime arrivalTime;
18+
private String arrivalAirport;
19+
private String flightId;
20+
private String referenceNumber;
21+
22+
@JsonCreator
23+
public SunsetAirFlightData() {
24+
}
25+
26+
public String getCustomerEmailAddress() {
27+
return customerEmailAddress;
28+
}
29+
30+
public void setCustomerEmailAddress(String customerEmailAddress) {
31+
this.customerEmailAddress = customerEmailAddress;
32+
}
33+
34+
public ZonedDateTime getDepartureTime() {
35+
return departureTime;
36+
}
37+
38+
public void setDepartureTime(ZonedDateTime departureTime) {
39+
this.departureTime = departureTime;
40+
}
41+
42+
public String getDepartureAirport() {
43+
return departureAirport;
44+
}
45+
46+
public void setDepartureAirport(String departureAirport) {
47+
this.departureAirport = departureAirport;
48+
}
49+
50+
public ZonedDateTime getArrivalTime() {
51+
return arrivalTime;
52+
}
53+
54+
public void setArrivalTime(ZonedDateTime arrivalTime) {
55+
this.arrivalTime = arrivalTime;
56+
}
57+
58+
public String getArrivalAirport() {
59+
return arrivalAirport;
60+
}
61+
62+
public void setArrivalAirport(String arrivalAirport) {
63+
this.arrivalAirport = arrivalAirport;
64+
}
65+
66+
public String getFlightId() {
67+
return flightId;
68+
}
69+
70+
public void setFlightId(String flightId) {
71+
this.flightId = flightId;
72+
}
73+
74+
public String getReferenceNumber() {
75+
return referenceNumber;
76+
}
77+
78+
public void setReferenceNumber(String referenceNumber) {
79+
this.referenceNumber = referenceNumber;
80+
}
81+
82+
@Override
83+
public boolean equals(Object o) {
84+
if (this == o) return true;
85+
if (o == null || getClass() != o.getClass()) return false;
86+
SunsetAirFlightData that = (SunsetAirFlightData) o;
87+
return Objects.equals(customerEmailAddress, that.customerEmailAddress) && Objects.equals(departureTime, that.departureTime) && Objects.equals(departureAirport, that.departureAirport) && Objects.equals(arrivalTime, that.arrivalTime) && Objects.equals(arrivalAirport, that.arrivalAirport) && Objects.equals(flightId, that.flightId) && Objects.equals(referenceNumber, that.referenceNumber);
88+
}
89+
90+
@Override
91+
public int hashCode() {
92+
return Objects.hash(customerEmailAddress, departureTime, departureAirport, arrivalTime, arrivalAirport, flightId, referenceNumber);
93+
}
94+
95+
@Override
96+
public String toString() {
97+
return "SkyOneAirlinesFlightData{" +
98+
"customerEmailAddress='" + customerEmailAddress + '\'' +
99+
", departureTime=" + departureTime +
100+
", departureAirport='" + departureAirport + '\'' +
101+
", arrivalTime=" + arrivalTime +
102+
", arrivalAirport='" + arrivalAirport + '\'' +
103+
", flightId='" + flightId + '\'' +
104+
", referenceNumber='" + referenceNumber + '\'' +
105+
'}';
106+
}
107+
108+
public FlightData toFlightData() {
109+
FlightData flightData = new FlightData();
110+
111+
flightData.setEmailAddress(getCustomerEmailAddress());
112+
flightData.setDepartureTime(getDepartureTime());
113+
flightData.setDepartureAirportCode(getDepartureAirport());
114+
flightData.setArrivalTime(getArrivalTime());
115+
flightData.setArrivalAirportCode(getArrivalAirport());
116+
flightData.setFlightNumber(getFlightId());
117+
flightData.setConfirmationCode(getReferenceNumber());
118+
119+
return flightData;
120+
}
121+
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package flightimporter;
2+
3+
import models.FlightData;
4+
import models.SkyOneAirlinesFlightData;
5+
import models.SunsetAirFlightData;
6+
import models.TestHelpers;
7+
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
8+
import org.apache.flink.streaming.api.datastream.DataStream;
9+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
10+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
11+
import org.apache.flink.test.junit5.MiniClusterExtension;
12+
import org.junit.jupiter.api.BeforeEach;
13+
import org.junit.jupiter.api.Test;
14+
import org.junit.jupiter.api.extension.RegisterExtension;
15+
16+
import java.time.ZonedDateTime;
17+
import java.util.ArrayList;
18+
import java.util.Arrays;
19+
import java.util.List;
20+
21+
import static org.junit.jupiter.api.Assertions.assertEquals;
22+
import static org.junit.jupiter.api.Assertions.assertTrue;
23+
24+
class FlightImporterJobTest {
25+
26+
StreamExecutionEnvironment env;
27+
DataStream.Collector<FlightData> collector;
28+
29+
static final MiniClusterResourceConfiguration miniClusterConfig = new MiniClusterResourceConfiguration.Builder()
30+
.setNumberSlotsPerTaskManager(2)
31+
.setNumberTaskManagers(1)
32+
.build();
33+
34+
@RegisterExtension
35+
static final MiniClusterExtension FLINK = new MiniClusterExtension(miniClusterConfig);
36+
37+
private void assertContains(DataStream.Collector<FlightData> collector, List<FlightData> expected) {
38+
List<FlightData> actual = new ArrayList<>();
39+
collector.getOutput().forEachRemaining(actual::add);
40+
41+
assertEquals(expected.size(), actual.size());
42+
43+
assertTrue(actual.containsAll(expected));
44+
}
45+
46+
@BeforeEach
47+
public void setup() {
48+
env = StreamExecutionEnvironment.getExecutionEnvironment();
49+
collector = new DataStream.Collector<>();
50+
}
51+
52+
@Test
53+
public void defineWorkflow_shouldConvertDataFromTwoStreams() throws Exception {
54+
SkyOneAirlinesFlightData skyOneFlight = new TestHelpers.SkyOneBuilder().build();
55+
SunsetAirFlightData sunsetFlight = new TestHelpers.SunsetBuilder().build();
56+
57+
DataStreamSource<SkyOneAirlinesFlightData> skyOneStream = env.fromElements(skyOneFlight);
58+
DataStreamSource<SunsetAirFlightData> sunsetStream = env.fromElements(sunsetFlight);
59+
60+
FlightImporterJob
61+
.defineWorkflow(skyOneStream, sunsetStream)
62+
.collectAsync(collector);
63+
64+
env.executeAsync();
65+
66+
assertContains(collector, Arrays.asList(skyOneFlight.toFlightData(), sunsetFlight.toFlightData()));
67+
}
68+
69+
@Test
70+
public void defineWorkflow_shouldFilterOutFlightsInThePast() throws Exception {
71+
SkyOneAirlinesFlightData newSkyOneFlight = new TestHelpers.SkyOneBuilder()
72+
.setFlightArrivalTime(ZonedDateTime.now().plusMinutes(1))
73+
.build();
74+
SkyOneAirlinesFlightData oldSkyOneFlight = new TestHelpers.SkyOneBuilder()
75+
.setFlightArrivalTime(ZonedDateTime.now().minusSeconds(1))
76+
.build();
77+
78+
SunsetAirFlightData newSunsetFlight = new TestHelpers.SunsetBuilder()
79+
.setArrivalTime(ZonedDateTime.now().plusMinutes(1))
80+
.build();
81+
SunsetAirFlightData oldSunsetFlight = new TestHelpers.SunsetBuilder()
82+
.setArrivalTime(ZonedDateTime.now().minusSeconds(1))
83+
.build();
84+
85+
DataStreamSource<SkyOneAirlinesFlightData> skyOneStream = env.fromElements(newSkyOneFlight, oldSkyOneFlight);
86+
DataStreamSource<SunsetAirFlightData> sunsetStream = env.fromElements(newSunsetFlight, oldSunsetFlight);
87+
88+
FlightImporterJob
89+
.defineWorkflow(skyOneStream, sunsetStream)
90+
.collectAsync(collector);
91+
92+
env.executeAsync();
93+
94+
assertContains(collector, Arrays.asList(newSkyOneFlight.toFlightData(), newSunsetFlight.toFlightData()));
95+
}
96+
}

0 commit comments

Comments
 (0)