Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,10 @@
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcDispatcherClient;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillServer;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.auth.VendoredCredentialsAdapter;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCache;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingRemoteStubFactory;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.FailoverChannel;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.IsolationChannel;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactory;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactoryImpl;
Expand All @@ -114,6 +116,7 @@
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.util.construction.CoderTranslation;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.auth.MoreCallCredentials;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheStats;
Expand Down Expand Up @@ -381,7 +384,8 @@ private StreamingWorkerHarnessFactoryOutput createFanOutStreamingEngineWorkerHar
MemoryMonitor memoryMonitor,
GrpcDispatcherClient dispatcherClient) {
WeightedSemaphore<Commit> maxCommitByteSemaphore = Commits.maxCommitByteSemaphore();
ChannelCache channelCache = createChannelCache(options, checkNotNull(configFetcher));
ChannelCache channelCache =
createChannelCache(options, checkNotNull(configFetcher), dispatcherClient);
@SuppressWarnings("methodref.receiver.bound")
FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness =
FanOutStreamingEngineWorkerHarness.create(
Expand Down Expand Up @@ -804,20 +808,31 @@ private static void validateWorkerOptions(DataflowWorkerHarnessOptions options)
}

private static ChannelCache createChannelCache(
DataflowWorkerHarnessOptions workerOptions, ComputationConfig.Fetcher configFetcher) {
DataflowWorkerHarnessOptions workerOptions,
ComputationConfig.Fetcher configFetcher,
GrpcDispatcherClient dispatcherClient) {
ChannelCache channelCache =
ChannelCache.create(
(currentFlowControlSettings, serviceAddress) -> {
// IsolationChannel will create and manage separate RPC channels to the same
// serviceAddress.
return IsolationChannel.create(
() ->
remoteChannel(
serviceAddress,
workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec(),
currentFlowControlSettings),
currentFlowControlSettings.getOnReadyThresholdBytes());
});
(currentFlowControlSettings, serviceAddress) ->
// IsolationChannel wraps FailoverChannel so that each active RPC gets its own
// FailoverChannel instance. FailoverChannel creates two channels (primary,
// fallback)
// per active RPC.
IsolationChannel.create(
() ->
FailoverChannel.create(
remoteChannel(
serviceAddress,
workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec(),
currentFlowControlSettings),
remoteChannel(
dispatcherClient.getDispatcherEndpoints().iterator().next(),
workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec(),
currentFlowControlSettings),
MoreCallCredentials.from(
new VendoredCredentialsAdapter(workerOptions.getGcpCredential()))),
currentFlowControlSettings.getOnReadyThresholdBytes()));

configFetcher
.getGlobalConfigHandle()
.registerConfigObserver(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,15 +410,18 @@ private GlobalDataStreamSender getOrCreateGlobalDataSteam(
}

private WindmillStreamSender createAndStartWindmillStreamSender(Endpoint endpoint) {
GetWorkRequest.Builder getWorkRequestBuilder =
GetWorkRequest.newBuilder()
.setClientId(jobHeader.getClientId())
.setJobId(jobHeader.getJobId())
.setProjectId(jobHeader.getProjectId())
.setWorkerId(jobHeader.getWorkerId());
endpoint.workerToken().ifPresent(getWorkRequestBuilder::setBackendWorkerToken);

WindmillStreamSender windmillStreamSender =
WindmillStreamSender.create(
WindmillConnection.from(endpoint, this::createWindmillStub),
GetWorkRequest.newBuilder()
.setClientId(jobHeader.getClientId())
.setJobId(jobHeader.getJobId())
.setProjectId(jobHeader.getProjectId())
.setWorkerId(jobHeader.getWorkerId())
.build(),
getWorkRequestBuilder.build(),
GetWorkBudget.noBudget(),
streamFactory,
workItemScheduler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public CloudWindmillServiceV1Alpha1Stub getWindmillServiceStub() {
: randomlySelectNextStub(windmillServiceStubs));
}

ImmutableSet<HostAndPort> getDispatcherEndpoints() {
public ImmutableSet<HostAndPort> getDispatcherEndpoints() {
return dispatcherStubs.get().dispatcherEndpoints();
}

Expand Down
Loading
Loading