Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -95,6 +96,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements

private final SubscriberStub subscriberStub;
private final int channelAffinity;
private final long protocolVersion;
private final String subscription;
private final SubscriptionName subscriptionNameObject;
private final ScheduledExecutorService systemExecutor;
Expand Down Expand Up @@ -127,6 +129,17 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(null, false);
private final SubscriberShutdownSettings subscriberShutdownSettings;

private final boolean enableKeepalive;
private static final long KEEP_ALIVE_SUPPORT_VERSION = 1;
private static final Duration CLIENT_PING_INTERVAL = Duration.ofSeconds(30);
private ScheduledFuture<?> pingSchedulerHandle;

private static final Duration SERVER_MONITOR_INTERVAL = Duration.ofSeconds(10);
private static final Duration SERVER_PING_TIMEOUT_DURATION = Duration.ofSeconds(15);
private final AtomicLong lastServerResponseTime;
private final AtomicLong lastClientPingTime;
private ScheduledFuture<?> serverMonitorHandle;

private StreamingSubscriberConnection(Builder builder) {
subscription = builder.subscription;
subscriptionNameObject = SubscriptionName.parse(builder.subscription);
Expand Down Expand Up @@ -154,6 +167,7 @@ private StreamingSubscriberConnection(Builder builder) {

subscriberStub = builder.subscriberStub;
channelAffinity = builder.channelAffinity;
protocolVersion = builder.protocolVersion;

MessageDispatcher.Builder messageDispatcherBuilder;
if (builder.receiver != null) {
Expand Down Expand Up @@ -190,6 +204,9 @@ private StreamingSubscriberConnection(Builder builder) {

flowControlSettings = builder.flowControlSettings;
useLegacyFlowControl = builder.useLegacyFlowControl;
enableKeepalive = protocolVersion >= KEEP_ALIVE_SUPPORT_VERSION;
lastServerResponseTime = new AtomicLong(clock.nanoTime());
lastClientPingTime = new AtomicLong(-1L);
}

public StreamingSubscriberConnection setExactlyOnceDeliveryEnabled(
Expand Down Expand Up @@ -218,6 +235,12 @@ protected void doStop() {
} finally {
lock.unlock();
}

if (enableKeepalive) {
stopClientPinger();
stopServerMonitor();
}

runShutdown();
notifyStopped();
}
Expand Down Expand Up @@ -266,6 +289,10 @@ public void onStart(StreamController controller) {

@Override
public void onResponse(StreamingPullResponse response) {
if (enableKeepalive) {
lastServerResponseTime.set(clock.nanoTime());
}

channelReconnectBackoffMillis.set(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());

boolean exactlyOnceDeliveryEnabledResponse =
Expand Down Expand Up @@ -295,11 +322,19 @@ public void onResponse(StreamingPullResponse response) {

@Override
public void onError(Throwable t) {
if (enableKeepalive) {
stopClientPinger();
stopServerMonitor();
}
errorFuture.setException(t);
}

@Override
public void onComplete() {
if (enableKeepalive) {
stopClientPinger();
stopServerMonitor();
}
logger.fine("Streaming pull terminated successfully!");
errorFuture.set(null);
}
Expand Down Expand Up @@ -336,6 +371,7 @@ private void initialize() {
this.useLegacyFlowControl
? 0
: valueOrZero(flowControlSettings.getMaxOutstandingRequestBytes()))
.setProtocolVersion(protocolVersion)
.build());

/**
Expand All @@ -350,6 +386,13 @@ private void initialize() {
lock.unlock();
}

if (enableKeepalive) {
lastServerResponseTime.set(clock.nanoTime());
lastClientPingTime.set(-1L);
startClientPinger();
startServerMonitor();
}

ApiFutures.addCallback(
errorFuture,
new ApiFutureCallback<Void>() {
Expand All @@ -366,6 +409,10 @@ public void onSuccess(@Nullable Void result) {

@Override
public void onFailure(Throwable cause) {
if (enableKeepalive) {
stopClientPinger();
stopServerMonitor();
}
if (!isAlive()) {
// we don't care about subscription failures when we're no longer running.
logger.log(Level.FINE, "pull failure after service no longer running", cause);
Expand Down Expand Up @@ -410,6 +457,100 @@ private boolean isAlive() {
return state == State.RUNNING || state == State.STARTING;
}

private void startClientPinger() {
if (pingSchedulerHandle != null) {
pingSchedulerHandle.cancel(false);
}

pingSchedulerHandle =
systemExecutor.scheduleAtFixedRate(
() -> {
try {
lock.lock();
try {
if (clientStream != null && isAlive()) {
clientStream.send(StreamingPullRequest.newBuilder().build());
lastClientPingTime.set(clock.nanoTime());
logger.log(Level.FINEST, "Sent client keepalive ping");
}
} finally {
lock.unlock();
}
} catch (Exception e) {
logger.log(Level.FINE, "Error sending client keepalive ping", e);
}
},
0,
CLIENT_PING_INTERVAL.getSeconds(),
TimeUnit.SECONDS);
}

private void stopClientPinger() {
if (pingSchedulerHandle != null) {
pingSchedulerHandle.cancel(false);
pingSchedulerHandle = null;
}
}

private void startServerMonitor() {
if (serverMonitorHandle != null) {
serverMonitorHandle.cancel(false);
}

serverMonitorHandle =
systemExecutor.scheduleAtFixedRate(
() -> {
try {
if (!isAlive()) {
return;
}

long now = clock.nanoTime();
long lastResponse = lastServerResponseTime.get();
long lastPing = lastClientPingTime.get();

if (lastPing <= lastResponse) {
return;
}

Duration elapsedSincePing = Duration.ofNanos(now - lastPing);
if (elapsedSincePing.compareTo(SERVER_PING_TIMEOUT_DURATION) < 0) {
return;
}

logger.log(
Level.WARNING,
"No response from server for {0} seconds since last ping. Closing stream.",
elapsedSincePing.getSeconds());

lock.lock();
try {
if (clientStream != null) {
clientStream.closeSendWithError(
Status.UNAVAILABLE
.withDescription("Keepalive timeout with server")
.asException());
}
} finally {
lock.unlock();
}
stopServerMonitor();
} catch (Exception e) {
logger.log(Level.FINE, "Error in server keepalive monitor", e);
}
},
SERVER_MONITOR_INTERVAL.getSeconds(),
SERVER_MONITOR_INTERVAL.getSeconds(),
TimeUnit.SECONDS);
}

private void stopServerMonitor() {
if (serverMonitorHandle != null) {
serverMonitorHandle.cancel(false);
serverMonitorHandle = null;
}
}

public void setResponseOutstandingMessages(AckResponse ackResponse) {
// We will close the futures with ackResponse - if there are multiple references to the same
// future they will be handled appropriately
Expand Down Expand Up @@ -769,6 +910,7 @@ public static final class Builder {
private Distribution ackLatencyDistribution;
private SubscriberStub subscriberStub;
private int channelAffinity;
private long protocolVersion;
private FlowController flowController;
private FlowControlSettings flowControlSettings;
private boolean useLegacyFlowControl;
Expand Down Expand Up @@ -840,6 +982,11 @@ public Builder setChannelAffinity(int channelAffinity) {
return this;
}

public Builder setProtocolVersion(long protocolVersion) {
this.protocolVersion = protocolVersion;
return this;
}

public Builder setFlowController(FlowController flowController) {
this.flowController = flowController;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac
private final boolean maxDurationPerAckExtensionDefaultUsed;
private final java.time.Duration minDurationPerAckExtension;
private final boolean minDurationPerAckExtensionDefaultUsed;
private final long protocolVersion;

// The ExecutorProvider used to generate executors for processing messages.
private final ExecutorProvider executorProvider;
Expand Down Expand Up @@ -182,6 +183,7 @@ private Subscriber(Builder builder) {
maxDurationPerAckExtensionDefaultUsed = builder.maxDurationPerAckExtensionDefaultUsed;
minDurationPerAckExtension = builder.minDurationPerAckExtension;
minDurationPerAckExtensionDefaultUsed = builder.minDurationPerAckExtensionDefaultUsed;
protocolVersion = builder.protocolVersion;

clock = builder.clock.isPresent() ? builder.clock.get() : CurrentMillisClock.getDefaultClock();

Expand Down Expand Up @@ -428,6 +430,7 @@ private void startStreamingConnections() {
.setEnableOpenTelemetryTracing(enableOpenTelemetryTracing)
.setTracer(tracer)
.setSubscriberShutdownSettings(subscriberShutdownSettings)
.setProtocolVersion(protocolVersion)
.build();

streamingSubscriberConnections.add(streamingSubscriberConnection);
Expand Down Expand Up @@ -548,6 +551,8 @@ public static final class Builder {
private boolean enableOpenTelemetryTracing = false;
private OpenTelemetry openTelemetry = null;

private long protocolVersion = 0L;

private SubscriberShutdownSettings subscriberShutdownSettings =
SubscriberShutdownSettings.newBuilder().build();

Expand Down Expand Up @@ -771,6 +776,12 @@ Builder setClock(ApiClock clock) {
return this;
}

/** Gives the ability to override the protocol version */
public Builder setProtocolVersion(long protocolVersion) {
this.protocolVersion = protocolVersion;
return this;
}

/**
* OpenTelemetry will be enabled if setEnableOpenTelemetry is true and and instance of
* OpenTelemetry has been provied. Warning: traces are subject to change. The name and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ public FakeClock getClock() {
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return schedulePendingCallable(
new PendingCallable<>(
Duration.ofMillis(unit.toMillis(delay)), command, PendingCallableType.NORMAL));
Duration.ofMillis(unit.toMillis(delay)), command, null, PendingCallableType.NORMAL));
}

@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return schedulePendingCallable(
new PendingCallable<>(
Duration.ofMillis(unit.toMillis(delay)), callable, PendingCallableType.NORMAL));
Duration.ofMillis(unit.toMillis(delay)), callable, null, PendingCallableType.NORMAL));
}

@Override
Expand All @@ -72,6 +72,7 @@ public ScheduledFuture<?> scheduleAtFixedRate(
new PendingCallable<>(
Duration.ofMillis(unit.toMillis(initialDelay)),
command,
Duration.ofMillis(unit.toMillis(period)),
PendingCallableType.FIXED_RATE));
}

Expand All @@ -82,6 +83,7 @@ public ScheduledFuture<?> scheduleWithFixedDelay(
new PendingCallable<>(
Duration.ofMillis(unit.toMillis(initialDelay)),
command,
Duration.ofMillis(unit.toMillis(delay)),
PendingCallableType.FIXED_DELAY));
}

Expand Down Expand Up @@ -212,13 +214,15 @@ enum PendingCallableType {
class PendingCallable<T> implements Comparable<PendingCallable<T>> {
Instant creationTime = Instant.ofEpochMilli(clock.millisTime());
Duration delay;
Duration period;
Callable<T> pendingCallable;
SettableFuture<T> future = SettableFuture.create();
AtomicBoolean cancelled = new AtomicBoolean(false);
AtomicBoolean done = new AtomicBoolean(false);
PendingCallableType type;

PendingCallable(Duration delay, final Runnable runnable, PendingCallableType type) {
PendingCallable(
Duration delay, final Runnable runnable, Duration period, PendingCallableType type) {
pendingCallable =
new Callable<T>() {
@Override
Expand All @@ -229,12 +233,15 @@ public T call() {
};
this.type = type;
this.delay = delay;
this.period = period;
}

PendingCallable(Duration delay, Callable<T> callable, PendingCallableType type) {
PendingCallable(
Duration delay, Callable<T> callable, Duration period, PendingCallableType type) {
pendingCallable = callable;
this.type = type;
this.delay = delay;
this.period = period;
}

private Instant getScheduledTime() {
Expand Down Expand Up @@ -305,10 +312,12 @@ T call() {
break;
case FIXED_DELAY:
this.creationTime = Instant.ofEpochMilli(clock.millisTime());
this.delay = period;
schedulePendingCallable(this);
break;
case FIXED_RATE:
this.creationTime = this.creationTime.plus(delay);
this.delay = period;
schedulePendingCallable(this);
break;
default:
Expand Down
Loading