Skip to content

Commit 8c1e9b8

Browse files
committed
merge master into get_partition_meta
2 parents 5a3bbe0 + b3f35e5 commit 8c1e9b8

File tree

12 files changed

+367
-735
lines changed

12 files changed

+367
-735
lines changed

pom.xml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
<groupId>com.oceanbase</groupId>
66
<artifactId>obkv-table-client</artifactId>
7-
<version>1.2.14-SNAPSHOT</version>
7+
<version>1.3.1-SNAPSHOT</version>
88

99
<name>${project.groupId}:${project.artifactId}</name>
1010
<description>OceanBase JavaClient for TableApi</description>
@@ -157,12 +157,12 @@
157157
<dependency>
158158
<groupId>org.testcontainers</groupId>
159159
<artifactId>jdbc</artifactId>
160-
<scope>test</scope>
161160
<version>1.15.3</version>
161+
<scope>test</scope>
162162
<exclusions>
163163
<exclusion>
164-
<groupId>org.rnorth.visible-assertions</groupId>
165164
<artifactId>visible-assertions</artifactId>
165+
<groupId>org.rnorth.visible-assertions</groupId>
166166
</exclusion>
167167
</exclusions>
168168
</dependency>
@@ -380,7 +380,7 @@
380380
</goals>
381381
<configuration>
382382
<rules>
383-
<DependencyConvergence/>
383+
<DependencyConvergence />
384384
</rules>
385385
</configuration>
386386
</execution>
@@ -390,4 +390,4 @@
390390
</build>
391391
</profile>
392392
</profiles>
393-
</project>
393+
</project>

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 122 additions & 247 deletions
Large diffs are not rendered by default.

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObConnectionFactory.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,7 @@
2222
import com.alipay.remoting.codec.Codec;
2323
import com.alipay.remoting.config.ConfigManager;
2424
import com.alipay.remoting.config.ConfigurableInstance;
25-
import com.alipay.remoting.connection.AbstractConnectionFactory;
2625
import com.alipay.remoting.connection.ConnectionFactory;
27-
import com.alipay.remoting.log.BoltLoggerFactory;
2826
import com.alipay.remoting.rpc.RpcHandler;
2927
import com.alipay.remoting.rpc.protocol.UserProcessor;
3028
import com.alipay.remoting.util.NettyEventLoopUtil;
@@ -35,14 +33,14 @@
3533
import io.netty.channel.*;
3634
import io.netty.channel.socket.SocketChannel;
3735
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
3837

3938
import java.net.InetSocketAddress;
4039
import java.util.concurrent.ConcurrentHashMap;
4140

4241
public class ObConnectionFactory implements ConnectionFactory {
4342

44-
private static final Logger logger = BoltLoggerFactory
45-
.getLogger(AbstractConnectionFactory.class);
43+
private static final Logger logger = LoggerFactory.getLogger(ObConnectionFactory.class);
4644

4745
private static final EventLoopGroup workerGroup = NettyEventLoopUtil.newEventLoopGroup(Runtime
4846
.getRuntime().availableProcessors() + 1,

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableConnection.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,12 @@ private void login() throws Exception {
173173
credential = result.getCredential();
174174
tenantId = result.getTenantId();
175175
// Set version if missing
176-
if (ObGlobal.obVsnMajor() == 0 && !result.getServerVersion().isEmpty()) {
176+
if (ObGlobal.obVsnMajor() == 0) {
177177
// version should be set before login when direct mode
178+
if (result.getServerVersion().isEmpty()) {
179+
throw new RuntimeException(
180+
"Failed to get server version from login result");
181+
}
178182
LocationUtil.parseObVerionFromLogin(result.getServerVersion());
179183
LOGGER.info("The OB_VERSION parsed from login result is: {}",
180184
ObGlobal.OB_VERSION);

src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java

Lines changed: 146 additions & 374 deletions
Large diffs are not rendered by default.

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableSingleOpEntity.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,25 @@ public void adjustRowkeyColumnName(Map<String, Long> columnNameIdxMap) {
283283
this.rowKeyBitMap = byteArray;
284284
}
285285

286+
// Support class, which is used for column name sorted
287+
private static class ColumnNameValuePair implements Comparable<ColumnNameValuePair> {
288+
long number;
289+
// we could use idx here, and adjust obj after compare
290+
String name;
291+
ObObj obj;
292+
293+
ColumnNameValuePair(long number, String name, ObObj obj) {
294+
this.number = number;
295+
this.name = name;
296+
this.obj = obj;
297+
}
298+
299+
@Override
300+
public int compareTo(ColumnNameValuePair other) {
301+
return Long.compare(this.number, other.number);
302+
}
303+
}
304+
286305
public void adjustPropertiesColumnName(Map<String, Long> columnNameIdxMap) {
287306
if (!ignoreEncodePropertiesColumnNames) {
288307
this.propertiesBitLen = columnNameIdxMap.size();
@@ -301,15 +320,17 @@ public void adjustPropertiesColumnName(Map<String, Long> columnNameIdxMap) {
301320
}
302321
}
303322

304-
List<ColumnNamePair> pairs = new ArrayList<>();
323+
List<ColumnNameValuePair> pairs = new ArrayList<>();
305324
for (int i = 0; i < columnNameIdx.size(); i++) {
306-
pairs.add(new ColumnNamePair(columnNameIdx.get(i), propertiesValues.get(i)));
325+
pairs.add(new ColumnNameValuePair(columnNameIdx.get(i), propertiesNames.get(i), propertiesValues.get(i)));
307326
}
308327

309328
Collections.sort(pairs);
310329

330+
propertiesNames = new ArrayList<>(pairs.size());
311331
propertiesValues = new ArrayList<>(pairs.size());
312-
for (ColumnNamePair pair : pairs) {
332+
for (ColumnNameValuePair pair : pairs) {
333+
propertiesNames.add(pair.name);
313334
propertiesValues.add(pair.obj);
314335
}
315336

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java

Lines changed: 12 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,11 @@
1717

1818
package com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query;
1919

20-
import com.alipay.oceanbase.rpc.ObGlobal;
2120
import com.alipay.oceanbase.rpc.ObTableClient;
2221
import com.alipay.oceanbase.rpc.bolt.transport.ObTableConnection;
2322
import com.alipay.oceanbase.rpc.exception.*;
2423
import com.alipay.oceanbase.rpc.location.model.ObReadConsistency;
2524
import com.alipay.oceanbase.rpc.location.model.ObServerRoute;
26-
import com.alipay.oceanbase.rpc.location.model.TableEntry;
2725
import com.alipay.oceanbase.rpc.location.model.partition.ObPair;
2826
import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload;
2927
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
@@ -46,8 +44,6 @@
4644
import java.util.concurrent.atomic.AtomicReference;
4745
import java.util.concurrent.locks.ReentrantLock;
4846

49-
import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.RUNTIME;
50-
5147
public abstract class AbstractQueryStreamResult extends AbstractPayload implements
5248
QueryStreamResult {
5349

@@ -56,6 +52,7 @@ public abstract class AbstractQueryStreamResult extends AbstractPayload implemen
5652
protected volatile boolean closed = false;
5753
protected volatile List<ObObj> row = null;
5854
protected volatile int rowIndex = -1;
55+
// 调整它的startKey
5956
protected ObTableQuery tableQuery;
6057
protected long operationTimeout = -1;
6158
protected String tableName;
@@ -71,8 +68,7 @@ public abstract class AbstractQueryStreamResult extends AbstractPayload implemen
7168
private ObReadConsistency readConsistency = ObReadConsistency.STRONG;
7269
// ObRowKey objs: [startKey, MIN_OBJECT, MIN_OBJECT]
7370
public List<ObObj> currentStartKey;
74-
protected ObTableClient client;
75-
71+
7672
/*
7773
* Get pcode.
7874
*/
@@ -148,11 +144,6 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
148144
if (failedServerList != null) {
149145
route.setBlackList(failedServerList);
150146
}
151-
if (ObGlobal.obVsnMajor() >= 4) {
152-
TableEntry tableEntry = client.getOrRefreshTableEntry(indexTableName, false, false, false);
153-
client.refreshTableLocationByTabletId(tableEntry, indexTableName, client.getTabletIdByPartId(tableEntry, partIdWithIndex.getLeft()));
154-
}
155-
156147
subObTable = client
157148
.getTableWithPartId(indexTableName, partIdWithIndex.getLeft(),
158149
needRefreshTableEntry, client.isTableEntryRefreshIntervalWait(),
@@ -163,6 +154,7 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
163154
result = subObTable.executeWithConnection(request, connectionRef);
164155
} else {
165156
result = subObTable.execute(request);
157+
166158
if (result != null && result.getPcode() == Pcodes.OB_TABLE_API_MOVE) {
167159
ObTableApiMove moveResponse = (ObTableApiMove) result;
168160
client.getRouteTableRefresher().addTableIfAbsent(indexTableName, true);
@@ -251,8 +243,9 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
251243
} else if (e instanceof ObTableException) {
252244
if ((((ObTableException) e).getErrorCode() == ResultCodes.OB_TABLE_NOT_EXIST.errorCode || ((ObTableException) e)
253245
.getErrorCode() == ResultCodes.OB_NOT_SUPPORTED.errorCode)
254-
&& ((request instanceof ObTableQueryAsyncRequest && ((ObTableQueryAsyncRequest) request).getObTableQueryRequest().getTableQuery().isHbaseQuery())
255-
|| (request instanceof ObTableQueryRequest && ((ObTableQueryRequest) request).getTableQuery().isHbaseQuery()))
246+
&& ((request instanceof ObTableQueryAsyncRequest && ((ObTableQueryAsyncRequest) request)
247+
.getObTableQueryRequest().getTableQuery().isHbaseQuery()) || (request instanceof ObTableQueryRequest && ((ObTableQueryRequest) request)
248+
.getTableQuery().isHbaseQuery()))
256249
&& client.getTableGroupInverted().get(indexTableName) != null) {
257250
// table not exists && hbase mode && table group exists , three condition both
258251
client.eraseTableGroupFromCache(tableName);
@@ -356,7 +349,10 @@ public boolean next() throws Exception {
356349

357350
} catch (Exception e) {
358351
if (e instanceof ObTableNeedFetchAllException) {
352+
// Adjust the start key and refresh the expectant
353+
this.tableQuery.adjustStartKey(currentStartKey);
359354
setExpectant(refreshPartition(tableQuery, tableName));
355+
360356
// Reset the iterator to start over
361357
it = expectant.entrySet().iterator();
362358
referPartition.clear(); // Clear the referPartition if needed
@@ -580,33 +576,9 @@ public void init() throws Exception {
580576
return;
581577
}
582578
if (tableQuery.getBatchSize() == -1) {
583-
if (!expectant.isEmpty()) {
584-
Iterator<Map.Entry<Long, ObPair<Long, ObTableParam>>> it = expectant.entrySet()
585-
.iterator();
586-
int retryTimes = 0;
587-
while (it.hasNext()) {
588-
Map.Entry<Long, ObPair<Long, ObTableParam>> entry = it.next();
589-
try {
590-
// try access new partition, async will not remove useless expectant
591-
referToNewPartition(entry.getValue());
592-
} catch (Exception e) {
593-
if (e instanceof ObTableNeedFetchAllException) {
594-
setExpectant(refreshPartition(tableQuery, tableName));
595-
it = expectant.entrySet().iterator();
596-
retryTimes++;
597-
if (retryTimes > client.getRuntimeRetryTimes()) {
598-
RUNTIME.error("Fail to get refresh table entry response after {}",
599-
retryTimes);
600-
throw new ObTableRetryExhaustedException(
601-
"Fail to get refresh table entry response after " + retryTimes +
602-
"errorCode:" + ((ObTableNeedFetchAllException) e).getErrorCode());
603-
604-
}
605-
} else {
606-
throw e;
607-
}
608-
}
609-
}
579+
for (Map.Entry<Long, ObPair<Long, ObTableParam>> entry : expectant.entrySet()) {
580+
// mark the refer partition
581+
referToNewPartition(entry.getValue());
610582
}
611583
expectant.clear();
612584
} else {
@@ -747,19 +719,4 @@ public ObReadConsistency getReadConsistency() {
747719
public void setReadConsistency(ObReadConsistency readConsistency) {
748720
this.readConsistency = readConsistency;
749721
}
750-
751-
/**
752-
* Get client.
753-
* @return client
754-
*/
755-
public ObTableClient getClient() {
756-
return client;
757-
}
758-
759-
/*
760-
* Set client.
761-
*/
762-
public void setClient(ObTableClient client) {
763-
this.client = client;
764-
}
765722
}

src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java

Lines changed: 23 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
public class ObTableClientQueryAsyncStreamResult extends AbstractQueryStreamResult {
4545
private static final Logger logger = LoggerFactory
4646
.getLogger(ObTableClientQueryStreamResult.class);
47+
protected ObTableClient client;
4748
private boolean isEnd = true;
4849
private long sessionId = Constants.OB_INVALID_ID;
4950
private ObTableQueryAsyncRequest asyncRequest = new ObTableQueryAsyncRequest();
@@ -230,22 +231,11 @@ public boolean next() throws Exception {
230231
} catch (Exception e) {
231232
if (e instanceof ObTableNeedFetchAllException) {
232233
String realTableName = client.getPhyTableNameFromTableGroup(entityType, tableName);
233-
TableEntry entry = client.getOrRefreshTableEntry(realTableName, false, false,
234-
false);
235-
// Calculate the next partition only when the range partition is affected by a split, based on the keys already scanned.
236-
if (ObGlobal.obVsnMajor() >= 4
237-
&& entry.isPartitionTable()
238-
&& entry.getPartitionInfo().getFirstPartDesc().getPartFuncType()
239-
.isRangePart()) {
240-
this.asyncRequest.getObTableQueryRequest().getTableQuery()
241-
.adjustStartKey(currentStartKey);
242-
setExpectant(refreshPartition(this.asyncRequest
243-
.getObTableQueryRequest().getTableQuery(), realTableName));
244-
setEnd(true);
245-
} else {
246-
setExpectant(refreshPartition(this.asyncRequest
247-
.getObTableQueryRequest().getTableQuery(), realTableName));
248-
}
234+
this.asyncRequest.getObTableQueryRequest().getTableQuery()
235+
.adjustStartKey(currentStartKey);
236+
setExpectant(refreshPartition(this.asyncRequest.getObTableQueryRequest()
237+
.getTableQuery(), realTableName));
238+
setEnd(true);
249239
} else {
250240
throw e;
251241
}
@@ -273,17 +263,10 @@ public boolean next() throws Exception {
273263
} catch (Exception e) {
274264
if (e instanceof ObTableNeedFetchAllException) {
275265
String realTableName = client.getPhyTableNameFromTableGroup(entityType, tableName);
276-
TableEntry tableEntry = client.getOrRefreshTableEntry(realTableName, false,
277-
false, false);
278-
if (ObGlobal.obVsnMajor() >= 4
279-
&& tableEntry.isPartitionTable()
280-
&& tableEntry.getPartitionInfo().getFirstPartDesc().getPartFuncType()
281-
.isRangePart()) {
282-
this.asyncRequest.getObTableQueryRequest().getTableQuery()
283-
.adjustStartKey(currentStartKey);
284-
setExpectant(refreshPartition(this.asyncRequest
285-
.getObTableQueryRequest().getTableQuery(), realTableName));
286-
}
266+
this.asyncRequest.getObTableQueryRequest().getTableQuery()
267+
.adjustStartKey(currentStartKey);
268+
setExpectant(refreshPartition(this.asyncRequest.getObTableQueryRequest()
269+
.getTableQuery(), realTableName));
287270
it = expectant.entrySet().iterator();
288271
retryTimes++;
289272
if (retryTimes > client.getTableEntryRefreshTryTimes()) {
@@ -369,7 +352,19 @@ public void close() throws Exception {
369352
closeLastStreamResult(lastEntry.getValue());
370353
}
371354
}
372-
355+
356+
public ObTableClient getClient() {
357+
return client;
358+
}
359+
360+
/**
361+
* Set client.
362+
* @param client client want to set
363+
*/
364+
public void setClient(ObTableClient client) {
365+
this.client = client;
366+
}
367+
373368
public boolean isEnd() {
374369
return isEnd;
375370
}

src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryStreamResult.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@
3434
import java.util.concurrent.atomic.AtomicReference;
3535

3636
public class ObTableClientQueryStreamResult extends AbstractQueryStreamResult {
37+
3738
private static final Logger logger = TableClientLoggerFactory
3839
.getLogger(ObTableClientQueryStreamResult.class);
40+
protected ObTableClient client;
3941

4042
protected ObTableQueryResult referToNewPartition(ObPair<Long, ObTableParam> partIdWithObTable)
4143
throws Exception {
@@ -82,4 +84,19 @@ protected Map<Long, ObPair<Long, ObTableParam>> refreshPartition(ObTableQuery ta
8284
throws Exception {
8385
return buildPartitions(client, tableQuery, tableName);
8486
}
87+
88+
/**
89+
* Get client.
90+
* @return client
91+
*/
92+
public ObTableClient getClient() {
93+
return client;
94+
}
95+
96+
/*
97+
* Set client.
98+
*/
99+
public void setClient(ObTableClient client) {
100+
this.client = client;
101+
}
85102
}

src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -367,14 +367,11 @@ public void partitionExecute(ObTableOperationResult[] results,
367367
if (failedServerList != null) {
368368
route.setBlackList(failedServerList);
369369
}
370-
TableEntry entry = obTableClient.getOrRefreshTableEntry(tableName, false,
371-
false, false);
372-
if (ObGlobal.obVsnMajor() >= 4) {
373-
obTableClient.refreshTableLocationByTabletId(entry, tableName, partId);
374-
}
375-
ObTableParam newParam = obTableClient.getTableWithPartId(tableName, partId,
376-
false, obTableClient.isTableEntryRefreshIntervalWait(),
377-
needFetchAllRouteInfo, route).getRight();
370+
ObTableParam newParam = obTableClient.getTableWithPartId(tableName,
371+
originPartId, needRefreshTableEntry,
372+
obTableClient.isTableEntryRefreshIntervalWait(), needFetchAllRouteInfo,
373+
route).getRight();
374+
378375
subObTable = newParam.getObTable();
379376
subRequest.setPartitionId(newParam.getPartitionId());
380377
}

0 commit comments

Comments
 (0)