2121import com .alipay .oceanbase .rpc .bolt .transport .ObTableConnection ;
2222import com .alipay .oceanbase .rpc .exception .ObTableException ;
2323import com .alipay .oceanbase .rpc .exception .ObTableNeedFetchAllException ;
24+ import com .alipay .oceanbase .rpc .exception .ObTableRetryExhaustedException ;
2425import com .alipay .oceanbase .rpc .location .model .partition .ObPair ;
2526import com .alipay .oceanbase .rpc .protocol .payload .Constants ;
2627import com .alipay .oceanbase .rpc .protocol .payload .ObPayload ;
3637import java .util .*;
3738import java .util .concurrent .atomic .AtomicReference ;
3839
40+ import static com .alipay .oceanbase .rpc .util .TableClientLoggerFactory .RUNTIME ;
41+
3942public class ObTableClientQueryAsyncStreamResult extends AbstractQueryStreamResult {
4043 private static final Logger logger = LoggerFactory
4144 .getLogger (ObTableClientQueryStreamResult .class );
@@ -50,6 +53,7 @@ public void init() throws Exception {
5053 if (initialized ) {
5154 return ;
5255 }
56+ int maxRetries = client .getTableEntryRefreshTryTimes ();
5357 // init request
5458 ObTableQueryRequest request = new ObTableQueryRequest ();
5559 request .setTableName (tableName );
@@ -66,8 +70,29 @@ public void init() throws Exception {
6670 if (!expectant .isEmpty ()) {
6771 Iterator <Map .Entry <Long , ObPair <Long , ObTableParam >>> it = expectant .entrySet ()
6872 .iterator ();
69- Map .Entry <Long , ObPair <Long , ObTableParam >> firstEntry = it .next ();
70- referToNewPartition (firstEntry .getValue ());
73+ int retryTimes = 0 ;
74+ while (it .hasNext ()) {
75+ Map .Entry <Long , ObPair <Long , ObTableParam >> firstEntry = it .next ();
76+ try {
77+ // try access new partition, async will not remove useless expectant
78+ referToNewPartition (firstEntry .getValue ());
79+ break ;
80+ } catch (Exception e ) {
81+ if (e instanceof ObTableNeedFetchAllException ) {
82+ setExpectant (refreshPartition (this .asyncRequest .getObTableQueryRequest ()
83+ .getTableQuery (), tableName ));
84+ it = expectant .entrySet ().iterator ();
85+ retryTimes ++;
86+ if (retryTimes > maxRetries ) {
87+ RUNTIME .error ("Fail to get refresh table entry response after {}" , retryTimes );
88+ throw new ObTableRetryExhaustedException ("Fail to get refresh table entry response after " + retryTimes );
89+
90+ }
91+ } else {
92+ throw e ;
93+ }
94+ }
95+ }
7196 if (isEnd ())
7297 it .remove ();
7398 }
@@ -197,6 +222,7 @@ public boolean next() throws Exception {
197222 boolean hasNext = false ;
198223 Iterator <Map .Entry <Long , ObPair <Long , ObTableParam >>> it = expectant .entrySet ()
199224 .iterator ();
225+ int retryTimes = 0 ;
200226 while (it .hasNext ()) {
201227 Map .Entry <Long , ObPair <Long , ObTableParam >> entry = it .next ();
202228 try {
@@ -209,6 +235,11 @@ public boolean next() throws Exception {
209235 setExpectant (refreshPartition (this .asyncRequest .getObTableQueryRequest ()
210236 .getTableQuery (), tableName ));
211237 it = expectant .entrySet ().iterator ();
238+ retryTimes ++;
239+ if (retryTimes > client .getTableEntryRefreshTryTimes ()) {
240+ RUNTIME .error ("Fail to get refresh table entry response after {}" , retryTimes );
241+ throw new ObTableRetryExhaustedException ("Fail to get refresh table entry response after " + retryTimes );
242+ }
212243 continue ;
213244 } else {
214245 throw e ;
0 commit comments