Skip to content

Commit 8a507cf

Browse files
committed
add valid flag to obTable to avoid unnecessary reconnect if observer down
1 parent c9076a6 commit 8a507cf

File tree

5 files changed

+84
-21
lines changed

5 files changed

+84
-21
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -556,14 +556,6 @@ private <T> T execute(String tableName, TableExecuteCallback<T> callback, ObServ
556556
throw ex;
557557
}
558558
} else {
559-
if (ex instanceof ObTableTransportException &&
560-
((ObTableTransportException) ex).getErrorCode() == TransportCodes.BOLT_TIMEOUT) {
561-
syncRefreshMetadata(true);
562-
TableEntry entry = tableRoute.getTableEntry(tableName);
563-
long partId = tableRoute.getPartId(entry, rowKey);
564-
long tabletId = tableRoute.getTabletIdByPartId(entry, partId);
565-
tableRoute.refreshPartitionLocation(tableName, tabletId, entry);
566-
}
567559
String logMessage;
568560
if (ex instanceof ObTableException) {
569561
logMessage = String.format(
@@ -578,6 +570,15 @@ private <T> T execute(String tableName, TableExecuteCallback<T> callback, ObServ
578570
);
579571
}
580572
logger.warn(logMessage, ex);
573+
if (ex instanceof ObTableTransportException &&
574+
((ObTableTransportException) ex).getErrorCode() == TransportCodes.BOLT_TIMEOUT) {
575+
syncRefreshMetadata(true);
576+
TableEntry entry = tableRoute.getTableEntry(tableName);
577+
long partId = tableRoute.getPartId(entry, rowKey);
578+
long tabletId = tableRoute.getTabletIdByPartId(entry, partId);
579+
tableRoute.refreshPartitionLocation(tableName, tabletId, entry);
580+
tableParam.getObTable().setDirty();
581+
}
581582
calculateContinuousFailure(tableName, ex.getMessage());
582583
throw ex;
583584
}
@@ -775,14 +776,6 @@ private <T> T execute(String tableName, OperationExecuteCallback<T> callback,
775776
throw new ObTableRetryExhaustedException(logMessage, ex);
776777
}
777778
} else {
778-
if (ex instanceof ObTableTransportException &&
779-
((ObTableTransportException) ex).getErrorCode() == TransportCodes.BOLT_TIMEOUT) {
780-
syncRefreshMetadata(true);
781-
TableEntry entry = tableRoute.getTableEntry(tableName);
782-
long partId = tableRoute.getPartId(entry, callback.getRowKey());
783-
long tabletId = tableRoute.getTabletIdByPartId(entry, partId);
784-
tableRoute.refreshPartitionLocation(tableName, tabletId, entry);
785-
}
786779
String logMessage;
787780
if (ex instanceof ObTableException) {
788781
logMessage = String.format(
@@ -797,6 +790,15 @@ private <T> T execute(String tableName, OperationExecuteCallback<T> callback,
797790
);
798791
}
799792
logger.warn(logMessage, ex);
793+
if (ex instanceof ObTableTransportException &&
794+
((ObTableTransportException) ex).getErrorCode() == TransportCodes.BOLT_TIMEOUT) {
795+
syncRefreshMetadata(true);
796+
TableEntry entry = tableRoute.getTableEntry(tableName);
797+
long partId = tableRoute.getPartId(entry, callback.getRowKey());
798+
long tabletId = tableRoute.getTabletIdByPartId(entry, partId);
799+
tableRoute.refreshPartitionLocation(tableName, tabletId, entry);
800+
tableParam.getObTable().setDirty();
801+
}
800802
calculateContinuousFailure(tableName, ex.getMessage());
801803
throw ex;
802804
}

src/main/java/com/alipay/oceanbase/rpc/location/model/TableRoute.java

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObNewRange;
3030
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQuery;
3131
import com.alipay.oceanbase.rpc.table.ObTable;
32-
import com.alipay.oceanbase.rpc.table.ObTableClientType;
3332
import com.alipay.oceanbase.rpc.table.ObTableParam;
3433
import com.alipay.oceanbase.rpc.table.ObTableServerCapacity;
3534
import com.alipay.oceanbase.rpc.util.StringUtil;
@@ -532,17 +531,21 @@ public TableEntry refreshPartitionLocation(String tableName, long tabletId, Tabl
532531
throws Exception {
533532
TableEntry tableEntry = entry == null ? tableLocations.getTableEntry(tableName) : entry;
534533
try {
535-
return tableLocations.refreshPartitionLocation(tableEntry, tableName, tabletId,
534+
tableEntry = tableLocations.refreshPartitionLocation(tableEntry, tableName, tabletId,
536535
serverRoster, sysUA);
536+
validCachedObTableStatus(tableName, tableEntry, tabletId, tableClient.getRoute(false));
537+
return tableEntry;
537538
} catch (ObTableGetException e) {
538539
logger
539540
.warn(
540541
"refresh partition location meets tableEntry not initialized exception, tableName: {}",
541542
tableName);
542543
if (e.getMessage().contains("Need to fetch meta")) {
543544
tableEntry = refreshMeta(tableName);
544-
return tableLocations.refreshPartitionLocation(tableEntry, tableName, tabletId,
545+
tableEntry = tableLocations.refreshPartitionLocation(tableEntry, tableName, tabletId,
545546
serverRoster, sysUA);
547+
validCachedObTableStatus(tableName, tableEntry, tabletId, tableClient.getRoute(false));
548+
return tableEntry;
546549
}
547550
throw e;
548551
} catch (Throwable t) {
@@ -556,17 +559,27 @@ public TableEntry refreshPartitionLocation(String tableName, long tabletId, Tabl
556559
public TableEntry refreshTabletLocationBatch(String tableName) throws Exception {
557560
TableEntry tableEntry = tableLocations.getTableEntry(tableName);
558561
try {
559-
return tableLocations.refreshTabletLocationBatch(tableEntry, tableName,
562+
tableEntry = tableLocations.refreshTabletLocationBatch(tableEntry, tableName,
560563
serverRoster, sysUA);
564+
Long[] tablets = getTabletsFromTableEntry(tableEntry);
565+
for (long tablet : tablets) {
566+
validCachedObTableStatus(tableName, tableEntry, tablet, tableClient.getRoute(false));
567+
}
568+
return tableEntry;
561569
} catch (ObTableGetException e) {
562570
logger
563571
.warn(
564572
"refresh location in batch meets tableEntry not initialized exception, tableName: {}",
565573
tableName);
566574
if (e.getMessage().contains("Need to fetch meta")) {
567575
tableEntry = refreshMeta(tableName);
568-
return tableLocations.refreshTabletLocationBatch(tableEntry, tableName,
576+
tableEntry = tableLocations.refreshTabletLocationBatch(tableEntry, tableName,
569577
serverRoster, sysUA);
578+
Long[] tablets = getTabletsFromTableEntry(tableEntry);
579+
for (long tablet : tablets) {
580+
validCachedObTableStatus(tableName, tableEntry, tablet, tableClient.getRoute(false));
581+
}
582+
return tableEntry;
570583
}
571584
throw e;
572585
} catch (Throwable t) {
@@ -584,6 +597,31 @@ public TableEntry refreshODPMeta(String tableName, boolean forceRefresh) throws
584597
return odpTableLocations.refreshODPMeta(tableName, forceRefresh, odpInfo.getObTable());
585598
}
586599

600+
private Long[] getTabletsFromTableEntry(TableEntry tableEntry) {
601+
Long[] tablets = null;
602+
if (tableEntry.isPartitionTable()) {
603+
tablets = tableEntry.getPartitionInfo().getPartTabletIdMap().values()
604+
.toArray(new Long[0]);
605+
} else {
606+
tablets = new Long[1];
607+
}
608+
return tablets;
609+
}
610+
611+
private void validCachedObTableStatus(String tableName, TableEntry tableEntry, long tabletId, ObServerRoute route) throws Exception {
612+
ObPartitionLocationInfo obPartitionLocationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
613+
if (obPartitionLocationInfo.getPartitionLocation() == null) {
614+
throw new ObTableNotExistException(
615+
"partition location is null after refresh, table: { " + tableName
616+
+ " } may not exist");
617+
}
618+
ReplicaLocation replica = getPartitionLocation(obPartitionLocationInfo, route);
619+
ObServerAddr addr = replica.getAddr();
620+
ObTable obTable = tableRoster.getTable(addr);
621+
if (obTable != null) {
622+
obTable.setValid();
623+
}
624+
}
587625
/**
588626
* get TableParam by tableName and rowkey
589627
* work for both OcpMode and OdpMode

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,7 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
293293
client.syncRefreshMetadata(true);
294294
long tabletId = partIdWithIndex.getRight().getTabletId();
295295
client.refreshTableLocationByTabletId(indexTableName, tabletId);
296+
subObTable.setDirty();
296297
}
297298
client.calculateContinuousFailure(indexTableName, e.getMessage());
298299
throw e;

src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ public class ObTable extends AbstractObTable implements Lifecycle {
7777
private boolean enableRerouting = true; // only used for init packet factory
7878

7979
private ReentrantLock statusLock = new ReentrantLock();
80+
private AtomicBoolean valid = new AtomicBoolean(true);
8081

8182
/*
8283
* Init.
@@ -187,6 +188,22 @@ public boolean isEnableRerouting(){
187188
return enableRerouting;
188189
}
189190

191+
// flag this obTable is valid and available
192+
public void setValid() {
193+
log.warn("[latency monitor] set ip:port {}:{} as valid", ip, port);
194+
valid.compareAndSet(false, true);
195+
}
196+
197+
// flag this obTable is invalid and unavailable
198+
public void setDirty() {
199+
log.warn("[latency monitor] set ip:port {}:{} as dirty", ip, port);
200+
valid.compareAndSet(true, false);
201+
}
202+
203+
public boolean isValid() {
204+
return valid.get();
205+
}
206+
190207
/*
191208
* Query.
192209
*/
@@ -396,6 +413,10 @@ public ObTableOperationResult execute(String tableName, ObTableOperationType typ
396413
public ObPayload execute(final ObPayload request) throws RemotingException,
397414
InterruptedException {
398415

416+
if (!isValid()) {
417+
log.warn("[latency monitor] The server is not available, server address: " + ip + ":" + port);
418+
throw new ObTableServerConnectException("The server is not available, server address: " + ip + ":" + port);
419+
}
399420
ObTableConnection connection = null;
400421
try {
401422
connection = getConnection();

src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -732,6 +732,7 @@ public void partitionExecute(ObTableSingleOpResult[] results,
732732
((ObTableTransportException) ex).getErrorCode() == TransportCodes.BOLT_TIMEOUT) {
733733
obTableClient.syncRefreshMetadata(true);
734734
obTableClient.refreshTabletLocationBatch(realTableName);
735+
subObTable.setDirty();
735736
}
736737
obTableClient.calculateContinuousFailure(realTableName, ex.getMessage());
737738
throw ex;

0 commit comments

Comments
 (0)