Skip to content

Commit 24b9713

Browse files
yanghuaiGitdujie
andauthored
[Feature-#1948][doris] performance and resource optimization (#1949)
Co-authored-by: dujie <[email protected]>
1 parent d7570c7 commit 24b9713

26 files changed

+1670
-287
lines changed

chunjun-connectors/chunjun-connector-doris/pom.xml

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,24 @@
3838

3939

4040
<dependencies>
41+
<dependency>
42+
<groupId>com.fasterxml.jackson.core</groupId>
43+
<artifactId>jackson-databind</artifactId>
44+
<version>2.9.10.1</version>
45+
</dependency>
46+
47+
<dependency>
48+
<groupId>org.apache.derby</groupId>
49+
<artifactId>derby</artifactId>
50+
<version>10.10.2.0</version>
51+
</dependency>
52+
53+
<dependency>
54+
<groupId>io.airlift</groupId>
55+
<artifactId>slice</artifactId>
56+
<version>0.41</version>
57+
</dependency>
58+
4159
<dependency>
4260
<groupId>com.dtstack.chunjun</groupId>
4361
<artifactId>chunjun-connector-mysql</artifactId>
@@ -51,6 +69,43 @@
5169
<groupId>org.apache.maven.plugins</groupId>
5270
<artifactId>maven-antrun-plugin</artifactId>
5371
</plugin>
72+
<plugin>
73+
<groupId>org.apache.maven.plugins</groupId>
74+
<artifactId>maven-shade-plugin</artifactId>
75+
<version>3.1.0</version>
76+
<executions>
77+
<execution>
78+
<phase>package</phase>
79+
<goals>
80+
<goal>shade</goal>
81+
</goals>
82+
<configuration>
83+
<createDependencyReducedPom>false</createDependencyReducedPom>
84+
<transformers>
85+
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer" />
86+
</transformers>
87+
<relocations>
88+
<relocation>
89+
<pattern>com.fasterxml.jackson.core</pattern>
90+
<shadedPattern>shade.dorisbatch.com.fasterxml.jackson.core</shadedPattern>
91+
</relocation>
92+
<relocation>
93+
<pattern>com.fasterxml.jackson.databind</pattern>
94+
<shadedPattern>shade.dorisbatch.com.fasterxml.jackson.databind</shadedPattern>
95+
</relocation>
96+
<relocation>
97+
<pattern>com.fasterxml.jackson.annotation</pattern>
98+
<shadedPattern>shade.dorisbatch.com.fasterxml.jackson.annotation</shadedPattern>
99+
</relocation>
100+
<relocation>
101+
<pattern>org.apache.http</pattern>
102+
<shadedPattern>shade.core.org.apache.http</shadedPattern>
103+
</relocation>
104+
</relocations>
105+
</configuration>
106+
</execution>
107+
</executions>
108+
</plugin>
54109
</plugins>
55110
</build>
56111

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.dtstack.chunjun.connector.doris.buffer;
2+
3+
import java.io.IOException;
4+
import java.io.InputStream;
5+
import java.util.function.Supplier;
6+
7+
public interface BufferFlusher {
8+
void write(InputStream inputStream, int length) throws Exception;
9+
10+
void write(Supplier<InputStream> supplier, int length) throws Exception;
11+
12+
void close() throws IOException;
13+
}
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
package com.dtstack.chunjun.connector.doris.buffer;
2+
3+
import io.airlift.slice.Slice;
4+
import io.airlift.slice.Slices;
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
7+
8+
import java.util.concurrent.atomic.AtomicInteger;
9+
import java.util.concurrent.locks.ReentrantLock;
10+
import java.util.function.Consumer;
11+
12+
import static java.nio.charset.StandardCharsets.UTF_8;
13+
14+
class BufferPool implements IBufferPool {
15+
private static final Logger LOG = LoggerFactory.getLogger(BufferPool.class);
16+
private static final byte OPEN_BRACKET = 0x5B; // [
17+
private static final byte COMMA = 0x2C; // ,
18+
private static final byte CLOSE_BRACKET = 0x5D; // ]
19+
20+
private final int capacity;
21+
private volatile int position;
22+
private volatile BufferState state;
23+
private final ReentrantLock writeLock = new ReentrantLock();
24+
private final int bufferId;
25+
private final AtomicInteger writeCount = new AtomicInteger(0);
26+
private final BufferFlusher flusher;;
27+
private final Consumer<Long> numWriterCount;
28+
private final Consumer<Long> bytesWriterCount;
29+
30+
private Slice slice;
31+
32+
public enum BufferState {
33+
AVAILABLE, // 可用状态
34+
WRITING, // 正在写入
35+
FLUSHING, // 正在刷写
36+
FULL // 已满待刷写
37+
}
38+
39+
public BufferPool(
40+
int capacity,
41+
int bufferId,
42+
BufferFlusher flusher,
43+
Consumer<Long> numWriterCount,
44+
Consumer<Long> bytesWriterCount) {
45+
this.capacity = capacity;
46+
this.slice = Slices.allocate(capacity);
47+
this.position = 0;
48+
this.bufferId = bufferId;
49+
this.flusher = flusher;
50+
this.numWriterCount = numWriterCount;
51+
this.bytesWriterCount = bytesWriterCount;
52+
this.reset();
53+
}
54+
55+
/**
56+
* 写入数据到缓冲区
57+
*
58+
* @param data 要写入的数据
59+
* @return 实际写入的字节数,-1表示缓冲区已满
60+
*/
61+
public int write(byte[] data) {
62+
writeLock.lock();
63+
try {
64+
return write(Slices.wrappedBuffer(data));
65+
} finally {
66+
writeLock.unlock();
67+
}
68+
}
69+
70+
/**
71+
* 写入数据到缓冲区
72+
*
73+
* @param data 要写入的数据
74+
* @return 实际写入的字节数,-1表示缓冲区已满
75+
*/
76+
public int write(String data) {
77+
writeLock.lock();
78+
try {
79+
return write(Slices.copiedBuffer(data, UTF_8));
80+
} finally {
81+
writeLock.unlock();
82+
}
83+
}
84+
85+
@Override
86+
public void flush() throws Exception {
87+
writeLock.lock();
88+
try {
89+
if (position > 1) {
90+
state = BufferState.FLUSHING;
91+
replaceLastCommonToClosingBracket();
92+
LOG.info(
93+
"小缓冲区 {} 刷写数据,数据总量 {} MB,条数:{}",
94+
bufferId,
95+
(int) Math.ceil((double) position / (1024 * 1024)),
96+
writeCount.get());
97+
numWriterCount.accept((long) writeCount.get());
98+
bytesWriterCount.accept((long) position);
99+
Slice output = slice.slice(0, position);
100+
LOG.info("开始异步刷写缓冲区 {},数据大小:{} 字节", this.bufferId, position);
101+
// 执行实际的刷写操作
102+
flusher.write(output::getInput, position);
103+
LOG.info("缓冲区 {} 刷写完成,数据大小:{} 字节", this.bufferId, position);
104+
// 刷写完成后重置缓冲区并放回可用队列
105+
this.reset();
106+
}
107+
} finally {
108+
writeLock.unlock();
109+
}
110+
}
111+
112+
@Override
113+
public void shutdown() throws Exception {
114+
this.flush();
115+
this.slice.clear();
116+
this.slice = null;
117+
}
118+
119+
/**
120+
* 写入数据到缓冲区
121+
*
122+
* @param data 要写入的数据
123+
* @return 实际写入的字节数,-1表示缓冲区已满
124+
*/
125+
public int write(Slice data) {
126+
writeLock.lock();
127+
try {
128+
if (state != BufferState.WRITING && state != BufferState.AVAILABLE) {
129+
return -1; // 缓冲区不可写
130+
}
131+
132+
int availableSpace = capacity - position;
133+
if (availableSpace <= 0) {
134+
state = BufferState.FULL;
135+
return -1; // 缓冲区已满
136+
}
137+
138+
if (data.length() + 1 + position > capacity) {
139+
state = BufferState.FULL;
140+
return -1; // 数据无法存入缓冲区
141+
}
142+
143+
state = BufferState.WRITING;
144+
int bytesToWrite = data.length();
145+
slice.setBytes(position, data);
146+
position += bytesToWrite;
147+
slice.setByte(position, COMMA);
148+
position += 1;
149+
writeCount.incrementAndGet();
150+
151+
if (position >= capacity) {
152+
state = BufferState.FULL;
153+
}
154+
155+
return bytesToWrite;
156+
} finally {
157+
writeLock.unlock();
158+
}
159+
}
160+
161+
// [{},{}, => [{},{}]
162+
private void replaceLastCommonToClosingBracket() {
163+
if (state == BufferState.FLUSHING) {
164+
slice.setByte(position - 1, CLOSE_BRACKET);
165+
}
166+
}
167+
168+
/** 刷写完成后重置缓冲区 */
169+
public void reset() {
170+
writeLock.lock();
171+
try {
172+
slice.setByte(0, OPEN_BRACKET);
173+
position = 1;
174+
writeCount.set(0);
175+
state = BufferState.AVAILABLE;
176+
LOG.info("SmallBuffer " + bufferId + " 已重置为可用状态");
177+
} finally {
178+
writeLock.unlock();
179+
}
180+
}
181+
182+
public BufferState getState() {
183+
return state;
184+
}
185+
186+
public int getBufferId() {
187+
return bufferId;
188+
}
189+
190+
public int getPosition() {
191+
return position;
192+
}
193+
194+
public int getCapacity() {
195+
return capacity;
196+
}
197+
}

0 commit comments

Comments
 (0)