Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,15 @@ public PipelineExecutionException(Throwable cause) {

/** Constructs a pipeline from default {@link PipelineOptions}. */
public static Pipeline create() {
Pipeline pipeline = new Pipeline(PipelineOptionsFactory.create());
PipelineOptions options = PipelineOptionsFactory.create();
if (options.getRunner() == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want this. The runner is intentionally not something the pipeline should know about while it is being created.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(we actually did a lot of work to remove the runner a while back :-)

LOG.warn(
"Pipeline created with no effective PipelineOptions specified. Command-line arguments "
+ "were parsed, but no runner or critical options were found. This may lead to "
+ "unexpected behavior. Consider providing explicit options via "
+ "Pipeline.create(PipelineOptions) or command-line arguments.");
}
Pipeline pipeline = new Pipeline(options);
LOG.debug("Creating {}", pipeline);
return pipeline;
}
Expand All @@ -156,7 +164,12 @@ public static Pipeline create() {
public static Pipeline create(PipelineOptions options) {
// TODO: fix runners that mutate PipelineOptions in this method, then remove this line
PipelineRunner.fromOptions(options);

if (options.getRunner() == null) {
LOG.warn(
"Pipeline created with no effective PipelineOptions specified. The provided options "
+ "lack a runner or critical configuration. This may lead to unexpected behavior. "
+ "Consider providing a PipelineOptions instance with explicit settings.");
}
Pipeline pipeline = new Pipeline(options);
LOG.debug("Creating {}", pipeline);
return pipeline;
Expand Down Expand Up @@ -263,7 +276,11 @@ private void checkForMatches(Node node) {
});
}

private void replace(final PTransformOverride override) {
private <
InputT extends PInput,
OutputT extends POutput,
TransformT extends PTransform<? super InputT, OutputT>>
void replace(final PTransformOverride override) {
final Set<Node> matches = new HashSet<>();
final Set<Node> freedNodes = new HashSet<>();
traverseTopologically(
Expand Down Expand Up @@ -298,7 +315,10 @@ public void visitPrimitiveTransform(Node node) {
usedFullNames.remove(freedNode.getFullName());
}
for (Node match : matches) {
applyReplacement(match, override.getOverrideFactory());
@SuppressWarnings("unchecked")
PTransformOverrideFactory<InputT, OutputT, TransformT> factory =
(PTransformOverrideFactory<InputT, OutputT, TransformT>) override.getOverrideFactory();
applyReplacement(match, factory);
}
}

Expand Down
Loading