Skip to content

Commit 402b604

Browse files
authored
refactor direct load (#176)
* direct load refactor * check parameters is supported
1 parent 01537f2 commit 402b604

File tree

86 files changed

+5777
-727
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

86 files changed

+5777
-727
lines changed

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObPacketFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,9 @@ private byte[] encodePayload(ObPayload payload) {
8989
// compute checksum
9090
rpcHeaderPacket.setChecksum(ObPureCrc32C.calculate(payloadContent));
9191

92+
// group id
93+
rpcHeaderPacket.setGroupId(payload.getGroupId());
94+
9295
// 3. assemble and encode rpc packet
9396
ObRpcPacket rpcPacket = new ObRpcPacket();
9497
rpcPacket.setRpcPacketHeader(rpcHeaderPacket);
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*-
2+
* #%L
3+
* com.oceanbase:obkv-table-client
4+
* %%
5+
* Copyright (C) 2021 - 2023 OceanBase
6+
* %%
7+
* OBKV Table Client Framework is licensed under Mulan PSL v2.
8+
* You can use this software according to the terms and conditions of the Mulan PSL v2.
9+
* You may obtain a copy of Mulan PSL v2 at:
10+
* http://license.coscl.org.cn/MulanPSL2
11+
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
12+
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
13+
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
14+
* See the Mulan PSL v2 for more details.
15+
* #L%
16+
*/
17+
18+
package com.alipay.oceanbase.rpc.direct_load;
19+
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
23+
import com.alipay.oceanbase.rpc.direct_load.exception.*;
24+
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
25+
import com.alipay.oceanbase.rpc.util.ObByteBuf;
26+
import com.alipay.oceanbase.rpc.util.Serialization;
27+
28+
public class ObDirectLoadBucket {
29+
30+
/**
31+
* buffer的格式如下
32+
* +----------------+-----------+------+------+-----+
33+
* | payload length | row count | row1 | row2 | ... |
34+
* +----------------+-----------+------+------+-----+
35+
*/
36+
37+
private static final ObDirectLoadLogger logger = ObDirectLoadLogger.getLogger();
38+
private final static int integerReservedSize = 5; // 预留5个字节用来编码Integer
39+
private final static int reservedSize = integerReservedSize * 2; // 预留2个Integer
40+
private final static int defaultBufferSize = 1 * 1024 * 1024; // 1M
41+
42+
private final int bufferSize;
43+
private ArrayList<ObByteBuf> payloadBufferList = new ArrayList<ObByteBuf>(64);
44+
private int totalRowCount = 0;
45+
46+
private ObByteBuf buffer = null;
47+
private int currentRowCount = 0;
48+
private Row row = new Row();
49+
50+
public ObDirectLoadBucket() {
51+
bufferSize = defaultBufferSize;
52+
}
53+
54+
public ObDirectLoadBucket(int bufferSize) {
55+
this.bufferSize = bufferSize;
56+
}
57+
58+
public boolean isEmpty() {
59+
return (getRowNum() == 0);
60+
}
61+
62+
public int getRowNum() {
63+
return totalRowCount + currentRowCount;
64+
}
65+
66+
@Override
67+
public String toString() {
68+
return String.format("{rowNum:%d}", getRowNum());
69+
}
70+
71+
public void addRow(ObObj[] cells) throws ObDirectLoadException {
72+
if (cells == null || cells.length == 0) {
73+
logger.warn("cells cannot be null or empty, cells:" + cells);
74+
throw new ObDirectLoadIllegalArgumentException("cells cannot be null or empty, cells:"
75+
+ cells);
76+
}
77+
row.setCells(cells);
78+
appendRow(row);
79+
}
80+
81+
public void addRow(List<ObObj> cells) throws ObDirectLoadException {
82+
if (cells == null || cells.isEmpty()) {
83+
logger.warn("cells cannot be null or empty, cells:" + cells);
84+
throw new ObDirectLoadIllegalArgumentException("cells cannot be null or empty, cells:"
85+
+ cells);
86+
}
87+
row.setCells(cells);
88+
appendRow(row);
89+
}
90+
91+
private void appendRow(Row row) {
92+
final int rowEncodedSize = row.getEncodedSize();
93+
while (true) {
94+
if (buffer == null) {
95+
allocBuffer(rowEncodedSize);
96+
} else if (buffer.writableBytes() < rowEncodedSize) {
97+
sealBuffer();
98+
} else {
99+
row.encode(buffer);
100+
++currentRowCount;
101+
break;
102+
}
103+
}
104+
}
105+
106+
private void allocBuffer(int encodedSize) {
107+
final int needSize = encodedSize + reservedSize;
108+
final int allocBufferSize = (needSize + bufferSize - 1) / bufferSize * bufferSize;
109+
buffer = new ObByteBuf(allocBufferSize);
110+
buffer.reserve(reservedSize);
111+
}
112+
113+
private void sealBuffer() {
114+
// 编码row count
115+
encodeVi32(buffer.bytes, integerReservedSize, currentRowCount);
116+
// 编码payload length
117+
encodeVi32(buffer.bytes, 0, buffer.readableBytes() - integerReservedSize);
118+
payloadBufferList.add(buffer);
119+
totalRowCount += currentRowCount;
120+
currentRowCount = 0;
121+
buffer = null;
122+
}
123+
124+
private void encodeVi32(byte[] buf, int pos, int value) {
125+
// 前面的byte的高位都设置为1
126+
for (int i = 0; i < integerReservedSize - 1; ++i, ++pos) {
127+
buf[pos] = (byte) (value | 0x80);
128+
value >>>= 7;
129+
}
130+
// 最后一个byte的高位设置为0
131+
buf[pos] = (byte) (value & 0x7f);
132+
}
133+
134+
public List<ObByteBuf> getPayloadBufferList() {
135+
if (buffer != null) {
136+
sealBuffer();
137+
}
138+
return payloadBufferList;
139+
}
140+
141+
private static class Row {
142+
143+
private final long SeqNo = 0;
144+
private ObObj[] cells = null;
145+
146+
public Row() {
147+
}
148+
149+
public void setCells(ObObj[] cells) {
150+
this.cells = cells;
151+
}
152+
153+
public void setCells(List<ObObj> cells) {
154+
this.cells = cells.toArray(new ObObj[0]);
155+
}
156+
157+
/**
158+
* Encode.
159+
*/
160+
public void encode(ObByteBuf buf) {
161+
Serialization.encodeVi64(buf, SeqNo);
162+
Serialization.encodeVi32(buf, cells.length);
163+
for (int i = 0; i < cells.length; ++i) {
164+
cells[i].encode(buf);
165+
}
166+
}
167+
168+
/**
169+
* Get encoded size.
170+
*/
171+
public int getEncodedSize() {
172+
int size = 0;
173+
size += Serialization.getNeedBytes(SeqNo);
174+
size += Serialization.getNeedBytes(cells.length);
175+
for (int i = 0; i < cells.length; ++i) {
176+
size += cells[i].getEncodedSize();
177+
}
178+
return size;
179+
}
180+
181+
}
182+
183+
}

0 commit comments

Comments
 (0)