@@ -3133,41 +3133,95 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
31333133 request .setTimeout (getOdpTable ().getObTableOperationTimeout ());
31343134 return getOdpTable ().execute (request );
31353135 } else {
3136+ int maxRetries = getRuntimeRetryTimes (); // Define the maximum number of retries
3137+ int tryTimes = 0 ;
3138+ long startExecute = System .currentTimeMillis ();
3139+ boolean needRefreshTableEntry = false ;
31363140 Map <Long , ObTableParam > partIdMapObTable = new HashMap <Long , ObTableParam >();
3137- for (ObNewRange rang : tableQuery .getKeyRanges ()) {
3138- ObRowKey startKey = rang .getStartKey ();
3139- int startKeySize = startKey .getObjs ().size ();
3140- ObRowKey endKey = rang .getEndKey ();
3141- int endKeySize = endKey .getObjs ().size ();
3142- Object [] start = new Object [startKeySize ];
3143- Object [] end = new Object [endKeySize ];
3144- for (int i = 0 ; i < startKeySize ; i ++) {
3145- start [i ] = startKey .getObj (i ).getValue ();
3141+ while (true ) {
3142+ long currentExecute = System .currentTimeMillis ();
3143+ long costMillis = currentExecute - startExecute ;
3144+ if (costMillis > getRuntimeMaxWait ()) {
3145+ logger .error (
3146+ "tablename:{} it has tried " + tryTimes
3147+ + " times and it has waited " + costMillis
3148+ + "/ms which exceeds response timeout "
3149+ + getRuntimeMaxWait () + "/ms" , request .getTableName ());
3150+ throw new ObTableTimeoutExcetion ("it has tried " + tryTimes
3151+ + " times and it has waited " + costMillis
3152+ + "/ms which exceeds response timeout "
3153+ + getRuntimeMaxWait () + "/ms" );
31463154 }
3155+ try {
3156+ // Recalculate partIdMapObTable
3157+ // Clear the map before recalculating
3158+ partIdMapObTable .clear ();
3159+ for (ObNewRange rang : tableQuery .getKeyRanges ()) {
3160+ ObRowKey startKey = rang .getStartKey ();
3161+ int startKeySize = startKey .getObjs ().size ();
3162+ ObRowKey endKey = rang .getEndKey ();
3163+ int endKeySize = endKey .getObjs ().size ();
3164+ Object [] start = new Object [startKeySize ];
3165+ Object [] end = new Object [endKeySize ];
3166+ for (int i = 0 ; i < startKeySize ; i ++) {
3167+ start [i ] = startKey .getObj (i ).getValue ();
3168+ }
31473169
3148- for (int i = 0 ; i < endKeySize ; i ++) {
3149- end [i ] = endKey .getObj (i ).getValue ();
3150- }
3151- ObBorderFlag borderFlag = rang .getBorderFlag ();
3152- List <ObPair <Long , ObTableParam >> pairList = getTables (request .getTableName (),
3153- tableQuery , start , borderFlag .isInclusiveStart (), end ,
3154- borderFlag .isInclusiveEnd (), false , false );
3155- for (ObPair <Long , ObTableParam > pair : pairList ) {
3156- partIdMapObTable .put (pair .getLeft (), pair .getRight ());
3157- }
3158- }
3159- if (partIdMapObTable .size () > 1 ) {
3160- throw new ObTablePartitionConsistentException (
3161- "query and mutate must be a atomic operation" );
3162- }
3170+ for (int i = 0 ; i < endKeySize ; i ++) {
3171+ end [i ] = endKey .getObj (i ).getValue ();
3172+ }
3173+ ObBorderFlag borderFlag = rang .getBorderFlag ();
3174+ List <ObPair <Long , ObTableParam >> pairList = getTables (request .getTableName (),
3175+ tableQuery , start , borderFlag .isInclusiveStart (), end ,
3176+ borderFlag .isInclusiveEnd (), needRefreshTableEntry , isTableEntryRefreshIntervalWait ());
3177+ for (ObPair <Long , ObTableParam > pair : pairList ) {
3178+ partIdMapObTable .put (pair .getLeft (), pair .getRight ());
3179+ }
3180+ }
31633181
3164- for (Long partId : partIdMapObTable .keySet ()) {
3165- ObTableParam tableParam = partIdMapObTable .get (partId );
3166- request .setTableId (tableParam .getTableId ());
3167- request .setPartitionId (tableParam .getPartitionId ());
3168- request .setTimeout (tableParam .getObTable ().getObTableOperationTimeout ());
3169- ObTable obTable = tableParam .getObTable ();
3170- return executeWithRetry (obTable , request , request .getTableName ());
3182+ // Check if partIdMapObTable size is greater than 1
3183+ if (partIdMapObTable .size () > 1 ) {
3184+ throw new ObTablePartitionConsistentException (
3185+ "query and mutate must be a atomic operation" );
3186+ }
3187+ // Proceed with the operation
3188+ Map .Entry <Long , ObTableParam > entry = partIdMapObTable .entrySet ().iterator ().next ();
3189+ ObTableParam tableParam = entry .getValue ();
3190+ request .setTableId (tableParam .getTableId ());
3191+ request .setPartitionId (tableParam .getPartitionId ());
3192+ request .setTimeout (tableParam .getObTable ().getObTableOperationTimeout ());
3193+ ObTable obTable = tableParam .getObTable ();
3194+
3195+ // Attempt to execute the operation
3196+ return executeWithRetry (obTable , request , request .getTableName ());
3197+ } catch (Exception ex ) {
3198+ tryTimes ++;
3199+ if (ex instanceof ObTableException && ((ObTableException ) ex ).isNeedRefreshTableEntry ()) {
3200+ needRefreshTableEntry = true ;
3201+ logger .warn (
3202+ "tablename:{} partition id:{} batch ops refresh table while meet ObTableMasterChangeException, errorCode: {}" ,
3203+ request .getTableName (), request .getPartitionId (), ((ObTableException ) ex ).getErrorCode (), ex );
3204+
3205+ if (isRetryOnChangeMasterTimes () && tryTimes < maxRetries ) {
3206+ logger .warn (
3207+ "tablename:{} partition id:{} batch ops retry while meet ObTableMasterChangeException, errorCode: {} , retry times {}" ,
3208+ request .getTableName (), request .getPartitionId (), ((ObTableException ) ex ).getErrorCode (),
3209+ tryTimes , ex );
3210+
3211+ if (ex instanceof ObTableNeedFetchAllException ) {
3212+ // Refresh table info
3213+ getOrRefreshTableEntry (request .getTableName (), needRefreshTableEntry , isTableEntryRefreshIntervalWait (), true );
3214+ }
3215+ } else {
3216+ calculateContinuousFailure (request .getTableName (), ex .getMessage ());
3217+ throw ex ;
3218+ }
3219+ } else {
3220+ calculateContinuousFailure (request .getTableName (), ex .getMessage ());
3221+ // Handle other exceptions or rethrow
3222+ throw ex ;
3223+ }
3224+ }
31713225 }
31723226 }
31733227 }
0 commit comments