Skip to content

Retry rewrite #2457

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ public void onError(Throwable throwable) {

@Override
public void onComplete() {
System.out.println("onComplete called on unary operation callable");
if (allowNoResponse && set(null)) {
tracer.operationSucceeded();
return;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.StreamController;
import io.grpc.Status;

import java.util.concurrent.atomic.AtomicInteger;

interface Callable2<RequestT, ResponseT> {

void call(RequestT request, ResponseObserver2<ResponseT> observer, ApiCallContext context);

}

class ToOldCallableAdapter<RequestT, ResponseT> extends ServerStreamingCallable<RequestT, ResponseT> {

Callable2<RequestT, ResponseT> callable;

ToOldCallableAdapter(Callable2<RequestT, ResponseT> callable) {
this.callable = callable;
}

@Override
public void call(RequestT requestT, ResponseObserver<ResponseT> responseObserver, ApiCallContext apiCallContext) {
callable.call(requestT, new ToOldObserverAdapter<>(responseObserver), apiCallContext);
}
}

class ToNewCallableAdapter<RequestT, ResponseT> implements Callable2<RequestT, ResponseT> {

ServerStreamingCallable<RequestT, ResponseT> callable;

ToNewCallableAdapter(ServerStreamingCallable<RequestT, ResponseT> callable) {
this.callable = callable;
}

@Override
public void call(RequestT request, ResponseObserver2<ResponseT> observer, ApiCallContext context) {
callable.call(request, new ToNewObserverAdapter<>(observer), context);
}
}

class ToNewObserverAdapter<ResponseT> implements ResponseObserver<ResponseT> {

private ResponseObserver2<ResponseT> userObserver;
private StreamController grpcController;

ToNewObserverAdapter(ResponseObserver2 observer) {
this.userObserver = observer;
}

@Override
public void onStart(StreamController streamController) {
grpcController = streamController;
grpcController.disableAutoInboundFlowControl();
userObserver.onStart(new StreamController2() {
@Override
public void cancel(String reason) {
grpcController.cancel();
}

@Override
public void onReady() {
grpcController.request(1);
}
});
}

@Override
public void onResponse(ResponseT responseT) {
userObserver.onResponse(responseT);
}

@Override
public void onError(Throwable throwable) {
userObserver.onClose(Status.fromThrowable(throwable));
}

@Override
public void onComplete() {
userObserver.onClose(Status.OK);
}
}

class ToOldObserverAdapter<ResponseT> implements ResponseObserver2<ResponseT> {

private ResponseObserver<ResponseT> userOberver;
private StreamController2 grpcController;
private AtomicInteger userRequested = new AtomicInteger(0);
private volatile boolean autoFlowControl = true;

ToOldObserverAdapter(ResponseObserver<ResponseT> observer) {
this.userOberver = observer;
}

@Override
public void onStart(StreamController2 streamController2) {
grpcController = streamController2;
userOberver.onStart(new StreamController() {
@Override
public void cancel() {
grpcController.cancel("user cancelled stream");
}

@Override
public void disableAutoInboundFlowControl() {
System.out.println("already disabled");
autoFlowControl = false;
}

@Override
public void request(int i) {
int oldN = userRequested.getAndAdd(i);
if (oldN == 0) {
grpcController.onReady();
}

}
});

if (autoFlowControl) {
grpcController.onReady();
}
}

@Override
public void onResponse(ResponseT response) {
userOberver.onResponse(response);
if (userRequested.decrementAndGet() > 0) {
grpcController.onReady();
} else if (autoFlowControl) {
grpcController.onReady();
}
}

@Override
public void onClose(Status status) {
if (status.isOk()) {
userOberver.onComplete();
} else {
userOberver.onError(status.asException());
}
}
}

interface ResponseObserver2<ResponseT> {
void onStart(StreamController2 streamController2);
void onResponse(ResponseT response);
void onClose(Status status);
}

interface StreamController2 {
void cancel(String reason);
void onReady();
}




Original file line number Diff line number Diff line change
Expand Up @@ -1268,18 +1268,22 @@ private <RequestT, ResponseT> ServerStreamingCallable<RequestT, ResponseT> withR
ServerStreamingCallable<RequestT, ResponseT> innerCallable,
ServerStreamingCallSettings<RequestT, ResponseT> serverStreamingCallSettings) {

ServerStreamingCallable<RequestT, ResponseT> retrying;
if (settings.getEnableRetryInfo()) {
retrying =
com.google.cloud.bigtable.gaxx.retrying.Callables.retrying(
innerCallable, serverStreamingCallSettings, clientContext);
} else {
retrying = Callables.retrying(innerCallable, serverStreamingCallSettings, clientContext);
}
if (settings.getEnableRoutingCookie()) {
return new CookiesServerStreamingCallable<>(retrying);
}
return retrying;
// ServerStreamingCallable<RequestT, ResponseT> retrying;
// if (settings.getEnableRetryInfo()) {
// retrying =
// com.google.cloud.bigtable.gaxx.retrying.Callables.retrying(
// innerCallable, serverStreamingCallSettings, clientContext);
// } else {
// retrying = Callables.retrying(innerCallable, serverStreamingCallSettings, clientContext);
// }
// if (settings.getEnableRoutingCookie()) {
// return new CookiesServerStreamingCallable<>(retrying);
// }
// return retrying;

Callable2<RequestT, ResponseT> toNewCallable = new ToNewCallableAdapter(innerCallable);
Callable2<RequestT, ResponseT> retryCallable = new RetryCallable<>(toNewCallable, serverStreamingCallSettings.getResumptionStrategy(), clientContext.getExecutor(), serverStreamingCallSettings.getRetrySettings());
return new ToOldCallableAdapter<>(retryCallable);
}

// </editor-fold>
Expand Down
Loading
Loading