From 9ed3a2e17d176205ff9749d189bd7ad3adaa7fc9 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Wed, 1 Jan 2025 22:31:03 -0500 Subject: [PATCH 1/3] rewrite 1 --- .../stub/BigtableUnaryOperationCallable.java | 1 + .../bigtable/data/v2/stub/Callable2.java | 163 +++++++++++ .../data/v2/stub/EnhancedBigtableStub.java | 55 ++-- .../bigtable/data/v2/stub/Operation.java | 214 +++++++++++++++ .../bigtable/data/v2/stub/RetryCallable.java | 29 ++ .../data/v2/stub/SafeResponseObserver.java | 1 + .../v2/stub/StreamingAttemptCallable.java | 19 ++ .../data/v2/stub/StreamingAttemptFactory.java | 55 ++++ .../data/v2/stub/StreamingOperation.java | 111 ++++++++ .../data/v2/stub/StreamingRetryCallable.java | 26 ++ .../bigtable/data/v2/stub/TestRetry.java | 259 ++++++++++++++++++ 11 files changed, 908 insertions(+), 25 deletions(-) create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/Callable2.java create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/Operation.java create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RetryCallable.java create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/StreamingAttemptCallable.java create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/StreamingAttemptFactory.java create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/StreamingOperation.java create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/StreamingRetryCallable.java create mode 100644 google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/TestRetry.java diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallable.java index 78d507665e..3cc6a10b79 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallable.java @@ -181,6 +181,7 @@ public void onError(Throwable throwable) { @Override public void onComplete() { + System.out.println("onComplete called on unary operation callable"); if (allowNoResponse && set(null)) { tracer.operationSucceeded(); return; diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/Callable2.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/Callable2.java new file mode 100644 index 0000000000..e560f05bba --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/Callable2.java @@ -0,0 +1,163 @@ +package com.google.cloud.bigtable.data.v2.stub; + +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.StreamController; +import io.grpc.Status; + +import java.util.concurrent.atomic.AtomicInteger; + +interface Callable2 { + + void call(RequestT request, ResponseObserver2 observer, ApiCallContext context); + +} + +class ToOldCallableAdapter extends ServerStreamingCallable { + + Callable2 callable; + + ToOldCallableAdapter(Callable2 callable) { + this.callable = callable; + } + + @Override + public void call(RequestT requestT, ResponseObserver responseObserver, ApiCallContext apiCallContext) { + callable.call(requestT, new ToOldObserverAdapter<>(responseObserver), apiCallContext); + } +} + +class ToNewCallableAdapter implements Callable2 { + + ServerStreamingCallable callable; + + ToNewCallableAdapter(ServerStreamingCallable callable) { + this.callable = callable; + } + + @Override + public void call(RequestT request, ResponseObserver2 observer, ApiCallContext context) { + callable.call(request, new ToNewObserverAdapter<>(observer), context); + } +} + +class ToNewObserverAdapter implements ResponseObserver { + + private ResponseObserver2 userObserver; + private StreamController grpcController; + + ToNewObserverAdapter(ResponseObserver2 observer) { + this.userObserver = observer; + } + + @Override + public void onStart(StreamController streamController) { + grpcController = streamController; + grpcController.disableAutoInboundFlowControl(); + userObserver.onStart(new StreamController2() { + @Override + public void cancel(String reason) { + grpcController.cancel(); + } + + @Override + public void onReady() { + grpcController.request(1); + } + }); + } + + @Override + public void onResponse(ResponseT responseT) { + userObserver.onResponse(responseT); + } + + @Override + public void onError(Throwable throwable) { + userObserver.onClose(Status.fromThrowable(throwable)); + } + + @Override + public void onComplete() { + userObserver.onClose(Status.OK); + } +} + +class ToOldObserverAdapter implements ResponseObserver2 { + + private ResponseObserver userOberver; + private StreamController2 grpcController; + private AtomicInteger userRequested = new AtomicInteger(0); + private boolean autoFlowControl = true; + + ToOldObserverAdapter(ResponseObserver observer) { + this.userOberver = observer; + } + + @Override + public void onStart(StreamController2 streamController2) { + grpcController = streamController2; + userOberver.onStart(new StreamController() { + @Override + public void cancel() { + grpcController.cancel("user cancelled stream"); + } + + @Override + public void disableAutoInboundFlowControl() { + System.out.println("already disabled"); + autoFlowControl = false; + } + + @Override + public void request(int i) { + int oldN = userRequested.getAndAdd(i); + if (oldN == 0) { + grpcController.onReady(); + } + + } + }); + + if (autoFlowControl) { + grpcController.onReady(); + } + } + + @Override + public void onResponse(ResponseT response) { + userOberver.onResponse(response); + if (userRequested.decrementAndGet() > 0) { + grpcController.onReady(); + } + + if (autoFlowControl) { + grpcController.onReady(); + } + } + + @Override + public void onClose(Status status) { + if (status.isOk()) { + userOberver.onComplete(); + } else { + userOberver.onError(status.asException()); + } + } +} + +interface ResponseObserver2 { + void onStart(StreamController2 streamController2); + void onResponse(ResponseT response); + void onClose(Status status); +} + +interface StreamController2 { + void cancel(String reason); + void onReady(); +} + + + + diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 46377fbc41..78aa012ea4 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -475,17 +475,17 @@ private ServerStreamingCallable createReadRo .build(), readRowsSettings.getRetryableCodes()); - ServerStreamingCallable withStatsHeaders = - new StatsHeadersServerStreamingCallable<>(base); +// ServerStreamingCallable withStatsHeaders = +// new StatsHeadersServerStreamingCallable<>(base); // Sometimes ReadRows connections are disconnected via an RST frame. This error is transient and // should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code // which by default is not retryable. Convert the exception so it can be retried in the client. - ServerStreamingCallable convertException = - new ConvertExceptionCallable<>(withStatsHeaders); +// ServerStreamingCallable convertException = +// new ConvertExceptionCallable<>(withStatsHeaders); ServerStreamingCallable merging = - new RowMergingCallable<>(convertException, rowAdapter); + new RowMergingCallable<>(base, rowAdapter); // Copy settings for the middle ReadRowsRequest -> RowT callable (as opposed to the inner // ReadRowsRequest -> ReadRowsResponse callable). @@ -498,21 +498,22 @@ private ServerStreamingCallable createReadRo .setWaitTimeout(readRowsSettings.getWaitTimeout()) .build(); - ServerStreamingCallable watched = - Callables.watched(merging, innerSettings, clientContext); +// ServerStreamingCallable watched = +// Callables.watched(merging, innerSettings, clientContext); - ServerStreamingCallable withBigtableTracer = - new BigtableTracerStreamingCallable<>(watched); +// ServerStreamingCallable withBigtableTracer = +// new BigtableTracerStreamingCallable<>(watched); // Retry logic is split into 2 parts to workaround a rare edge case described in // ReadRowsRetryCompletedCallable - ServerStreamingCallable retrying1 = - new ReadRowsRetryCompletedCallable<>(withBigtableTracer); +// ServerStreamingCallable retrying1 = +// new ReadRowsRetryCompletedCallable<>(withBigtableTracer); ServerStreamingCallable retrying2 = - withRetries(retrying1, innerSettings); + withRetries(merging, innerSettings); - return new FilterMarkerRowsCallable<>(retrying2, rowAdapter); + return retrying2; +// return new FilterMarkerRowsCallable<>(retrying2, rowAdapter); } /** @@ -1268,18 +1269,22 @@ private ServerStreamingCallable withR ServerStreamingCallable innerCallable, ServerStreamingCallSettings serverStreamingCallSettings) { - ServerStreamingCallable retrying; - if (settings.getEnableRetryInfo()) { - retrying = - com.google.cloud.bigtable.gaxx.retrying.Callables.retrying( - innerCallable, serverStreamingCallSettings, clientContext); - } else { - retrying = Callables.retrying(innerCallable, serverStreamingCallSettings, clientContext); - } - if (settings.getEnableRoutingCookie()) { - return new CookiesServerStreamingCallable<>(retrying); - } - return retrying; +// ServerStreamingCallable retrying; +// if (settings.getEnableRetryInfo()) { +// retrying = +// com.google.cloud.bigtable.gaxx.retrying.Callables.retrying( +// innerCallable, serverStreamingCallSettings, clientContext); +// } else { +// retrying = Callables.retrying(innerCallable, serverStreamingCallSettings, clientContext); +// } +// if (settings.getEnableRoutingCookie()) { +// return new CookiesServerStreamingCallable<>(retrying); +// } +// return retrying; + + Callable2 toNewCallable = new ToNewCallableAdapter(innerCallable); + Callable2 retryCallable = new RetryCallable<>(toNewCallable, serverStreamingCallSettings.getResumptionStrategy(), clientContext.getExecutor(), serverStreamingCallSettings.getRetrySettings()); + return new ToOldCallableAdapter<>(retryCallable); } // diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/Operation.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/Operation.java new file mode 100644 index 0000000000..21b1e31104 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/Operation.java @@ -0,0 +1,214 @@ +package com.google.cloud.bigtable.data.v2.stub; + +import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.retrying.StreamResumptionStrategy; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.StreamingCallSettings; +import io.grpc.Deadline; +import io.grpc.Status; + +import java.time.Duration; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + + +public class Operation { + + private final StreamResumptionStrategy resumptionStrategy; + private StateListener operationState; + private final ResponseObserver2 outerObserver; + private final Callable2 callable; + private final ScheduledExecutorService executor; + private RequestT request; + private ApiCallContext context; + private final RetrySettings settings; + private Deadline deadline; + private final AtomicInteger attempt; + + Operation(Callable2 callable, StreamResumptionStrategy resumptionStrategy, ResponseObserver2 outerObserver, + ScheduledExecutorService executor, RequestT request, ApiCallContext context, RetrySettings settings) { + this.callable = callable; + this.resumptionStrategy = resumptionStrategy; + this.outerObserver = outerObserver; + this.executor = executor; + operationState = new Idle(this, outerObserver); + this.request = request; + this.context = context; + this.settings = settings; + this.attempt = new AtomicInteger(0); + } + + void start() { + outerObserver.onStart(new StreamController2() { + @Override + public void cancel(String reason) { + onCancel(reason); + } + + @Override + public void onReady() { + deadline = Deadline.after(settings.getTotalTimeoutDuration().toMillis(), TimeUnit.MILLISECONDS); + operationState.onReady(); + } + }); + } + + void onCancel(String reason) { + + } + + public void onStateChange(StateListener state) { + this.operationState = state; + } + + public StateListener getState() { + return this.operationState; + } + + public void updateRequest(RequestT updated) { + this.request = updated; + } + + public void updateContext(ApiCallContext updated) { + this.context = updated; + } + + long getRetryDelay() { + long maxRetryDelay = settings.getMaxRetryDelayDuration().toMillis(); + long initialRetryDelay = settings.getInitialRetryDelayDuration().toMillis(); + double multiplier = settings.getRetryDelayMultiplier(); + + long nextDelay = (long) Math.min(initialRetryDelay * Math.pow(multiplier, attempt.get()), maxRetryDelay); + return ThreadLocalRandom.current().nextLong(nextDelay); + } + + long getTimeout() { + long rpcTimeout = settings.getInitialRpcTimeoutDuration().toMillis(); + long timeout = Math.min(rpcTimeout, deadline.timeRemaining(TimeUnit.MILLISECONDS)); + + return timeout; + } + + abstract class StateListener { + + protected Operation operation; + protected ResponseObserver2 outerObserver; + + StateListener(Operation operation, ResponseObserver2 outerObserver) { + this.operation = operation; + this.outerObserver = outerObserver; + } + + abstract public void onReady(); + abstract public void onCancel(); + } + + private class Idle extends StateListener { + + Idle(Operation operation, ResponseObserver2 outerObserver) { + super(operation, outerObserver); + } + + @Override + public void onReady() { + System.out.println("attempt=" + attempt.get() + " timeout=" + context.getTimeoutDuration()); + Active active = new Active(super.operation, super.outerObserver); + callable.call(operation.request, active, operation.context); + operation.onStateChange(active); + } + + @Override + public void onCancel() { + outerObserver.onClose(Status.CANCELLED); + } + } + + private class Active extends StateListener implements ResponseObserver2 { + + StreamController2 grpcController; + boolean userWaitingResponse = false; + + Active(Operation operation, ResponseObserver2 outerObserver) { + super(operation, outerObserver); + } + + @Override + public void onReady() { + userWaitingResponse = true; + grpcController.onReady(); + } + + @Override + public void onCancel() { + grpcController.cancel("user cancelled"); + } + + @Override + public void onStart(StreamController2 streamController) { + this.grpcController = streamController; + onReady(); + } + + @Override + public void onResponse(ResponseT response) { + userWaitingResponse = false; + resumptionStrategy.processResponse(response); + outerObserver.onResponse(response); + } + + @Override + public void onClose(Status status) { + if (!status.isOk()) { + if (status.equals(Status.DEADLINE_EXCEEDED) || status.equals(Status.UNAVAILABLE)) { + attempt.getAndIncrement(); + operation.updateRequest(resumptionStrategy.getResumeRequest(request)); + operation.updateContext(context.withTimeoutDuration(Duration.ofMillis(getTimeout()))); + if (!userWaitingResponse) { + Idle idle = new Idle(super.operation, outerObserver); + onStateChange(idle); + } else { + Scheduled scheduled = new Scheduled(super.operation, outerObserver); + ScheduledFuture future = executor.schedule(scheduled::onReady, 5, TimeUnit.SECONDS); + scheduled.setScheduledFuture(future); + onStateChange(scheduled); + } + } else { + outerObserver.onClose(status); + } + } else { + outerObserver.onClose(status); + } + } + } + + class Scheduled extends StateListener { + + ScheduledFuture scheduledFuture; + Scheduled(Operation operation, ResponseObserver2 outerObserver) { + super(operation, outerObserver); + } + + @Override + public void onReady() { + Active active = new Active(super.operation, super.outerObserver); + System.out.println("attempt=" + attempt.get() + " timeout=" + context.getTimeoutDuration()); + callable.call(request, active, context); + onStateChange(active); + } + + @Override + public void onCancel() { + if (scheduledFuture != null) { + scheduledFuture.cancel(true); + scheduledFuture = null; + } + } + + void setScheduledFuture(ScheduledFuture future) { + this.scheduledFuture = future; + } + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RetryCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RetryCallable.java new file mode 100644 index 0000000000..c159d05c6b --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RetryCallable.java @@ -0,0 +1,29 @@ +package com.google.cloud.bigtable.data.v2.stub; + +import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.retrying.StreamResumptionStrategy; +import com.google.api.gax.rpc.ApiCallContext; + +import java.util.concurrent.ScheduledExecutorService; + +public class RetryCallable implements Callable2 { + + Callable2 callable; + StreamResumptionStrategy resumptionStrategy; + ScheduledExecutorService executor; + RetrySettings settings; + + RetryCallable(Callable2 callable, StreamResumptionStrategy streamResumptionStrategy, ScheduledExecutorService executor, RetrySettings settings) { + this.callable = callable; + this.resumptionStrategy = streamResumptionStrategy; + this.executor = executor; + this.settings = settings; + } + + @Override + public void call(RequestT request, ResponseObserver2 observer, ApiCallContext context) { + System.out.println("retry callable is called"); + Operation operation = new Operation<>(callable, resumptionStrategy, observer, executor, request, context, settings); + operation.start(); + } +} \ No newline at end of file diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/SafeResponseObserver.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/SafeResponseObserver.java index 0133dd3c2b..d1d54219bc 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/SafeResponseObserver.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/SafeResponseObserver.java @@ -98,6 +98,7 @@ public final void onError(Throwable throwable) { @Override public final void onComplete() { if (!isClosed.compareAndSet(false, true)) { + System.out.println("OuterObserver is " + outerObserver.getClass().getName()); logException("Tried to double close the stream"); return; } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/StreamingAttemptCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/StreamingAttemptCallable.java new file mode 100644 index 0000000000..87b9e1d5d9 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/StreamingAttemptCallable.java @@ -0,0 +1,19 @@ +package com.google.cloud.bigtable.data.v2.stub; + +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.ServerStreamingCallable; + +public class StreamingAttemptCallable extends ServerStreamingCallable { + + private ServerStreamingCallable innerCallable; + + public StreamingAttemptCallable(ServerStreamingCallable callable) { + this.innerCallable = callable; + } + + @Override + public void call(RequestT requestT, ResponseObserver responseObserver, ApiCallContext apiCallContext) { + innerCallable.call(requestT, responseObserver, apiCallContext); + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/StreamingAttemptFactory.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/StreamingAttemptFactory.java new file mode 100644 index 0000000000..98c5fbf57c --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/StreamingAttemptFactory.java @@ -0,0 +1,55 @@ +package com.google.cloud.bigtable.data.v2.stub; + +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.retrying.StreamResumptionStrategy; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.StreamController; + +public class StreamingAttemptFactory { + + private RequestT request; + private StreamResumptionStrategy resumptionStrategy; + private ServerStreamingCallable callable; + + StreamingAttemptFactory(ServerStreamingCallable callable, + RequestT request, + StreamResumptionStrategy resumptionStrategy) { + this.request = request; + this.resumptionStrategy = resumptionStrategy; + this.callable = callable; + } + + SettableApiFuture sendAttempt(ResponseObserver observer, ApiCallContext context) { + StreamingAttemptCallable attemptCallable = new StreamingAttemptCallable<>(callable); + + request = resumptionStrategy.getResumeRequest(request); + + SettableApiFuture future = SettableApiFuture.create(); + attemptCallable.call(request, new ResponseObserver() { + @Override + public void onStart(StreamController streamController) { + + } + + @Override + public void onResponse(ResponseT responseT) { + resumptionStrategy.processResponse(responseT); + observer.onResponse(responseT); + } + + @Override + public void onError(Throwable throwable) { + future.setException(throwable); + } + + @Override + public void onComplete() { + future.set(null); + } + }, context); + + return future; + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/StreamingOperation.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/StreamingOperation.java new file mode 100644 index 0000000000..1673c5eada --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/StreamingOperation.java @@ -0,0 +1,111 @@ +package com.google.cloud.bigtable.data.v2.stub; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.retrying.StreamResumptionStrategy; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.ServerStreamingCallSettings; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.StreamController; +import com.google.common.util.concurrent.MoreExecutors; +import io.grpc.Deadline; + +import java.time.Duration; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public class StreamingOperation { + + private final ScheduledExecutorService executor; + private final StreamResumptionStrategy resumptionStrategy; + private final ServerStreamingCallable callable; + private final RetrySettings retrySettings; + private final Deadline deadline; + private final AtomicInteger attemptNumber = new AtomicInteger(0); + private StreamingAttemptFactory attemptFactory; + + StreamingOperation(StreamResumptionStrategy resumptionStrategy, + ScheduledExecutorService executor, + ServerStreamingCallSettings callSettings, + ServerStreamingCallable callable) { + this.resumptionStrategy = resumptionStrategy; + this.executor = executor; + this.retrySettings = callSettings.getRetrySettings(); + this.callable = callable; + this.deadline = Deadline.after( + callSettings.getRetrySettings().getTotalTimeoutDuration().toMillis(), TimeUnit.MILLISECONDS); + + } + + public void startOperation(RequestT request, ResponseObserver observer, ApiCallContext context) { + /** + * 1. create a new attempt + * new request + * new response observer + * 2. attempt succeeds, notify outer observer + * 3. attempt fails, check if it can be retried + */ + this.attemptFactory = new StreamingAttemptFactory(callable, request, resumptionStrategy); + + retry(attemptFactory, observer, context); + } + + public void retry(StreamingAttemptFactory attemptFactory, ResponseObserver observer, ApiCallContext context) { + SettableApiFuture future = attemptFactory.sendAttempt(observer, context); + + ApiFutures.addCallback(future, new ApiFutureCallback() { + @Override + public void onFailure(Throwable throwable) { + if (shouldRetry(throwable)) { + executor.schedule(() -> retry(attemptFactory, observer, context.withTimeoutDuration(Duration.ofMillis(getTimeout()))), getRetryDelay(), TimeUnit.MILLISECONDS); + } else { + observer.onError(throwable); + } + } + + @Override + public void onSuccess(Void unused) { + observer.onComplete(); + } + }, MoreExecutors.directExecutor()); + } + + boolean shouldRetry(Throwable t) { + if (deadline.isExpired()) { + return false; + } + int maxAttempt = retrySettings.getMaxAttempts(); + if (maxAttempt != 0 && attemptNumber.get() >= maxAttempt) { + return false; + } + if (t instanceof ApiException) { + return ((ApiException) t).isRetryable(); + } + return false; + } + + long getRetryDelay() { + long maxRetryDelay = retrySettings.getMaxRetryDelayDuration().toMillis(); + long initialRetryDelay = retrySettings.getInitialRetryDelayDuration().toMillis(); + double multiplier = retrySettings.getRetryDelayMultiplier(); + + long nextDelay = (long) Math.min(initialRetryDelay * Math.pow(multiplier, attemptNumber.get()), maxRetryDelay); + System.out.println("retry attempt " + attemptNumber.get() + " delay " + nextDelay); + return ThreadLocalRandom.current().nextLong(nextDelay); + } + + long getTimeout() { + long rpcTimeout = retrySettings.getInitialRpcTimeoutDuration().toMillis(); + long timeout = Math.min(rpcTimeout, deadline.timeRemaining(TimeUnit.MILLISECONDS)); + + System.out.println("retry attempt " + attemptNumber.get() + " timeout " + timeout); + return timeout; + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/StreamingRetryCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/StreamingRetryCallable.java new file mode 100644 index 0000000000..83052bf90b --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/StreamingRetryCallable.java @@ -0,0 +1,26 @@ +package com.google.cloud.bigtable.data.v2.stub; + +import com.google.api.gax.retrying.StreamResumptionStrategy; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.ServerStreamingCallSettings; +import com.google.api.gax.rpc.ServerStreamingCallable; + +import java.util.concurrent.ScheduledExecutorService; + +public class StreamingRetryCallable extends ServerStreamingCallable { + + private final StreamingOperation operation; + + StreamingRetryCallable(ServerStreamingCallable innerCallable, + ScheduledExecutorService executor, + ServerStreamingCallSettings settings, + StreamResumptionStrategy resumptionStrategy) { + this.operation = new StreamingOperation<>(resumptionStrategy, executor, settings, innerCallable); + } + + @Override + public void call(RequestT request, ResponseObserver responseObserver, ApiCallContext apiCallContext) { + operation.startOperation(request, responseObserver, apiCallContext); + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/TestRetry.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/TestRetry.java new file mode 100644 index 0000000000..ec9ad10213 --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/TestRetry.java @@ -0,0 +1,259 @@ +package com.google.cloud.bigtable.data.v2.stub; + +import com.google.api.core.ApiFuture; +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.api.gax.grpc.GrpcCallSettings; +import com.google.api.gax.grpc.GrpcRawCallableFactory; +import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.retrying.SimpleStreamResumptionStrategy; +import com.google.api.gax.retrying.StreamResumptionStrategy; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.ServerStream; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.StatusCode; +import com.google.api.gax.rpc.StreamController; +import com.google.bigtable.v2.BigtableGrpc; +import com.google.bigtable.v2.MutateRowRequest; +import com.google.bigtable.v2.MutateRowResponse; +import com.google.bigtable.v2.ReadRowsRequest; +import com.google.bigtable.v2.ReadRowsResponse; +import com.google.bigtable.v2.TableName; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; +import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.Row; +import com.google.cloud.bigtable.data.v2.models.RowMutation; +import com.google.cloud.bigtable.data.v2.models.TableId; +import com.google.cloud.bigtable.data.v2.models.TargetId; +import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsResumptionStrategy; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.ByteString; +import com.google.protobuf.BytesValue; +import com.google.protobuf.StringValue; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.time.Duration; +import java.util.concurrent.Executors; + +public class TestRetry { + + EnhancedBigtableStub stub; + Server server; + + @Before + public void setup() throws Exception { + server = ServerBuilder.forPort(1234) + .addService(new FakeService()) + .build(); + + BigtableDataSettings.Builder settings = + BigtableDataSettings + .newBuilderForEmulator(1234) + .setProjectId("project") + .setInstanceId("instance"); + + settings.stubSettings().mutateRowSettings() + .setRetrySettings(RetrySettings.newBuilder() + .setInitialRpcTimeoutDuration(Duration.ofMillis(1000)) + .setMaxRpcTimeoutDuration(Duration.ofMillis(1000)) + .setTotalTimeoutDuration(Duration.ofSeconds(60)) + .setRetryDelayMultiplier(1.5) + .setInitialRetryDelayDuration(Duration.ofSeconds(1)) + .setMaxRetryDelayDuration(Duration.ofSeconds(10)) + .build()); + + settings.stubSettings().readRowsSettings() + .setRetrySettings(RetrySettings.newBuilder() + .setInitialRpcTimeoutDuration(Duration.ofMillis(1000)) + .setMaxRpcTimeoutDuration(Duration.ofMillis(1000)) + .setTotalTimeoutDuration(Duration.ofSeconds(60)) + .setRetryDelayMultiplier(1.5) + .setInitialRetryDelayDuration(Duration.ofSeconds(1)) + .setMaxRetryDelayDuration(Duration.ofSeconds(10)) + .build()); + +// settings.stubSettings().setEnableSkipTrailers(false); + + server.start(); + + stub = EnhancedBigtableStub.create(settings.stubSettings().build()); + } + + + @After + public void tearDown() { + stub.close(); + server.shutdown(); + } + + @Test + public void testUnary() throws Exception { + + ApiFuture future = stub.mutateRowCallable() + .futureCall(RowMutation.create(TableId.of("table"), "row") + .setCell("cf", "q", "c"), null); + + future.get(); + } + + @Test + public void testServerStream() throws Exception { +// ServerStreamingCallable callable = GrpcRawCallableFactory.createServerStreamingCallable( +// GrpcCallSettings.newBuilder() +// .setMethodDescriptor(BigtableGrpc.getReadRowsMethod()) +// .setParamsExtractor(r -> ImmutableMap.of()) +// .build(), +// ImmutableSet.of(StatusCode.Code.DEADLINE_EXCEEDED)); +// +// ToNewCallableAdapter toNew = new ToNewCallableAdapter(callable); +// RetryCallable retry = new RetryCallable<>(toNew, new FakeResumptionStrategy(), Executors.newScheduledThreadPool(1)); +// ToOldCallableAdapter toOld = new ToOldCallableAdapter<>(retry); +// +// +// ReadRowsRequest request = ReadRowsRequest.newBuilder() +// .setTableName(TableName.of("project", "instance", "table").toString()) +// .build(); +// +// Channel channel = ManagedChannelBuilder.forAddress("localhost", 1234).usePlaintext().build(); +// +// ServerStreamingCallable userCallable = toOld.withDefaultCallContext(GrpcCallContext.of(channel, CallOptions.DEFAULT)); +// +// ServerStream responses = userCallable.call(request); +// for (ReadRowsResponse response : responses) { +// System.out.println(response); +// } +// +// userCallable.call(request, new ResponseObserver() { +// private StreamController controller; +// @Override +// public void onStart(StreamController controller) { +// } +// +// @Override +// public void onResponse(ReadRowsResponse response) { +// System.out.println("Received response " + response); +// } +// +// @Override +// public void onError(Throwable t) { +// +// } +// +// @Override +// public void onComplete() { +// System.out.println("complete"); +// } +// }); + +// Thread.sleep(50000); + +// userCallable.all().call(request); + + stub.readRowsCallable().all().call(Query.create("test")); +// ServerStream stream = stub.readRowsCallable().call(Query.create("table")); +// +// for (Row row : stream) { +// System.out.println(row); +// } + } + + class FakeService extends BigtableGrpc.BigtableImplBase { + int count = 0; + + @Override + public void readRows(ReadRowsRequest request, StreamObserver responseObserver) { + count++; + if (count == 1) { + System.out.println("attempt 1: " + request); + responseObserver.onNext( + ReadRowsResponse.newBuilder() + .addChunks( + ReadRowsResponse.CellChunk.newBuilder() + .setRowKey(ByteString.copyFromUtf8("key1")) + .setFamilyName(StringValue.newBuilder().setValue("cf")) + .setQualifier(BytesValue.newBuilder().setValue(ByteString.copyFromUtf8("q"))) + .setTimestampMicros(0) + .setValue(ByteString.copyFromUtf8("value")) + .setCommitRow(true)) + .build()); + responseObserver.onError(new StatusRuntimeException(Status.DEADLINE_EXCEEDED)); + } else if (count == 2) { + System.out.println("attempt 2: " + request); + responseObserver.onNext(ReadRowsResponse.newBuilder() + .addChunks( + ReadRowsResponse.CellChunk.newBuilder() + .setRowKey(ByteString.copyFromUtf8("key2")) + .setFamilyName(StringValue.newBuilder().setValue("cf")) + .setQualifier(BytesValue.newBuilder().setValue(ByteString.copyFromUtf8("q"))) + .setTimestampMicros(0) + .setValue(ByteString.copyFromUtf8("value")) + .setCommitRow(true)) + .build()); + responseObserver.onNext(ReadRowsResponse.newBuilder() + .addChunks( + ReadRowsResponse.CellChunk.newBuilder() + .setRowKey(ByteString.copyFromUtf8("key3")) + .setFamilyName(StringValue.newBuilder().setValue("cf")) + .setQualifier(BytesValue.newBuilder().setValue(ByteString.copyFromUtf8("q"))) + .setTimestampMicros(0) + .setValue(ByteString.copyFromUtf8("value")) + .setCommitRow(true)) + .build()); + responseObserver.onError(new StatusRuntimeException(Status.DEADLINE_EXCEEDED)); + } else { + System.out.println("attempt 3: " + request); + responseObserver.onCompleted(); + } + } + + @Override + public void mutateRow(MutateRowRequest request, StreamObserver responseObserver) { + System.out.println("server got mutateRow attempt " + count); + if (count ++ < 2) { + responseObserver.onError(new StatusRuntimeException(Status.UNAVAILABLE)); + } else { + responseObserver.onNext(MutateRowResponse.newBuilder().build()); + responseObserver.onCompleted(); + } + } + } + + class FakeResumptionStrategy implements StreamResumptionStrategy { + @Nonnull + @Override + public StreamResumptionStrategy createNew() { + return null; + } + + @Nonnull + @Override + public Object processResponse(Object response) { + return null; + } + + @Nullable + @Override + public Object getResumeRequest(Object originalRequest) { + return originalRequest; + } + + @Override + public boolean canResume() { + return true; + } + } +} From bf2ba0b8338e92fe86c8ee938abdf226889be6e8 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Thu, 2 Jan 2025 14:30:26 -0500 Subject: [PATCH 2/3] retry-write not thread safe --- .../bigtable/data/v2/stub/Callable2.java | 6 +- .../bigtable/data/v2/stub/Operation.java | 75 +++++++++---------- .../bigtable/data/v2/stub/TestRetry.java | 17 +++-- 3 files changed, 50 insertions(+), 48 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/Callable2.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/Callable2.java index e560f05bba..a535d30f59 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/Callable2.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/Callable2.java @@ -89,7 +89,7 @@ class ToOldObserverAdapter implements ResponseObserver2 { private ResponseObserver userOberver; private StreamController2 grpcController; private AtomicInteger userRequested = new AtomicInteger(0); - private boolean autoFlowControl = true; + private volatile boolean autoFlowControl = true; ToOldObserverAdapter(ResponseObserver observer) { this.userOberver = observer; @@ -130,9 +130,7 @@ public void onResponse(ResponseT response) { userOberver.onResponse(response); if (userRequested.decrementAndGet() > 0) { grpcController.onReady(); - } - - if (autoFlowControl) { + } else if (autoFlowControl) { grpcController.onReady(); } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/Operation.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/Operation.java index 21b1e31104..2745694511 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/Operation.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/Operation.java @@ -3,18 +3,18 @@ import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.retrying.StreamResumptionStrategy; import com.google.api.gax.rpc.ApiCallContext; -import com.google.api.gax.rpc.StreamingCallSettings; +import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracer; import io.grpc.Deadline; import io.grpc.Status; import java.time.Duration; +import java.util.LinkedList; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; - public class Operation { private final StreamResumptionStrategy resumptionStrategy; @@ -30,14 +30,16 @@ public class Operation { Operation(Callable2 callable, StreamResumptionStrategy resumptionStrategy, ResponseObserver2 outerObserver, ScheduledExecutorService executor, RequestT request, ApiCallContext context, RetrySettings settings) { + // TODO future debuggability + // TODO for example: ring buffer to buffer the last few steps this.callable = callable; this.resumptionStrategy = resumptionStrategy; this.outerObserver = outerObserver; this.executor = executor; - operationState = new Idle(this, outerObserver); + this.operationState = new Idle(this, outerObserver); this.request = request; - this.context = context; this.settings = settings; + this.context = context; this.attempt = new AtomicInteger(0); } @@ -51,31 +53,23 @@ public void cancel(String reason) { @Override public void onReady() { deadline = Deadline.after(settings.getTotalTimeoutDuration().toMillis(), TimeUnit.MILLISECONDS); + if (context.getTracer() instanceof BigtableTracer) { + ((BigtableTracer) context.getTracer()).attemptStarted(attempt.get()); + } + operationState.onReady(); } }); } void onCancel(String reason) { - + operationState.onCancel(reason); } public void onStateChange(StateListener state) { this.operationState = state; } - public StateListener getState() { - return this.operationState; - } - - public void updateRequest(RequestT updated) { - this.request = updated; - } - - public void updateContext(ApiCallContext updated) { - this.context = updated; - } - long getRetryDelay() { long maxRetryDelay = settings.getMaxRetryDelayDuration().toMillis(); long initialRetryDelay = settings.getInitialRetryDelayDuration().toMillis(); @@ -87,9 +81,8 @@ long getRetryDelay() { long getTimeout() { long rpcTimeout = settings.getInitialRpcTimeoutDuration().toMillis(); - long timeout = Math.min(rpcTimeout, deadline.timeRemaining(TimeUnit.MILLISECONDS)); - return timeout; + return Math.min(rpcTimeout, deadline.timeRemaining(TimeUnit.MILLISECONDS)); } abstract class StateListener { @@ -103,7 +96,7 @@ abstract class StateListener { } abstract public void onReady(); - abstract public void onCancel(); + abstract public void onCancel(String reason); } private class Idle extends StateListener { @@ -114,15 +107,15 @@ private class Idle extends StateListener { @Override public void onReady() { - System.out.println("attempt=" + attempt.get() + " timeout=" + context.getTimeoutDuration()); Active active = new Active(super.operation, super.outerObserver); - callable.call(operation.request, active, operation.context); + callable.call(operation.request, active, operation.context.withTimeoutDuration(Duration.ofMillis(getTimeout()))); + System.out.println("attempt=" + attempt.get() + " timeout=" + context.getTimeoutDuration()); operation.onStateChange(active); } @Override - public void onCancel() { - outerObserver.onClose(Status.CANCELLED); + public void onCancel(String reason) { + outerObserver.onClose(Status.CANCELLED.withDescription(reason)); } } @@ -142,8 +135,8 @@ public void onReady() { } @Override - public void onCancel() { - grpcController.cancel("user cancelled"); + public void onCancel(String reason) { + grpcController.cancel(reason); } @Override @@ -162,28 +155,34 @@ public void onResponse(ResponseT response) { @Override public void onClose(Status status) { if (!status.isOk()) { + // TODO placeholder, need to check for retryable code and error details if (status.equals(Status.DEADLINE_EXCEEDED) || status.equals(Status.UNAVAILABLE)) { attempt.getAndIncrement(); - operation.updateRequest(resumptionStrategy.getResumeRequest(request)); - operation.updateContext(context.withTimeoutDuration(Duration.ofMillis(getTimeout()))); + request = resumptionStrategy.getResumeRequest(request); if (!userWaitingResponse) { + // TODO wait retry delay ? Idle idle = new Idle(super.operation, outerObserver); onStateChange(idle); + return; } else { - Scheduled scheduled = new Scheduled(super.operation, outerObserver); - ScheduledFuture future = executor.schedule(scheduled::onReady, 5, TimeUnit.SECONDS); - scheduled.setScheduledFuture(future); - onStateChange(scheduled); + long retryDelay = getRetryDelay(); + if (deadline.timeRemaining(TimeUnit.MILLISECONDS) - retryDelay > 1) { + Scheduled scheduled = new Scheduled(super.operation, outerObserver); + ScheduledFuture future = executor.schedule(scheduled::onReady, retryDelay, TimeUnit.MILLISECONDS); + scheduled.setScheduledFuture(future); + onStateChange(scheduled); + return; + } } - } else { - outerObserver.onClose(status); } - } else { - outerObserver.onClose(status); } + outerObserver.onClose(status); } + + // TODO state transition } + class Scheduled extends StateListener { ScheduledFuture scheduledFuture; @@ -194,13 +193,13 @@ class Scheduled extends StateListener { @Override public void onReady() { Active active = new Active(super.operation, super.outerObserver); + callable.call(request, active, context.withTimeoutDuration(Duration.ofMillis(getTimeout()))); System.out.println("attempt=" + attempt.get() + " timeout=" + context.getTimeoutDuration()); - callable.call(request, active, context); onStateChange(active); } @Override - public void onCancel() { + public void onCancel(String reason) { if (scheduledFuture != null) { scheduledFuture.cancel(true); scheduledFuture = null; diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/TestRetry.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/TestRetry.java index ec9ad10213..1b88b9eb40 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/TestRetry.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/TestRetry.java @@ -163,12 +163,17 @@ public void testServerStream() throws Exception { // userCallable.all().call(request); - stub.readRowsCallable().all().call(Query.create("test")); -// ServerStream stream = stub.readRowsCallable().call(Query.create("table")); -// -// for (Row row : stream) { -// System.out.println(row); -// } +// stub.readRowsCallable().all().call(Query.create("test")); + ServerStream stream = stub.readRowsCallable().call(Query.create("table")); + + int count = 0; + for (Row row : stream) { + System.out.println(row); + count++; + if (count == 2) { + stream.cancel(); + } + } } class FakeService extends BigtableGrpc.BigtableImplBase { From 7b32edadf65d2bffa9f34593dea165e8f4dbc23e Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Thu, 2 Jan 2025 15:59:46 -0500 Subject: [PATCH 3/3] thread safe --- .../data/v2/stub/EnhancedBigtableStub.java | 27 +++++----- .../bigtable/data/v2/stub/Operation.java | 54 +++++++++++-------- .../bigtable/data/v2/stub/TestRetry.java | 11 +--- 3 files changed, 47 insertions(+), 45 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 78aa012ea4..0e0e7ba175 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -475,17 +475,17 @@ private ServerStreamingCallable createReadRo .build(), readRowsSettings.getRetryableCodes()); -// ServerStreamingCallable withStatsHeaders = -// new StatsHeadersServerStreamingCallable<>(base); + ServerStreamingCallable withStatsHeaders = + new StatsHeadersServerStreamingCallable<>(base); // Sometimes ReadRows connections are disconnected via an RST frame. This error is transient and // should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code // which by default is not retryable. Convert the exception so it can be retried in the client. -// ServerStreamingCallable convertException = -// new ConvertExceptionCallable<>(withStatsHeaders); + ServerStreamingCallable convertException = + new ConvertExceptionCallable<>(withStatsHeaders); ServerStreamingCallable merging = - new RowMergingCallable<>(base, rowAdapter); + new RowMergingCallable<>(convertException, rowAdapter); // Copy settings for the middle ReadRowsRequest -> RowT callable (as opposed to the inner // ReadRowsRequest -> ReadRowsResponse callable). @@ -498,22 +498,21 @@ private ServerStreamingCallable createReadRo .setWaitTimeout(readRowsSettings.getWaitTimeout()) .build(); -// ServerStreamingCallable watched = -// Callables.watched(merging, innerSettings, clientContext); + ServerStreamingCallable watched = + Callables.watched(merging, innerSettings, clientContext); -// ServerStreamingCallable withBigtableTracer = -// new BigtableTracerStreamingCallable<>(watched); + ServerStreamingCallable withBigtableTracer = + new BigtableTracerStreamingCallable<>(watched); // Retry logic is split into 2 parts to workaround a rare edge case described in // ReadRowsRetryCompletedCallable -// ServerStreamingCallable retrying1 = -// new ReadRowsRetryCompletedCallable<>(withBigtableTracer); + ServerStreamingCallable retrying1 = + new ReadRowsRetryCompletedCallable<>(withBigtableTracer); ServerStreamingCallable retrying2 = - withRetries(merging, innerSettings); + withRetries(retrying1, innerSettings); - return retrying2; -// return new FilterMarkerRowsCallable<>(retrying2, rowAdapter); + return new FilterMarkerRowsCallable<>(retrying2, rowAdapter); } /** diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/Operation.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/Operation.java index 2745694511..19175ac3a4 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/Operation.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/Operation.java @@ -7,8 +7,8 @@ import io.grpc.Deadline; import io.grpc.Status; +import javax.annotation.concurrent.GuardedBy; import java.time.Duration; -import java.util.LinkedList; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadLocalRandom; @@ -18,7 +18,6 @@ public class Operation { private final StreamResumptionStrategy resumptionStrategy; - private StateListener operationState; private final ResponseObserver2 outerObserver; private final Callable2 callable; private final ScheduledExecutorService executor; @@ -27,11 +26,14 @@ public class Operation { private final RetrySettings settings; private Deadline deadline; private final AtomicInteger attempt; + private final Object lock = new Object(); + + @GuardedBy("lock") + private StateListener operationState; Operation(Callable2 callable, StreamResumptionStrategy resumptionStrategy, ResponseObserver2 outerObserver, ScheduledExecutorService executor, RequestT request, ApiCallContext context, RetrySettings settings) { - // TODO future debuggability - // TODO for example: ring buffer to buffer the last few steps + // TODO future debuggability: for example: ring buffer to buffer the last few states this.callable = callable; this.resumptionStrategy = resumptionStrategy; this.outerObserver = outerObserver; @@ -57,17 +59,23 @@ public void onReady() { ((BigtableTracer) context.getTracer()).attemptStarted(attempt.get()); } - operationState.onReady(); + synchronized (lock) { + operationState.onReady(); + } } }); } void onCancel(String reason) { - operationState.onCancel(reason); + synchronized (lock) { + operationState.onCancel(reason); + } } public void onStateChange(StateListener state) { - this.operationState = state; + synchronized (lock) { + this.operationState = state; + } } long getRetryDelay() { @@ -108,9 +116,10 @@ private class Idle extends StateListener { @Override public void onReady() { Active active = new Active(super.operation, super.outerObserver); - callable.call(operation.request, active, operation.context.withTimeoutDuration(Duration.ofMillis(getTimeout()))); - System.out.println("attempt=" + attempt.get() + " timeout=" + context.getTimeoutDuration()); - operation.onStateChange(active); + synchronized (lock) { + callable.call(operation.request, active, operation.context.withTimeoutDuration(Duration.ofMillis(getTimeout()))); + operation.onStateChange(active); + } } @Override @@ -161,25 +170,27 @@ public void onClose(Status status) { request = resumptionStrategy.getResumeRequest(request); if (!userWaitingResponse) { // TODO wait retry delay ? - Idle idle = new Idle(super.operation, outerObserver); - onStateChange(idle); + synchronized (lock) { + Idle idle = new Idle(super.operation, outerObserver); + onStateChange(idle); + } return; } else { long retryDelay = getRetryDelay(); if (deadline.timeRemaining(TimeUnit.MILLISECONDS) - retryDelay > 1) { Scheduled scheduled = new Scheduled(super.operation, outerObserver); - ScheduledFuture future = executor.schedule(scheduled::onReady, retryDelay, TimeUnit.MILLISECONDS); - scheduled.setScheduledFuture(future); - onStateChange(scheduled); + synchronized (lock) { + ScheduledFuture future = executor.schedule(scheduled::onReady, retryDelay, TimeUnit.MILLISECONDS); + scheduled.setScheduledFuture(future); + onStateChange(scheduled); + } return; } } - } + } } outerObserver.onClose(status); } - - // TODO state transition } @@ -193,9 +204,10 @@ class Scheduled extends StateListener { @Override public void onReady() { Active active = new Active(super.operation, super.outerObserver); - callable.call(request, active, context.withTimeoutDuration(Duration.ofMillis(getTimeout()))); - System.out.println("attempt=" + attempt.get() + " timeout=" + context.getTimeoutDuration()); - onStateChange(active); + synchronized (lock) { + callable.call(request, active, context.withTimeoutDuration(Duration.ofMillis(getTimeout()))); + onStateChange(active); + } } @Override diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/TestRetry.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/TestRetry.java index 1b88b9eb40..db0c391b9b 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/TestRetry.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/TestRetry.java @@ -163,17 +163,8 @@ public void testServerStream() throws Exception { // userCallable.all().call(request); -// stub.readRowsCallable().all().call(Query.create("test")); - ServerStream stream = stub.readRowsCallable().call(Query.create("table")); + stub.readRowsCallable().all().call(Query.create("test")); - int count = 0; - for (Row row : stream) { - System.out.println(row); - count++; - if (count == 2) { - stream.cancel(); - } - } } class FakeService extends BigtableGrpc.BigtableImplBase {