Skip to content

Commit a82d5d0

Browse files
committed
partical refresh
1 parent ecf8cc6 commit a82d5d0

File tree

7 files changed

+571
-251
lines changed

7 files changed

+571
-251
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 158 additions & 77 deletions
Large diffs are not rendered by default.

src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java

Lines changed: 271 additions & 149 deletions
Large diffs are not rendered by default.

src/main/java/com/alipay/oceanbase/rpc/location/model/TableEntry.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@
2323
import com.alipay.oceanbase.rpc.location.model.partition.ObPartitionLevel;
2424
import com.alipay.oceanbase.rpc.protocol.payload.Constants;
2525

26-
import java.util.HashMap;
2726
import java.util.LinkedHashMap;
2827
import java.util.Map;
28+
import java.util.concurrent.ConcurrentHashMap;
29+
import java.util.concurrent.locks.Lock;
2930

3031
import static com.google.common.base.Preconditions.checkArgument;
3132

@@ -53,7 +54,26 @@ public class TableEntry {
5354
// partition location
5455
private TableEntryKey tableEntryKey = null;
5556
private volatile ObPartitionEntry partitionEntry = null;
56-
57+
58+
// tablet id ==> refresh time
59+
private final ConcurrentHashMap<Long, Long> refreshTimeMap = new ConcurrentHashMap<>();
60+
public ConcurrentHashMap<Long, Lock> refreshLockMap = new ConcurrentHashMap<>();
61+
62+
public long getTabletLocationLastRefreshTimeMills(Long tabletId) {
63+
return refreshTimeMap.getOrDefault(tabletId, 0L);
64+
}
65+
public void setTableLocationLastRefreshTimeMills(Long tabletId, Long lastRefreshTime) {
66+
refreshTimeMap.put(tabletId, lastRefreshTime);
67+
}
68+
69+
public Lock getRefreshLock(Long tabletId) {
70+
return refreshLockMap.get(tabletId);
71+
}
72+
public void setRefreshLock(Long tabletId, Lock refreshLock) {
73+
refreshLockMap.put(tabletId, refreshLock);
74+
}
75+
76+
5777
/*
5878
* Is valid.
5979
*/
@@ -218,8 +238,6 @@ public void prepare() throws IllegalArgumentException {
218238
checkArgument(partitionInfo != null, "partition table partition info is not ready. key"
219239
+ tableEntryKey);
220240
partitionInfo.prepare();
221-
checkArgument(partitionEntry != null,
222-
"partition table partition entry is not ready. key" + tableEntryKey);
223241
}
224242
}
225243

src/main/java/com/alipay/oceanbase/rpc/location/model/partition/ObPartitionEntry.java

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,30 @@
2121

2222
import java.util.HashMap;
2323
import java.util.Map;
24+
import java.util.concurrent.ConcurrentHashMap;
25+
2426

2527
public class ObPartitionEntry {
2628
private Map<Long, ObPartitionLocation> partitionLocation = new HashMap<Long, ObPartitionLocation>();
2729

2830
// mapping from tablet id to ls id, and the part id to tablet id mapping is in ObPartitionInfo
2931
private Map<Long, Long> tabletLsIdMap = new HashMap<>();
30-
32+
33+
// tabelt id -> (PartitionLocation, LsId)
34+
// tablet id 作为索引管理PartitionInfo 其中包含了 PartitionLocation 和LSID
35+
// 外部会通过tablet id并发的读写ObPartitionLocationInfo
36+
// 写的场景就是更新,读的场景是正常的请求执行,需要保证读写的安全性,更新的时候一方面是保证线程安全,另一方面还需要保证不能频繁更新
37+
private ConcurrentHashMap<Long, ObPartitionLocationInfo> partitionInfos = new ConcurrentHashMap<>();
38+
39+
40+
public ObPartitionLocationInfo getPartitionInfo(long tabletId) {
41+
if (!partitionInfos.containsKey(tabletId)) {
42+
ObPartitionLocationInfo partitionInfo = new ObPartitionLocationInfo();
43+
partitionInfos.put(tabletId, partitionInfo);
44+
}
45+
return partitionInfos.get(tabletId);
46+
}
47+
3148
public Map<Long, ObPartitionLocation> getPartitionLocation() {
3249
return partitionLocation;
3350
}
@@ -39,6 +56,16 @@ public void setPartitionLocation(Map<Long, ObPartitionLocation> partitionLocatio
3956
this.partitionLocation = partitionLocation;
4057
}
4158

59+
public Map<Long, Long> getTabletLsIdMap() {
60+
return tabletLsIdMap;
61+
}
62+
63+
public void setTabletLsIdMap(Map<Long, Long> tabletLsIdMap) {
64+
this.tabletLsIdMap = tabletLsIdMap;
65+
}
66+
67+
public long getLsId(long tabletId) { return tabletLsIdMap.get(tabletId); }
68+
4269
/*
4370
* Get partition location with part id.
4471
*/
@@ -86,14 +113,4 @@ public void prepareForWeakRead(ObServerLdcLocation ldcLocation) {
86113
public String toString() {
87114
return "ObPartitionEntry{" + "partitionLocation=" + partitionLocation + '}';
88115
}
89-
90-
public Map<Long, Long> getTabletLsIdMap() {
91-
return tabletLsIdMap;
92-
}
93-
94-
public void setTabletLsIdMap(Map<Long, Long> tabletLsIdMap) {
95-
this.tabletLsIdMap = tabletLsIdMap;
96-
}
97-
98-
public long getLsId(long tabletId) { return tabletLsIdMap.get(tabletId); }
99116
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*-
2+
* #%L
3+
* com.oceanbase:obkv-table-client
4+
* %%
5+
* Copyright (C) 2021 - 2024 OceanBase
6+
* %%
7+
* OBKV Table Client Framework is licensed under Mulan PSL v2.
8+
* You can use this software according to the terms and conditions of the Mulan PSL v2.
9+
* You may obtain a copy of Mulan PSL v2 at:
10+
* http://license.coscl.org.cn/MulanPSL2
11+
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
12+
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
13+
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
14+
* See the Mulan PSL v2 for more details.
15+
* #L%
16+
*/
17+
18+
package com.alipay.oceanbase.rpc.location.model.partition;
19+
20+
import java.util.concurrent.atomic.AtomicBoolean;
21+
import java.util.concurrent.locks.ReentrantReadWriteLock;
22+
23+
import static com.alipay.oceanbase.rpc.protocol.payload.Constants.OB_INVALID_ID;
24+
25+
// 这个类不做线程安全之类的处理
26+
public class ObPartitionLocationInfo {
27+
private ObPartitionLocation partitionLocation = null;
28+
private Long tabletLsId = OB_INVALID_ID;
29+
private Long lastUpdateTime; // 最后更新时间
30+
public ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); // 读写锁
31+
public AtomicBoolean initialized = new AtomicBoolean(false);
32+
33+
public ObPartitionLocationInfo() {
34+
this.lastUpdateTime = System.currentTimeMillis(); // 初始化为当前时间
35+
}
36+
37+
public ObPartitionLocation getPartitionLocation() {
38+
rwLock.readLock().lock();
39+
try {
40+
return partitionLocation;
41+
} finally {
42+
rwLock.readLock().unlock();
43+
}
44+
}
45+
46+
public void setPartitionLocation(ObPartitionLocation partitionLocation) {
47+
this.partitionLocation = partitionLocation;
48+
}
49+
50+
public void updateLocation(ObPartitionLocation newLocation) {
51+
rwLock.writeLock().lock();
52+
try {
53+
this.partitionLocation = newLocation;
54+
this.lastUpdateTime = System.currentTimeMillis();
55+
} finally {
56+
rwLock.writeLock().unlock();
57+
}
58+
}
59+
60+
public Long getTabletLsId() {
61+
return tabletLsId;
62+
}
63+
64+
public void setTabletLsId(Long tabletLsId) {
65+
this.tabletLsId = tabletLsId;
66+
}
67+
68+
public Long getLastUpdateTime() {
69+
rwLock.readLock().lock();
70+
try {
71+
return lastUpdateTime;
72+
} finally {
73+
rwLock.readLock().unlock();
74+
}
75+
}
76+
}

src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,10 @@ public void init() throws Exception {
8484
it = expectant.entrySet().iterator();
8585
retryTimes++;
8686
if (retryTimes > maxRetries) {
87-
RUNTIME.error("Fail to get refresh table entry response after {}", retryTimes);
88-
throw new ObTableRetryExhaustedException("Fail to get refresh table entry response after " + retryTimes);
87+
RUNTIME.error("Fail to get refresh table entry response after {}",
88+
retryTimes);
89+
throw new ObTableRetryExhaustedException(
90+
"Fail to get refresh table entry response after " + retryTimes);
8991

9092
}
9193
} else {
@@ -237,8 +239,10 @@ public boolean next() throws Exception {
237239
it = expectant.entrySet().iterator();
238240
retryTimes++;
239241
if (retryTimes > client.getTableEntryRefreshTryTimes()) {
240-
RUNTIME.error("Fail to get refresh table entry response after {}", retryTimes);
241-
throw new ObTableRetryExhaustedException("Fail to get refresh table entry response after " + retryTimes);
242+
RUNTIME.error("Fail to get refresh table entry response after {}",
243+
retryTimes);
244+
throw new ObTableRetryExhaustedException(
245+
"Fail to get refresh table entry response after " + retryTimes);
242246
}
243247
continue;
244248
} else {

src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientBatchOpsImpl.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.alipay.oceanbase.rpc.ObTableClient;
2121
import com.alipay.oceanbase.rpc.exception.*;
2222
import com.alipay.oceanbase.rpc.location.model.ObServerRoute;
23+
import com.alipay.oceanbase.rpc.location.model.TableEntry;
2324
import com.alipay.oceanbase.rpc.location.model.partition.ObPair;
2425
import com.alipay.oceanbase.rpc.mutation.result.*;
2526
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
@@ -352,11 +353,12 @@ public void partitionExecute(ObTableOperationResult[] results,
352353
if (failedServerList != null) {
353354
route.setBlackList(failedServerList);
354355
}
355-
ObTableParam newParam = obTableClient.getTableWithPartId(tableName,
356-
originPartId, needRefreshTableEntry,
357-
obTableClient.isTableEntryRefreshIntervalWait(), needFetchAllRouteInfo,
358-
route).getRight();
359-
356+
TableEntry entry = obTableClient.getOrRefreshTableEntry(tableName, false,
357+
false, false);
358+
obTableClient.refreshTableLocationByTabletId(entry, tableName, partId);
359+
ObTableParam newParam = obTableClient.getTableWithPartId(tableName, partId,
360+
false, obTableClient.isTableEntryRefreshIntervalWait(), needFetchAllRouteInfo, route)
361+
.getRight();
360362
subObTable = newParam.getObTable();
361363
subRequest.setPartitionId(newParam.getPartitionId());
362364
}
@@ -418,6 +420,7 @@ public void partitionExecute(ObTableOperationResult[] results,
418420
tableName, partId, ((ObTableException) ex).getErrorCode(),
419421
tryTimes, ex);
420422
if (ex instanceof ObTableNeedFetchAllException) {
423+
// refresh table info
421424
obTableClient.getOrRefreshTableEntry(tableName, needRefreshTableEntry,
422425
obTableClient.isTableEntryRefreshIntervalWait(), true);
423426
throw ex;
@@ -444,7 +447,6 @@ public void partitionExecute(ObTableOperationResult[] results,
444447
throw new ObTableUnexpectedException(
445448
"check batch operation result error: client get unexpected NULL result");
446449
}
447-
448450
List<ObTableOperationResult> subObTableOperationResults = subObTableBatchOperationResult
449451
.getResults();
450452

0 commit comments

Comments
 (0)