3434import java .time .Clock ;
3535import java .util .ArrayList ;
3636import java .util .List ;
37+ import java .util .Random ;
3738import java .util .concurrent .CancellationException ;
3839import java .util .concurrent .ConcurrentLinkedQueue ;
3940import java .util .concurrent .Executors ;
4243import java .util .concurrent .atomic .AtomicBoolean ;
4344import java .util .concurrent .atomic .AtomicInteger ;
4445import java .util .concurrent .atomic .AtomicReference ;
46+ import java .util .function .Supplier ;
4547import java .util .logging .Level ;
4648import java .util .logging .Logger ;
4749import javax .annotation .Nullable ;
@@ -71,6 +73,8 @@ public class BigtableChannelPool extends ManagedChannel {
7173 private final ChannelPoolHealthChecker channelPoolHealthChecker ;
7274 private final AtomicInteger indexTicker = new AtomicInteger ();
7375 private final String authority ;
76+ private final Random rng = new Random ();
77+ private final Supplier <Integer > picker ;
7478
7579 public static BigtableChannelPool create (
7680 BigtableChannelPoolSettings settings ,
@@ -113,6 +117,23 @@ public static BigtableChannelPool create(
113117
114118 entries .set (initialListBuilder .build ());
115119 authority = entries .get ().get (0 ).channel .authority ();
120+
121+ switch (settings .getLoadBalancingStrategy ()) {
122+ case ROUND_ROBIN :
123+ picker = this ::pickEntryIndexRoundRobin ;
124+ break ;
125+ case LEAST_IN_FLIGHT :
126+ picker = this ::pickEntryIndexLeastInFlight ;
127+ break ;
128+ case POWER_OF_TWO_LEAST_IN_FLIGHT :
129+ picker = this ::pickEntryIndexPowerOfTwoLeastInFlight ;
130+ break ;
131+ default :
132+ throw new IllegalStateException (
133+ String .format (
134+ "Unknown load balancing strategy %s" , settings .getLoadBalancingStrategy ()));
135+ }
136+
116137 this .executor = executor ;
117138
118139 if (!settings .isStaticSize ()) {
@@ -138,19 +159,74 @@ public String authority() {
138159 }
139160
140161 /**
141- * Create a {@link ClientCall} on a Channel from the pool chosen in a round-robin fashion to the
142- * remote operation specified by the given {@link MethodDescriptor}. The returned {@link
143- * ClientCall} does not trigger any remote behavior until {@link
144- * ClientCall#start(ClientCall.Listener, io.grpc.Metadata)} is invoked.
162+ * Create a {@link ClientCall} on a Channel from the pool to the remote operation specified by the
163+ * given {@link MethodDescriptor}. The returned {@link ClientCall} does not trigger any remote
164+ * behavior until {@link ClientCall#start(ClientCall.Listener, io.grpc.Metadata)} is invoked.
145165 */
146166 @ Override
147167 public <ReqT , RespT > ClientCall <ReqT , RespT > newCall (
148168 MethodDescriptor <ReqT , RespT > methodDescriptor , CallOptions callOptions ) {
149- return getChannel (indexTicker .getAndIncrement ()).newCall (methodDescriptor , callOptions );
169+ return new AffinityChannel (pickEntryIndex ()).newCall (methodDescriptor , callOptions );
170+ }
171+
172+ /**
173+ * Pick the index of an entry to use for the next call. The returned value *should* be within
174+ * range, but callers should not assume that this is always the case as race conditions are
175+ * possible.
176+ */
177+ private int pickEntryIndex () {
178+ return picker .get ();
179+ }
180+
181+ /** Pick an entry using the Round Robin algorithm. */
182+ private int pickEntryIndexRoundRobin () {
183+ return Math .abs (indexTicker .getAndIncrement () % entries .get ().size ());
184+ }
185+
186+ /** Pick an entry at random. */
187+ private int pickEntryIndexRandom () {
188+ return rng .nextInt (entries .get ().size ());
150189 }
151190
152- Channel getChannel (int affinity ) {
153- return new AffinityChannel (affinity );
191+ /** Pick an entry using the least-in-flight algorithm. */
192+ private int pickEntryIndexLeastInFlight () {
193+ List <Entry > localEntries = entries .get ();
194+ int minRpcs = Integer .MAX_VALUE ;
195+ List <Integer > candidates = new ArrayList <>();
196+
197+ for (int i = 0 ; i < localEntries .size (); i ++) {
198+ Entry entry = localEntries .get (i );
199+ int rpcs = entry .outstandingRpcs .get ();
200+ if (rpcs < minRpcs ) {
201+ minRpcs = rpcs ;
202+ candidates .clear ();
203+ candidates .add (i );
204+ } else if (rpcs == minRpcs ) {
205+ candidates .add (i );
206+ }
207+ }
208+ // If there are multiple matching entries, pick one at random.
209+ return candidates .get (rng .nextInt (candidates .size ()));
210+ }
211+
212+ /** Pick an entry using the power-of-two algorithm. */
213+ private int pickEntryIndexPowerOfTwoLeastInFlight () {
214+ List <Entry > localEntries = entries .get ();
215+ int choice1 = pickEntryIndexRandom ();
216+ int choice2 = pickEntryIndexRandom ();
217+ if (choice1 == choice2 ) {
218+ // Try to pick two different entries. If this picks the same entry again, it's likely that
219+ // there's only one healthy channel in the pool and we should proceed anyway.
220+ choice2 = pickEntryIndexRandom ();
221+ }
222+
223+ Entry entry1 = localEntries .get (choice1 );
224+ Entry entry2 = localEntries .get (choice2 );
225+ return entry1 .outstandingRpcs .get () < entry2 .outstandingRpcs .get () ? choice1 : choice2 ;
226+ }
227+
228+ Channel getChannel (int index ) {
229+ return new AffinityChannel (index );
154230 }
155231
156232 /** {@inheritDoc} */
@@ -395,7 +471,9 @@ void refresh() {
395471 * Get and retain a Channel Entry. The returned Entry will have its rpc count incremented,
396472 * preventing it from getting recycled.
397473 */
398- Entry getRetainedEntry (int affinity ) {
474+ private Entry getRetainedEntry (int affinity ) {
475+ // If an entry is not retainable, that usually means that it's about to be replaced and if we
476+ // retry we should get a new useable entry.
399477 // The maximum number of concurrent calls to this method for any given time span is at most 2,
400478 // so the loop can actually be 2 times. But going for 5 times for a safety margin for potential
401479 // code evolving
@@ -543,10 +621,10 @@ private void shutdown() {
543621
544622 /** Thin wrapper to ensure that new calls are properly reference counted. */
545623 private class AffinityChannel extends Channel {
546- private final int affinity ;
624+ private final int index ;
547625
548- public AffinityChannel (int affinity ) {
549- this .affinity = affinity ;
626+ public AffinityChannel (int index ) {
627+ this .index = index ;
550628 }
551629
552630 @ Override
@@ -557,9 +635,7 @@ public String authority() {
557635 @ Override
558636 public <RequestT , ResponseT > ClientCall <RequestT , ResponseT > newCall (
559637 MethodDescriptor <RequestT , ResponseT > methodDescriptor , CallOptions callOptions ) {
560-
561- Entry entry = getRetainedEntry (affinity );
562-
638+ Entry entry = getRetainedEntry (index );
563639 return new ReleasingClientCall <>(entry .channel .newCall (methodDescriptor , callOptions ), entry );
564640 }
565641 }
0 commit comments