Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,10 @@
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcDispatcherClient;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillServer;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.auth.VendoredCredentialsAdapter;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCache;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingRemoteStubFactory;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.FailoverChannel;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.IsolationChannel;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactory;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactoryImpl;
Expand All @@ -114,6 +116,8 @@
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.util.construction.CoderTranslation;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ManagedChannel;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.auth.MoreCallCredentials;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheStats;
Expand Down Expand Up @@ -381,7 +385,8 @@ private StreamingWorkerHarnessFactoryOutput createFanOutStreamingEngineWorkerHar
MemoryMonitor memoryMonitor,
GrpcDispatcherClient dispatcherClient) {
WeightedSemaphore<Commit> maxCommitByteSemaphore = Commits.maxCommitByteSemaphore();
ChannelCache channelCache = createChannelCache(options, checkNotNull(configFetcher));
ChannelCache channelCache =
createChannelCache(options, checkNotNull(configFetcher), dispatcherClient);
@SuppressWarnings("methodref.receiver.bound")
FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness =
FanOutStreamingEngineWorkerHarness.create(
Expand Down Expand Up @@ -804,20 +809,37 @@ private static void validateWorkerOptions(DataflowWorkerHarnessOptions options)
}

private static ChannelCache createChannelCache(
DataflowWorkerHarnessOptions workerOptions, ComputationConfig.Fetcher configFetcher) {
DataflowWorkerHarnessOptions workerOptions,
ComputationConfig.Fetcher configFetcher,
GrpcDispatcherClient dispatcherClient) {
ChannelCache channelCache =
ChannelCache.create(
(currentFlowControlSettings, serviceAddress) -> {
// IsolationChannel will create and manage separate RPC channels to the same
// serviceAddress.
return IsolationChannel.create(
() ->
remoteChannel(
serviceAddress,
workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec(),
currentFlowControlSettings),
currentFlowControlSettings.getOnReadyThresholdBytes());
ManagedChannel primaryChannel =
IsolationChannel.create(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since it's being setup this way IsolationChannel connectivity callbacks are going to be what is used. I'm not sure how that will work since it internally has multiple channels. Looking it seems just has the default ManagedChannel implementation which throws unimplemented exception.

What about having IsolationChannel on top of fallback channels? That seems simpler to me since IsolationChannel just internally creates the separate channels and otherwise doesn't do much than forward things on.

It would be good to have a unit test of whatever setup we do use so that we flush out the issues there instead of requiring an integration test.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed this. IsolationChannel now wraps FailoverChannel which creates two channels per active RPC.

The original intent was to keep IsolationChannel unmodified (since it is used by dispatcher client) and handle fallback at per-worker level. The new ordering (IsolationChannel over FailoverChannel) changes the semantic to per-RPC failover. Which means in case of connectivity issues, each RPC would independently discover the failure and switch at different times, rather than switching together in a coordinated way.

I do agree managing state at per RPC level seems to be less error prone, but would like to callout this semantic change.

() ->
remoteChannel(
serviceAddress,
workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec(),
currentFlowControlSettings),
currentFlowControlSettings.getOnReadyThresholdBytes());
// Create an isolated fallback channel from dispatcher endpoints.
// This ensures both primary and fallback use separate isolated channels.
ManagedChannel fallbackChannel =
IsolationChannel.create(
() ->
remoteChannel(
dispatcherClient.getDispatcherEndpoints().iterator().next(),
workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec(),
currentFlowControlSettings),
currentFlowControlSettings.getOnReadyThresholdBytes());
return FailoverChannel.create(
primaryChannel,
fallbackChannel,
MoreCallCredentials.from(
new VendoredCredentialsAdapter(workerOptions.getGcpCredential())));
});

configFetcher
.getGlobalConfigHandle()
.registerConfigObserver(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,15 +410,18 @@ private GlobalDataStreamSender getOrCreateGlobalDataSteam(
}

private WindmillStreamSender createAndStartWindmillStreamSender(Endpoint endpoint) {
GetWorkRequest.Builder getWorkRequestBuilder =
GetWorkRequest.newBuilder()
.setClientId(jobHeader.getClientId())
.setJobId(jobHeader.getJobId())
.setProjectId(jobHeader.getProjectId())
.setWorkerId(jobHeader.getWorkerId());
endpoint.workerToken().ifPresent(getWorkRequestBuilder::setBackendWorkerToken);

WindmillStreamSender windmillStreamSender =
WindmillStreamSender.create(
WindmillConnection.from(endpoint, this::createWindmillStub),
GetWorkRequest.newBuilder()
.setClientId(jobHeader.getClientId())
.setJobId(jobHeader.getJobId())
.setProjectId(jobHeader.getProjectId())
.setWorkerId(jobHeader.getWorkerId())
.build(),
getWorkRequestBuilder.build(),
GetWorkBudget.noBudget(),
streamFactory,
workItemScheduler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public CloudWindmillServiceV1Alpha1Stub getWindmillServiceStub() {
: randomlySelectNextStub(windmillServiceStubs));
}

ImmutableSet<HostAndPort> getDispatcherEndpoints() {
public ImmutableSet<HostAndPort> getDispatcherEndpoints() {
return dispatcherStubs.get().dispatcherEndpoints();
}

Expand Down
Loading
Loading