2828import okhttp3 .RequestBody ;
2929import okhttp3 .Response ;
3030import okhttp3 .ResponseBody ;
31- import org .apache .commons .codec .digest .DigestUtils ;
3231import org .apache .commons .lang .StringUtils ;
3332import org .slf4j .Logger ;
3433
@@ -59,12 +58,9 @@ public class DataCheckerDao {
5958 "AND (UNIX_TIMESTAMP() - UNIX_TIMESTAMP(STR_TO_DATE(modify_time, '%Y-%m-%d %H:%i:%s'))) <= ? AND status = '1';" ;
6059
6160 private static final String SQL_DOPS_CHECK_TABLE =
62- "SELECT * FROM dops_clean_task_list WHERE db_name = ? AND tb_name = ? AND part_name is null AND task_state NOT IN (10,13) order by order_id desc limit 1 " ;
61+ "SELECT * FROM dops_clean_task_list WHERE task_state = 10 AND db_name = ? AND tb_name = ? " ;
6362 private static final String SQL_DOPS_CHECK_PARTITION =
64- "SELECT * FROM dops_clean_task_list WHERE db_name = ? AND tb_name = ? AND part_name = ? AND task_state NOT IN (10,13) order by order_id desc limit 1" ;
65-
66- private static final String SQL_DOPS_CHECK_ALL_PARTITION =
67- "SELECT * FROM dops_clean_task_list WHERE db_name = ? AND tb_name = ? AND part_name is not null AND task_state != 13 order by order_id desc limit 1" ;
63+ "SELECT * FROM dops_clean_task_list WHERE task_state = 10 AND db_name = ? AND tb_name = ? AND part_name = ?" ;
6864 private static final String HIVE_SOURCE_TYPE = "hivedb" ;
6965 private static final String MASK_SOURCE_TYPE = "maskdb" ;
7066
@@ -181,7 +177,6 @@ private boolean getDataCheckResult(Map<String, String> proObjectMap,
181177 if (StringUtils .isNotBlank (dataObjectStr )) {
182178 dataObjectStr = dataObjectStr .replace (" " , "" ).trim ();
183179 }
184- String objectNum = proObjectMap .get (DataChecker .DATA_OBJECT_NUM );
185180 CheckDataObject dataObject ;
186181 try {
187182 dataObject = parseDataObject (dataObjectStr );
@@ -218,7 +213,8 @@ private boolean getDataCheckResult(Map<String, String> proObjectMap,
218213 }
219214 }
220215 return normalCheck
221- && checkQualitisData ( objectNum ,dataObject , log , props , dopsConn ,qualitisUtil );
216+ && getDopsTotalCount (dataObject , dopsConn , log ) > 0
217+ && checkQualitisData (dataObject , log , props , qualitisUtil );
222218
223219 }
224220
@@ -253,15 +249,13 @@ private Map<String, String> key2Map(Object key, Properties p) {
253249 proMap .put (DataChecker .SOURCE_TYPE , String .valueOf (p .get (stKey )));
254250 }
255251 proMap .put (DataChecker .DATA_OBJECT , String .valueOf (p .get (doKey )));
256- proMap .put (DataChecker .DATA_OBJECT_NUM , keyNum );
257252 } else {
258253 String stKey = DataChecker .SOURCE_TYPE ;
259254 String doKey = DataChecker .DATA_OBJECT ;
260255 if (null != p .get (stKey )) {
261256 proMap .put (DataChecker .SOURCE_TYPE , String .valueOf (p .get (stKey )));
262257 }
263258 proMap .put (DataChecker .DATA_OBJECT , String .valueOf (p .get (doKey )));
264- proMap .put (DataChecker .DATA_OBJECT_NUM , "0" );
265259 }
266260 }
267261
@@ -324,17 +318,6 @@ private PreparedStatement getDopsStatement(Connection conn, CheckDataObject data
324318 }
325319 }
326320
327- /**
328- * 构造查询dops库的查询,分区表全表校验场景
329- */
330- private PreparedStatement getDopsStatementCheckAllPartition (Connection conn , CheckDataObject dataObject ) throws SQLException {
331- PreparedStatement pstmt = conn .prepareCall (SQL_DOPS_CHECK_ALL_PARTITION );
332- pstmt .setString (1 , dataObject .getDbName ());
333- pstmt .setString (2 , dataObject .getTableName ());
334- return pstmt ;
335-
336- }
337-
338321
339322 /**
340323 * 反序列化检查对象
@@ -343,7 +326,7 @@ private PreparedStatement getDopsStatementCheckAllPartition(Connection conn, Che
343326 */
344327 private CheckDataObject parseDataObject (String dataObjectStr )throws SQLException {
345328 CheckDataObject dataObject ;
346- if (!dataObjectStr .contains ("." )){
329+ if (!dataObjectStr .contains ("\\ ." )){
347330 throw new SQLException ("Error for DataObject format!" +dataObjectStr );
348331 }
349332 String dbName = dataObjectStr .split ("\\ ." )[0 ];
@@ -368,12 +351,10 @@ private CheckDataObject parseDataObject(String dataObjectStr)throws SQLException
368351 */
369352 private long getJobTotalCount (CheckDataObject dataObject , Connection conn , Logger log ) {
370353 log .info ("-------------------------------------- search hive/spark/mr data " );
371- log .info ("-------------------------------------- dataObject : " + dataObject );
354+ log .info ("-------------------------------------- : " + dataObject );
372355 try (PreparedStatement pstmt = getJobStatement (conn , dataObject )) {
373356 ResultSet rs = pstmt .executeQuery ();
374- long ret = rs .last () ? rs .getRow () : 0 ;
375- log .info ("-------------------------------------- hive/spark/mr data result:" +ret );
376- return ret ;
357+ return rs .last () ? rs .getRow () : 0 ;
377358 } catch (SQLException e ) {
378359 log .error ("fetch data from Hive MetaStore error" , e );
379360 return 0 ;
@@ -386,84 +367,45 @@ private long getJobTotalCount(CheckDataObject dataObject, Connection conn, Logge
386367 private long getBdpTotalCount (CheckDataObject dataObject , Connection conn , Logger log , Properties props ) {
387368 String timeScape = props .getOrDefault (DataChecker .TIME_SCAPE , "NULL" ).toString ();
388369 log .info ("-------------------------------------- search bdp data " );
389- log .info ("-------------------------------------- dataObject : " + dataObject .toString ());
370+ log .info ("-------------------------------------- : " + dataObject .toString ());
390371 try (PreparedStatement pstmt = getBdpStatement (conn , dataObject , timeScape )) {
391372 ResultSet rs = pstmt .executeQuery ();
392- long ret =rs .last () ? rs .getRow () : 0 ;
393- log .info ("-------------------------------------- bdp data result:" +ret );
394- return ret ;
373+ return rs .last () ? rs .getRow () : 0 ;
395374 } catch (SQLException e ) {
396375 log .error ("fetch data from bdp error" , e );
397376 return 0 ;
398377 }
399378 }
400379
401380 /**
402- * - 返回0表示未找到任何记录 ;
403- * - 返回1表示非分区表的全表校验场景找到了记录;
404- * - 返回2表示分区表的分区校验场景找到了记录;
405- * - 返回3表示分区表的全表校验场景找到了记录;
406- * - 返回4表示查询出错了
381+ * 查询dops库
407382 */
408- private int checkDops (CheckDataObject dataObject , Connection conn , Logger log ){
383+ private long getDopsTotalCount (CheckDataObject dataObject , Connection conn , Logger log ) {
384+
409385 log .info ("-------------------------------------- search dops data " );
410- log .info ("-------------------------------------- dataObject : " + dataObject .toString ());
386+ log .info ("-------------------------------------- : " + dataObject .toString ());
411387 try (PreparedStatement pstmt = getDopsStatement (conn , dataObject )) {
412388 ResultSet rs = pstmt .executeQuery ();
413- long count = rs .last () ? rs .getRow () : 0 ;
414- log .info ("-------------------------------------- dops data check table or partition,count:" +count );
415- if (count >0 ){
416- return CheckDataObject .Type .PARTITION == dataObject .getType () ? 2 : 1 ;
417- }
389+ return rs .last () ? rs .getRow () : 0 ;
418390 } catch (SQLException e ) {
419- log .error ("fetch data from dops error while check table or partition " , e );
391+ log .error ("fetch data from dops error" , e );
420392 //如果查询出错,还是认为dops处理过这个表/分区
421- return 4 ;
393+ return 1 ;
422394 }
423-
424- try (PreparedStatement pstmt = getDopsStatementCheckAllPartition (conn , dataObject )){
425- ResultSet rs = pstmt .executeQuery ();
426- long count = rs .last () ? rs .getRow () : 0 ;
427- log .info ("-------------------------------------- dops data check all partition result count:" +count );
428- if (count >0 ){
429- return 3 ;
430- }
431- }catch (SQLException e ) {
432- log .error ("fetch data from dops error while check all partition" , e );
433- //如果查询出错,还是认为dops处理过这个表/分区
434- return 4 ;
435- }
436- return 0 ;
437395 }
438396
439397 /**
440398 * 从qualitis去check数据
441399 */
442- private boolean checkQualitisData (String objectNum , CheckDataObject dataObject , Logger log , Properties props , Connection conn , QualitisUtil qualitisUtil ) {
400+ private boolean checkQualitisData (CheckDataObject dataObject , Logger log , Properties props , QualitisUtil qualitisUtil ) {
443401 boolean systemCheck = Boolean .valueOf (props .getProperty ("job.eventchecker.qualitis.switch" ));
444402 boolean userCheck = Boolean .valueOf (props .getProperty (DataChecker .QUALITIS_CHECK , "true" ));
445403 if (systemCheck && userCheck ) {
446-
447- int dopsState =checkDops (dataObject ,conn ,log );
448- if (dopsState ==0 ){
449- //没找到记录,直接通过校验
450- return true ;
451- } else if (dopsState == 3 || dopsState == 4 ) {
452- //找记录失败、或者是找到了分区表的全表校验记录,直接校验不通过。
453- return false ;
454- }
455- // 其他情况,继续走qualitis校验
456404 log .info (
457405 "=============================Data Check Qualitis Start==========================================" );
458406 try {
459- String projectName = props .getProperty (DataChecker .CONTEXTID_PROJECT_NAME );
460- String user = props .getProperty (DataChecker .CONTEXTID_USER );
461- String flowName = props .getProperty (DataChecker .CONTEXTID_FLOW_NAME );
462- String nodeName =props .getProperty (DataChecker .NAME_NAME );
463-
464- String ruleName = getMD5Str (projectName + flowName + nodeName + objectNum );
465407 String applicationId = qualitisUtil
466- .createAndSubmitRule (dataObject , projectName , ruleName , user );
408+ .createAndSubmitRule (dataObject );
467409 if (StringUtils .isEmpty (applicationId )) {
468410 return false ;
469411 }
@@ -478,7 +420,7 @@ private boolean checkQualitisData(String objectNum,CheckDataObject dataObject, L
478420 case 12 :
479421 try {
480422 Thread
481- .sleep (Double .valueOf (props .getProperty ("qualitis.getStatus.interval" )). longValue ( ));
423+ .sleep (Integer .valueOf (props .getProperty ("qualitis.getStatus.interval" )));
482424 } catch (InterruptedException e ) {
483425 log .error ("get datachecker result from qualitis InterruptedException" , e );
484426 }
@@ -502,9 +444,6 @@ private boolean checkQualitisData(String objectNum,CheckDataObject dataObject, L
502444 return true ;
503445 }
504446 }
505- public static String getMD5Str (String str ){
506- return DigestUtils .md5Hex (str );
507- }
508447
509448 private Map <String , String > fetchMaskCode (CheckDataObject dataObject , Logger log , Properties props ) {
510449 log .info ("=============================调用BDP MASK接口查询数据状态==========================================" );
0 commit comments