3636import com .alipay .oceanbase .rpc .protocol .payload .impl .execute .ObTableStreamRequest ;
3737import com .alipay .oceanbase .rpc .protocol .payload .impl .execute .QueryStreamResult ;
3838import com .alipay .oceanbase .rpc .protocol .payload .impl .execute .syncquery .ObTableQueryAsyncResult ;
39+ import com .alipay .oceanbase .rpc .stream .ObTableClientQueryStreamResult ;
3940import com .alipay .oceanbase .rpc .table .ObTable ;
4041import com .alipay .oceanbase .rpc .table .ObTableParam ;
4142import io .netty .buffer .ByteBuf ;
4243import org .slf4j .Logger ;
44+ import org .slf4j .LoggerFactory ;
4345
4446import java .util .*;
4547import java .util .concurrent .atomic .AtomicReference ;
4648import java .util .concurrent .locks .ReentrantLock ;
49+ import java .util .function .Consumer ;
4750
4851import static com .alipay .oceanbase .rpc .util .TableClientLoggerFactory .RUNTIME ;
4952
@@ -70,8 +73,9 @@ public abstract class AbstractQueryStreamResult extends AbstractPayload implemen
7073 private ObReadConsistency readConsistency = ObReadConsistency .STRONG ;
7174 // ObRowKey objs: [startKey, MIN_OBJECT, MIN_OBJECT]
7275 public List <ObObj > currentStartKey ;
73- protected ObTableClient client ;
74-
76+ protected ObTableClient client ;
77+ private static final Logger logger = LoggerFactory
78+ .getLogger (AbstractQueryStreamResult .class );
7579 /*
7680 * Get pcode.
7781 */
@@ -163,8 +167,12 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
163167 result = subObTable .execute (request );
164168 if (result instanceof ObTableApiMove ) {
165169 ObTableApiMove move = (ObTableApiMove ) result ;
166- logger .warn ("The server has not yet completed the master switch, and returned an incorrect leader with an IP address of {}. " +
167- "Rerouting return IP is {}" , moveResponse .getReplica ().getServer ().ipToString (), move .getReplica ().getServer ().ipToString ());
170+ logger
171+ .warn (
172+ "The server has not yet completed the master switch, and returned an incorrect leader with an IP address of {}. "
173+ + "Rerouting return IP is {}" , moveResponse
174+ .getReplica ().getServer ().ipToString (), move .getReplica ()
175+ .getServer ().ipToString ());
168176 throw new ObTableRoutingWrongException ();
169177 }
170178 }
@@ -492,9 +500,11 @@ protected abstract ObTableQueryAsyncResult executeAsync(ObPair<Long, ObTablePara
492500 ObPayload streamRequest )
493501 throws Exception ;
494502
495- protected abstract Map <Long , ObPair <Long , ObTableParam >> refreshPartition (ObTableQuery tableQuery ,
496- String tableName )
497- throws Exception ;
503+ protected Map <Long , ObPair <Long , ObTableParam >> refreshPartition (ObTableQuery tableQuery ,
504+ String tableName )
505+ throws Exception {
506+ return buildPartitions (client , tableQuery , tableName );
507+ }
498508
499509 protected void cacheResultRows (ObTableQueryResult tableQueryResult ) {
500510 cacheRows .addAll (tableQueryResult .getPropertiesRows ());
@@ -525,6 +535,93 @@ protected void cacheStreamNext(ObPair<Long, ObTableParam> partIdWithObTable,
525535 }
526536 }
527537
538+ @ FunctionalInterface
539+ public interface ThrowingConsumer <T > {
540+ void accept (T t ) throws Exception ;
541+ }
542+
543+ @ FunctionalInterface
544+ public interface ExceptionHandler <T > {
545+ Iterator <Map .Entry <Long , ObPair <Long , ObTableParam >>> handle (ObTableClient client ,
546+ int maxRetryTimes ,
547+ String tableName ,
548+ Iterator <Map .Entry <Long , ObPair <Long , ObTableParam >>> iterator ,
549+ Map .Entry <Long , ObPair <Long , ObTableParam >> entry ,
550+ Exception e , int retryTimes )
551+ throws Exception ;
552+ }
553+
554+ public void executeWithRetry (ObTableClient client ,
555+ int maxRetryTimes ,
556+ String tableName ,
557+ Iterator <Map .Entry <Long , ObPair <Long , ObTableParam >>> iterator ,
558+ ThrowingConsumer <ObPair <Long , ObTableParam >> operation ,
559+ ExceptionHandler <Iterator <Map .Entry <Long , ObPair <Long , ObTableParam >>>> exceptionHandler )
560+ throws Exception {
561+ int retryTimes = 0 ;
562+
563+ while (iterator .hasNext ()) {
564+ retryTimes ++;
565+ Map .Entry <Long , ObPair <Long , ObTableParam >> entry = iterator .next ();
566+
567+ try {
568+ if (retryTimes > 1 ) {
569+ TableEntry tableEntry = client .getOrRefreshTableEntry (tableName , false , false ,
570+ false );
571+ client .refreshTableLocationByTabletId (tableEntry , tableName , entry .getValue ()
572+ .getRight ().getPartitionId ());
573+ }
574+ operation .accept (entry .getValue ());
575+
576+ } catch (Exception e ) {
577+ iterator = exceptionHandler .handle (client , maxRetryTimes , tableName , iterator ,
578+ entry , e , retryTimes );
579+ }
580+ }
581+ }
582+
583+ protected Iterator <Map .Entry <Long , ObPair <Long , ObTableParam >>> handleException (ObTableClient client ,
584+ int maxRetryTimes ,
585+ String tableName ,
586+ Iterator <Map .Entry <Long , ObPair <Long , ObTableParam >>> iterator ,
587+ Map .Entry <Long , ObPair <Long , ObTableParam >> entry ,
588+ Exception e ,
589+ int retryTimes )
590+ throws Exception {
591+
592+ if (client .isOdpMode ()) {
593+ if ((retryTimes - 1 ) < maxRetryTimes ) {
594+ if (e instanceof ObTableException ) {
595+ logger .warn (
596+ "execute while meet Exception, errorCode: {} , errorMsg: {}, try times {}" ,
597+ ((ObTableException ) e ).getErrorCode (), e .getMessage (), retryTimes );
598+ } else {
599+ logger .warn ("execute while meet Exception, exception: {}, try times {}" , e ,
600+ retryTimes );
601+ }
602+ } else {
603+ throw e ;
604+ }
605+ } else {
606+ if (e instanceof ObTableException && ((ObTableException ) e ).isNeedRefreshTableEntry ()) {
607+ if (client .isRetryOnChangeMasterTimes () && retryTimes <= maxRetryTimes ) {
608+ if (e instanceof ObTableNeedFetchAllException ) {
609+ // Refresh table info
610+ client .getOrRefreshTableEntry (tableName , true ,
611+ client .isTableEntryRefreshIntervalWait (), true );
612+ setExpectant (refreshPartition (tableQuery , tableName ));
613+ // Return a new iterator
614+ return expectant .entrySet ().iterator ();
615+ }
616+ } else {
617+ client .calculateContinuousFailure (tableName , e .getMessage ());
618+ throw e ;
619+ }
620+ }
621+ }
622+ return iterator ; // Return the original iterator if no changes are made
623+ }
624+
528625 /**
529626 * Get row.
530627 */
@@ -554,30 +651,8 @@ public void init() throws Exception {
554651 if (tableQuery .getBatchSize () == -1 ) {
555652 if (!expectant .isEmpty ()) {
556653 Iterator <Map .Entry <Long , ObPair <Long , ObTableParam >>> it = expectant .entrySet ()
557- .iterator ();
558- int retryTimes = 0 ;
559- while (it .hasNext ()) {
560- Map .Entry <Long , ObPair <Long , ObTableParam >> entry = it .next ();
561- try {
562- // try access new partition, async will not remove useless expectant
563- referToNewPartition (entry .getValue ());
564- } catch (Exception e ) {
565- if (e instanceof ObTableNeedFetchAllException ) {
566- setExpectant (refreshPartition (tableQuery , tableName ));
567- it = expectant .entrySet ().iterator ();
568- retryTimes ++;
569- if (retryTimes > client .getRuntimeRetryTimes ()) {
570- RUNTIME .error ("Fail to get refresh table entry response after {}" ,
571- retryTimes );
572- throw new ObTableRetryExhaustedException (
573- "Fail to get refresh table entry response after " + retryTimes );
574-
575- }
576- } else {
577- throw e ;
578- }
579- }
580- }
654+ .iterator ();
655+ executeWithRetry (client , client .getRuntimeRetryTimes (), tableName , it , this ::referToNewPartition , this ::handleException );
581656 }
582657 expectant .clear ();
583658 } else {
0 commit comments