@@ -3152,41 +3152,95 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
31523152 request .setTimeout (getOdpTable ().getObTableOperationTimeout ());
31533153 return getOdpTable ().execute (request );
31543154 } else {
3155+ int maxRetries = getRuntimeRetryTimes (); // Define the maximum number of retries
3156+ int tryTimes = 0 ;
3157+ long startExecute = System .currentTimeMillis ();
3158+ boolean needRefreshTableEntry = false ;
31553159 Map <Long , ObTableParam > partIdMapObTable = new HashMap <Long , ObTableParam >();
3156- for (ObNewRange rang : tableQuery .getKeyRanges ()) {
3157- ObRowKey startKey = rang .getStartKey ();
3158- int startKeySize = startKey .getObjs ().size ();
3159- ObRowKey endKey = rang .getEndKey ();
3160- int endKeySize = endKey .getObjs ().size ();
3161- Object [] start = new Object [startKeySize ];
3162- Object [] end = new Object [endKeySize ];
3163- for (int i = 0 ; i < startKeySize ; i ++) {
3164- start [i ] = startKey .getObj (i ).getValue ();
3160+ while (true ) {
3161+ long currentExecute = System .currentTimeMillis ();
3162+ long costMillis = currentExecute - startExecute ;
3163+ if (costMillis > getRuntimeMaxWait ()) {
3164+ logger .error (
3165+ "tablename:{} it has tried " + tryTimes
3166+ + " times and it has waited " + costMillis
3167+ + "/ms which exceeds response timeout "
3168+ + getRuntimeMaxWait () + "/ms" , request .getTableName ());
3169+ throw new ObTableTimeoutExcetion ("it has tried " + tryTimes
3170+ + " times and it has waited " + costMillis
3171+ + "/ms which exceeds response timeout "
3172+ + getRuntimeMaxWait () + "/ms" );
31653173 }
3174+ try {
3175+ // Recalculate partIdMapObTable
3176+ // Clear the map before recalculating
3177+ partIdMapObTable .clear ();
3178+ for (ObNewRange rang : tableQuery .getKeyRanges ()) {
3179+ ObRowKey startKey = rang .getStartKey ();
3180+ int startKeySize = startKey .getObjs ().size ();
3181+ ObRowKey endKey = rang .getEndKey ();
3182+ int endKeySize = endKey .getObjs ().size ();
3183+ Object [] start = new Object [startKeySize ];
3184+ Object [] end = new Object [endKeySize ];
3185+ for (int i = 0 ; i < startKeySize ; i ++) {
3186+ start [i ] = startKey .getObj (i ).getValue ();
3187+ }
31663188
3167- for (int i = 0 ; i < endKeySize ; i ++) {
3168- end [i ] = endKey .getObj (i ).getValue ();
3169- }
3170- ObBorderFlag borderFlag = rang .getBorderFlag ();
3171- List <ObPair <Long , ObTableParam >> pairList = getTables (request .getTableName (),
3172- tableQuery , start , borderFlag .isInclusiveStart (), end ,
3173- borderFlag .isInclusiveEnd (), false , false );
3174- for (ObPair <Long , ObTableParam > pair : pairList ) {
3175- partIdMapObTable .put (pair .getLeft (), pair .getRight ());
3176- }
3177- }
3178- if (partIdMapObTable .size () > 1 ) {
3179- throw new ObTablePartitionConsistentException (
3180- "query and mutate must be a atomic operation" );
3181- }
3189+ for (int i = 0 ; i < endKeySize ; i ++) {
3190+ end [i ] = endKey .getObj (i ).getValue ();
3191+ }
3192+ ObBorderFlag borderFlag = rang .getBorderFlag ();
3193+ List <ObPair <Long , ObTableParam >> pairList = getTables (request .getTableName (),
3194+ tableQuery , start , borderFlag .isInclusiveStart (), end ,
3195+ borderFlag .isInclusiveEnd (), needRefreshTableEntry , isTableEntryRefreshIntervalWait ());
3196+ for (ObPair <Long , ObTableParam > pair : pairList ) {
3197+ partIdMapObTable .put (pair .getLeft (), pair .getRight ());
3198+ }
3199+ }
31823200
3183- for (Long partId : partIdMapObTable .keySet ()) {
3184- ObTableParam tableParam = partIdMapObTable .get (partId );
3185- request .setTableId (tableParam .getTableId ());
3186- request .setPartitionId (tableParam .getPartitionId ());
3187- request .setTimeout (tableParam .getObTable ().getObTableOperationTimeout ());
3188- ObTable obTable = tableParam .getObTable ();
3189- return executeWithRetry (obTable , request , request .getTableName ());
3201+ // Check if partIdMapObTable size is greater than 1
3202+ if (partIdMapObTable .size () > 1 ) {
3203+ throw new ObTablePartitionConsistentException (
3204+ "query and mutate must be a atomic operation" );
3205+ }
3206+ // Proceed with the operation
3207+ Map .Entry <Long , ObTableParam > entry = partIdMapObTable .entrySet ().iterator ().next ();
3208+ ObTableParam tableParam = entry .getValue ();
3209+ request .setTableId (tableParam .getTableId ());
3210+ request .setPartitionId (tableParam .getPartitionId ());
3211+ request .setTimeout (tableParam .getObTable ().getObTableOperationTimeout ());
3212+ ObTable obTable = tableParam .getObTable ();
3213+
3214+ // Attempt to execute the operation
3215+ return executeWithRetry (obTable , request , request .getTableName ());
3216+ } catch (Exception ex ) {
3217+ tryTimes ++;
3218+ if (ex instanceof ObTableException && ((ObTableException ) ex ).isNeedRefreshTableEntry ()) {
3219+ needRefreshTableEntry = true ;
3220+ logger .warn (
3221+ "tablename:{} partition id:{} batch ops refresh table while meet ObTableMasterChangeException, errorCode: {}" ,
3222+ request .getTableName (), request .getPartitionId (), ((ObTableException ) ex ).getErrorCode (), ex );
3223+
3224+ if (isRetryOnChangeMasterTimes () && tryTimes < maxRetries ) {
3225+ logger .warn (
3226+ "tablename:{} partition id:{} batch ops retry while meet ObTableMasterChangeException, errorCode: {} , retry times {}" ,
3227+ request .getTableName (), request .getPartitionId (), ((ObTableException ) ex ).getErrorCode (),
3228+ tryTimes , ex );
3229+
3230+ if (ex instanceof ObTableNeedFetchAllException ) {
3231+ // Refresh table info
3232+ getOrRefreshTableEntry (request .getTableName (), needRefreshTableEntry , isTableEntryRefreshIntervalWait (), true );
3233+ }
3234+ } else {
3235+ calculateContinuousFailure (request .getTableName (), ex .getMessage ());
3236+ throw ex ;
3237+ }
3238+ } else {
3239+ calculateContinuousFailure (request .getTableName (), ex .getMessage ());
3240+ // Handle other exceptions or rethrow
3241+ throw ex ;
3242+ }
3243+ }
31903244 }
31913245 }
31923246 }
0 commit comments