Skip to content

Commit b57034b

Browse files
committed
fix hbase put sinlge cf table
1 parent 9728a9b commit b57034b

File tree

3 files changed

+28
-16
lines changed

3 files changed

+28
-16
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1193,7 +1193,10 @@ public void dealWithRpcTimeoutForBatchTablet(ObServerAddr addr, String tableName
11931193
*/
11941194
public String tryGetTableNameFromTableGroupCache(final String tableGroupName,
11951195
final boolean refresh) throws Exception {
1196-
return tableRoute.tryGetTableNameFromTableGroupCache(tableGroupName, refresh);
1196+
if (isTableGroupName(tableGroupName)) {
1197+
return tableRoute.tryGetTableNameFromTableGroupCache(tableGroupName, refresh);
1198+
}
1199+
return tableGroupName;
11971200
}
11981201

11991202
/**
@@ -2401,17 +2404,13 @@ public ObPayload execute(final ObHbaseRequest request) throws Exception {
24012404
// get the first cell from the first cfRows to route
24022405
// use the first table in tablegroup to route
24032406
String realTableName = null;
2404-
try {
2405-
realTableName = tryGetTableNameFromTableGroupCache(request.getTableName(), false);
2406-
} catch (ObTableNotExistException e) {
2407-
if (request.getCfRows().size() != 1) {
2408-
throw new ObTableUnexpectedException("multi-cf operations must create tablegroup and binding tables");
2409-
} else {
2410-
realTableName = request.getCfRows().get(0).getRealTableName();
2411-
}
2407+
if (request.getCfRows().isEmpty()) {
2408+
throw new ObTableUnexpectedException("no cf rows");
24122409
}
2413-
if (realTableName == null) {
2414-
throw new ObTableUnexpectedException("realTableName is null");
2410+
if (request.getCfRows().size() > 1) {
2411+
realTableName = tryGetTableNameFromTableGroupCache(request.getTableName(), false);
2412+
} else {
2413+
realTableName = request.getCfRows().get(0).getRealTableName();
24152414
}
24162415
int keyIdx = request.getCfRows().get(0).getKeyIndex(0);
24172416
row.add("K", request.getKeys().get(keyIdx).getValue());

src/main/java/com/alipay/oceanbase/rpc/location/model/TableGroupCache.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,11 @@ private String refreshTableNameByTableGroup(String physicalTableName, String tab
150150

151151
public void eraseTableGroupFromCache(String tableGroupName) {
152152
// clear table group cache
153-
TableGroupInverted.remove(TableGroupCache.get(tableGroupName));
153+
String tableName = TableGroupCache.get(tableGroupName);
154+
if (tableName != null) {
155+
// maybe in concurrent situation, other thread has erased name already
156+
TableGroupInverted.remove(tableName);
157+
}
154158
TableGroupCache.remove(tableGroupName);
155159
TableGroupCacheLocks.remove(tableGroupName);
156160
}

src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.alipay.oceanbase.rpc.exception.*;
2222
import com.alipay.oceanbase.rpc.location.model.TableEntry;
2323
import com.alipay.oceanbase.rpc.location.model.partition.ObPair;
24+
import com.alipay.oceanbase.rpc.location.model.partition.ObPartitionLevel;
2425
import com.alipay.oceanbase.rpc.protocol.payload.Constants;
2526
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
2627
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.*;
@@ -190,22 +191,26 @@ public boolean queryLastStreamResultInNext() throws Exception {
190191
TableEntry entry = client.getOrRefreshTableEntry(realTableName, false);
191192
// Calculate the next partition only when the range partition is affected by a split, based on the keys already scanned.
192193
if (entry.isPartitionTable()
194+
&& entry.getPartitionInfo().getLevel() == ObPartitionLevel.LEVEL_ONE
193195
&& entry.getPartitionInfo().getFirstPartDesc().getPartFuncType()
194196
.isRangePart()) {
195197
this.asyncRequest.getObTableQueryRequest().getTableQuery()
196198
.adjustStartKey(currentStartKey);
197199
setExpectant(refreshPartition(this.asyncRequest
198200
.getObTableQueryRequest().getTableQuery(), realTableName));
199201
setEnd(true);
202+
} else {
203+
// non one-level-range partitioned table does not retry, inform user to rescan
204+
throw e;
200205
}
201206
} else {
202207
throw e;
203208
}
204209
}
205210
// remove useless expectant if it is end
206-
if (isEnd())
211+
if (isEnd()) {
207212
it.remove();
208-
213+
}
209214
if (!cacheRows.isEmpty()) {
210215
nextRow();
211216
return true;
@@ -230,12 +235,16 @@ public boolean queryNewStreamResultInNext() throws Exception {
230235
tableName);
231236
TableEntry tableEntry = client.getOrRefreshTableEntry(realTableName, false);
232237
if (tableEntry.isPartitionTable()
238+
&& tableEntry.getPartitionInfo().getLevel() == ObPartitionLevel.LEVEL_ONE
233239
&& tableEntry.getPartitionInfo().getFirstPartDesc().getPartFuncType()
234240
.isRangePart()) {
235241
this.asyncRequest.getObTableQueryRequest().getTableQuery()
236242
.adjustStartKey(currentStartKey);
237243
setExpectant(refreshPartition(this.asyncRequest
238244
.getObTableQueryRequest().getTableQuery(), realTableName));
245+
} else {
246+
// non one-level-range partitioned table does not retry, inform user to rescan
247+
throw e;
239248
}
240249
it = expectant.entrySet().iterator();
241250
retryTimes++;
@@ -253,9 +262,9 @@ public boolean queryNewStreamResultInNext() throws Exception {
253262
}
254263

255264
// remove useless expectant if it is end
256-
if (isEnd())
265+
if (isEnd()) {
257266
it.remove();
258-
267+
}
259268
if (!cacheRows.isEmpty()) {
260269
hasNext = true;
261270
nextRow();

0 commit comments

Comments
 (0)