2020package com .dtstack .flink .sql .side .rdb .async ;
2121
2222import com .dtstack .flink .sql .enums .ECacheContentType ;
23+ import com .dtstack .flink .sql .metric .MetricConstant ;
2324import com .dtstack .flink .sql .side .BaseAsyncReqRow ;
2425import com .dtstack .flink .sql .side .BaseSideInfo ;
2526import com .dtstack .flink .sql .side .CacheMissVal ;
3132import io .vertx .ext .sql .SQLConnection ;
3233import org .apache .calcite .sql .JoinType ;
3334import org .apache .commons .lang3 .StringUtils ;
35+ import org .apache .flink .metrics .Counter ;
3436import org .apache .flink .streaming .api .functions .async .ResultFuture ;
3537import org .apache .flink .table .runtime .types .CRow ;
3638import org .apache .flink .table .typeutils .TimeIndicatorTypeInfo ;
@@ -77,10 +79,12 @@ public class RdbAsyncReqRow extends BaseAsyncReqRow {
7779
7880 private final static AtomicBoolean CONN_STATUS = new AtomicBoolean (true );
7981
80- private final static AtomicLong TIMOUT_NUM = new AtomicLong (0 );
82+ private final static AtomicLong FAIL_NUM = new AtomicLong (0 );
8183
8284 private Logger logger = LoggerFactory .getLogger (getClass ());
8385
86+ private Counter counter = getRuntimeContext ().getMetricGroup ().counter (MetricConstant .DT_NUM_SIDE_PARSE_ERROR_RECORDS );
87+
8488 public RdbAsyncReqRow (BaseSideInfo sideInfo ) {
8589 super (sideInfo );
8690 }
@@ -123,7 +127,12 @@ private void connectWithRetry(Map<String, Object> inputParams, CRow input, Resul
123127 logger .error ("getConnection error" , conn .cause ());
124128 }
125129 if (failCounter .get () >= sideInfo .getSideTableInfo ().getAsyncFailMaxNum (3L )){
126- outByJoinType (resultFuture , conn .cause ());
130+ if (FAIL_NUM .incrementAndGet () > sideInfo .getSideTableInfo ().getAsyncFailMaxNum (Long .MAX_VALUE )){
131+ counter .inc ();
132+ resultFuture .completeExceptionally (conn .cause ());
133+ } else {
134+ dealMissKey (input , resultFuture );
135+ }
127136 finishFlag .set (true );
128137 }
129138 conn .result ().close ();
@@ -135,6 +144,7 @@ private void connectWithRetry(Map<String, Object> inputParams, CRow input, Resul
135144 handleQuery (conn .result (), inputParams , input , resultFuture );
136145 finishFlag .set (true );
137146 } catch (Exception e ) {
147+ dealFillDataError (resultFuture , e , null );
138148 logger .error ("" , e );
139149 } finally {
140150 latch .countDown ();
@@ -200,12 +210,13 @@ private void handleQuery(SQLConnection connection,Map<String, Object> inputParam
200210 JsonArray params = new JsonArray (Lists .newArrayList (inputParams .values ()));
201211 connection .queryWithParams (sideInfo .getSqlCondition (), params , rs -> {
202212 if (rs .failed ()) {
203- if (TIMOUT_NUM .incrementAndGet () > sideInfo .getSideTableInfo ().getAsyncFailMaxNum (Long .MAX_VALUE )){
204- outByJoinType (resultFuture , rs .cause ());
205- return ;
213+ if (FAIL_NUM .incrementAndGet () > sideInfo .getSideTableInfo ().getAsyncFailMaxNum (Long .MAX_VALUE )){
214+ LOG .error ("Cannot retrieve the data from the database" , rs .cause ());
215+ counter .inc ();
216+ resultFuture .completeExceptionally (rs .cause ());
217+ } else {
218+ dealMissKey (input , resultFuture );
206219 }
207- LOG .error ("Cannot retrieve the data from the database" , rs .cause ());
208- resultFuture .complete (null );
209220 return ;
210221 }
211222
@@ -244,12 +255,4 @@ private void handleQuery(SQLConnection connection,Map<String, Object> inputParam
244255 });
245256 }
246257
247- private void outByJoinType (ResultFuture <CRow > resultFuture , Throwable e ){
248- if (sideInfo .getJoinType () == JoinType .LEFT ){
249- resultFuture .complete (null );
250- return ;
251- }
252- resultFuture .completeExceptionally (e );
253- }
254-
255258}
0 commit comments