diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallable.java index 78d507665e..3cc6a10b79 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallable.java @@ -181,6 +181,7 @@ public void onError(Throwable throwable) { @Override public void onComplete() { + System.out.println("onComplete called on unary operation callable"); if (allowNoResponse && set(null)) { tracer.operationSucceeded(); return; diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/Callable2.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/Callable2.java new file mode 100644 index 0000000000..a535d30f59 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/Callable2.java @@ -0,0 +1,161 @@ +package com.google.cloud.bigtable.data.v2.stub; + +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.StreamController; +import io.grpc.Status; + +import java.util.concurrent.atomic.AtomicInteger; + +interface Callable2 { + + void call(RequestT request, ResponseObserver2 observer, ApiCallContext context); + +} + +class ToOldCallableAdapter extends ServerStreamingCallable { + + Callable2 callable; + + ToOldCallableAdapter(Callable2 callable) { + this.callable = callable; + } + + @Override + public void call(RequestT requestT, ResponseObserver responseObserver, ApiCallContext apiCallContext) { + callable.call(requestT, new ToOldObserverAdapter<>(responseObserver), apiCallContext); + } +} + +class ToNewCallableAdapter implements Callable2 { + + ServerStreamingCallable callable; + + ToNewCallableAdapter(ServerStreamingCallable callable) { + this.callable = callable; + } + + @Override + public void call(RequestT request, ResponseObserver2 observer, ApiCallContext context) { + callable.call(request, new ToNewObserverAdapter<>(observer), context); + } +} + +class ToNewObserverAdapter implements ResponseObserver { + + private ResponseObserver2 userObserver; + private StreamController grpcController; + + ToNewObserverAdapter(ResponseObserver2 observer) { + this.userObserver = observer; + } + + @Override + public void onStart(StreamController streamController) { + grpcController = streamController; + grpcController.disableAutoInboundFlowControl(); + userObserver.onStart(new StreamController2() { + @Override + public void cancel(String reason) { + grpcController.cancel(); + } + + @Override + public void onReady() { + grpcController.request(1); + } + }); + } + + @Override + public void onResponse(ResponseT responseT) { + userObserver.onResponse(responseT); + } + + @Override + public void onError(Throwable throwable) { + userObserver.onClose(Status.fromThrowable(throwable)); + } + + @Override + public void onComplete() { + userObserver.onClose(Status.OK); + } +} + +class ToOldObserverAdapter implements ResponseObserver2 { + + private ResponseObserver userOberver; + private StreamController2 grpcController; + private AtomicInteger userRequested = new AtomicInteger(0); + private volatile boolean autoFlowControl = true; + + ToOldObserverAdapter(ResponseObserver observer) { + this.userOberver = observer; + } + + @Override + public void onStart(StreamController2 streamController2) { + grpcController = streamController2; + userOberver.onStart(new StreamController() { + @Override + public void cancel() { + grpcController.cancel("user cancelled stream"); + } + + @Override + public void disableAutoInboundFlowControl() { + System.out.println("already disabled"); + autoFlowControl = false; + } + + @Override + public void request(int i) { + int oldN = userRequested.getAndAdd(i); + if (oldN == 0) { + grpcController.onReady(); + } + + } + }); + + if (autoFlowControl) { + grpcController.onReady(); + } + } + + @Override + public void onResponse(ResponseT response) { + userOberver.onResponse(response); + if (userRequested.decrementAndGet() > 0) { + grpcController.onReady(); + } else if (autoFlowControl) { + grpcController.onReady(); + } + } + + @Override + public void onClose(Status status) { + if (status.isOk()) { + userOberver.onComplete(); + } else { + userOberver.onError(status.asException()); + } + } +} + +interface ResponseObserver2 { + void onStart(StreamController2 streamController2); + void onResponse(ResponseT response); + void onClose(Status status); +} + +interface StreamController2 { + void cancel(String reason); + void onReady(); +} + + + + diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 46377fbc41..0e0e7ba175 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -1268,18 +1268,22 @@ private ServerStreamingCallable withR ServerStreamingCallable innerCallable, ServerStreamingCallSettings serverStreamingCallSettings) { - ServerStreamingCallable retrying; - if (settings.getEnableRetryInfo()) { - retrying = - com.google.cloud.bigtable.gaxx.retrying.Callables.retrying( - innerCallable, serverStreamingCallSettings, clientContext); - } else { - retrying = Callables.retrying(innerCallable, serverStreamingCallSettings, clientContext); - } - if (settings.getEnableRoutingCookie()) { - return new CookiesServerStreamingCallable<>(retrying); - } - return retrying; +// ServerStreamingCallable retrying; +// if (settings.getEnableRetryInfo()) { +// retrying = +// com.google.cloud.bigtable.gaxx.retrying.Callables.retrying( +// innerCallable, serverStreamingCallSettings, clientContext); +// } else { +// retrying = Callables.retrying(innerCallable, serverStreamingCallSettings, clientContext); +// } +// if (settings.getEnableRoutingCookie()) { +// return new CookiesServerStreamingCallable<>(retrying); +// } +// return retrying; + + Callable2 toNewCallable = new ToNewCallableAdapter(innerCallable); + Callable2 retryCallable = new RetryCallable<>(toNewCallable, serverStreamingCallSettings.getResumptionStrategy(), clientContext.getExecutor(), serverStreamingCallSettings.getRetrySettings()); + return new ToOldCallableAdapter<>(retryCallable); } // diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/Operation.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/Operation.java new file mode 100644 index 0000000000..19175ac3a4 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/Operation.java @@ -0,0 +1,225 @@ +package com.google.cloud.bigtable.data.v2.stub; + +import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.retrying.StreamResumptionStrategy; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracer; +import io.grpc.Deadline; +import io.grpc.Status; + +import javax.annotation.concurrent.GuardedBy; +import java.time.Duration; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public class Operation { + + private final StreamResumptionStrategy resumptionStrategy; + private final ResponseObserver2 outerObserver; + private final Callable2 callable; + private final ScheduledExecutorService executor; + private RequestT request; + private ApiCallContext context; + private final RetrySettings settings; + private Deadline deadline; + private final AtomicInteger attempt; + private final Object lock = new Object(); + + @GuardedBy("lock") + private StateListener operationState; + + Operation(Callable2 callable, StreamResumptionStrategy resumptionStrategy, ResponseObserver2 outerObserver, + ScheduledExecutorService executor, RequestT request, ApiCallContext context, RetrySettings settings) { + // TODO future debuggability: for example: ring buffer to buffer the last few states + this.callable = callable; + this.resumptionStrategy = resumptionStrategy; + this.outerObserver = outerObserver; + this.executor = executor; + this.operationState = new Idle(this, outerObserver); + this.request = request; + this.settings = settings; + this.context = context; + this.attempt = new AtomicInteger(0); + } + + void start() { + outerObserver.onStart(new StreamController2() { + @Override + public void cancel(String reason) { + onCancel(reason); + } + + @Override + public void onReady() { + deadline = Deadline.after(settings.getTotalTimeoutDuration().toMillis(), TimeUnit.MILLISECONDS); + if (context.getTracer() instanceof BigtableTracer) { + ((BigtableTracer) context.getTracer()).attemptStarted(attempt.get()); + } + + synchronized (lock) { + operationState.onReady(); + } + } + }); + } + + void onCancel(String reason) { + synchronized (lock) { + operationState.onCancel(reason); + } + } + + public void onStateChange(StateListener state) { + synchronized (lock) { + this.operationState = state; + } + } + + long getRetryDelay() { + long maxRetryDelay = settings.getMaxRetryDelayDuration().toMillis(); + long initialRetryDelay = settings.getInitialRetryDelayDuration().toMillis(); + double multiplier = settings.getRetryDelayMultiplier(); + + long nextDelay = (long) Math.min(initialRetryDelay * Math.pow(multiplier, attempt.get()), maxRetryDelay); + return ThreadLocalRandom.current().nextLong(nextDelay); + } + + long getTimeout() { + long rpcTimeout = settings.getInitialRpcTimeoutDuration().toMillis(); + + return Math.min(rpcTimeout, deadline.timeRemaining(TimeUnit.MILLISECONDS)); + } + + abstract class StateListener { + + protected Operation operation; + protected ResponseObserver2 outerObserver; + + StateListener(Operation operation, ResponseObserver2 outerObserver) { + this.operation = operation; + this.outerObserver = outerObserver; + } + + abstract public void onReady(); + abstract public void onCancel(String reason); + } + + private class Idle extends StateListener { + + Idle(Operation operation, ResponseObserver2 outerObserver) { + super(operation, outerObserver); + } + + @Override + public void onReady() { + Active active = new Active(super.operation, super.outerObserver); + synchronized (lock) { + callable.call(operation.request, active, operation.context.withTimeoutDuration(Duration.ofMillis(getTimeout()))); + operation.onStateChange(active); + } + } + + @Override + public void onCancel(String reason) { + outerObserver.onClose(Status.CANCELLED.withDescription(reason)); + } + } + + private class Active extends StateListener implements ResponseObserver2 { + + StreamController2 grpcController; + boolean userWaitingResponse = false; + + Active(Operation operation, ResponseObserver2 outerObserver) { + super(operation, outerObserver); + } + + @Override + public void onReady() { + userWaitingResponse = true; + grpcController.onReady(); + } + + @Override + public void onCancel(String reason) { + grpcController.cancel(reason); + } + + @Override + public void onStart(StreamController2 streamController) { + this.grpcController = streamController; + onReady(); + } + + @Override + public void onResponse(ResponseT response) { + userWaitingResponse = false; + resumptionStrategy.processResponse(response); + outerObserver.onResponse(response); + } + + @Override + public void onClose(Status status) { + if (!status.isOk()) { + // TODO placeholder, need to check for retryable code and error details + if (status.equals(Status.DEADLINE_EXCEEDED) || status.equals(Status.UNAVAILABLE)) { + attempt.getAndIncrement(); + request = resumptionStrategy.getResumeRequest(request); + if (!userWaitingResponse) { + // TODO wait retry delay ? + synchronized (lock) { + Idle idle = new Idle(super.operation, outerObserver); + onStateChange(idle); + } + return; + } else { + long retryDelay = getRetryDelay(); + if (deadline.timeRemaining(TimeUnit.MILLISECONDS) - retryDelay > 1) { + Scheduled scheduled = new Scheduled(super.operation, outerObserver); + synchronized (lock) { + ScheduledFuture future = executor.schedule(scheduled::onReady, retryDelay, TimeUnit.MILLISECONDS); + scheduled.setScheduledFuture(future); + onStateChange(scheduled); + } + return; + } + } + } + } + outerObserver.onClose(status); + } + } + + + class Scheduled extends StateListener { + + ScheduledFuture scheduledFuture; + Scheduled(Operation operation, ResponseObserver2 outerObserver) { + super(operation, outerObserver); + } + + @Override + public void onReady() { + Active active = new Active(super.operation, super.outerObserver); + synchronized (lock) { + callable.call(request, active, context.withTimeoutDuration(Duration.ofMillis(getTimeout()))); + onStateChange(active); + } + } + + @Override + public void onCancel(String reason) { + if (scheduledFuture != null) { + scheduledFuture.cancel(true); + scheduledFuture = null; + } + } + + void setScheduledFuture(ScheduledFuture future) { + this.scheduledFuture = future; + } + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RetryCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RetryCallable.java new file mode 100644 index 0000000000..c159d05c6b --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RetryCallable.java @@ -0,0 +1,29 @@ +package com.google.cloud.bigtable.data.v2.stub; + +import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.retrying.StreamResumptionStrategy; +import com.google.api.gax.rpc.ApiCallContext; + +import java.util.concurrent.ScheduledExecutorService; + +public class RetryCallable implements Callable2 { + + Callable2 callable; + StreamResumptionStrategy resumptionStrategy; + ScheduledExecutorService executor; + RetrySettings settings; + + RetryCallable(Callable2 callable, StreamResumptionStrategy streamResumptionStrategy, ScheduledExecutorService executor, RetrySettings settings) { + this.callable = callable; + this.resumptionStrategy = streamResumptionStrategy; + this.executor = executor; + this.settings = settings; + } + + @Override + public void call(RequestT request, ResponseObserver2 observer, ApiCallContext context) { + System.out.println("retry callable is called"); + Operation operation = new Operation<>(callable, resumptionStrategy, observer, executor, request, context, settings); + operation.start(); + } +} \ No newline at end of file diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/SafeResponseObserver.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/SafeResponseObserver.java index 0133dd3c2b..d1d54219bc 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/SafeResponseObserver.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/SafeResponseObserver.java @@ -98,6 +98,7 @@ public final void onError(Throwable throwable) { @Override public final void onComplete() { if (!isClosed.compareAndSet(false, true)) { + System.out.println("OuterObserver is " + outerObserver.getClass().getName()); logException("Tried to double close the stream"); return; } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/StreamingAttemptCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/StreamingAttemptCallable.java new file mode 100644 index 0000000000..87b9e1d5d9 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/StreamingAttemptCallable.java @@ -0,0 +1,19 @@ +package com.google.cloud.bigtable.data.v2.stub; + +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.ServerStreamingCallable; + +public class StreamingAttemptCallable extends ServerStreamingCallable { + + private ServerStreamingCallable innerCallable; + + public StreamingAttemptCallable(ServerStreamingCallable callable) { + this.innerCallable = callable; + } + + @Override + public void call(RequestT requestT, ResponseObserver responseObserver, ApiCallContext apiCallContext) { + innerCallable.call(requestT, responseObserver, apiCallContext); + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/StreamingAttemptFactory.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/StreamingAttemptFactory.java new file mode 100644 index 0000000000..98c5fbf57c --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/StreamingAttemptFactory.java @@ -0,0 +1,55 @@ +package com.google.cloud.bigtable.data.v2.stub; + +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.retrying.StreamResumptionStrategy; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.StreamController; + +public class StreamingAttemptFactory { + + private RequestT request; + private StreamResumptionStrategy resumptionStrategy; + private ServerStreamingCallable callable; + + StreamingAttemptFactory(ServerStreamingCallable callable, + RequestT request, + StreamResumptionStrategy resumptionStrategy) { + this.request = request; + this.resumptionStrategy = resumptionStrategy; + this.callable = callable; + } + + SettableApiFuture sendAttempt(ResponseObserver observer, ApiCallContext context) { + StreamingAttemptCallable attemptCallable = new StreamingAttemptCallable<>(callable); + + request = resumptionStrategy.getResumeRequest(request); + + SettableApiFuture future = SettableApiFuture.create(); + attemptCallable.call(request, new ResponseObserver() { + @Override + public void onStart(StreamController streamController) { + + } + + @Override + public void onResponse(ResponseT responseT) { + resumptionStrategy.processResponse(responseT); + observer.onResponse(responseT); + } + + @Override + public void onError(Throwable throwable) { + future.setException(throwable); + } + + @Override + public void onComplete() { + future.set(null); + } + }, context); + + return future; + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/StreamingOperation.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/StreamingOperation.java new file mode 100644 index 0000000000..1673c5eada --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/StreamingOperation.java @@ -0,0 +1,111 @@ +package com.google.cloud.bigtable.data.v2.stub; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.retrying.StreamResumptionStrategy; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.ServerStreamingCallSettings; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.StreamController; +import com.google.common.util.concurrent.MoreExecutors; +import io.grpc.Deadline; + +import java.time.Duration; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public class StreamingOperation { + + private final ScheduledExecutorService executor; + private final StreamResumptionStrategy resumptionStrategy; + private final ServerStreamingCallable callable; + private final RetrySettings retrySettings; + private final Deadline deadline; + private final AtomicInteger attemptNumber = new AtomicInteger(0); + private StreamingAttemptFactory attemptFactory; + + StreamingOperation(StreamResumptionStrategy resumptionStrategy, + ScheduledExecutorService executor, + ServerStreamingCallSettings callSettings, + ServerStreamingCallable callable) { + this.resumptionStrategy = resumptionStrategy; + this.executor = executor; + this.retrySettings = callSettings.getRetrySettings(); + this.callable = callable; + this.deadline = Deadline.after( + callSettings.getRetrySettings().getTotalTimeoutDuration().toMillis(), TimeUnit.MILLISECONDS); + + } + + public void startOperation(RequestT request, ResponseObserver observer, ApiCallContext context) { + /** + * 1. create a new attempt + * new request + * new response observer + * 2. attempt succeeds, notify outer observer + * 3. attempt fails, check if it can be retried + */ + this.attemptFactory = new StreamingAttemptFactory(callable, request, resumptionStrategy); + + retry(attemptFactory, observer, context); + } + + public void retry(StreamingAttemptFactory attemptFactory, ResponseObserver observer, ApiCallContext context) { + SettableApiFuture future = attemptFactory.sendAttempt(observer, context); + + ApiFutures.addCallback(future, new ApiFutureCallback() { + @Override + public void onFailure(Throwable throwable) { + if (shouldRetry(throwable)) { + executor.schedule(() -> retry(attemptFactory, observer, context.withTimeoutDuration(Duration.ofMillis(getTimeout()))), getRetryDelay(), TimeUnit.MILLISECONDS); + } else { + observer.onError(throwable); + } + } + + @Override + public void onSuccess(Void unused) { + observer.onComplete(); + } + }, MoreExecutors.directExecutor()); + } + + boolean shouldRetry(Throwable t) { + if (deadline.isExpired()) { + return false; + } + int maxAttempt = retrySettings.getMaxAttempts(); + if (maxAttempt != 0 && attemptNumber.get() >= maxAttempt) { + return false; + } + if (t instanceof ApiException) { + return ((ApiException) t).isRetryable(); + } + return false; + } + + long getRetryDelay() { + long maxRetryDelay = retrySettings.getMaxRetryDelayDuration().toMillis(); + long initialRetryDelay = retrySettings.getInitialRetryDelayDuration().toMillis(); + double multiplier = retrySettings.getRetryDelayMultiplier(); + + long nextDelay = (long) Math.min(initialRetryDelay * Math.pow(multiplier, attemptNumber.get()), maxRetryDelay); + System.out.println("retry attempt " + attemptNumber.get() + " delay " + nextDelay); + return ThreadLocalRandom.current().nextLong(nextDelay); + } + + long getTimeout() { + long rpcTimeout = retrySettings.getInitialRpcTimeoutDuration().toMillis(); + long timeout = Math.min(rpcTimeout, deadline.timeRemaining(TimeUnit.MILLISECONDS)); + + System.out.println("retry attempt " + attemptNumber.get() + " timeout " + timeout); + return timeout; + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/StreamingRetryCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/StreamingRetryCallable.java new file mode 100644 index 0000000000..83052bf90b --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/StreamingRetryCallable.java @@ -0,0 +1,26 @@ +package com.google.cloud.bigtable.data.v2.stub; + +import com.google.api.gax.retrying.StreamResumptionStrategy; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.ServerStreamingCallSettings; +import com.google.api.gax.rpc.ServerStreamingCallable; + +import java.util.concurrent.ScheduledExecutorService; + +public class StreamingRetryCallable extends ServerStreamingCallable { + + private final StreamingOperation operation; + + StreamingRetryCallable(ServerStreamingCallable innerCallable, + ScheduledExecutorService executor, + ServerStreamingCallSettings settings, + StreamResumptionStrategy resumptionStrategy) { + this.operation = new StreamingOperation<>(resumptionStrategy, executor, settings, innerCallable); + } + + @Override + public void call(RequestT request, ResponseObserver responseObserver, ApiCallContext apiCallContext) { + operation.startOperation(request, responseObserver, apiCallContext); + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/TestRetry.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/TestRetry.java new file mode 100644 index 0000000000..db0c391b9b --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/TestRetry.java @@ -0,0 +1,255 @@ +package com.google.cloud.bigtable.data.v2.stub; + +import com.google.api.core.ApiFuture; +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.api.gax.grpc.GrpcCallSettings; +import com.google.api.gax.grpc.GrpcRawCallableFactory; +import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.retrying.SimpleStreamResumptionStrategy; +import com.google.api.gax.retrying.StreamResumptionStrategy; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.ServerStream; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.StatusCode; +import com.google.api.gax.rpc.StreamController; +import com.google.bigtable.v2.BigtableGrpc; +import com.google.bigtable.v2.MutateRowRequest; +import com.google.bigtable.v2.MutateRowResponse; +import com.google.bigtable.v2.ReadRowsRequest; +import com.google.bigtable.v2.ReadRowsResponse; +import com.google.bigtable.v2.TableName; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; +import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.Row; +import com.google.cloud.bigtable.data.v2.models.RowMutation; +import com.google.cloud.bigtable.data.v2.models.TableId; +import com.google.cloud.bigtable.data.v2.models.TargetId; +import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsResumptionStrategy; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.ByteString; +import com.google.protobuf.BytesValue; +import com.google.protobuf.StringValue; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.time.Duration; +import java.util.concurrent.Executors; + +public class TestRetry { + + EnhancedBigtableStub stub; + Server server; + + @Before + public void setup() throws Exception { + server = ServerBuilder.forPort(1234) + .addService(new FakeService()) + .build(); + + BigtableDataSettings.Builder settings = + BigtableDataSettings + .newBuilderForEmulator(1234) + .setProjectId("project") + .setInstanceId("instance"); + + settings.stubSettings().mutateRowSettings() + .setRetrySettings(RetrySettings.newBuilder() + .setInitialRpcTimeoutDuration(Duration.ofMillis(1000)) + .setMaxRpcTimeoutDuration(Duration.ofMillis(1000)) + .setTotalTimeoutDuration(Duration.ofSeconds(60)) + .setRetryDelayMultiplier(1.5) + .setInitialRetryDelayDuration(Duration.ofSeconds(1)) + .setMaxRetryDelayDuration(Duration.ofSeconds(10)) + .build()); + + settings.stubSettings().readRowsSettings() + .setRetrySettings(RetrySettings.newBuilder() + .setInitialRpcTimeoutDuration(Duration.ofMillis(1000)) + .setMaxRpcTimeoutDuration(Duration.ofMillis(1000)) + .setTotalTimeoutDuration(Duration.ofSeconds(60)) + .setRetryDelayMultiplier(1.5) + .setInitialRetryDelayDuration(Duration.ofSeconds(1)) + .setMaxRetryDelayDuration(Duration.ofSeconds(10)) + .build()); + +// settings.stubSettings().setEnableSkipTrailers(false); + + server.start(); + + stub = EnhancedBigtableStub.create(settings.stubSettings().build()); + } + + + @After + public void tearDown() { + stub.close(); + server.shutdown(); + } + + @Test + public void testUnary() throws Exception { + + ApiFuture future = stub.mutateRowCallable() + .futureCall(RowMutation.create(TableId.of("table"), "row") + .setCell("cf", "q", "c"), null); + + future.get(); + } + + @Test + public void testServerStream() throws Exception { +// ServerStreamingCallable callable = GrpcRawCallableFactory.createServerStreamingCallable( +// GrpcCallSettings.newBuilder() +// .setMethodDescriptor(BigtableGrpc.getReadRowsMethod()) +// .setParamsExtractor(r -> ImmutableMap.of()) +// .build(), +// ImmutableSet.of(StatusCode.Code.DEADLINE_EXCEEDED)); +// +// ToNewCallableAdapter toNew = new ToNewCallableAdapter(callable); +// RetryCallable retry = new RetryCallable<>(toNew, new FakeResumptionStrategy(), Executors.newScheduledThreadPool(1)); +// ToOldCallableAdapter toOld = new ToOldCallableAdapter<>(retry); +// +// +// ReadRowsRequest request = ReadRowsRequest.newBuilder() +// .setTableName(TableName.of("project", "instance", "table").toString()) +// .build(); +// +// Channel channel = ManagedChannelBuilder.forAddress("localhost", 1234).usePlaintext().build(); +// +// ServerStreamingCallable userCallable = toOld.withDefaultCallContext(GrpcCallContext.of(channel, CallOptions.DEFAULT)); +// +// ServerStream responses = userCallable.call(request); +// for (ReadRowsResponse response : responses) { +// System.out.println(response); +// } +// +// userCallable.call(request, new ResponseObserver() { +// private StreamController controller; +// @Override +// public void onStart(StreamController controller) { +// } +// +// @Override +// public void onResponse(ReadRowsResponse response) { +// System.out.println("Received response " + response); +// } +// +// @Override +// public void onError(Throwable t) { +// +// } +// +// @Override +// public void onComplete() { +// System.out.println("complete"); +// } +// }); + +// Thread.sleep(50000); + +// userCallable.all().call(request); + + stub.readRowsCallable().all().call(Query.create("test")); + + } + + class FakeService extends BigtableGrpc.BigtableImplBase { + int count = 0; + + @Override + public void readRows(ReadRowsRequest request, StreamObserver responseObserver) { + count++; + if (count == 1) { + System.out.println("attempt 1: " + request); + responseObserver.onNext( + ReadRowsResponse.newBuilder() + .addChunks( + ReadRowsResponse.CellChunk.newBuilder() + .setRowKey(ByteString.copyFromUtf8("key1")) + .setFamilyName(StringValue.newBuilder().setValue("cf")) + .setQualifier(BytesValue.newBuilder().setValue(ByteString.copyFromUtf8("q"))) + .setTimestampMicros(0) + .setValue(ByteString.copyFromUtf8("value")) + .setCommitRow(true)) + .build()); + responseObserver.onError(new StatusRuntimeException(Status.DEADLINE_EXCEEDED)); + } else if (count == 2) { + System.out.println("attempt 2: " + request); + responseObserver.onNext(ReadRowsResponse.newBuilder() + .addChunks( + ReadRowsResponse.CellChunk.newBuilder() + .setRowKey(ByteString.copyFromUtf8("key2")) + .setFamilyName(StringValue.newBuilder().setValue("cf")) + .setQualifier(BytesValue.newBuilder().setValue(ByteString.copyFromUtf8("q"))) + .setTimestampMicros(0) + .setValue(ByteString.copyFromUtf8("value")) + .setCommitRow(true)) + .build()); + responseObserver.onNext(ReadRowsResponse.newBuilder() + .addChunks( + ReadRowsResponse.CellChunk.newBuilder() + .setRowKey(ByteString.copyFromUtf8("key3")) + .setFamilyName(StringValue.newBuilder().setValue("cf")) + .setQualifier(BytesValue.newBuilder().setValue(ByteString.copyFromUtf8("q"))) + .setTimestampMicros(0) + .setValue(ByteString.copyFromUtf8("value")) + .setCommitRow(true)) + .build()); + responseObserver.onError(new StatusRuntimeException(Status.DEADLINE_EXCEEDED)); + } else { + System.out.println("attempt 3: " + request); + responseObserver.onCompleted(); + } + } + + @Override + public void mutateRow(MutateRowRequest request, StreamObserver responseObserver) { + System.out.println("server got mutateRow attempt " + count); + if (count ++ < 2) { + responseObserver.onError(new StatusRuntimeException(Status.UNAVAILABLE)); + } else { + responseObserver.onNext(MutateRowResponse.newBuilder().build()); + responseObserver.onCompleted(); + } + } + } + + class FakeResumptionStrategy implements StreamResumptionStrategy { + @Nonnull + @Override + public StreamResumptionStrategy createNew() { + return null; + } + + @Nonnull + @Override + public Object processResponse(Object response) { + return null; + } + + @Nullable + @Override + public Object getResumeRequest(Object originalRequest) { + return originalRequest; + } + + @Override + public boolean canResume() { + return true; + } + } +}