Skip to content

Commit ced1afb

Browse files
authored
[Dataflow Streaming] Refactor ResettableThrowingStreamObserver to inject instead of use factory (#34940)
1 parent ed1ef55 commit ced1afb

File tree

3 files changed

+32
-44
lines changed

3 files changed

+32
-44
lines changed

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@
2828
import java.util.concurrent.TimeUnit;
2929
import java.util.concurrent.atomic.AtomicBoolean;
3030
import java.util.function.Function;
31+
import java.util.function.Supplier;
3132
import javax.annotation.concurrent.GuardedBy;
3233
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverCancelledException;
3334
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory;
35+
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.TerminatingStreamObserver;
3436
import org.apache.beam.sdk.util.BackOff;
3537
import org.apache.beam.vendor.grpc.v1p69p0.com.google.api.client.util.Sleeper;
3638
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Status;
@@ -83,6 +85,8 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
8385
private final int logEveryNStreamFailures;
8486
private final String backendWorkerToken;
8587
private final ResettableThrowingStreamObserver<RequestT> requestObserver;
88+
89+
private final Supplier<TerminatingStreamObserver<RequestT>> requestObserverFactory;
8690
private final StreamDebugMetrics debugMetrics;
8791
private final AtomicBoolean isHealthCheckScheduled;
8892

@@ -120,13 +124,11 @@ protected AbstractWindmillStream(
120124
this.isHealthCheckScheduled = new AtomicBoolean(false);
121125
this.finishLatch = new CountDownLatch(1);
122126
this.logger = logger;
123-
this.requestObserver =
124-
new ResettableThrowingStreamObserver<>(
125-
() ->
126-
streamObserverFactory.from(
127-
clientFactory,
128-
new AbstractWindmillStream<RequestT, ResponseT>.ResponseObserver()),
129-
logger);
127+
this.requestObserver = new ResettableThrowingStreamObserver<>(logger);
128+
this.requestObserverFactory =
129+
() ->
130+
streamObserverFactory.from(
131+
clientFactory, new AbstractWindmillStream<RequestT, ResponseT>.ResponseObserver());
130132
this.sleeper = Sleeper.DEFAULT;
131133
this.debugMetrics = StreamDebugMetrics.create();
132134
}
@@ -184,7 +186,7 @@ private void startStream() {
184186
try {
185187
synchronized (this) {
186188
debugMetrics.recordStart();
187-
requestObserver.reset();
189+
requestObserver.reset(requestObserverFactory.get());
188190
onNewStream();
189191
if (clientClosed) {
190192
halfClose();

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
*/
1818
package org.apache.beam.runners.dataflow.worker.windmill.client;
1919

20-
import java.util.function.Supplier;
2120
import javax.annotation.Nullable;
2221
import javax.annotation.concurrent.GuardedBy;
2322
import javax.annotation.concurrent.ThreadSafe;
@@ -29,7 +28,7 @@
2928
import org.slf4j.Logger;
3029

3130
/**
32-
* Request observer that allows resetting its internal delegate using the given {@link
31+
* Request observer that allows resetting its internal delegate using a {@link
3332
* #streamObserverFactory}.
3433
*
3534
* @implNote {@link StreamObserver}s generated by {@link #streamObserverFactory} are expected to be
@@ -40,7 +39,7 @@
4039
@ThreadSafe
4140
@Internal
4241
final class ResettableThrowingStreamObserver<T> {
43-
private final Supplier<TerminatingStreamObserver<T>> streamObserverFactory;
42+
4443
private final Logger logger;
4544

4645
@GuardedBy("this")
@@ -57,9 +56,7 @@ final class ResettableThrowingStreamObserver<T> {
5756
@GuardedBy("this")
5857
private boolean isCurrentStreamClosed = true;
5958

60-
ResettableThrowingStreamObserver(
61-
Supplier<TerminatingStreamObserver<T>> streamObserverFactory, Logger logger) {
62-
this.streamObserverFactory = streamObserverFactory;
59+
ResettableThrowingStreamObserver(Logger logger) {
6360
this.logger = logger;
6461
this.delegateStreamObserver = null;
6562
}
@@ -79,12 +76,13 @@ private synchronized StreamObserver<T> delegate()
7976
}
8077

8178
/** Creates a new delegate to use for future {@link StreamObserver} methods. */
82-
synchronized void reset() throws WindmillStreamShutdownException {
79+
synchronized void reset(TerminatingStreamObserver<T> observer)
80+
throws WindmillStreamShutdownException {
8381
if (isPoisoned) {
8482
throw new WindmillStreamShutdownException("Stream is already shutdown.");
8583
}
8684

87-
delegateStreamObserver = streamObserverFactory.get();
85+
delegateStreamObserver = observer;
8886
isCurrentStreamClosed = false;
8987
}
9088

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserverTest.java

Lines changed: 16 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,7 @@
2424
import static org.mockito.Mockito.verify;
2525
import static org.mockito.Mockito.verifyNoInteractions;
2626

27-
import java.util.ArrayList;
28-
import java.util.List;
29-
import java.util.function.Supplier;
3027
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.TerminatingStreamObserver;
31-
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.StreamObserver;
3228
import org.junit.Test;
3329
import org.junit.runner.RunWith;
3430
import org.junit.runners.JUnit4;
@@ -57,36 +53,36 @@ public void terminate(Throwable terminationException) {}
5753

5854
@Test
5955
public void testPoison_beforeDelegateSet() {
60-
ResettableThrowingStreamObserver<Integer> observer = newStreamObserver(() -> delegate);
56+
ResettableThrowingStreamObserver<Integer> observer = newStreamObserver();
6157
observer.poison();
6258
verifyNoInteractions(delegate);
6359
}
6460

6561
@Test
6662
public void testPoison_afterDelegateSet() throws WindmillStreamShutdownException {
67-
ResettableThrowingStreamObserver<Integer> observer = newStreamObserver(() -> delegate);
68-
observer.reset();
63+
ResettableThrowingStreamObserver<Integer> observer = newStreamObserver();
64+
observer.reset(delegate);
6965
observer.poison();
7066
verify(delegate).terminate(isA(WindmillStreamShutdownException.class));
7167
}
7268

7369
@Test
7470
public void testReset_afterPoisonedThrows() {
75-
ResettableThrowingStreamObserver<Integer> observer = newStreamObserver(() -> delegate);
71+
ResettableThrowingStreamObserver<Integer> observer = newStreamObserver();
7672
observer.poison();
77-
assertThrows(WindmillStreamShutdownException.class, observer::reset);
73+
assertThrows(WindmillStreamShutdownException.class, () -> observer.reset(newDelegate()));
7874
}
7975

8076
@Test
8177
public void testOnNext_afterPoisonedThrows() {
82-
ResettableThrowingStreamObserver<Integer> observer = newStreamObserver(() -> delegate);
78+
ResettableThrowingStreamObserver<Integer> observer = newStreamObserver();
8379
observer.poison();
8480
assertThrows(WindmillStreamShutdownException.class, () -> observer.onNext(1));
8581
}
8682

8783
@Test
8884
public void testOnError_afterPoisonedThrows() {
89-
ResettableThrowingStreamObserver<Integer> observer = newStreamObserver(() -> delegate);
85+
ResettableThrowingStreamObserver<Integer> observer = newStreamObserver();
9086
observer.poison();
9187
assertThrows(
9288
WindmillStreamShutdownException.class,
@@ -95,7 +91,7 @@ public void testOnError_afterPoisonedThrows() {
9591

9692
@Test
9793
public void testOnCompleted_afterPoisonedThrows() {
98-
ResettableThrowingStreamObserver<Integer> observer = newStreamObserver(() -> delegate);
94+
ResettableThrowingStreamObserver<Integer> observer = newStreamObserver();
9995
observer.poison();
10096
assertThrows(WindmillStreamShutdownException.class, observer::onCompleted);
10197
}
@@ -104,28 +100,20 @@ public void testOnCompleted_afterPoisonedThrows() {
104100
public void testReset_usesNewDelegate()
105101
throws WindmillStreamShutdownException,
106102
ResettableThrowingStreamObserver.StreamClosedException {
107-
List<StreamObserver<Integer>> delegates = new ArrayList<>();
108-
ResettableThrowingStreamObserver<Integer> observer =
109-
newStreamObserver(
110-
() -> {
111-
TerminatingStreamObserver<Integer> delegate = newDelegate();
112-
delegates.add(delegate);
113-
return delegate;
114-
});
115-
observer.reset();
103+
ResettableThrowingStreamObserver<Integer> observer = newStreamObserver();
104+
TerminatingStreamObserver<Integer> firstObserver = newDelegate();
105+
observer.reset(firstObserver);
116106
observer.onNext(1);
117-
observer.reset();
118-
observer.onNext(2);
119107

120-
StreamObserver<Integer> firstObserver = delegates.get(0);
121-
StreamObserver<Integer> secondObserver = delegates.get(1);
108+
TerminatingStreamObserver<Integer> secondObserver = newDelegate();
109+
observer.reset(secondObserver);
110+
observer.onNext(2);
122111

123112
verify(firstObserver).onNext(eq(1));
124113
verify(secondObserver).onNext(eq(2));
125114
}
126115

127-
private <T> ResettableThrowingStreamObserver<T> newStreamObserver(
128-
Supplier<TerminatingStreamObserver<T>> delegate) {
129-
return new ResettableThrowingStreamObserver<>(delegate, LoggerFactory.getLogger(getClass()));
116+
private <T> ResettableThrowingStreamObserver<T> newStreamObserver() {
117+
return new ResettableThrowingStreamObserver<>(LoggerFactory.getLogger(getClass()));
130118
}
131119
}

0 commit comments

Comments
 (0)