Skip to content

Commit 210ee9c

Browse files
committed
add default bytes cache for obkvparam and ohtablefilter
1 parent 2d4bee2 commit 210ee9c

File tree

5 files changed

+192
-66
lines changed

5 files changed

+192
-66
lines changed

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/AbstractPayload.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,10 @@ public abstract class AbstractPayload implements ObPayload {
4646
private long version = 1;
4747
protected long timeout = RPC_OPERATION_TIMEOUT.getDefaultLong();
4848
protected int groupId = 0;
49+
// for perf opt
4950
protected long payLoadContentSize = INVALID_PAYLOAD_CONTENT_SIZE;
50-
51+
protected static volatile byte[] defaultEncodeBytes = null;
52+
protected static volatile long defaultPayLoadSize = INVALID_PAYLOAD_CONTENT_SIZE;
5153
/*
5254
* Get pcode.
5355
*/
@@ -190,4 +192,18 @@ protected void encodeHeader(ObByteBuf buf) {
190192
encodeObUniVersionHeader(buf, getVersion(), getPayloadContentSize());
191193
}
192194

195+
// for perf opt
196+
protected byte[] encodeDefaultBytes() {
197+
if (defaultEncodeBytes == null) {
198+
synchronized (this.getClass()) {
199+
if (defaultEncodeBytes == null) {
200+
defaultEncodeBytes = encode();
201+
}
202+
}
203+
}
204+
return defaultEncodeBytes;
205+
}
206+
207+
protected boolean isUseDefaultEncode() { return false; }
208+
193209
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOpQuery.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public class ObTableSingleOpQuery extends ObTableQuery {
3636
private byte[] scanRangeBitMap = null;
3737
private long scanRangeBitLen = 0;
3838
private List<String> aggColumnNames = new ArrayList<>();
39+
private final static byte[] primaryIndexByteArray = Serialization.encodeVString("PRIMARY");
3940

4041
/*
4142
* Encode.
@@ -113,7 +114,11 @@ public void encode(ObByteBuf buf) {
113114
encodeHeader(buf);
114115

115116
// 1. encode index name
116-
Serialization.encodeVString(buf, indexName);
117+
if (indexName == null || indexName.isEmpty() || indexName.compareToIgnoreCase("PRIMARY") == 0) {
118+
buf.writeBytes(primaryIndexByteArray);
119+
} else {
120+
Serialization.encodeVString(buf, indexName);
121+
}
117122

118123
// 2. encode scan ranges columns
119124
Serialization.encodeVi64(buf, scanRangeBitLen);

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObHTableFilter.java

Lines changed: 51 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -62,65 +62,73 @@ public byte[] encode() {
6262
int idx = 0;
6363

6464
// 0. encode header
65-
int headerLen = (int) getObUniVersionHeaderLength(getVersion(), getPayloadContentSize());
66-
System.arraycopy(encodeObUniVersionHeader(getVersion(), getPayloadContentSize()), 0, bytes,
67-
idx, headerLen);
68-
idx += headerLen;
65+
byte[] headerBytes = encodeObUniVersionHeader(getVersion(), getPayloadContentSize());
66+
System.arraycopy(headerBytes, 0, bytes, idx, headerBytes.length);
67+
idx += headerBytes.length;
6968

7069
// 1. encode
7170
System.arraycopy(Serialization.encodeI8(isValid ? (byte) 1 : (byte) 0), 0, bytes, idx, 1);
7271
idx++;
73-
int len = Serialization.getNeedBytes(selectColumnQualifier.size());
74-
System
75-
.arraycopy(Serialization.encodeVi64(selectColumnQualifier.size()), 0, bytes, idx, len);
76-
idx += len;
72+
byte[] selectColumnQualifierBytes = Serialization.encodeVi64(selectColumnQualifier.size());
73+
System.arraycopy(selectColumnQualifierBytes, 0, bytes, idx, selectColumnQualifierBytes.length);
74+
idx += selectColumnQualifierBytes.length;
75+
7776
for (ObBytesString q : selectColumnQualifier) {
78-
len = Serialization.getNeedBytes(q);
79-
System.arraycopy(Serialization.encodeBytesString(q), 0, bytes, idx, len);
80-
idx += len;
77+
byte[] QualifierBytes = Serialization.encodeBytesString(q);
78+
System.arraycopy(QualifierBytes, 0, bytes, idx, QualifierBytes.length);
79+
idx += QualifierBytes.length;
8180
}
8281

83-
len = Serialization.getNeedBytes(minStamp);
84-
System.arraycopy(Serialization.encodeVi64(minStamp), 0, bytes, idx, len);
85-
idx += len;
86-
len = Serialization.getNeedBytes(maxStamp);
87-
System.arraycopy(Serialization.encodeVi64(maxStamp), 0, bytes, idx, len);
88-
idx += len;
89-
len = Serialization.getNeedBytes(maxVersions);
90-
System.arraycopy(Serialization.encodeVi32(maxVersions), 0, bytes, idx, len);
91-
idx += len;
92-
len = Serialization.getNeedBytes(limitPerRowPerCf);
93-
System.arraycopy(Serialization.encodeVi32(limitPerRowPerCf), 0, bytes, idx, len);
94-
idx += len;
95-
len = Serialization.getNeedBytes(offsetPerRowPerCf);
96-
System.arraycopy(Serialization.encodeVi32(offsetPerRowPerCf), 0, bytes, idx, len);
97-
idx += len;
98-
len = Serialization.getNeedBytes(filterString);
99-
System.arraycopy(Serialization.encodeBytesString(filterString), 0, bytes, idx, len);
100-
idx += len;
82+
byte[] minStampBytes = Serialization.encodeVi64(minStamp);
83+
System.arraycopy(minStampBytes, 0, bytes, idx, minStampBytes.length);
84+
idx += minStampBytes.length;
85+
byte[] maxStampBytes = Serialization.encodeVi64(maxStamp);
86+
System.arraycopy(maxStampBytes, 0, bytes, idx, maxStampBytes.length);
87+
idx += maxStampBytes.length;
88+
byte[] maxVersionsBytes = Serialization.encodeVi32(maxVersions);
89+
System.arraycopy(maxVersionsBytes, 0, bytes, idx, maxVersionsBytes.length);
90+
idx += maxVersionsBytes.length;
91+
byte[] limitPerRowPerCfBytes = Serialization.encodeVi32(limitPerRowPerCf);
92+
System.arraycopy(Serialization.encodeVi32(limitPerRowPerCf), 0, bytes, idx, limitPerRowPerCfBytes.length);
93+
idx += limitPerRowPerCfBytes.length;
94+
byte[] offsetPerRowPerCfBytes = Serialization.encodeVi32(offsetPerRowPerCf);
95+
System.arraycopy(offsetPerRowPerCfBytes, 0, bytes, idx, offsetPerRowPerCfBytes.length);
96+
idx += offsetPerRowPerCfBytes.length;
97+
byte[] filterStringBytes = Serialization.encodeBytesString(filterString);
98+
System.arraycopy(filterStringBytes, 0, bytes, idx, filterStringBytes.length);
10199

102100
return bytes;
103101
}
104102

103+
protected boolean isUseDefaultEncode() {
104+
return isValid == true && selectColumnQualifier.isEmpty() && minStamp == 0 &&
105+
maxStamp == Long.MAX_VALUE && maxVersions == 1 && limitPerRowPerCf == -1 &&
106+
offsetPerRowPerCf == 0 && filterString == null;
107+
}
108+
105109
public void encode(ObByteBuf buf) {
106-
// 0. encode header
107-
encodeObUniVersionHeader(buf, getVersion(), getPayloadContentSize());
110+
if (isUseDefaultEncode()) {
111+
buf.writeBytes(encodeDefaultBytes());
112+
} else {
113+
// 0. encode header
114+
encodeObUniVersionHeader(buf, getVersion(), getPayloadContentSize());
108115

109-
// 1. encode
110-
Serialization.encodeI8(buf, isValid ? (byte) 1 : (byte) 0);
116+
// 1. encode
117+
Serialization.encodeI8(buf, isValid ? (byte) 1 : (byte) 0);
111118

112-
Serialization.encodeVi64(buf, selectColumnQualifier.size());
119+
Serialization.encodeVi64(buf, selectColumnQualifier.size());
113120

114-
for (ObBytesString q : selectColumnQualifier) {
115-
Serialization.encodeBytesString(buf, q);
116-
}
121+
for (ObBytesString q : selectColumnQualifier) {
122+
Serialization.encodeBytesString(buf, q);
123+
}
117124

118-
Serialization.encodeVi64(buf, minStamp);
119-
Serialization.encodeVi64(buf, maxStamp);
120-
Serialization.encodeVi32(buf, maxVersions);
121-
Serialization.encodeVi32(buf, limitPerRowPerCf);
122-
Serialization.encodeVi32(buf, offsetPerRowPerCf);
123-
Serialization.encodeBytesString(buf, filterString);
125+
Serialization.encodeVi64(buf, minStamp);
126+
Serialization.encodeVi64(buf, maxStamp);
127+
Serialization.encodeVi32(buf, maxVersions);
128+
Serialization.encodeVi32(buf, limitPerRowPerCf);
129+
Serialization.encodeVi32(buf, offsetPerRowPerCf);
130+
Serialization.encodeBytesString(buf, filterString);
131+
}
124132
}
125133

126134
/*

src/main/java/com/alipay/oceanbase/rpc/table/ObHBaseParams.java

Lines changed: 40 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,13 @@ public class ObHBaseParams extends ObKVParamsBase {
3030
boolean isCacheBlock = false; // whether enable server block cache and row cache or not
3131
boolean checkExistenceOnly = false; // check the existence only
3232
String hbaseVersion = "1.3.6";
33-
final static byte[] hbaseDefaultVersionBytes = Serialization.encodeVString("1.3.6");
3433
private static final int FLAG_ALLOW_PARTIAL_RESULTS = 1 << 0;
3534
private static final int FLAG_IS_CACHE_BLOCK = 1 << 1;
3635
private static final int FLAG_CHECK_EXISTENCE_ONLY = 1 << 2;
36+
// encode perf opt
37+
final static byte[] hbaseDefaultVersionBytes = Serialization.encodeVString("1.3.6");
38+
volatile static byte[] defaultEncodeBytes = null;
39+
3740

3841
public ObHBaseParams() {
3942
pType = paramType.HBase;
@@ -112,35 +115,35 @@ public byte[] encode() {
112115
byte[] b = new byte[] { (byte) pType.ordinal() };
113116
System.arraycopy(b, 0, bytes, idx, 1);
114117
idx += 1;
115-
System.arraycopy(Serialization.encodeVi32(caching), 0, bytes, idx,
116-
Serialization.getNeedBytes(caching));
117-
idx += Serialization.getNeedBytes(caching);
118-
System.arraycopy(Serialization.encodeVi32(callTimeout), 0, bytes, idx,
119-
Serialization.getNeedBytes(callTimeout));
120-
idx += Serialization.getNeedBytes(callTimeout);
118+
byte[] tmpBytes = Serialization.encodeVi32(caching);
119+
System.arraycopy(tmpBytes, 0, bytes, idx, tmpBytes.length);
120+
idx += tmpBytes.length;
121+
tmpBytes = Serialization.encodeVi32(callTimeout);
122+
System.arraycopy(tmpBytes, 0, bytes, idx, tmpBytes.length);
123+
idx += tmpBytes.length;
121124
System.arraycopy(booleansToByteArray(), 0, bytes, idx, 1);
122125
idx += 1;
123-
System.arraycopy(Serialization.encodeVString(hbaseVersion), 0, bytes, idx,
124-
Serialization.getNeedBytes(hbaseVersion));
125-
idx += Serialization.getNeedBytes(hbaseVersion);
126+
tmpBytes = Serialization.encodeVString(hbaseVersion);
127+
System.arraycopy(tmpBytes, 0, bytes, idx, tmpBytes.length);
126128

127129
return bytes;
128130
}
129131

130132
public void encode(ObByteBuf buf) {
131-
buf.writeByte((byte) pType.ordinal());
132-
133-
Serialization.encodeVi32(buf, caching);
134-
135-
Serialization.encodeVi32(buf, callTimeout);
136-
137-
buf.writeBytes(booleansToByteArray());
138-
139-
if (hbaseVersion == "1.3.6") {
140-
buf.writeBytes(hbaseDefaultVersionBytes);
133+
if (isUseDefaultEncode()) {
134+
buf.writeBytes(encodeDefaultBytes());
141135
} else {
142-
Serialization.encodeVString(buf, hbaseVersion);
136+
buf.writeByte((byte) pType.ordinal());
137+
Serialization.encodeVi32(buf, caching);
138+
Serialization.encodeVi32(buf, callTimeout);
139+
buf.writeBytes(booleansToByteArray());
140+
if (hbaseVersion == "1.3.6") {
141+
buf.writeBytes(hbaseDefaultVersionBytes);
142+
} else {
143+
Serialization.encodeVString(buf, hbaseVersion);
144+
}
143145
}
146+
144147
}
145148

146149
public void byteArrayToBooleans(ByteBuf bytes) {
@@ -171,4 +174,20 @@ public String toString() {
171174
+ hbaseVersion + "\n}\n";
172175
}
173176

177+
public boolean isUseDefaultEncode() {
178+
return caching == -1 && callTimeout == -1 && allowPartialResults == false &&
179+
isCacheBlock == false && checkExistenceOnly == false && hbaseVersion == "1.3.6";
180+
}
181+
182+
public byte[] encodeDefaultBytes() {
183+
if (defaultEncodeBytes == null) {
184+
synchronized (this.getClass()) {
185+
if (defaultEncodeBytes == null) {
186+
defaultEncodeBytes = encode();
187+
}
188+
}
189+
}
190+
return defaultEncodeBytes;
191+
}
192+
174193
}

src/test/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableLsOperationRequestTest.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,43 @@ public void testHTableFilterEncode() {
192192
perfComparator.printResult("testHTableFilterEncode");
193193
}
194194

195+
@Test
196+
public void testDefaultHTableFilterEncode() {
197+
PerformanceComparator perfComparator = new PerformanceComparator();
198+
{
199+
// caching default
200+
ObHTableFilter defaultHTableFilter = new ObHTableFilter();
201+
defaultHTableFilter.setValid(true);
202+
long defaultPayLoadSize = defaultHTableFilter.getPayloadSize();
203+
byte[] defaultEncodeBytes = new byte[(int) defaultPayLoadSize];
204+
ObByteBuf defaultByteBuf = new ObByteBuf(defaultEncodeBytes);
205+
defaultHTableFilter.encode(defaultByteBuf);
206+
}
207+
for (int i = 0; i < defaultIterSize; i++) {
208+
ObHTableFilter hTableFilter = new ObHTableFilter();
209+
hTableFilter.setValid(true);
210+
hTableFilter.setMaxVersions(generateRandomInt());
211+
hTableFilter.setMaxStamp(generateRandomInt());
212+
hTableFilter.setMinStamp(generateRandomInt());
213+
hTableFilter.setLimitPerRowPerCf(generateRandomInt());
214+
hTableFilter.setOffsetPerRowPerCf(generateRandomInt());
215+
long payLoadSize = hTableFilter.getPayloadSize();
216+
byte[] encodeBytes = new byte[(int) payLoadSize];
217+
ObByteBuf byteBuf = new ObByteBuf(encodeBytes);
218+
perfComparator.execFirst(() -> hTableFilter.encode(byteBuf));
219+
220+
// default
221+
ObHTableFilter defaultHTableFilter = new ObHTableFilter();
222+
defaultHTableFilter.setValid(true);
223+
long defaultPayLoadSize = defaultHTableFilter.getPayloadSize();
224+
byte[] defaultEncodeBytes = new byte[(int) defaultPayLoadSize];
225+
ObByteBuf defaultByteBuf = new ObByteBuf(defaultEncodeBytes);
226+
perfComparator.execSecond(() -> defaultHTableFilter.encode(defaultByteBuf));
227+
}
228+
// Calculate and print average times
229+
perfComparator.printResult("testDefaultHTableFilterEncode");
230+
}
231+
195232
@Test
196233
public void testKVParamsEncode() {
197234
PerformanceComparator perfComparator = new PerformanceComparator();
@@ -218,6 +255,47 @@ public void testKVParamsEncode() {
218255
perfComparator.printResult("testKVParamsEncode");
219256
}
220257

258+
@Test
259+
public void testDefaultKVParamsEncode() {
260+
PerformanceComparator perfComparator = new PerformanceComparator();
261+
// cache default bytes
262+
{
263+
// default encode
264+
ObKVParams defaultObKVParams = new ObKVParams();
265+
ObHBaseParams defaultHbaseParams = (ObHBaseParams) defaultObKVParams.getObParams(HBase);
266+
defaultObKVParams.setObParamsBase(defaultHbaseParams);
267+
long defaultPayLoadSize = defaultObKVParams.getPayloadSize();
268+
byte[] defaultEncodeBytes = new byte[(int) defaultPayLoadSize];
269+
ObByteBuf defaultByteBuf = new ObByteBuf(defaultEncodeBytes);
270+
defaultObKVParams.encode(defaultByteBuf);
271+
}
272+
for (int i = 0; i < defaultIterSize; i++) {
273+
ObKVParams obKVParams = new ObKVParams();
274+
ObHBaseParams hbaseParams = (ObHBaseParams) obKVParams.getObParams(HBase);
275+
obKVParams.setObParamsBase(hbaseParams);
276+
hbaseParams.setCaching(generateRandomInt());
277+
hbaseParams.setCallTimeout(generateRandomInt());
278+
hbaseParams.setAllowPartialResults(generateRandomBoolean());
279+
hbaseParams.setCacheBlock(generateRandomBoolean());
280+
hbaseParams.setCheckExistenceOnly(generateRandomBoolean());
281+
long payLoadSize = obKVParams.getPayloadSize();
282+
byte[] newEncodeBytes = new byte[(int) payLoadSize];
283+
ObByteBuf byteBuf = new ObByteBuf(newEncodeBytes);
284+
perfComparator.execFirst(() -> obKVParams.encode(byteBuf));
285+
// default encode
286+
ObKVParams defaultObKVParams = new ObKVParams();
287+
ObHBaseParams defaultHbaseParams = (ObHBaseParams) defaultObKVParams.getObParams(HBase);
288+
defaultObKVParams.setObParamsBase(defaultHbaseParams);
289+
long defaultPayLoadSize = defaultObKVParams.getPayloadSize();
290+
byte[] defaultEncodeBytes = new byte[(int) defaultPayLoadSize];
291+
ObByteBuf defaultByteBuf = new ObByteBuf(defaultEncodeBytes);
292+
perfComparator.execSecond(() -> defaultObKVParams.encode(defaultByteBuf));
293+
}
294+
// Calculate and print average times
295+
perfComparator.printResult("testDefaultKVParamsEncode");
296+
}
297+
298+
221299
private static void assertEncodeByteArray(byte[] bytes1, byte[] bytes2) {
222300
if (bytes1 == bytes2) return;
223301
if (bytes1 == null || bytes2 == null) Assert.fail();

0 commit comments

Comments
 (0)