diff --git a/binder/src/androidTest/java/io/grpc/binder/internal/BinderTransportTest.java b/binder/src/androidTest/java/io/grpc/binder/internal/BinderTransportTest.java index fc9a383d572..36055185e67 100644 --- a/binder/src/androidTest/java/io/grpc/binder/internal/BinderTransportTest.java +++ b/binder/src/androidTest/java/io/grpc/binder/internal/BinderTransportTest.java @@ -70,6 +70,7 @@ protected InternalServer newServer(List streamTracer .setListenAddress(addr) .setExecutorPool(serverExecutorPool) .setExecutorServicePool(executorServicePool) + .setOffloadExecutorPool(offloadExecutorPool) .setStreamTracerFactories(streamTracerFactories) .build(); diff --git a/binder/src/main/java/io/grpc/binder/BinderServerBuilder.java b/binder/src/main/java/io/grpc/binder/BinderServerBuilder.java index c926c853472..1be7eccc523 100644 --- a/binder/src/main/java/io/grpc/binder/BinderServerBuilder.java +++ b/binder/src/main/java/io/grpc/binder/BinderServerBuilder.java @@ -31,6 +31,7 @@ import io.grpc.internal.FixedObjectPool; import io.grpc.internal.ServerImplBuilder; import java.io.File; +import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; /** Builder for a server that services requests from an Android Service. */ @@ -141,6 +142,27 @@ public BinderServerBuilder useTransportSecurity(File certChain, File privateKey) throw new UnsupportedOperationException("TLS not supported in BinderServer"); } + /** + * Provides an executor to be used for operations that block or are expensive. + * + *

For example, {@link SecurityPolicy}s may be evaluated on this executor as implementations + * commonly require one or more blocking IPC round trips to Android's system server. This allows + * the host application to segregate its threads by workload. + * + *

Optional. By default, the executor associated with {@link ServerBuilder#executor(Executor)} + * will be used for this purpose. + * + *

The server won't take ownership of the given executor. Callers must ensure that it remains + * usable (not shutdown) until the built server terminates. + * + * @return this + */ + public BinderServerBuilder offloadExecutor(Executor executor) { + internalBuilder.setOffloadExecutorPool( + new FixedObjectPool<>(checkNotNull(executor, "offloadExecutor"))); + return this; + } + /** * Builds a {@link Server} according to this builder's parameters and stores its listening {@link * IBinder} in the {@link IBinderReceiver} passed to {@link #forAddress(AndroidComponentAddress, diff --git a/binder/src/main/java/io/grpc/binder/ServerSecurityPolicy.java b/binder/src/main/java/io/grpc/binder/ServerSecurityPolicy.java index 6a9361c0eaf..19f4eb5c44e 100644 --- a/binder/src/main/java/io/grpc/binder/ServerSecurityPolicy.java +++ b/binder/src/main/java/io/grpc/binder/ServerSecurityPolicy.java @@ -22,6 +22,7 @@ import io.grpc.Status; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Executor; import javax.annotation.CheckReturnValue; /** @@ -66,22 +67,18 @@ public Status checkAuthorizationForService(int uid, String serviceName) { * * @param uid The Android UID to authenticate. * @param serviceName The name of the gRPC service being called. + * @param offloadExecutor for evaluating the relevant SecurityPolicy if it's not natively async * @return a future with the result of the authorization check. A failed future represents a * failure to perform the authorization check, not that the access is denied. */ @CheckReturnValue - ListenableFuture checkAuthorizationForServiceAsync(int uid, String serviceName) { + ListenableFuture checkAuthorizationForServiceAsync( + int uid, String serviceName, Executor offloadExecutor) { SecurityPolicy securityPolicy = perServicePolicies.getOrDefault(serviceName, defaultPolicy); if (securityPolicy instanceof AsyncSecurityPolicy) { return ((AsyncSecurityPolicy) securityPolicy).checkAuthorizationAsync(uid); } - - try { - Status status = securityPolicy.checkAuthorization(uid); - return Futures.immediateFuture(status); - } catch (Exception e) { - return Futures.immediateFailedFuture(e); - } + return Futures.submit(() -> securityPolicy.checkAuthorization(uid), offloadExecutor); } public static Builder newBuilder() { diff --git a/binder/src/main/java/io/grpc/binder/internal/BinderServer.java b/binder/src/main/java/io/grpc/binder/internal/BinderServer.java index 0ad54fb74d1..2a592ad5837 100644 --- a/binder/src/main/java/io/grpc/binder/internal/BinderServer.java +++ b/binder/src/main/java/io/grpc/binder/internal/BinderServer.java @@ -65,6 +65,8 @@ public final class BinderServer implements InternalServer, LeakSafeOneWayBinder. private final ObjectPool executorServicePool; private final ObjectPool executorPool; + private final ObjectPool offloadExecutorPool; + private final ImmutableList streamTracerFactories; private final AndroidComponentAddress listenAddress; private final LeakSafeOneWayBinder hostServiceBinder; @@ -81,6 +83,10 @@ public final class BinderServer implements InternalServer, LeakSafeOneWayBinder. @GuardedBy("this") private Executor executor; + @Nullable // Before start() and after termination. + @GuardedBy("this") + private Executor offloadExecutor; + @GuardedBy("this") private boolean shutdown; @@ -88,6 +94,10 @@ private BinderServer(Builder builder) { this.listenAddress = checkNotNull(builder.listenAddress); this.executorPool = checkNotNull(builder.executorPool); this.executorServicePool = builder.executorServicePool; + this.offloadExecutorPool = + builder.offloadExecutorPool != null + ? builder.offloadExecutorPool + : builder.executorServicePool; this.streamTracerFactories = ImmutableList.copyOf(checkNotNull(builder.streamTracerFactories, "streamTracerFactories")); this.serverPolicyChecker = BinderInternal.createPolicyChecker(builder.serverSecurityPolicy); @@ -105,6 +115,7 @@ public synchronized void start(ServerListener serverListener) throws IOException listener = new ActiveTransportTracker(serverListener, this::onTerminated); executorService = executorServicePool.getObject(); executor = executorPool.getObject(); + offloadExecutor = offloadExecutorPool.getObject(); } @Override @@ -142,6 +153,7 @@ public synchronized void shutdown() { private synchronized void onTerminated() { executor = executorPool.returnObject(executor); + offloadExecutor = offloadExecutorPool.returnObject(offloadExecutor); } @Override @@ -176,7 +188,9 @@ public synchronized boolean handleTransaction(int code, Parcel parcel) { attrsBuilder, callingUid, serverPolicyChecker, - checkNotNull(executor, "Not started?")); + checkNotNull(executor, "Not started?"), + checkNotNull(offloadExecutor, "Not started?")); + // Create a new transport and let our listener know about it. BinderTransport.BinderServerTransport transport = new BinderTransport.BinderServerTransport( @@ -219,6 +233,7 @@ public boolean handleTransaction(int code, Parcel parcel) { public static class Builder { @Nullable AndroidComponentAddress listenAddress; @Nullable List streamTracerFactories; + @Nullable ObjectPool offloadExecutorPool; @Nullable ObjectPool executorPool; ObjectPool executorServicePool = @@ -275,6 +290,16 @@ public Builder setExecutorServicePool( return this; } + /** + * Sets the executor to be used for blocking work. + * + *

Optional. If unset, 'executorPool' will be used for this work (not recommended). + */ + public Builder setOffloadExecutorPool(ObjectPool offloadExecutorPool) { + this.offloadExecutorPool = offloadExecutorPool; + return this; + } + /** * Sets the {@link ServerSecurityPolicy} to be used for built servers. * diff --git a/binder/src/main/java/io/grpc/binder/internal/BinderTransportSecurity.java b/binder/src/main/java/io/grpc/binder/internal/BinderTransportSecurity.java index 430eee3e041..bb97862d565 100644 --- a/binder/src/main/java/io/grpc/binder/internal/BinderTransportSecurity.java +++ b/binder/src/main/java/io/grpc/binder/internal/BinderTransportSecurity.java @@ -69,17 +69,20 @@ public static void installAuthInterceptor(ServerBuilder serverBuilder) { * @param remoteUid The remote UID of the transport. * @param serverPolicyChecker The policy checker for this transport. * @param executor used for calling into the application. Must outlive the transport. + * @param offloadExecutor used for blocking or expensive work. Must outlive the transport. */ @Internal public static void attachAuthAttrs( Attributes.Builder builder, int remoteUid, ServerPolicyChecker serverPolicyChecker, - Executor executor) { + Executor executor, + Executor offloadExecutor) { builder .set( TRANSPORT_AUTHORIZATION_STATE, - new TransportAuthorizationState(remoteUid, serverPolicyChecker, executor)) + new TransportAuthorizationState( + remoteUid, serverPolicyChecker, executor, offloadExecutor)) .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY); } @@ -97,9 +100,9 @@ public ServerCall.Listener interceptCall( ListenableFuture authStatusFuture = transportAuthState.checkAuthorization(call.getMethodDescriptor()); - // Most SecurityPolicy will have synchronous implementations that provide an - // immediately-resolved Future. In that case, short-circuit to avoid unnecessary allocations - // and asynchronous code if the authorization result is already present. + // Auth decisions are cached so this future will often already be complete. In that case, we + // use a fast path below that avoids unnecessary allocations and asynchronous code since the + // authorization result is already known. if (!authStatusFuture.isDone()) { return newServerCallListenerForPendingAuthResult( authStatusFuture, transportAuthState.executor, call, headers, next); @@ -166,15 +169,21 @@ private static final class TransportAuthorizationState { private final ServerPolicyChecker serverPolicyChecker; private final ConcurrentHashMap> serviceAuthorization; private final Executor executor; + private final Executor offloadExecutor; /** * @param executor used for calling into the application. Must outlive the transport. + * @param offloadExecutor used to check a non-async SecurityPolicy. Must outlive the transport. */ TransportAuthorizationState( - int uid, ServerPolicyChecker serverPolicyChecker, Executor executor) { + int uid, + ServerPolicyChecker serverPolicyChecker, + Executor executor, + Executor offloadExecutor) { this.uid = uid; this.serverPolicyChecker = serverPolicyChecker; this.executor = executor; + this.offloadExecutor = offloadExecutor; serviceAuthorization = new ConcurrentHashMap<>(8); } @@ -202,7 +211,7 @@ ListenableFuture checkAuthorization(MethodDescriptor method) { // TODO(10669): evaluate if there should be at most a single pending authorization check per // (uid, serviceName) pair at any given time. ListenableFuture authorization = - serverPolicyChecker.checkAuthorizationForServiceAsync(uid, serviceName); + serverPolicyChecker.checkAuthorizationForServiceAsync(uid, serviceName, offloadExecutor); if (useCache) { serviceAuthorization.putIfAbsent(serviceName, authorization); Futures.addCallback( @@ -227,7 +236,7 @@ public void onFailure(Throwable t) { * *

This class provides the asynchronous version of {@link io.grpc.binder.SecurityPolicy}, * allowing implementations of authorization logic that involves slow or asynchronous calls - * without necessarily blocking the calling thread. + * without ever blocking the calling thread. * * @see io.grpc.binder.SecurityPolicy */ @@ -238,11 +247,15 @@ public interface ServerPolicyChecker { *

This method never throws an exception. If the execution of the security policy check * fails, a failed future with such exception is returned. * + *

This method never blocks the calling thread. + * * @param uid The Android UID to authenticate. * @param serviceName The name of the gRPC service being called. + * @param offloadExecutor used for blocking or expensive work if necessary * @return a future with the result of the authorization check. A failed future represents a * failure to perform the authorization check, not that the access is denied. */ - ListenableFuture checkAuthorizationForServiceAsync(int uid, String serviceName); + ListenableFuture checkAuthorizationForServiceAsync( + int uid, String serviceName, Executor offloadExecutor); } } diff --git a/binder/src/test/java/io/grpc/binder/ServerSecurityPolicyTest.java b/binder/src/test/java/io/grpc/binder/ServerSecurityPolicyTest.java index eedc3f590cd..2272c8f312c 100644 --- a/binder/src/test/java/io/grpc/binder/ServerSecurityPolicyTest.java +++ b/binder/src/test/java/io/grpc/binder/ServerSecurityPolicyTest.java @@ -19,20 +19,24 @@ import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertThrows; import static org.junit.Assert.fail; +import static org.robolectric.Shadows.shadowOf; +import android.os.Looper; import android.os.Process; +import androidx.core.content.ContextCompat; +import androidx.test.core.app.ApplicationProvider; import com.google.common.base.Function; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.Uninterruptibles; import io.grpc.Status; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CancellationException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.Executors; import org.junit.Test; import org.junit.runner.RunWith; @@ -49,6 +53,8 @@ public final class ServerSecurityPolicyTest { private static final int OTHER_UID = MY_UID + 1; ServerSecurityPolicy policy; + final Executor executor = + ContextCompat.getMainExecutor(ApplicationProvider.getApplicationContext()); @Test public void testDefaultInternalOnly() throws Exception { @@ -153,8 +159,7 @@ public void testPerServiceAsync() throws Exception { // Add some extra future transformation to confirm that a chain // of futures gets properly handled. ListenableFuture dependency = Futures.immediateVoidFuture(); - return Futures.transform( - dependency, unused -> Status.OK, MoreExecutors.directExecutor()); + return Futures.transform(dependency, unused -> Status.OK, executor); })) .build(); @@ -181,8 +186,8 @@ public void testPerService_failedSecurityPolicyFuture_returnsAFailedFuture() { .build(); ListenableFuture statusFuture = - policy.checkAuthorizationForServiceAsync(MY_UID, SERVICE1); - + policy.checkAuthorizationForServiceAsync(MY_UID, SERVICE1, executor); + shadowOf(Looper.getMainLooper()).idle(); assertThrows(ExecutionException.class, statusFuture::get); } @@ -194,8 +199,8 @@ public void testPerServiceAsync_cancelledFuture_propagatesStatus() { .build(); ListenableFuture statusFuture = - policy.checkAuthorizationForServiceAsync(MY_UID, SERVICE1); - + policy.checkAuthorizationForServiceAsync(MY_UID, SERVICE1, executor); + shadowOf(Looper.getMainLooper()).idle(); assertThrows(CancellationException.class, statusFuture::get); } @@ -231,8 +236,8 @@ public void testPerServiceAsync_interrupted_cancelledFuture() { })) .build(); ListenableFuture statusFuture = - policy.checkAuthorizationForServiceAsync(MY_UID, SERVICE1); - + policy.checkAuthorizationForServiceAsync(MY_UID, SERVICE1, executor); + shadowOf(Looper.getMainLooper()).idle(); assertThrows(InterruptedException.class, statusFuture::get); listeningExecutorService.shutdownNow(); } @@ -337,11 +342,12 @@ public void testPerServiceNoDefaultAsync() throws Exception { * Shortcut for invoking {@link ServerSecurityPolicy#checkAuthorizationForServiceAsync} without * dealing with concurrency details. Returns a {link @Status.Code} for convenience. */ - private static Status.Code checkAuthorizationForServiceAsync( + private Status.Code checkAuthorizationForServiceAsync( ServerSecurityPolicy policy, int callerUid, String service) throws ExecutionException { ListenableFuture statusFuture = - policy.checkAuthorizationForServiceAsync(callerUid, service); - return Uninterruptibles.getUninterruptibly(statusFuture).getCode(); + policy.checkAuthorizationForServiceAsync(callerUid, service, executor); + shadowOf(Looper.getMainLooper()).idle(); + return Futures.getDone(statusFuture).getCode(); } private static SecurityPolicy policy(Function func) {