Skip to content

Commit e4d43f2

Browse files
committed
update
1 parent c3ee72b commit e4d43f2

File tree

8 files changed

+36
-36
lines changed

8 files changed

+36
-36
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,6 @@
33
*.class
44
*.log
55

6+
/.idea
7+
target
8+
.idea

pom.xml

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,19 @@
1111
<name>Flink Tutorial</name>
1212
<url>http://www.myorganization.org</url>
1313

14-
<repositories>
15-
<repository>
16-
<id>apache.snapshots</id>
17-
<name>Apache Development Snapshot Repository</name>
18-
<url>https://repository.apache.org/content/repositories/snapshots/</url>
19-
<releases>
20-
<enabled>false</enabled>
21-
</releases>
22-
<snapshots>
23-
<enabled>true</enabled>
24-
</snapshots>
25-
</repository>
26-
</repositories>
14+
<!-- <repositories>-->
15+
<!-- <repository>-->
16+
<!-- <id>apache.snapshots</id>-->
17+
<!-- <name>Apache Development Snapshot Repository</name>-->
18+
<!-- <url>https://repository.apache.org/content/repositories/snapshots/</url>-->
19+
<!-- <releases>-->
20+
<!-- <enabled>false</enabled>-->
21+
<!-- </releases>-->
22+
<!-- <snapshots>-->
23+
<!-- <enabled>true</enabled>-->
24+
<!-- </snapshots>-->
25+
<!-- </repository>-->
26+
<!-- </repositories>-->
2727

2828
<properties>
2929
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

src/main/java/TestJava.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
import org.relaxng.datatype.DatatypeException;
2-
31
import java.text.ParseException;
42
import java.text.SimpleDateFormat;
53
import java.util.Date;

src/main/java/com/javaedge/java/chapter3/JavaDataSetDataSourceApp.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@ public class JavaDataSetDataSourceApp {
1414

1515
public static void main(String[] args) throws Exception {
1616
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
17-
// fromCollection(env);
17+
fromCollection(env);
1818
textFile(env);
1919
}
2020

2121
public static void textFile(ExecutionEnvironment env) throws Exception {
22-
String filePath = "file:///Volumes/doc/data/data.txt";
22+
String filePath = "file:///Volumes/Download/data/data.txt";
2323
env.readTextFile(filePath).print();
2424
System.out.println("===========~这是一个分割线~============");
2525

src/main/java/com/javaedge/java/chapter4/JavaDataStreamSourceApp.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ public static void main(String[] args) throws Exception {
1313
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
1414

1515
// socketFunction(env);
16-
// nonParallelSourceFunction(env);
17-
parallelSourceFunction(env);
16+
nonParallelSourceFunction(env);
17+
// parallelSourceFunction(env);
1818

1919
env.execute("JavaDataStreamSourceApp");
2020
}

src/main/java/com/javaedge/java/chapter4/JavaDataStreamTransformationApp.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
public class JavaDataStreamTransformationApp {
1818
public static void main(String[] args) throws Exception {
1919
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
20-
// filterFunction(env);
20+
filterFunction(env);
2121
// unionFunction(env);
22-
splitSelectFunction(env);
22+
// splitSelectFunction(env);
2323
env.execute("JavaDataStreamTransformationApp");
2424
}
2525

src/main/java/com/javaedge/java/chapter5/JavaTableSQLAPI.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public static void main(String[] args) throws Exception {
2222

2323
DataSet<Sales> csv = env.readCsvFile(filePath)
2424
.ignoreFirstLine()
25-
.pojoType(Sales.class,"transactionId","customerId","itemId","amountPaid");
25+
.pojoType(Sales.class, "transactionId", "customerId", "itemId", "amountPaid");
2626
//csv.print();
2727

2828
Table sales = tableEnv.fromDataSet(csv);
@@ -33,11 +33,11 @@ public static void main(String[] args) throws Exception {
3333
result.print();
3434
}
3535

36-
public static class Sales{
36+
public static class Sales {
3737
public String transactionId;
3838
public String customerId;
3939
public String itemId;
40-
public Double amountPaid;
40+
public Double amountPaid;
4141
}
4242
}
4343

src/main/java/com/javaedge/java/chapter6/JavaWindowsApp.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,25 +9,24 @@
99

1010
/**
1111
* @author JavaEdge
12-
*
1312
* @date 2019-07-23
1413
*/
1514
public class JavaWindowsApp {
1615

1716
public static void main(String[] args) throws Exception {
1817
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
19-
DataStreamSource<String> text = env.socketTextStream("localhost",9999);
20-
text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
21-
@Override
22-
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
23-
String[] tokens = value.toLowerCase().split(",");
24-
for(String token : tokens) {
25-
if(token.length() > 0) {
26-
out.collect(new Tuple2<>(token, 1));
18+
DataStreamSource<String> text = env.socketTextStream("localhost", 9999);
19+
text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
20+
@Override
21+
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
22+
String[] tokens = value.toLowerCase().split(",");
23+
for (String token : tokens) {
24+
if (token.length() > 0) {
25+
out.collect(new Tuple2<>(token, 1));
26+
}
27+
}
2728
}
28-
}
29-
}
30-
}).keyBy(0)
29+
}).keyBy(0)
3130
.timeWindow(Time.seconds(5))
3231
.sum(1)
3332
.print()

0 commit comments

Comments
 (0)