Skip to content

Commit 6c24ec2

Browse files
authored
Merge branch 'apache:master' into fix-python-postcommit-30513
2 parents 6e28454 + 3b41e8b commit 6c24ec2

File tree

28 files changed

+176
-373
lines changed

28 files changed

+176
-373
lines changed

.github/workflows/build_wheels.yml

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -229,15 +229,6 @@ jobs:
229229
]
230230
# Keep in sync (remove asterisks) with PY_VERSIONS_FULL env var above - if changed, change that as well.
231231
py_version: ["cp310-", "cp311-", "cp312-", "cp313-"]
232-
# The following exclude/include skips intermediate Python versions on GitHub hosted runners for PR run
233-
is_pr:
234-
- ${{ github.event_name == 'pull_request' }}
235-
exclude:
236-
- is_pr: true
237-
include:
238-
- os_python.os: "ubuntu-20.04"
239-
- py_version: "cp310-"
240-
- py_version: "cp313-"
241232
steps:
242233
- name: Download python source distribution from artifacts
243234
uses: actions/download-artifact@v5

CONTRIBUTING.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ These steps and instructions on getting started are outlined below as well.
4141
- A [GitHub](https://github.com/) account.
4242
- A Linux, macOS, or Microsoft Windows development environment.
4343
- Java JDK 11 (preferred, or 8, 17, 21) installed.
44+
- General note:
45+
- Set `JAVA_HOME` to the JDK **installation directory**, not the `bin` directory
46+
- (Window for example: `C:\Program Files\Eclipse Adoptium\jdk-11.x.x`)
47+
- If multiple JDK versions are installed, ensure Java 11 appears first on `PATH`
48+
- The initial Gradle build may take 10–15 minutes due to dependency downloads
4449
- Latest [Go](https://golang.org) 1.x installed.
4550
- [Docker](https://www.docker.com/) installed for some tasks including building worker containers and testing changes to this website locally.
4651
- For SDK Development:

runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,11 @@ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
555555
return timestamp();
556556
}
557557

558+
@Override
559+
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
560+
return elem.causedByDrain();
561+
}
562+
558563
@Override
559564
public String timerId(DoFn<InputT, OutputT> doFn) {
560565
throw new UnsupportedOperationException(
@@ -831,6 +836,11 @@ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
831836
return timestamp();
832837
}
833838

839+
@Override
840+
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
841+
return causedByDrain;
842+
}
843+
834844
@Override
835845
public String timerId(DoFn<InputT, OutputT> doFn) {
836846
return timerId;
@@ -1119,6 +1129,11 @@ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
11191129
return timestamp;
11201130
}
11211131

1132+
@Override
1133+
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
1134+
throw new UnsupportedOperationException("CausedByDrain parameters are not supported.");
1135+
}
1136+
11221137
@Override
11231138
public String timerId(DoFn<InputT, OutputT> doFn) {
11241139
throw new UnsupportedOperationException("Timer parameters are not supported.");

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,13 +126,16 @@ public interface DataflowStreamingPipelineOptions extends PipelineOptions {
126126

127127
void setWindmillMessagesBetweenIsReadyChecks(int value);
128128

129-
@Description("If true, a most a single active rpc will be used per channel.")
129+
/** @deprecated since 2.73.0 */
130+
@Deprecated
131+
@Description("Unused flag.")
130132
Boolean getUseWindmillIsolatedChannels();
131133

132134
void setUseWindmillIsolatedChannels(Boolean value);
133135

134-
@Description(
135-
"If true, separate streaming rpcs will be used for heartbeats instead of sharing streams with state reads.")
136+
/** @deprecated since beam 2.73.0 */
137+
@Deprecated
138+
@Description("Unused Flag")
136139
Boolean getUseSeparateWindmillHeartbeatStreams();
137140

138141
void setUseSeparateWindmillHeartbeatStreams(Boolean value);

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java

Lines changed: 7 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@
5454
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingApplianceComputationConfigFetcher;
5555
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEngineComputationConfigFetcher;
5656
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
57-
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle;
5857
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandleImpl;
5958
import org.apache.beam.runners.dataflow.worker.streaming.harness.FanOutStreamingEngineWorkerHarness;
6059
import org.apache.beam.runners.dataflow.worker.streaming.harness.SingleSourceWorkerHarness;
@@ -460,12 +459,7 @@ private StreamingWorkerHarnessFactoryOutput createSingleSourceWorkerHarness(
460459
windmillServer::getDataStream);
461460
GetDataClient getDataClient =
462461
new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool);
463-
HeartbeatSender heartbeatSender =
464-
createStreamingEngineHeartbeatSender(
465-
options,
466-
windmillServer,
467-
getDataStreamPool,
468-
checkNotNull(configFetcher).getGlobalConfigHandle());
462+
HeartbeatSender heartbeatSender = createStreamingEngineHeartbeatSender(windmillServer);
469463
@SuppressWarnings("methodref.receiver.bound")
470464
WorkCommitter workCommitter =
471465
StreamingEngineWorkCommitter.builder()
@@ -603,25 +597,9 @@ private static ChannelzServlet createChannelzServlet(
603597
}
604598

605599
private static HeartbeatSender createStreamingEngineHeartbeatSender(
606-
DataflowWorkerHarnessOptions options,
607-
WindmillServerStub windmillClient,
608-
WindmillStreamPool<GetDataStream> getDataStreamPool,
609-
StreamingGlobalConfigHandle globalConfigHandle) {
610-
// Experiment gates the logic till backend changes are rollback safe
611-
if (!DataflowRunner.hasExperiment(
612-
options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL_EXPERIMENT)
613-
|| options.getUseSeparateWindmillHeartbeatStreams() != null) {
614-
return StreamPoolHeartbeatSender.create(
615-
Boolean.TRUE.equals(options.getUseSeparateWindmillHeartbeatStreams())
616-
? WindmillStreamPool.create(1, GET_DATA_STREAM_TIMEOUT, windmillClient::getDataStream)
617-
: getDataStreamPool);
618-
619-
} else {
620-
return StreamPoolHeartbeatSender.create(
621-
WindmillStreamPool.create(1, GET_DATA_STREAM_TIMEOUT, windmillClient::getDataStream),
622-
getDataStreamPool,
623-
globalConfigHandle);
624-
}
600+
WindmillServerStub windmillClient) {
601+
return StreamPoolHeartbeatSender.create(
602+
WindmillStreamPool.create(1, GET_DATA_STREAM_TIMEOUT, windmillClient::getDataStream));
625603
}
626604

627605
public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions options) {
@@ -725,7 +703,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
725703
Function<ComputationConfig.Fetcher, ComputationStateCache> computationStateCacheFactory) {
726704
if (options.isEnableStreamingEngine()) {
727705
GrpcDispatcherClient dispatcherClient =
728-
GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options));
706+
GrpcDispatcherClient.create(new WindmillStubFactoryFactoryImpl(options));
729707
ComputationConfig.Fetcher configFetcher =
730708
StreamingEngineComputationConfigFetcher.create(
731709
options.getGlobalConfigRefreshPeriod().getMillis(), dataflowServiceClient);
@@ -753,7 +731,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
753731
if (options.getWindmillServiceEndpoint() != null
754732
|| options.getLocalWindmillHostport().startsWith("grpc:")) {
755733
GrpcDispatcherClient dispatcherClient =
756-
GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options));
734+
GrpcDispatcherClient.create(new WindmillStubFactoryFactoryImpl(options));
757735
GrpcWindmillStreamFactory windmillStreamFactory =
758736
windmillStreamFactoryBuilder
759737
.setHealthCheckIntervalMillis(
@@ -920,7 +898,7 @@ static StreamingDataflowWorker forTesting(
920898
createGrpcwindmillStreamFactoryBuilder(options, 1)
921899
.setProcessHeartbeatResponses(
922900
new WorkHeartbeatResponseProcessor(computationStateCache::get));
923-
GrpcDispatcherClient grpcDispatcherClient = GrpcDispatcherClient.create(options, stubFactory);
901+
GrpcDispatcherClient grpcDispatcherClient = GrpcDispatcherClient.create(stubFactory);
924902
grpcDispatcherClient.consumeWindmillDispatcherEndpoints(
925903
ImmutableSet.<HostAndPort>builder()
926904
.add(HostAndPort.fromHost("StreamingDataflowWorkerTest"))

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java

Lines changed: 7 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,9 @@
2828
import java.util.Set;
2929
import java.util.concurrent.CountDownLatch;
3030
import java.util.concurrent.TimeUnit;
31-
import java.util.concurrent.atomic.AtomicBoolean;
3231
import java.util.concurrent.atomic.AtomicReference;
3332
import javax.annotation.concurrent.GuardedBy;
3433
import javax.annotation.concurrent.ThreadSafe;
35-
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
3634
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
3735
import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc;
3836
import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc.CloudWindmillMetadataServiceV1Alpha1Stub;
@@ -66,42 +64,25 @@ public class GrpcDispatcherClient {
6664
@GuardedBy("this")
6765
private final Random rand;
6866

69-
private final WindmillStubFactoryFactory windmillStubFactoryFactory;
70-
71-
private final AtomicReference<WindmillStubFactory> windmillStubFactory = new AtomicReference<>();
72-
73-
private final AtomicBoolean useIsolatedChannels = new AtomicBoolean();
74-
private final boolean reactToIsolatedChannelsJobSetting;
67+
private final WindmillStubFactory windmillStubFactory;
7568

7669
private GrpcDispatcherClient(
77-
DataflowWorkerHarnessOptions options,
7870
WindmillStubFactoryFactory windmillStubFactoryFactory,
7971
DispatcherStubs initialDispatcherStubs,
8072
Random rand) {
81-
this.windmillStubFactoryFactory = windmillStubFactoryFactory;
82-
if (options.getUseWindmillIsolatedChannels() != null) {
83-
this.useIsolatedChannels.set(options.getUseWindmillIsolatedChannels());
84-
this.reactToIsolatedChannelsJobSetting = false;
85-
} else {
86-
this.useIsolatedChannels.set(false);
87-
this.reactToIsolatedChannelsJobSetting = true;
88-
}
89-
this.windmillStubFactory.set(
90-
windmillStubFactoryFactory.makeWindmillStubFactory(useIsolatedChannels.get()));
73+
this.windmillStubFactory = windmillStubFactoryFactory.makeWindmillStubFactory();
9174
this.rand = rand;
9275
this.dispatcherStubs = new AtomicReference<>(initialDispatcherStubs);
9376
this.onInitializedEndpoints = new CountDownLatch(1);
9477
}
9578

96-
public static GrpcDispatcherClient create(
97-
DataflowWorkerHarnessOptions options, WindmillStubFactoryFactory windmillStubFactoryFactory) {
79+
public static GrpcDispatcherClient create(WindmillStubFactoryFactory windmillStubFactoryFactory) {
9880
return new GrpcDispatcherClient(
99-
options, windmillStubFactoryFactory, DispatcherStubs.empty(), new Random());
81+
windmillStubFactoryFactory, DispatcherStubs.empty(), new Random());
10082
}
10183

10284
@VisibleForTesting
10385
public static GrpcDispatcherClient forTesting(
104-
DataflowWorkerHarnessOptions options,
10586
WindmillStubFactoryFactory windmillStubFactoryFactory,
10687
List<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs,
10788
List<CloudWindmillMetadataServiceV1Alpha1Stub> windmillMetadataServiceStubs,
@@ -110,7 +91,6 @@ public static GrpcDispatcherClient forTesting(
11091
dispatcherEndpoints.size() == windmillServiceStubs.size()
11192
&& windmillServiceStubs.size() == windmillMetadataServiceStubs.size());
11293
return new GrpcDispatcherClient(
113-
options,
11494
windmillStubFactoryFactory,
11595
DispatcherStubs.create(
11696
dispatcherEndpoints, windmillServiceStubs, windmillMetadataServiceStubs),
@@ -172,31 +152,17 @@ public void onJobConfig(StreamingGlobalConfig config) {
172152
LOG.warn("Dispatcher client received empty windmill service endpoints from global config");
173153
return;
174154
}
175-
boolean forceRecreateStubs = false;
176-
if (reactToIsolatedChannelsJobSetting) {
177-
boolean useIsolatedChannels = config.userWorkerJobSettings().getUseWindmillIsolatedChannels();
178-
if (this.useIsolatedChannels.getAndSet(useIsolatedChannels) != useIsolatedChannels) {
179-
windmillStubFactory.set(
180-
windmillStubFactoryFactory.makeWindmillStubFactory(useIsolatedChannels));
181-
forceRecreateStubs = true;
182-
}
183-
}
184-
consumeWindmillDispatcherEndpoints(config.windmillServiceEndpoints(), forceRecreateStubs);
155+
consumeWindmillDispatcherEndpoints(config.windmillServiceEndpoints());
185156
}
186157

187158
public synchronized void consumeWindmillDispatcherEndpoints(
188159
ImmutableSet<HostAndPort> dispatcherEndpoints) {
189-
consumeWindmillDispatcherEndpoints(dispatcherEndpoints, /* forceRecreateStubs= */ false);
190-
}
191-
192-
private synchronized void consumeWindmillDispatcherEndpoints(
193-
ImmutableSet<HostAndPort> dispatcherEndpoints, boolean forceRecreateStubs) {
194160
ImmutableSet<HostAndPort> currentDispatcherEndpoints =
195161
dispatcherStubs.get().dispatcherEndpoints();
196162
Preconditions.checkArgument(
197163
dispatcherEndpoints != null && !dispatcherEndpoints.isEmpty(),
198164
"Cannot set dispatcher endpoints to nothing.");
199-
if (!forceRecreateStubs && currentDispatcherEndpoints.equals(dispatcherEndpoints)) {
165+
if (currentDispatcherEndpoints.equals(dispatcherEndpoints)) {
200166
// The endpoints are equal don't recreate the stubs.
201167
return;
202168
}
@@ -207,7 +173,7 @@ private synchronized void consumeWindmillDispatcherEndpoints(
207173
}
208174

209175
LOG.info("Initializing Streaming Engine GRPC client for endpoints: {}", dispatcherEndpoints);
210-
dispatcherStubs.set(DispatcherStubs.create(dispatcherEndpoints, windmillStubFactory.get()));
176+
dispatcherStubs.set(DispatcherStubs.create(dispatcherEndpoints, windmillStubFactory));
211177
onInitializedEndpoints.countDown();
212178
}
213179

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,6 @@ static GrpcWindmillServer newTestInstance(
166166
Set<HostAndPort> dispatcherEndpoints = Sets.newHashSet(HostAndPort.fromHost(name));
167167
GrpcDispatcherClient dispatcherClient =
168168
GrpcDispatcherClient.forTesting(
169-
testOptions,
170169
windmillStubFactoryFactory,
171170
windmillServiceStubs,
172171
windmillMetadataServiceStubs,
@@ -198,7 +197,7 @@ static GrpcWindmillServer newApplianceTestInstance(
198197
options,
199198
GrpcWindmillStreamFactory.of(createJobHeader(options, 1)).build(),
200199
// No-op, Appliance does not use Dispatcher to call Streaming Engine.
201-
GrpcDispatcherClient.create(options, windmillStubFactoryFactory));
200+
GrpcDispatcherClient.create(windmillStubFactoryFactory));
202201
testServer.syncApplianceStub = createWindmillApplianceStubWithDeadlineInterceptor(channel);
203202
return testServer;
204203
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,5 @@
2121

2222
@Internal
2323
public interface WindmillStubFactoryFactory {
24-
WindmillStubFactory makeWindmillStubFactory(boolean useIsolatedChannels);
24+
WindmillStubFactory makeWindmillStubFactory();
2525
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactoryImpl.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,24 +33,19 @@ public WindmillStubFactoryFactoryImpl(DataflowWorkerHarnessOptions workerOptions
3333
}
3434

3535
@Override
36-
public WindmillStubFactory makeWindmillStubFactory(boolean useIsolatedChannels) {
36+
public WindmillStubFactory makeWindmillStubFactory() {
3737
ChannelCache channelCache =
3838
ChannelCache.create(
3939
(flowControlSettings, serviceAddress) ->
4040
// IsolationChannel will create and manage separate RPC channels to the same
4141
// serviceAddress via calling the channelFactory, else just directly return the
4242
// RPC channel.
43-
useIsolatedChannels
44-
? IsolationChannel.create(
45-
() ->
46-
remoteChannel(
47-
serviceAddress.getServiceAddress(),
48-
windmillServiceRpcChannelAliveTimeoutSec,
49-
flowControlSettings))
50-
: remoteChannel(
51-
serviceAddress.getServiceAddress(),
52-
windmillServiceRpcChannelAliveTimeoutSec,
53-
flowControlSettings));
43+
IsolationChannel.create(
44+
() ->
45+
remoteChannel(
46+
serviceAddress.getServiceAddress(),
47+
windmillServiceRpcChannelAliveTimeoutSec,
48+
flowControlSettings)));
5449
return ChannelCachingRemoteStubFactory.create(gcpCredential, channelCache);
5550
}
5651
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import java.util.concurrent.atomic.AtomicReference;
2121
import javax.annotation.Nonnull;
22-
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle;
2322
import org.apache.beam.runners.dataflow.worker.windmill.client.CloseableStream;
2423
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream;
2524
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool;
@@ -47,32 +46,6 @@ public static StreamPoolHeartbeatSender create(
4746
return new StreamPoolHeartbeatSender(heartbeatStreamPool);
4847
}
4948

50-
/**
51-
* Creates StreamPoolHeartbeatSender that switches between the passed in stream pools depending on
52-
* global config.
53-
*
54-
* @param dedicatedHeartbeatPool stream to use when using separate streams for heartbeat is
55-
* enabled.
56-
* @param getDataPool stream to use when using separate streams for heartbeat is disabled.
57-
*/
58-
public static StreamPoolHeartbeatSender create(
59-
@Nonnull WindmillStreamPool<WindmillStream.GetDataStream> dedicatedHeartbeatPool,
60-
@Nonnull WindmillStreamPool<WindmillStream.GetDataStream> getDataPool,
61-
@Nonnull StreamingGlobalConfigHandle configHandle) {
62-
// Use getDataPool as the default, settings callback will
63-
// switch to the separate pool if enabled before processing any elements are processed.
64-
StreamPoolHeartbeatSender heartbeatSender = new StreamPoolHeartbeatSender(getDataPool);
65-
configHandle.registerConfigObserver(
66-
streamingGlobalConfig ->
67-
heartbeatSender.heartbeatStreamPool.set(
68-
streamingGlobalConfig
69-
.userWorkerJobSettings()
70-
.getUseSeparateWindmillHeartbeatStreams()
71-
? dedicatedHeartbeatPool
72-
: getDataPool));
73-
return heartbeatSender;
74-
}
75-
7649
@Override
7750
public void sendHeartbeats(Heartbeats heartbeats) {
7851
try (CloseableStream<WindmillStream.GetDataStream> closeableStream =

0 commit comments

Comments
 (0)