Skip to content

Commit 1791a19

Browse files
ll076110OT-XY
andauthored
[hotfix-#1821][hdfs]Modify orc file calculation method (#1822)
Co-authored-by: OT-XY <[email protected]>
1 parent 8204be8 commit 1791a19

File tree

5 files changed

+69
-60
lines changed

5 files changed

+69
-60
lines changed

chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsOrcOutputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ protected void openSource() {
149149
@Override
150150
// todo the deviation needs to be calculated accurately
151151
protected long getCurrentFileSize() {
152-
return (long) (bytesWriteCounter.getLocalValue() * getDeviation());
152+
return (long) ((bytesWriteCounter.getLocalValue() - lastWriteSize) * getDeviation());
153153
}
154154

155155
@Override

chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsParquetOutputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ protected void nextBlock() {
156156
public void flushDataInternal() {
157157
log.info(
158158
"Close current parquet record writer, write data size:[{}]",
159-
SizeUnitType.readableFileSize(bytesWriteCounter.getLocalValue()));
159+
SizeUnitType.readableFileSize(bytesWriteCounter.getLocalValue() - lastWriteSize));
160160
try {
161161
if (writer != null) {
162162
writer.close();

chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsTextOutputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ protected void nextBlock() {
7777
public void flushDataInternal() {
7878
log.info(
7979
"Close current text stream, write data size:[{}]",
80-
SizeUnitType.readableFileSize(bytesWriteCounter.getLocalValue()));
80+
SizeUnitType.readableFileSize(bytesWriteCounter.getLocalValue() - lastWriteSize));
8181

8282
try {
8383
if (stream != null) {

chunjun-core/src/main/java/com/dtstack/chunjun/sink/format/BaseFileOutputFormat.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public abstract class BaseFileOutputFormat extends BaseRichOutputFormat {
5858
protected List<String> preCommitFilePathList = new ArrayList<>();
5959
protected long nextNumForCheckDataSize;
6060
protected long lastWriteTime = System.currentTimeMillis();
61+
protected long lastWriteSize;
6162

6263
@Override
6364
public void initializeGlobal(int parallelism) {
@@ -155,6 +156,7 @@ public void flushData() {
155156
if (rowsOfCurrentBlock != 0) {
156157
flushDataInternal();
157158
sumRowsOfBlock += rowsOfCurrentBlock;
159+
lastWriteSize = bytesWriteCounter.getLocalValue();
158160
log.info(
159161
"flush file:{}, rowsOfCurrentBlock = {}, sumRowsOfBlock = {}",
160162
currentFileName,

chunjun-local-test/pom.xml

Lines changed: 64 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -51,71 +51,78 @@
5151
<artifactId>chunjun-connector-stream</artifactId>
5252
<version>${project.version}</version>
5353
</dependency>
54-
<dependency>
55-
<groupId>com.dtstack.chunjun</groupId>
56-
<artifactId>chunjun-connector-binlog</artifactId>
57-
<version>${project.version}</version>
58-
</dependency>
54+
<!--rdb test start-->
55+
<!-- <dependency>-->
56+
<!-- <groupId>com.dtstack.chunjun</groupId>-->
57+
<!-- <artifactId>chunjun-connector-mysql</artifactId>-->
58+
<!-- <version>${project.version}</version>-->
59+
<!-- </dependency>-->
60+
<!--rdb test end-->
61+
62+
<!--hdfs test start-->
63+
<!-- <dependency>-->
64+
<!-- <groupId>com.dtstack.chunjun</groupId>-->
65+
<!-- <artifactId>chunjun-connector-hdfs</artifactId>-->
66+
<!-- <version>${project.version}</version>-->
67+
<!-- </dependency>-->
68+
<!-- <dependency>-->
69+
<!-- <artifactId>hadoop-mapreduce-client-core</artifactId>-->
70+
<!-- <exclusions>-->
71+
<!-- <exclusion>-->
72+
<!-- <artifactId>commons-cli</artifactId>-->
73+
<!-- <groupId>commons-cli</groupId>-->
74+
<!-- </exclusion>-->
75+
<!-- <exclusion>-->
76+
<!-- <artifactId>snappy-java</artifactId>-->
77+
<!-- <groupId>org.xerial.snappy</groupId>-->
78+
<!-- </exclusion>-->
79+
<!-- </exclusions>-->
80+
<!-- <groupId>org.apache.hadoop</groupId>-->
81+
<!-- <version>2.7.5</version>-->
82+
<!-- </dependency>-->
83+
<!-- <dependency>-->
84+
<!-- <artifactId>hadoop-common</artifactId>-->
85+
<!-- <exclusions>-->
86+
<!-- <exclusion>-->
87+
<!-- <artifactId>gson</artifactId>-->
88+
<!-- <groupId>com.google.code.gson</groupId>-->
89+
<!-- </exclusion>-->
90+
<!-- <exclusion>-->
91+
<!-- <artifactId>commons-cli</artifactId>-->
92+
<!-- <groupId>commons-cli</groupId>-->
93+
<!-- </exclusion>-->
94+
<!-- <exclusion>-->
95+
<!-- <artifactId>snappy-java</artifactId>-->
96+
<!-- <groupId>org.xerial.snappy</groupId>-->
97+
<!-- </exclusion>-->
98+
<!-- </exclusions>-->
99+
<!-- <groupId>org.apache.hadoop</groupId>-->
100+
<!-- <version>2.7.5</version>-->
101+
<!-- </dependency>-->
102+
<!-- <dependency>-->
103+
<!-- <artifactId>hadoop-hdfs</artifactId>-->
104+
<!-- <exclusions>-->
105+
<!-- <exclusion>-->
106+
<!-- <artifactId>commons-cli</artifactId>-->
107+
<!-- <groupId>commons-cli</groupId>-->
108+
<!-- </exclusion>-->
109+
<!-- <exclusion>-->
110+
<!-- <artifactId>netty-all</artifactId>-->
111+
<!-- <groupId>io.netty</groupId>-->
112+
<!-- </exclusion>-->
113+
<!-- </exclusions>-->
114+
<!-- <groupId>org.apache.hadoop</groupId>-->
115+
<!-- <version>2.7.5</version>-->
116+
<!-- </dependency>-->
117+
<!--hdfs test end-->
59118

60-
<dependency>
61-
<groupId>com.dtstack.chunjun</groupId>
62-
<artifactId>chunjun-connector-iceberg</artifactId>
63-
<version>${project.version}</version>
64-
</dependency>
65-
<dependency>
66-
<groupId>com.dtstack.chunjun</groupId>
67-
<artifactId>chunjun-connector-mongodb</artifactId>
68-
<version>${project.version}</version>
69-
</dependency>
70-
71-
<dependency>
72-
<groupId>com.dtstack.chunjun</groupId>
73-
<artifactId>chunjun-connector-emqx</artifactId>
74-
<version>${project.version}</version>
75-
</dependency>
76-
<dependency>
77-
<groupId>com.dtstack.chunjun</groupId>
78-
<artifactId>chunjun-connector-jdbc-base</artifactId>
79-
<version>${project.version}</version>
80-
</dependency>
81-
82-
<dependency>
83-
<groupId>com.dtstack.chunjun</groupId>
84-
<artifactId>chunjun-connector-mysql</artifactId>
85-
<version>${project.version}</version>
86-
</dependency>
87-
88-
<dependency>
89-
<groupId>com.dtstack.chunjun</groupId>
90-
<artifactId>chunjun-connector-kafka</artifactId>
91-
<version>${project.version}</version>
92-
</dependency>
93-
94-
<dependency>
95-
<groupId>com.dtstack.chunjun</groupId>
96-
<artifactId>chunjun-connector-kingbase</artifactId>
97-
<version>${project.version}</version>
98-
</dependency>
99-
100-
<!-- -->
101-
<dependency>
102-
<groupId>com.dtstack.chunjun</groupId>
103-
<artifactId>chunjun-connector-ftp</artifactId>
104-
<version>${project.version}</version>
105-
</dependency>
106119

107120
<dependency>
108121
<groupId>org.apache.flink</groupId>
109122
<artifactId>flink-json</artifactId>
110123
<version>${flink.version}</version>
111124
</dependency>
112125

113-
<dependency>
114-
<groupId>com.dtstack.chunjun</groupId>
115-
<artifactId>chunjun-connector-oceanbase</artifactId>
116-
<version>master</version>
117-
</dependency>
118-
119126
<dependency>
120127
<groupId>org.apache.avro</groupId>
121128
<artifactId>avro</artifactId>

0 commit comments

Comments
 (0)