Skip to content

Commit fc67437

Browse files
committed
perf:优化
1 parent e4d43f2 commit fc67437

File tree

14 files changed

+52
-81
lines changed

14 files changed

+52
-81
lines changed

.idea/encodings.xml

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
# Flink 从入门到实战 - Scala/Java双语言版本
22

3-
# 1 核心知识点
3+
## 1 核心知识点
44
![](https://upload-images.jianshu.io/upload_images/16782311-dc4156dc0a34d557.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
5-
## 编程模型及核心概念
5+
### 编程模型及核心概念
66
Flink中的DataSet & DataStream
77

88
Flink编程模型、延迟执行
99

1010
Flink中支持的数据类型
1111

1212
![](https://upload-images.jianshu.io/upload_images/16782311-77ad64efc672f89b.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
13-
## DataSet API编程
13+
### DataSet API编程
1414

1515
DataSet中的Data Source、Sink
1616

@@ -20,7 +20,7 @@ Transformation核心应用
2020

2121

2222
![](https://upload-images.jianshu.io/upload_images/16782311-6d7c1b11f42bb125.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
23-
## DataStream API编程
23+
### DataStream API编程
2424

2525
DataStream中的Data Source及自定义实现
2626

@@ -31,7 +31,7 @@ Transformation核心应用
3131

3232

3333
![](https://upload-images.jianshu.io/upload_images/16782311-8ec74ae8f1b686da.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
34-
## Flink Table API&SQL编程
34+
### Flink Table API&SQL编程
3535

3636
Flink关系型API介绍
3737

@@ -41,7 +41,7 @@ Table&SQL API其他功能
4141

4242

4343
![](https://upload-images.jianshu.io/upload_images/16782311-7f673e9fb901a151.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
44-
## Flink中的Time及Windows操作
44+
### Flink中的Time及Windows操作
4545

4646
三种不同Time详解
4747

@@ -79,11 +79,8 @@ Flink常见的监控方式
7979

8080
常见的Flink调优策略
8181

82-
# 2 以下知名公司都在使用Flink
82+
# 2 公司都在用Flink
8383
![](https://upload-images.jianshu.io/upload_images/16782311-9447a7c96178832b.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
8484

85-
# X 交流学习
86-
![](https://upload-images.jianshu.io/upload_images/16782311-8d7acde57fdce062.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
87-
## [Java交流群](https://jq.qq.com/?_wv=1027&k=5UB4P1T)
88-
## [博客](http://www.shishusheng.com)
89-
## [Github](https://github.com/Wasabi1234)
85+
## X 交流学习
86+
### [Github](https://github.com/Java-Edge)

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,4 +311,4 @@
311311
</profile>
312312
</profiles>
313313

314-
</project>
314+
</project>

src/main/java/com/javaedge/java/chapter3/JavaDataSetSinkApp.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.javaedge.java.chapter3;
22

33
/**
4-
* @author sss
4+
* @author JavaEdge
55
* @date 2019-07-18
66
*/
77

src/main/java/com/javaedge/java/chapter4/JavaCustomSinkToMySQL.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,6 @@
55
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
66
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
77

8-
/**
9-
* @author JavaEdge
10-
*
11-
* @date 2019-07-21
12-
*/
138
public class JavaCustomSinkToMySQL {
149
public static void main(String[] args) throws Exception {
1510
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

src/main/java/com/javaedge/java/chapter4/SinkToMySQL.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,7 @@
77
import java.sql.DriverManager;
88
import java.sql.PreparedStatement;
99

10-
/**
11-
* @author JavaEdge
12-
*
13-
* @date 2019-07-21
14-
*/
15-
public class SinkToMySQL extends RichSinkFunction<Student>{
10+
public class SinkToMySQL extends RichSinkFunction<Student> {
1611

1712
Connection connection;
1813
PreparedStatement pstmt;
@@ -25,7 +20,7 @@ private Connection getConnection() {
2520

2621
String url = "jdbc:mysql://localhost:3306/javaedge_flink";
2722

28-
conn = DriverManager.getConnection(url,"root","root");
23+
conn = DriverManager.getConnection(url, "root", "123456");
2924

3025
} catch (Exception e) {
3126
e.printStackTrace();
@@ -36,6 +31,7 @@ private Connection getConnection() {
3631

3732
/**
3833
* 在open方法中建立connection
34+
*
3935
* @param parameters
4036
* @throws Exception
4137
*/
@@ -73,17 +69,18 @@ public void invoke(Student value, Context context) throws Exception {
7369

7470
/**
7571
* 在close方法中要释放资源
72+
*
7673
* @throws Exception
7774
*/
7875
@Override
7976
public void close() throws Exception {
8077
super.close();
8178

82-
if(pstmt != null) {
79+
if (pstmt != null) {
8380
pstmt.close();
8481
}
8582

86-
if(connection != null) {
83+
if (connection != null) {
8784
connection.close();
8885
}
8986
}

src/main/java/com/javaedge/java/chapter4/Student.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,4 @@ public class Student {
1414
private String name;
1515

1616
private int age;
17-
}
18-
17+
}

src/main/java/com/javaedge/java/chapter6/JavaWindowsProcessApp.java

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,28 +10,23 @@
1010
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
1111
import org.apache.flink.util.Collector;
1212

13-
/**
14-
* @author JavaEdge
15-
*
16-
* @date 2019-07-23
17-
*/
1813
public class JavaWindowsProcessApp {
1914

2015
public static void main(String[] args) throws Exception {
2116
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2217
DataStreamSource<String> text = env.socketTextStream("localhost", 9999);
2318

2419
text.flatMap(new FlatMapFunction<String, Tuple2<Integer, Integer>>() {
25-
@Override
26-
public void flatMap(String value, Collector<Tuple2<Integer, Integer>> out) throws Exception {
27-
String[] tokens = value.toLowerCase().split(",");
28-
for (String token : tokens) {
29-
if (token.length() > 0) {
30-
out.collect(new Tuple2<>(1, Integer.parseInt(token)));
20+
@Override
21+
public void flatMap(String value, Collector<Tuple2<Integer, Integer>> out) throws Exception {
22+
String[] tokens = value.toLowerCase().split(",");
23+
for (String token : tokens) {
24+
if (token.length() > 0) {
25+
out.collect(new Tuple2<>(1, Integer.parseInt(token)));
26+
}
27+
}
3128
}
32-
}
33-
}
34-
}).keyBy(0)
29+
}).keyBy(0)
3530
.timeWindow(Time.seconds(5))
3631
.process(new ProcessWindowFunction<Tuple2<Integer, Integer>, Object, Tuple, TimeWindow>() {
3732
@Override

src/main/java/com/javaedge/java/chapter6/JavaWindowsReduceApp.java

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,28 +8,23 @@
88
import org.apache.flink.streaming.api.windowing.time.Time;
99
import org.apache.flink.util.Collector;
1010

11-
/**
12-
* @author JavaEdge
13-
*
14-
* @date 2019-07-23
15-
*/
1611
public class JavaWindowsReduceApp {
1712

1813
public static void main(String[] args) throws Exception {
1914
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
20-
DataStreamSource<String> text = env.socketTextStream("localhost",9999);
15+
DataStreamSource<String> text = env.socketTextStream("localhost", 9999);
2116

22-
text.flatMap(new FlatMapFunction<String, Tuple2<Integer,Integer>>() {
23-
@Override
24-
public void flatMap(String value, Collector<Tuple2<Integer, Integer>> out) throws Exception {
25-
String[] tokens = value.toLowerCase().split(",");
26-
for(String token : tokens) {
27-
if(token.length() > 0) {
28-
out.collect(new Tuple2<>(1, Integer.parseInt(token)));
17+
text.flatMap(new FlatMapFunction<String, Tuple2<Integer, Integer>>() {
18+
@Override
19+
public void flatMap(String value, Collector<Tuple2<Integer, Integer>> out) throws Exception {
20+
String[] tokens = value.toLowerCase().split(",");
21+
for (String token : tokens) {
22+
if (token.length() > 0) {
23+
out.collect(new Tuple2<>(1, Integer.parseInt(token)));
24+
}
25+
}
2926
}
30-
}
31-
}
32-
}).keyBy(0)
27+
}).keyBy(0)
3328
.timeWindow(Time.seconds(5))
3429
.reduce(new ReduceFunction<Tuple2<Integer, Integer>>() {
3530
@Override

src/main/resources/init.sql

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
create database javaedge_flink;
2+
3+
create table student
4+
(
5+
id int(11) NOT NULL AUTO_INCREMENT,
6+
name varchar(25),
7+
age int(10),
8+
primary key (id)
9+
);

0 commit comments

Comments
 (0)