5656import java .io .IOException ;
5757import java .util .ArrayList ;
5858import java .util .HashMap ;
59+ import java .util .Iterator ;
5960import java .util .List ;
6061import java .util .Map ;
6162import java .util .stream .Collectors ;
@@ -536,23 +537,68 @@ public void releaseClients() {
536537 beToAddr = null ;
537538 }
538539
540+ private String getBackendEndpoint (long beId ) {
541+ if (beToAddr != null ) {
542+ TNetworkAddress addr = beToAddr .get (beId );
543+ if (addr != null ) {
544+ String host = addr .getHostname ();
545+ if (host == null ) {
546+ host = "unknown" ;
547+ }
548+ return host + ":" + addr .getPort ();
549+ }
550+ }
551+ if (beToThriftAddress != null ) {
552+ String addr = beToThriftAddress .get (beId );
553+ if (addr != null ) {
554+ return addr ;
555+ }
556+ }
557+ return "unknown" ;
558+ }
559+
539560 private final void clearJobOnBEs () {
540561 try {
541562 initClients ();
542- for (Map .Entry <Long , Client > entry : beToClient .entrySet ()) {
563+ // Iterate with explicit iterator so we can remove invalidated clients during iteration.
564+ Iterator <Map .Entry <Long , Client >> iter = beToClient .entrySet ().iterator ();
565+ while (iter .hasNext ()) {
566+ Map .Entry <Long , Client > entry = iter .next ();
567+ long beId = entry .getKey ();
568+ Client client = entry .getValue ();
543569 TWarmUpTabletsRequest request = new TWarmUpTabletsRequest ();
544570 request .setType (TWarmUpTabletsRequestType .CLEAR_JOB );
545571 request .setJobId (jobId );
546572 if (this .isEventDriven ()) {
547573 TWarmUpEventType event = getTWarmUpEventType ();
548574 if (event == null ) {
549- throw new IllegalArgumentException ("Unknown SyncEvent " + syncEvent );
575+ // If event type is unknown, skip this BE but continue others.
576+ LOG .warn ("Unknown SyncEvent {}, skip CLEAR_JOB for BE {} ({})" ,
577+ syncEvent , beId , getBackendEndpoint (beId ));
578+ continue ;
550579 }
551580 request .setEvent (event );
552581 }
553- LOG .info ("send warm up request to BE {}. job_id={}, request_type=CLEAR_JOB" ,
554- entry .getKey (), jobId );
555- entry .getValue ().warmUpTablets (request );
582+ LOG .info ("send warm up request to BE {} ({}). job_id={}, request_type=CLEAR_JOB" ,
583+ beId , getBackendEndpoint (beId ), jobId );
584+ try {
585+ client .warmUpTablets (request );
586+ } catch (Exception e ) {
587+ // If RPC to this BE fails, invalidate this client and remove it from map,
588+ // then continue to next BE so that one bad BE won't block others.
589+ LOG .warn ("send warm up request to BE {} ({}) failed: {}" ,
590+ beId , getBackendEndpoint (beId ), e .getMessage ());
591+ try {
592+ TNetworkAddress addr = beToAddr == null ? null : beToAddr .get (beId );
593+ if (addr != null ) {
594+ ClientPool .backendPool .invalidateObject (addr , client );
595+ }
596+ } catch (Exception ie ) {
597+ LOG .warn ("invalidate client for BE {} failed: {}" , beId , ie .getMessage ());
598+ }
599+ // remove from local map so releaseClients won't try to return an invalidated client
600+ iter .remove ();
601+ }
556602 }
557603 } catch (Exception e ) {
558604 LOG .warn ("send warm up request failed. job_id={}, request_type=CLEAR_JOB, exception={}" ,
@@ -653,8 +699,8 @@ private void runEventDrivenJob() throws Exception {
653699 throw new IllegalArgumentException ("Unknown SyncEvent " + syncEvent );
654700 }
655701 request .setEvent (event );
656- LOG .debug ("send warm up request to BE {}. job_id={}, event={}, request_type=SET_JOB(EVENT)" ,
657- entry .getKey (), jobId , syncEvent );
702+ LOG .debug ("send warm up request to BE {} ({}) . job_id={}, event={}, request_type=SET_JOB(EVENT)" ,
703+ entry .getKey (), getBackendEndpoint ( entry . getKey ()), jobId , syncEvent );
658704 TWarmUpTabletsResponse response = entry .getValue ().warmUpTablets (request );
659705 if (response .getStatus ().getStatusCode () != TStatusCode .OK ) {
660706 if (!response .getStatus ().getErrorMsgs ().isEmpty ()) {
@@ -698,9 +744,10 @@ private void runRunningJob() throws Exception {
698744 request .setJobId (jobId );
699745 request .setBatchId (lastBatchId + 1 );
700746 request .setJobMetas (buildJobMetas (entry .getKey (), request .batch_id ));
701- LOG .info ("send warm up request to BE {}. job_id={}, batch_id={}"
747+ LOG .info ("send warm up request to BE {} ({}) . job_id={}, batch_id={}"
702748 + ", job_size={}, request_type=SET_JOB" ,
703- entry .getKey (), jobId , request .batch_id , request .job_metas .size ());
749+ entry .getKey (), getBackendEndpoint (entry .getKey ()),
750+ jobId , request .batch_id , request .job_metas .size ());
704751 TWarmUpTabletsResponse response = entry .getValue ().warmUpTablets (request );
705752 if (response .getStatus ().getStatusCode () != TStatusCode .OK ) {
706753 if (!response .getStatus ().getErrorMsgs ().isEmpty ()) {
@@ -715,8 +762,9 @@ private void runRunningJob() throws Exception {
715762 for (Map .Entry <Long , Client > entry : beToClient .entrySet ()) {
716763 TWarmUpTabletsRequest request = new TWarmUpTabletsRequest ();
717764 request .setType (TWarmUpTabletsRequestType .GET_CURRENT_JOB_STATE_AND_LEASE );
718- LOG .info ("send warm up request to BE {}. job_id={}, request_type=GET_CURRENT_JOB_STATE_AND_LEASE" ,
719- entry .getKey (), jobId );
765+ LOG .info ("send warm up request to BE {} ({}). job_id={}"
766+ + ", request_type=GET_CURRENT_JOB_STATE_AND_LEASE" ,
767+ entry .getKey (), getBackendEndpoint (entry .getKey ()), jobId );
720768 TWarmUpTabletsResponse response = entry .getValue ().warmUpTablets (request );
721769 if (response .getStatus ().getStatusCode () != TStatusCode .OK ) {
722770 if (!response .getStatus ().getErrorMsgs ().isEmpty ()) {
@@ -754,9 +802,10 @@ private void runRunningJob() throws Exception {
754802 if (!request .job_metas .isEmpty ()) {
755803 // check all batches is done or not
756804 allBatchesDone = false ;
757- LOG .info ("send warm up request to BE {}. job_id={}, batch_id={}"
805+ LOG .info ("send warm up request to BE {} ({}) . job_id={}, batch_id={}"
758806 + ", job_size={}, request_type=SET_BATCH" ,
759- entry .getKey (), jobId , request .batch_id , request .job_metas .size ());
807+ entry .getKey (), getBackendEndpoint (entry .getKey ()),
808+ jobId , request .batch_id , request .job_metas .size ());
760809 TWarmUpTabletsResponse response = entry .getValue ().warmUpTablets (request );
761810 if (response .getStatus ().getStatusCode () != TStatusCode .OK ) {
762811 if (!response .getStatus ().getErrorMsgs ().isEmpty ()) {
0 commit comments