3737import java .util .Iterator ;
3838import java .util .List ;
3939import java .util .Map ;
40+ import java .util .Set ;
4041import java .util .concurrent .atomic .AtomicLong ;
4142import java .util .stream .Collectors ;
4243
@@ -131,6 +132,21 @@ public static TNetworkAddress getHost(ImmutableMap<Long, Backend> backends,
131132 if (backends .isEmpty ()) {
132133 throw new UserException (SystemInfoService .NO_SCAN_NODE_BACKEND_AVAILABLE_MSG );
133134 }
135+ if (ConnectContext .get () != null
136+ && !ConnectContext .get ().getSessionVariable ().getSelectedBackendIds ().isEmpty ()) {
137+ Set <Long > beIds = ConnectContext .get ().getSessionVariable ().getSelectedBackendIds ();
138+ long id = nextId .getAndIncrement () % beIds .size ();
139+ Long selectedId = beIds .stream ().skip (id ).filter (e -> backends .containsKey (e )
140+ && isAvailable (backends .get (e ))).findFirst ().orElse (null );
141+ Backend backend = backends .getOrDefault (selectedId , null );
142+ if (isAvailable (backend )) {
143+ backendIdRef .setRef (selectedId );
144+ return new TNetworkAddress (backend .getHost (), backend .getBePort ());
145+ }
146+ // no backend returned
147+ throw new UserException (SystemInfoService .NO_SCAN_NODE_BACKEND_AVAILABLE_MSG
148+ + " BEID: " + selectedId + " host: " + backend .getHost ());
149+ }
134150 long id = nextId .getAndIncrement () % backends .size ();
135151 Map .Entry <Long , Backend > backendEntry = backends .entrySet ().stream ().skip (id ).filter (
136152 e -> isAvailable (e .getValue ())).findFirst ().orElse (null );
@@ -148,6 +164,22 @@ public static TNetworkAddress getHost(ImmutableMap<Long, Backend> backends,
148164 + getBackendErrorMsg (Lists .newArrayList (backends .keySet ()), backends , 3 ));
149165 }
150166
167+ public static TNetworkAddress getHost (ImmutableMap <Long , Backend > backends ,
168+ Reference <Long > backendIdRef , Long beId )
169+ throws UserException {
170+ if (backends .isEmpty ()) {
171+ throw new UserException (SystemInfoService .NO_SCAN_NODE_BACKEND_AVAILABLE_MSG );
172+ }
173+ Backend backend = backends .getOrDefault (beId , null );
174+ if (backend != null && isAvailable (backend )) {
175+ backendIdRef .setRef (beId );
176+ return new TNetworkAddress (backend .getHost (), backend .getBePort ());
177+ }
178+ // no backend returned
179+ throw new UserException (SystemInfoService .NO_SCAN_NODE_BACKEND_AVAILABLE_MSG
180+ + " BEID: " + beId + " host: " + backend .getHost ());
181+ }
182+
151183 // get the reason why backends can not be chosen.
152184 private static String getBackendErrorMsg (List <Long > backendIds , ImmutableMap <Long , Backend > backends , int limit ) {
153185 List <String > res = Lists .newArrayList ();
0 commit comments