Skip to content

Commit 01f0441

Browse files
authored
Initial code
1 parent 6164b6a commit 01f0441

File tree

28 files changed

+2381
-1
lines changed

28 files changed

+2381
-1
lines changed

.gitpod.yml

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
image: gitpod/workspace-java-11
2+
3+
tasks:
4+
- before: |
5+
./install_flink.sh
6+
export PATH="$PATH:/workspace/learn-building-flink-applications-in-java-exercises/flink-1.17.0/bin"
7+
start-cluster.sh
8+
taskmanager.sh start
9+
taskmanager.sh start
10+
taskmanager.sh start
11+
exit
12+
- command: gp preview http://localhost:8081 && exit
13+
- name: Data Generator
14+
command: |
15+
export PATH="$PATH:/workspace/learn-building-flink-applications-in-java-exercises/flink-1.17.0/bin"
16+
cd exercises
17+
- name: Flight Importer
18+
command: |
19+
export PATH="$PATH:/workspace/learn-building-flink-applications-in-java-exercises/flink-1.17.0/bin"
20+
cd exercises
21+
- name: User Statistics
22+
command: |
23+
export PATH="$PATH:/workspace/learn-building-flink-applications-in-java-exercises/flink-1.17.0/bin"
24+
cd exercises
25+
- name: Misc
26+
command: |
27+
export PATH="$PATH:/workspace/learn-building-flink-applications-in-java-exercises/flink-1.17.0/bin"
28+
cd exercises
29+
30+
ports:
31+
- port: 8081
32+
onOpen: ignore
33+
34+
vscode:
35+
extensions:
36+
- vscjava.vscode-java-pack
37+
38+
workspaceLocation: /workspace/learn-building-flink-applications-in-java-exercises

CHANGELOG.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,8 @@
22

33
## Version 0.1
44

5-
* Initial commit of required files for a public repo.
5+
* Initial commit of required files for a public repo.
6+
7+
## Version 1.0
8+
9+
* Initial commit of exercise code.

exercises/exercise.sh

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
#!/bin/bash
2+
set -euo pipefail
3+
IFS=$'\n\t'
4+
5+
EXERCISE_DIR=./
6+
SOLUTIONS_DIR=../solutions
7+
STAGING_DIR=../staging
8+
9+
if [ ! -d $STAGING_DIR ]; then
10+
echo "$STAGING_DIR could not be found."
11+
exit 1
12+
fi
13+
14+
if [ ! -d $SOLUTIONS_DIR ]; then
15+
echo "$SOLUTIONS_DIR could not be found."
16+
exit 1
17+
fi
18+
19+
function help() {
20+
echo "Usage:"
21+
echo " exercises.sh <Command>"
22+
echo " Commands:"
23+
echo " stage <Exercise Filter> - Setup the exercise."
24+
echo " <Exercise Filter> - A portion of the exercise name (eg. the exercise number) that will be used to select the exercise."
25+
echo " solve <Exercise Filter> <File Filter> - Solve the exercise."
26+
echo " <Exercise Filter> - A portion of the exercise name (eg. the exercise number) that will be used to select the exercise."
27+
echo " <File Filter> - (Optional) A portion of a file name that will be used to select while file to copy from the solution."
28+
echo " list - List all exercises."
29+
echo " Exercise Filter: A portion of the name of the exercise. Eg. The Exercise Number. If multiple matches are found, the first one will be chosen."
30+
}
31+
32+
function stage() {
33+
EXERCISE_FILTER=$1
34+
MATCHED_EXERCISES=($(ls $STAGING_DIR | grep ".*$EXERCISE_FILTER.*"))
35+
EXERCISE=${MATCHED_EXERCISES[0]}
36+
37+
echo "STAGING $EXERCISE"
38+
39+
cp -r $STAGING_DIR/$EXERCISE/. $EXERCISE_DIR
40+
}
41+
42+
function solve() {
43+
EXERCISE_FILTER=$1
44+
FILE_FILTER=${2:-""}
45+
MATCHED_EXERCISES=($(ls $SOLUTIONS_DIR | grep ".*$EXERCISE_FILTER.*"))
46+
EXERCISE=${MATCHED_EXERCISES[0]}
47+
SOLUTION=$SOLUTIONS_DIR/$EXERCISE
48+
49+
if [ -z $FILE_FILTER ]; then
50+
echo "SOLVING $EXERCISE"
51+
52+
cp -r $SOLUTION/. $EXERCISE_DIR
53+
else
54+
WORKING_DIR=$(pwd)
55+
cd $SOLUTION
56+
MATCHED_FILES=($(find . -iname "*$FILE_FILTER*"))
57+
cd $WORKING_DIR
58+
59+
if [ -z ${MATCHED_FILES:-""} ]; then
60+
echo "FILE NOT FOUND: $FILE_FILTER"
61+
exit 1
62+
fi
63+
64+
FILE_PATH=${MATCHED_FILES[0]}
65+
66+
echo "COPYING $FILE_PATH FROM $EXERCISE"
67+
68+
cp $SOLUTION/$FILE_PATH $EXERCISE_DIR/$FILE_PATH
69+
fi
70+
71+
}
72+
73+
function list() {
74+
EXERCISES=$(ls $SOLUTIONS_DIR)
75+
76+
for ex in "${EXERCISES[@]}"
77+
do
78+
echo "$ex"
79+
done
80+
}
81+
82+
COMMAND=${1:-"help"}
83+
84+
## Determine which command is being requested, and execute it.
85+
if [ "$COMMAND" = "stage" ]; then
86+
EXERCISE_FILTER=${2:-""}
87+
if [ -z $EXERCISE_FILTER ]; then
88+
echo "MISSING EXERCISE ID"
89+
help
90+
exit 1
91+
fi
92+
stage $EXERCISE_FILTER
93+
elif [ "$COMMAND" = "solve" ]; then
94+
EXERCISE_FILTER=${2:-""}
95+
FILE_FILTER=${3:-""}
96+
if [ -z $EXERCISE_FILTER ]; then
97+
echo "MISSING EXERCISE ID"
98+
help
99+
exit 1
100+
fi
101+
solve $EXERCISE_FILTER $FILE_FILTER
102+
elif [ "$COMMAND" = "list" ]; then
103+
list
104+
elif [ "$COMMAND" = "help" ]; then
105+
help
106+
else
107+
echo "INVALID COMMAND: $COMMAND"
108+
help
109+
exit 1
110+
fi

install_flink.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
curl https://dlcdn.apache.org/flink/flink-1.17.0/flink-1.17.0-bin-scala_2.12.tgz --output flink-1.17.0.tgz
2+
tar -xzf flink-*.tgz

solutions/05-running-a-job/.gitignore

Whitespace-only changes.
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package flightimporter;
2+
3+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
4+
import org.apache.flink.api.common.serialization.SimpleStringSchema;
5+
import org.apache.flink.connector.kafka.source.KafkaSource;
6+
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
7+
import org.apache.flink.streaming.api.datastream.DataStream;
8+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
9+
10+
import java.io.InputStream;
11+
import java.util.Properties;
12+
13+
public class FlightImporterJob {
14+
15+
public static void main(String[] args) throws Exception {
16+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
17+
18+
Properties consumerConfig = new Properties();
19+
try (InputStream stream = FlightImporterJob.class.getClassLoader().getResourceAsStream("consumer.properties")) {
20+
consumerConfig.load(stream);
21+
}
22+
23+
KafkaSource<String> skyOneSource = KafkaSource.<String>builder()
24+
.setProperties(consumerConfig)
25+
.setTopics("skyone")
26+
.setStartingOffsets(OffsetsInitializer.latest())
27+
.setValueOnlyDeserializer(new SimpleStringSchema())
28+
.build();
29+
30+
DataStream<String> skyOneStream = env
31+
.fromSource(skyOneSource, WatermarkStrategy.noWatermarks(), "skyone_source");
32+
33+
skyOneStream.print();
34+
35+
env.execute("FlightImporter");
36+
}
37+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package flightimporter;
2+
3+
import models.SkyOneAirlinesFlightData;
4+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
5+
import org.apache.flink.connector.kafka.source.KafkaSource;
6+
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
7+
import org.apache.flink.formats.json.JsonDeserializationSchema;
8+
import org.apache.flink.streaming.api.datastream.DataStream;
9+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
10+
11+
import java.io.InputStream;
12+
import java.util.Properties;
13+
14+
public class FlightImporterJob {
15+
16+
public static void main(String[] args) throws Exception {
17+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
18+
19+
Properties consumerConfig = new Properties();
20+
try (InputStream stream = FlightImporterJob.class.getClassLoader().getResourceAsStream("consumer.properties")) {
21+
consumerConfig.load(stream);
22+
}
23+
24+
KafkaSource<SkyOneAirlinesFlightData> skyOneSource = KafkaSource.<SkyOneAirlinesFlightData>builder()
25+
.setProperties(consumerConfig)
26+
.setTopics("skyone")
27+
.setStartingOffsets(OffsetsInitializer.latest())
28+
.setValueOnlyDeserializer(new JsonDeserializationSchema(SkyOneAirlinesFlightData.class))
29+
.build();
30+
31+
DataStream<SkyOneAirlinesFlightData> skyOneStream = env
32+
.fromSource(skyOneSource, WatermarkStrategy.noWatermarks(), "skyone_source");
33+
34+
skyOneStream.print();
35+
36+
env.execute("FlightImporter");
37+
}
38+
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package models;
2+
3+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonFormat;
4+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
5+
6+
import java.time.ZonedDateTime;
7+
import java.util.Objects;
8+
9+
@JsonIgnoreProperties(ignoreUnknown = true)
10+
public class SkyOneAirlinesFlightData {
11+
private String emailAddress;
12+
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
13+
private ZonedDateTime flightDepartureTime;
14+
private String iataDepartureCode;
15+
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
16+
private ZonedDateTime flightArrivalTime;
17+
private String iataArrivalCode;
18+
private String flightNumber;
19+
private String confirmation;
20+
21+
public SkyOneAirlinesFlightData() {
22+
23+
}
24+
25+
public String getEmailAddress() {
26+
return emailAddress;
27+
}
28+
29+
public void setEmailAddress(String emailAddress) {
30+
this.emailAddress = emailAddress;
31+
}
32+
33+
public ZonedDateTime getFlightDepartureTime() {
34+
return flightDepartureTime;
35+
}
36+
37+
public void setFlightDepartureTime(ZonedDateTime flightDepartureTime) {
38+
this.flightDepartureTime = flightDepartureTime;
39+
}
40+
41+
public String getIataDepartureCode() {
42+
return iataDepartureCode;
43+
}
44+
45+
public void setIataDepartureCode(String iataDepartureCode) {
46+
this.iataDepartureCode = iataDepartureCode;
47+
}
48+
49+
public ZonedDateTime getFlightArrivalTime() {
50+
return flightArrivalTime;
51+
}
52+
53+
public void setFlightArrivalTime(ZonedDateTime flightArrivalTime) {
54+
this.flightArrivalTime = flightArrivalTime;
55+
}
56+
57+
public String getIataArrivalCode() {
58+
return iataArrivalCode;
59+
}
60+
61+
public void setIataArrivalCode(String iataArrivalCode) {
62+
this.iataArrivalCode = iataArrivalCode;
63+
}
64+
65+
public String getFlightNumber() {
66+
return flightNumber;
67+
}
68+
69+
public void setFlightNumber(String flightNumber) {
70+
this.flightNumber = flightNumber;
71+
}
72+
73+
public String getConfirmation() {
74+
return confirmation;
75+
}
76+
77+
public void setConfirmation(String confirmation) {
78+
this.confirmation = confirmation;
79+
}
80+
81+
@Override
82+
public boolean equals(Object o) {
83+
if (this == o) return true;
84+
if (o == null || getClass() != o.getClass()) return false;
85+
SkyOneAirlinesFlightData that = (SkyOneAirlinesFlightData) o;
86+
return Objects.equals(emailAddress, that.emailAddress) && Objects.equals(flightDepartureTime, that.flightDepartureTime) && Objects.equals(iataDepartureCode, that.iataDepartureCode) && Objects.equals(flightArrivalTime, that.flightArrivalTime) && Objects.equals(iataArrivalCode, that.iataArrivalCode) && Objects.equals(flightNumber, that.flightNumber) && Objects.equals(confirmation, that.confirmation);
87+
}
88+
89+
@Override
90+
public int hashCode() {
91+
return Objects.hash(emailAddress, flightDepartureTime, iataDepartureCode, flightArrivalTime, iataArrivalCode, flightNumber, confirmation);
92+
}
93+
94+
@Override
95+
public String toString() {
96+
return "SkyOneAirlinesFlightData{" +
97+
"emailAddress='" + emailAddress + '\'' +
98+
", flightDepartureTime=" + flightDepartureTime +
99+
", iataDepartureCode='" + iataDepartureCode + '\'' +
100+
", flightArrivalTime=" + flightArrivalTime +
101+
", iataArrivalCode='" + iataArrivalCode + '\'' +
102+
", flightNumber='" + flightNumber + '\'' +
103+
", confirmation='" + confirmation + '\'' +
104+
'}';
105+
}
106+
}

0 commit comments

Comments
 (0)