Skip to content

Commit b9d7e76

Browse files
Add Documentation Hint for Template Job Creation in DataflowRunner (#34204)
1 parent 3c1d760 commit b9d7e76

File tree

2 files changed

+35
-6
lines changed

2 files changed

+35
-6
lines changed

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,17 @@
201201
*
202202
* <p>Please see <a href="https://cloud.google.com/dataflow/security-and-permissions">Google Cloud
203203
* Dataflow Security and Permissions</a> for more details.
204+
*
205+
* <p>DataflowRunner now supports creating job templates using the {@code --templateLocation}
206+
* option. If this option is set, the runner will generate a template instead of running the
207+
* pipeline immediately.
208+
*
209+
* <p>Example:
210+
*
211+
* <pre>{@code
212+
* --runner=DataflowRunner
213+
* --templateLocation=gs://your-bucket/templates/my-template
214+
* }</pre>
204215
*/
205216
@SuppressWarnings({
206217
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
@@ -595,6 +606,7 @@ protected DataflowRunner(DataflowPipelineOptions options) {
595606

596607
private static class AlwaysCreateViaRead<T>
597608
implements PTransformOverrideFactory<PBegin, PCollection<T>, Create.Values<T>> {
609+
598610
@Override
599611
public PTransformOverrideFactory.PTransformReplacement<PBegin, PCollection<T>>
600612
getReplacementTransform(
@@ -775,7 +787,7 @@ private List<PTransformOverride> getOverrides(boolean streaming) {
775787
PTransformOverride.of(
776788
PTransformMatchers.requiresStableInputParDoMulti(),
777789
RequiresStableInputParDoOverrides.multiOutputOverrideFactory()));
778-
*/
790+
*/
779791
overridesBuilder
780792
.add(
781793
PTransformOverride.of(
@@ -949,7 +961,7 @@ public Map<PCollection<?>, ReplacementOutput> mapOutputs(
949961
The PCollectionView itself must have the same tag since that tag may have been embedded in serialized DoFns
950962
previously and cannot easily be rewired. The PCollection may differ, so we rewire it, even if the rewiring
951963
is a noop.
952-
*/
964+
*/
953965
return ReplacementOutputs.singleton(outputs, newOutput);
954966
}
955967
}
@@ -1179,6 +1191,7 @@ private List<RunnerApi.ArtifactInformation> getDefaultArtifacts() {
11791191
@VisibleForTesting
11801192
static boolean isMultiLanguagePipeline(Pipeline pipeline) {
11811193
class IsMultiLanguageVisitor extends PipelineVisitor.Defaults {
1194+
11821195
private boolean isMultiLanguage = false;
11831196

11841197
private void performMultiLanguageTest(Node node) {
@@ -1656,6 +1669,7 @@ private static EnvironmentInfo getEnvironmentInfoFromEnvironmentId(
16561669

16571670
@AutoValue
16581671
abstract static class EnvironmentInfo {
1672+
16591673
static EnvironmentInfo create(
16601674
String environmentId, String containerUrl, List<String> capabilities) {
16611675
return new AutoValue_DataflowRunner_EnvironmentInfo(
@@ -1954,7 +1968,6 @@ void recordViewUsesNonDeterministicKeyCoder(PTransform<?, ?> ptransform) {
19541968
// ================================================================================
19551969
// PubsubIO translations
19561970
// ================================================================================
1957-
19581971
private static class StreamingPubsubIOReadOverrideFactory
19591972
implements PTransformOverrideFactory<
19601973
PBegin, PCollection<PubsubMessage>, PubsubUnboundedSource> {
@@ -2113,6 +2126,7 @@ protected String getKindString() {
21132126
}
21142127

21152128
private static class StreamingPubsubSinkTranslators {
2129+
21162130
/** Rewrite {@link StreamingPubsubIOWrite} to the appropriate internal node. */
21172131
static class StreamingPubsubIOWriteTranslator
21182132
implements TransformTranslator<StreamingPubsubIOWrite> {
@@ -2166,9 +2180,9 @@ private static void translate(
21662180
}
21672181

21682182
// ================================================================================
2169-
21702183
private static class SingleOutputExpandableTransformTranslator
21712184
implements TransformTranslator<External.SingleOutputExpandableTransform> {
2185+
21722186
@Override
21732187
public void translate(
21742188
External.SingleOutputExpandableTransform transform, TranslationContext context) {
@@ -2186,6 +2200,7 @@ public void translate(
21862200

21872201
private static class MultiOutputExpandableTransformTranslator
21882202
implements TransformTranslator<External.MultiOutputExpandableTransform> {
2203+
21892204
@Override
21902205
public void translate(
21912206
External.MultiOutputExpandableTransform transform, TranslationContext context) {
@@ -2734,6 +2749,7 @@ static void verifyStateSupportForWindowingStrategy(WindowingStrategy strategy) {
27342749
*/
27352750
private static class DataflowPayloadTranslator
27362751
implements TransformPayloadTranslator<PTransform<?, ?>> {
2752+
27372753
@Override
27382754
public String getUrn(PTransform transform) {
27392755
return "dataflow_stub:" + transform.getClass().getName();
@@ -2758,6 +2774,7 @@ public RunnerApi.FunctionSpec translate(
27582774
})
27592775
@AutoService(TransformPayloadTranslatorRegistrar.class)
27602776
public static class DataflowTransformTranslator implements TransformPayloadTranslatorRegistrar {
2777+
27612778
@Override
27622779
public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
27632780
getTransformPayloadTranslators() {

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,19 @@ public interface DataflowPipelineOptions
105105
+ "Must either be local or Cloud Storage.")
106106
String getTemplateLocation();
107107

108+
/**
109+
* Sets the Cloud Storage path where the Dataflow template will be stored. Required for creating
110+
* Flex Templates or Classic Templates.
111+
*
112+
* <p>Example:
113+
*
114+
* <pre>{@code
115+
* DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
116+
* options.setTemplateLocation("gs://your-bucket/templates/my-template");
117+
* }</pre>
118+
*
119+
* @param value Cloud Storage path for storing the Dataflow template.
120+
*/
108121
void setTemplateLocation(String value);
109122

110123
/**
@@ -181,10 +194,8 @@ public interface DataflowPipelineOptions
181194
enum FlexResourceSchedulingGoal {
182195
/** No goal specified. */
183196
UNSPECIFIED,
184-
185197
/** Optimize for lower execution time. */
186198
SPEED_OPTIMIZED,
187-
188199
/** Optimize for lower cost. */
189200
COST_OPTIMIZED,
190201
}
@@ -198,6 +209,7 @@ enum FlexResourceSchedulingGoal {
198209

199210
/** Returns a default staging location under {@link GcpOptions#getGcpTempLocation}. */
200211
class StagingLocationFactory implements DefaultValueFactory<String> {
212+
201213
private static final Logger LOG = LoggerFactory.getLogger(StagingLocationFactory.class);
202214

203215
@Override

0 commit comments

Comments
 (0)