Skip to content

Commit 12a4582

Browse files
committed
merge obkv master
2 parents fe5f280 + 3171e0d commit 12a4582

File tree

16 files changed

+187
-299
lines changed

16 files changed

+187
-299
lines changed

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,12 @@
166166
</exclusion>
167167
</exclusions>
168168
</dependency>
169+
<dependency>
170+
<groupId>ch.qos.logback</groupId>
171+
<artifactId>logback-classic</artifactId>
172+
<version>1.2.12</version>
173+
<scope>test</scope>
174+
</dependency>
169175
</dependencies>
170176

171177
<build>

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import static com.alipay.oceanbase.rpc.protocol.payload.ResultCodes.OB_ERR_KV_ROUTE_ENTRY_EXPIRE;
6666
import static com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationType.*;
6767
import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.*;
68+
import static java.lang.String.format;
6869

6970
public class ObTableClient extends AbstractObTableClient implements Lifecycle {
7071
private static final Logger logger = getLogger(ObTableClient.class);
@@ -791,7 +792,6 @@ private <T> T executeMutation(String tableName, MutationExecuteCallback<T> callb
791792
}
792793
boolean needRefreshTableEntry = false;
793794
boolean needRenew = false;
794-
boolean needFetchAllRouteInfo = false;
795795
int tryTimes = 0;
796796
long startExecute = System.currentTimeMillis();
797797
while (true) {
@@ -1933,10 +1933,22 @@ public ObPair<Long, ObTableParam> getTableInternal(String tableName, TableEntry
19331933
long partitionId = partId;
19341934
ObPartitionLocationInfo obPartitionLocationInfo = null;
19351935
if (ObGlobal.obVsnMajor() >= 4) {
1936-
19371936
obPartitionLocationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
1938-
19391937
replica = getPartitionLocation(obPartitionLocationInfo, route);
1938+
/**
1939+
* Normally, getOrRefreshPartitionInfo makes sure that a thread only continues if it finds the leader
1940+
* during a route refresh. But sometimes, there might not be a leader yet. In this case, the thread
1941+
* is released, and since it can't get the replica, it throws an no master exception.
1942+
*/
1943+
if (replica == null && obPartitionLocationInfo.getPartitionLocation().getLeader() == null) {
1944+
RUNTIME.error(LCD.convert("01-00028"), partitionId, tableEntry.getPartitionEntry(), tableEntry);
1945+
RUNTIME.error(format(
1946+
"partition=%d has no leader partitionEntry=%s original tableEntry=%s",
1947+
partitionId, tableEntry.getPartitionEntry(), tableEntry));
1948+
throw new ObTablePartitionNoMasterException(format(
1949+
"partition=%d has no leader partitionEntry=%s original tableEntry=%s",
1950+
partitionId, tableEntry.getPartitionEntry(), tableEntry));
1951+
}
19401952
} else {
19411953
if (tableEntry.isPartitionTable()
19421954
&& null != tableEntry.getPartitionInfo().getSubPartDesc()) {
@@ -2163,7 +2175,7 @@ public List<ObPair<Long, ObTableParam>> getTables(String tableName, ObTableQuery
21632175
boolean refresh, boolean waitForRefresh)
21642176
throws Exception {
21652177
return getTables(tableName, query, start, startInclusive, end, endInclusive, refresh,
2166-
waitForRefresh, false, getRoute(false));
2178+
waitForRefresh, getRoute(false));
21672179
}
21682180

21692181
/**
@@ -2184,10 +2196,10 @@ public List<ObPair<Long, ObTableParam>> getTables(String tableName, ObTableQuery
21842196
Object[] start, boolean startInclusive,
21852197
Object[] end, boolean endInclusive,
21862198
boolean refresh, boolean waitForRefresh,
2187-
boolean needFetchAll, ObServerRoute route) throws Exception {
2199+
ObServerRoute route) throws Exception {
21882200

21892201
// 1. get TableEntry information
2190-
TableEntry tableEntry = getOrRefreshTableEntry(tableName, refresh, waitForRefresh, needFetchAll);
2202+
TableEntry tableEntry = getOrRefreshTableEntry(tableName, refresh, waitForRefresh, false);
21912203

21922204
List<String> scanRangeColumns = query.getScanRangeColumns();
21932205
if (scanRangeColumns == null || scanRangeColumns.isEmpty()) {
@@ -3571,16 +3583,13 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
35713583
} else if (request instanceof ObTableQueryRequest) {
35723584
// TableGroup -> TableName
35733585
String tableName = request.getTableName();
3574-
tableName = getPhyTableNameFromTableGroup(((ObTableQueryRequest) request), tableName);
35753586
ObTableClientQueryImpl tableQuery = new ObTableClientQueryImpl(tableName,
35763587
((ObTableQueryRequest) request).getTableQuery(), this);
35773588
tableQuery.setEntityType(request.getEntityType());
35783589
return new ObClusterTableQuery(tableQuery).executeInternal();
35793590
} else if (request instanceof ObTableQueryAsyncRequest) {
35803591
// TableGroup -> TableName
35813592
String tableName = request.getTableName();
3582-
tableName = getPhyTableNameFromTableGroup(
3583-
((ObTableQueryAsyncRequest) request).getObTableQueryRequest(), tableName);
35843593
ObTableClientQueryImpl tableQuery = new ObTableClientQueryImpl(tableName,
35853594
((ObTableQueryAsyncRequest) request).getObTableQueryRequest().getTableQuery(), this);
35863595
tableQuery.setEntityType(request.getEntityType());
@@ -4199,6 +4208,15 @@ public String getPhyTableNameFromTableGroup(ObTableQueryRequest request, String
41994208
return tableName;
42004209
}
42014210

4211+
public String getPhyTableNameFromTableGroup(ObTableEntityType type, String tableName) throws Exception {
4212+
if (odpMode) {
4213+
// do nothing
4214+
} else if (type == ObTableEntityType.HKV && isTableGroupName(tableName)) {
4215+
tableName = tryGetTableNameFromTableGroupCache(tableName, false);
4216+
}
4217+
return tableName;
4218+
}
4219+
42024220
/*
42034221
* Get the start keys of different tablets, byte[0] = [] = EMPTY_START_ROW = EMPTY_END_ROW
42044222
* Example:

src/main/java/com/alipay/oceanbase/rpc/direct_load/protocol/v0/ObDirectLoadProtocolV0.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ public class ObDirectLoadProtocolV0 implements ObDirectLoadProtocol {
3030

3131
public static final long OB_VERSION_4_3_2_0 = ObGlobal.calcVersion(4, (short) 3,
3232
(byte) 2, (byte) 0);
33+
public static final long OB_VERSION_4_3_5_0 = ObGlobal.calcVersion(4, (short) 3,
34+
(byte) 5, (byte) 0);
3335

3436
private static final int PROTOCOL_VERSION = 0;
3537
private final ObDirectLoadLogger logger;
@@ -63,12 +65,14 @@ public void checkIsSupported(ObDirectLoadStatement statement) throws ObDirectLoa
6365
+ " is not supported, minimum version required is "
6466
+ ObGlobal.getObVsnString(OB_VERSION_4_3_2_0));
6567
}
66-
} else if (statement.getPartitionNames().length > 0) {
68+
} else if (obVersion < OB_VERSION_4_3_5_0 && statement.getPartitionNames().length > 0) {
6769
logger.warn("partition names in ob version " + ObGlobal.getObVsnString(obVersion)
68-
+ "is not supported");
70+
+ "is not supported, minimum version required is "
71+
+ ObGlobal.getObVsnString(OB_VERSION_4_3_5_0));
6972
throw new ObDirectLoadNotSupportedException("partition names in ob version "
70-
+ ObGlobal.getObVsnString(obVersion)
71-
+ " is not supported");
73+
+ ObGlobal.getObVsnString(obVersion)
74+
+ " is not supported, minimum version required is "
75+
+ ObGlobal.getObVsnString(OB_VERSION_4_3_5_0));
7276
}
7377
}
7478

src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1258,8 +1258,17 @@ private static ObPartitionEntry getPartitionLocationFromResultSetByTablet(TableE
12581258
}
12591259
location.addReplicaLocation(replica);
12601260

1261-
if (partitionLocationInfo.initialized.compareAndSet(false, true)) {
1261+
if (location.getLeader() != null && partitionLocationInfo.initialized.compareAndSet(false, true)) {
12621262
partitionLocationInfo.initializationLatch.countDown();
1263+
} else if (rs.isLast() && location.getLeader() == null) {
1264+
partitionLocationInfo.initializationLatch.countDown();
1265+
RUNTIME.error(LCD.convert("01-00028"), partitionId, partitionEntry, tableEntry);
1266+
RUNTIME.error(format(
1267+
"partition=%d has no leader partitionEntry=%s original tableEntry=%s",
1268+
partitionId, partitionEntry, tableEntry));
1269+
throw new ObTablePartitionNoMasterException(format(
1270+
"partition=%d has no leader partitionEntry=%s original tableEntry=%s",
1271+
partitionId, partitionEntry, tableEntry));
12631272
}
12641273
}
12651274

@@ -1312,8 +1321,17 @@ private static ObPartitionEntry getPartitionLocationFromResultSet(TableEntry tab
13121321
}
13131322
location.addReplicaLocation(replica);
13141323

1315-
if (partitionLocationInfo.initialized.compareAndSet(false, true)) {
1324+
if (location.getLeader() != null && partitionLocationInfo.initialized.compareAndSet(false, true)) {
1325+
partitionLocationInfo.initializationLatch.countDown();
1326+
} else if (rs.isLast() && location.getLeader() == null) {
13161327
partitionLocationInfo.initializationLatch.countDown();
1328+
RUNTIME.error(LCD.convert("01-00028"), partitionId, partitionEntry, tableEntry);
1329+
RUNTIME.error(format(
1330+
"partition=%d has no leader partitionEntry=%s original tableEntry=%s",
1331+
partitionId, partitionEntry, tableEntry));
1332+
throw new ObTablePartitionNoMasterException(format(
1333+
"partition=%d has no leader partitionEntry=%s original tableEntry=%s",
1334+
partitionId, partitionEntry, tableEntry));
13171335
}
13181336
} else {
13191337
partitionId = rs.getLong("partition_id");

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableLSOpRequest.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,14 @@ public void addTabletOperation(ObTableTabletOp tabletOp) {
121121
public void setLsOperation(ObTableLSOperation lsOperation) {
122122
this.lsOperation = lsOperation;
123123
}
124-
124+
125+
/*
126+
* Get entity type.
127+
*/
128+
public ObTableEntityType getEntityType() {
129+
return entityType;
130+
}
131+
125132
/*
126133
* Set entity type.
127134
*/

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,11 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
148148
if (failedServerList != null) {
149149
route.setBlackList(failedServerList);
150150
}
151+
if (ObGlobal.obVsnMajor() >= 4) {
152+
TableEntry tableEntry = client.getOrRefreshTableEntry(indexTableName, false, false, false);
153+
client.refreshTableLocationByTabletId(tableEntry, indexTableName, client.getTabletIdByPartId(tableEntry, partIdWithIndex.getLeft()));
154+
}
155+
151156
subObTable = client
152157
.getTableWithPartId(indexTableName, partIdWithIndex.getLeft(),
153158
needRefreshTableEntry, client.isTableEntryRefreshIntervalWait(),
@@ -158,7 +163,6 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
158163
result = subObTable.executeWithConnection(request, connectionRef);
159164
} else {
160165
result = subObTable.execute(request);
161-
162166
if (result != null && result.getPcode() == Pcodes.OB_TABLE_API_MOVE) {
163167
ObTableApiMove moveResponse = (ObTableApiMove) result;
164168
client.getRouteTableRefresher().addTableIfAbsent(indexTableName, true);
@@ -269,7 +273,7 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
269273
((ObTableException) e).getErrorCode(), tryTimes, e);
270274
// tablet not exists, refresh table entry
271275
if (e instanceof ObTableNeedFetchAllException) {
272-
client.getOrRefreshTableEntry(tableName, true, true, true);
276+
client.getOrRefreshTableEntry(indexTableName, true, true, true);
273277
throw e;
274278
}
275279
} else {
@@ -291,6 +295,13 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
291295
return result;
292296
}
293297

298+
/*
299+
* RenewLease.
300+
*/
301+
public void renewLease() throws Exception {
302+
throw new IllegalStateException("renew only support stream query");
303+
}
304+
294305
/*
295306
* Next.
296307
*/
@@ -587,7 +598,8 @@ public void init() throws Exception {
587598
RUNTIME.error("Fail to get refresh table entry response after {}",
588599
retryTimes);
589600
throw new ObTableRetryExhaustedException(
590-
"Fail to get refresh table entry response after " + retryTimes);
601+
"Fail to get refresh table entry response after " + retryTimes +
602+
"errorCode:" + ((ObTableNeedFetchAllException) e).getErrorCode());
591603

592604
}
593605
} else {

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/syncquery/ObQueryOperationType.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import java.util.Map;
2222

2323
public enum ObQueryOperationType {
24-
QUERY_START(0), QUERY_NEXT(1), QUERY_END(2);
24+
QUERY_START(0), QUERY_NEXT(1), QUERY_END(2), QUERY_RENEW(3);
2525

2626
private int value;
2727
private static Map<Integer, ObQueryOperationType> map = new HashMap<Integer, ObQueryOperationType>();

src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -81,14 +81,15 @@ public void init() throws Exception {
8181
} catch (Exception e) {
8282
if (e instanceof ObTableNeedFetchAllException) {
8383
setExpectant(refreshPartition(this.asyncRequest.getObTableQueryRequest()
84-
.getTableQuery(), tableName));
84+
.getTableQuery(), client.getPhyTableNameFromTableGroup(entityType, tableName)));
8585
it = expectant.entrySet().iterator();
8686
retryTimes++;
8787
if (retryTimes > maxRetries) {
8888
RUNTIME.error("Fail to get refresh table entry response after {}",
8989
retryTimes);
9090
throw new ObTableRetryExhaustedException(
91-
"Fail to get refresh table entry response after " + retryTimes);
91+
"Fail to get refresh table entry response after " + retryTimes
92+
+ "errorCode:" + ((ObTableNeedFetchAllException) e).getErrorCode());
9293

9394
}
9495
} else {
@@ -180,6 +181,32 @@ protected Map<Long, ObPair<Long, ObTableParam>> refreshPartition(ObTableQuery ta
180181
return buildPartitions(client, tableQuery, tableName);
181182
}
182183

184+
// This function is designed for HBase-type requests.
185+
// It is used to extend the session duration of a scan
186+
@Override
187+
public void renewLease() throws Exception {
188+
if (!isEnd() && !expectant.isEmpty()) {
189+
Iterator<Map.Entry<Long, ObPair<Long, ObTableParam>>> it = expectant.entrySet()
190+
.iterator();
191+
Map.Entry<Long, ObPair<Long, ObTableParam>> lastEntry = it.next();
192+
ObPair<Long, ObTableParam> partIdWithObTable = lastEntry.getValue();
193+
// try access new partition, async will not remove useless expectant
194+
ObTableParam obTableParam = partIdWithObTable.getRight();
195+
ObTableQueryRequest queryRequest = asyncRequest.getObTableQueryRequest();
196+
197+
// refresh request info
198+
queryRequest.setPartitionId(obTableParam.getPartitionId());
199+
queryRequest.setTableId(obTableParam.getTableId());
200+
201+
// refresh async query request
202+
asyncRequest.setQueryType(ObQueryOperationType.QUERY_RENEW);
203+
asyncRequest.setQuerySessionId(sessionId);
204+
executeAsync(partIdWithObTable, asyncRequest);
205+
} else {
206+
throw new ObTableException("query end or expectant is null");
207+
}
208+
}
209+
183210
@Override
184211
public boolean next() throws Exception {
185212
checkStatus();
@@ -202,8 +229,8 @@ public boolean next() throws Exception {
202229
referToLastStreamResult(lastEntry.getValue());
203230
} catch (Exception e) {
204231
if (e instanceof ObTableNeedFetchAllException) {
205-
206-
TableEntry entry = client.getOrRefreshTableEntry(tableName, false, false,
232+
String realTableName = client.getPhyTableNameFromTableGroup(entityType, tableName);
233+
TableEntry entry = client.getOrRefreshTableEntry(realTableName, false, false,
207234
false);
208235
// Calculate the next partition only when the range partition is affected by a split, based on the keys already scanned.
209236
if (ObGlobal.obVsnMajor() >= 4
@@ -213,11 +240,11 @@ public boolean next() throws Exception {
213240
this.asyncRequest.getObTableQueryRequest().getTableQuery()
214241
.adjustStartKey(currentStartKey);
215242
setExpectant(refreshPartition(this.asyncRequest
216-
.getObTableQueryRequest().getTableQuery(), tableName));
243+
.getObTableQueryRequest().getTableQuery(), realTableName));
217244
setEnd(true);
218245
} else {
219246
setExpectant(refreshPartition(this.asyncRequest
220-
.getObTableQueryRequest().getTableQuery(), tableName));
247+
.getObTableQueryRequest().getTableQuery(), realTableName));
221248
}
222249
} else {
223250
throw e;
@@ -245,7 +272,8 @@ public boolean next() throws Exception {
245272
referToNewPartition(entry.getValue());
246273
} catch (Exception e) {
247274
if (e instanceof ObTableNeedFetchAllException) {
248-
TableEntry tableEntry = client.getOrRefreshTableEntry(tableName, false,
275+
String realTableName = client.getPhyTableNameFromTableGroup(entityType, tableName);
276+
TableEntry tableEntry = client.getOrRefreshTableEntry(realTableName, false,
249277
false, false);
250278
if (ObGlobal.obVsnMajor() >= 4
251279
&& tableEntry.isPartitionTable()
@@ -254,7 +282,7 @@ public boolean next() throws Exception {
254282
this.asyncRequest.getObTableQueryRequest().getTableQuery()
255283
.adjustStartKey(currentStartKey);
256284
setExpectant(refreshPartition(this.asyncRequest
257-
.getObTableQueryRequest().getTableQuery(), tableName));
285+
.getObTableQueryRequest().getTableQuery(), realTableName));
258286
}
259287
it = expectant.entrySet().iterator();
260288
retryTimes++;

0 commit comments

Comments
 (0)