-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[docs] Add new Python multi-lang quickstart using the SchemaTransform framework #33360
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7639,6 +7639,8 @@ In this section, we will use [KafkaIO.Read](https://beam.apache.org/releases/jav | |
|
|
||
| #### 13.1.1. Creating cross-language Java transforms | ||
|
|
||
| For Beam versions 2.60.0+, please follow [this guide](sdks/python-custom-multi-language-pipelines-guide.md) instead. | ||
|
|
||
| There are two ways to make Java transforms available to other SDKs. | ||
|
|
||
| * Option 1: In some cases, you can use existing Java transforms from other SDKs without writing any additional Java code. | ||
|
|
@@ -8040,6 +8042,8 @@ input.apply( | |
|
|
||
| #### 13.2.2. Using cross-language transforms in a Python pipeline | ||
|
|
||
| For Beam versions 2.60.0+, please follow [this guide](sdks/python-custom-multi-language-pipelines-guide.md#use-the-portable-transform-in-a-python-pipeline) instead. | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this section actually need this disclaimer? I think consuming schema transforms is basically the same, right/nothing has changed for this section? |
||
| If a Python-specific wrapper for a cross-language transform is available, use that. Otherwise, you have to use the lower-level [`ExternalTransform`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.ExternalTransform) class to access the transform. | ||
|
|
||
| **Using an SDK wrapper** | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,216 @@ | ||||||||
| --- | ||||||||
| type: languages | ||||||||
| title: "Python multi-language pipelines quickstart" | ||||||||
| --- | ||||||||
| <!-- | ||||||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||||||
| you may not use this file except in compliance with the License. | ||||||||
| You may obtain a copy of the License at | ||||||||
|
|
||||||||
| http://www.apache.org/licenses/LICENSE-2.0 | ||||||||
|
|
||||||||
| Unless required by applicable law or agreed to in writing, software | ||||||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||
| See the License for the specific language governing permissions and | ||||||||
| limitations under the License. | ||||||||
| --> | ||||||||
|
|
||||||||
| # Python multi-language pipelines quickstart | ||||||||
|
|
||||||||
| This page provides instructions for running the updated multi-language examples for the Python SDK. For more details, see this step-by-step [guide](python-custom-multi-language-pipelines-guide.md) on creating and running custom multi-language transforms. | ||||||||
|
|
||||||||
| The code shown in this quickstart is available in a [collection of runnable examples](https://github.com/apache/beam/tree/master/examples/multi-language). | ||||||||
|
|
||||||||
| To build and run a multi-language Python pipeline, you need a Python environment with the Beam SDK installed. If you don’t have an environment set up, first complete the [Apache Beam Python SDK Quickstart](/get-started/quickstart-py/). | ||||||||
|
|
||||||||
| A *multi-language pipeline* is a pipeline that’s built in one Beam SDK language and uses transform(s) from another Beam SDK language. These “other-language” transforms are called [*cross-language transforms*](../glossary.md#cross-language-transforms). The idea is to make pipeline components easier to share across the Beam SDKs, and to grow the pool of available transforms for all the SDKs. In the examples below, the multi-language pipeline is built with the Beam Python SDK, and the cross-language transforms are built with the Beam Java SDK. | ||||||||
|
|
||||||||
| ## Create a cross-language transform | ||||||||
|
|
||||||||
| Here's a Java transform provider, [ExtractWordsProvider](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java), that is uniquely identified with the URN `"beam:schematransform:org.apache.beam:extract_words:v1"`. Given a Configuration object, it will provide a transform: | ||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you describe what the URN does? (in this context allows the transform to be identified across the language barrier) |
||||||||
|
|
||||||||
| ```java | ||||||||
| @AutoService(SchemaTransformProvider.class) | ||||||||
| public class ExtractWordsProvider extends TypedSchemaTransformProvider<Configuration> { | ||||||||
|
|
||||||||
| @Override | ||||||||
| public String identifier() { | ||||||||
| return "beam:schematransform:org.apache.beam:extract_words:v1"; | ||||||||
| } | ||||||||
|
|
||||||||
| @Override | ||||||||
| protected SchemaTransform from(Configuration configuration) { | ||||||||
| return new ExtractWordsTransform(configuration); | ||||||||
| } | ||||||||
| } | ||||||||
| ``` | ||||||||
| > **NOTE**: To ensure that your URN doesn't run into confilcts with URNs from other transforms, follow the URN conventions described [here](../programming-guide.md#1314-defining-a-urn). | ||||||||
|
|
||||||||
|
|
||||||||
| The config object is a simple Java object (POJO) that has fields required by the transform. AutoValue is encouraged, and the `@DefaultSchema` annotation helps Beam do some necessary conversions in the background: | ||||||||
| ```java | ||||||||
| @DefaultSchema(AutoValueSchema.class) | ||||||||
| @AutoValue | ||||||||
| protected abstract static class Configuration { | ||||||||
| public static Builder builder() { | ||||||||
| return new AutoValue_ExtractWordsProvider_Configuration.Builder(); | ||||||||
| } | ||||||||
|
|
||||||||
| @SchemaFieldDescription("List of words to drop.") | ||||||||
| public abstract List<String> getDrop(); | ||||||||
|
|
||||||||
| @AutoValue.Builder | ||||||||
| public abstract static class Builder { | ||||||||
| public abstract Builder setDrop(List<String> drop); | ||||||||
|
|
||||||||
| public abstract Configuration build(); | ||||||||
| } | ||||||||
| } | ||||||||
| ``` | ||||||||
|
|
||||||||
| Beam uses this configuration to generate a Python transform with the following signature: | ||||||||
| ```python | ||||||||
| Extract(drop=["foo", "bar"]) | ||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Saying the existing code snippet is a signature is not quite right. Thoughts on providing the full Python class definition? This might be a bit clearer. Alternately, we could change |
||||||||
| ``` | ||||||||
|
|
||||||||
| The transform can be any implementation of your choice, as long as it meets the requirements of a [SchemaTransform](../glossary.md#schematransform). For this example, the transform does the following: | ||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need to similarly describe what a valid configuration is above. I assume not all field types are valid? |
||||||||
|
|
||||||||
| ```java | ||||||||
| static class ExtractWordsTransform extends SchemaTransform { | ||||||||
| private static final Schema OUTPUT_SCHEMA = Schema.builder().addStringField("word").build(); | ||||||||
| private final List<String> drop; | ||||||||
|
|
||||||||
| ExtractWordsTransform(Configuration configuration) { | ||||||||
| this.drop = configuration.getDrop(); | ||||||||
| } | ||||||||
|
|
||||||||
| @Override | ||||||||
| public PCollectionRowTuple expand(PCollectionRowTuple input) { | ||||||||
| return PCollectionRowTuple.of( | ||||||||
| "output", | ||||||||
| input | ||||||||
| .getSinglePCollection() | ||||||||
| .apply( | ||||||||
| ParDo.of( | ||||||||
| new DoFn<Row, Row>() { | ||||||||
| @ProcessElement | ||||||||
| public void process(@Element Row element, OutputReceiver<Row> receiver) { | ||||||||
| // Split the line into words. | ||||||||
| String line = Preconditions.checkStateNotNull(element.getString("line")); | ||||||||
| String[] words = line.split("[^\\p{L}]+", -1); | ||||||||
| Arrays.stream(words) | ||||||||
| .filter(w -> !drop.contains(w)) | ||||||||
| .forEach( | ||||||||
| word -> | ||||||||
| receiver.output( | ||||||||
| Row.withSchema(OUTPUT_SCHEMA) | ||||||||
| .withFieldValue("word", word) | ||||||||
| .build())); | ||||||||
| } | ||||||||
| })) | ||||||||
| .setRowSchema(OUTPUT_SCHEMA)); | ||||||||
| } | ||||||||
| } | ||||||||
| ``` | ||||||||
|
|
||||||||
| This example uses other Java transform providers as well, [JavaCountProvider](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java) and [WriteWordsProvider](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java), but they follow the same pattern. | ||||||||
|
|
||||||||
| ## Choose an expansion service | ||||||||
|
|
||||||||
| When building a job for a multi-language pipeline, Beam uses an [expansion service](../glossary#expansion-service) to expand [composite transforms](../glossary#composite-transform). You must have at least one expansion service per remote SDK. | ||||||||
|
|
||||||||
| Before running a multi-language pipeline, you need to build an expansion service that can access your Java transform. It’s often easier to create a single shaded JAR that contains both. Both Python and Java dependencies will be staged for the runner by the Python SDK. | ||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I'm not sure what this is saying - both of what?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It might be nice to include an example command or additional info that shows how you can do this as well |
||||||||
|
|
||||||||
| Assuming you've built a JAR named **java-multilang-bundled-0.1.jar**, you can start the service with a command like the following, where `12345` is the port on which the expansion service will run: | ||||||||
|
|
||||||||
| ``` | ||||||||
| java -jar java-multilang-bundled-0.1.jar 12345 | ||||||||
| ``` | ||||||||
|
|
||||||||
| For instructions on running an example expansion service, see [this README](https://github.com/apache/beam/blob/master/examples/multi-language/README.md#instructions-for-running-the-pipelines). | ||||||||
|
|
||||||||
| ## Create a Python pipeline | ||||||||
|
|
||||||||
| Your Python pipeline can now use the [**ExternalTransformProvider**](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external_transform_provider.html#apache_beam.transforms.external_transform_provider.ExternalTransformProvider) API to configure your cross-language transform. Here’s an example constructed from [wordcount_external.py](https://github.com/apache/beam/blob/master/examples/multi-language/python/wordcount_external.py): | ||||||||
|
|
||||||||
| First, determine the transform identifiers you are looking for: | ||||||||
|
|
||||||||
| ```python | ||||||||
| EXTRACT_IDENTIFIER = "beam:schematransform:org.apache.beam:extract_words:v1" | ||||||||
| COUNT_IDENTIFIER = "beam:schematransform:org.apache.beam:count:v1" | ||||||||
| WRITE_IDENTIFIER = "beam:schematransform:org.apache.beam:write_words:v1" | ||||||||
| ``` | ||||||||
|
|
||||||||
| Then, initialize the `ExternalTransformProvider` with your expansion service. This can take two parameters: | ||||||||
|
|
||||||||
| * `expansion_services`: an expansion service, or list of expansion services | ||||||||
| * `urn_pattern`: (optional) a regex pattern to match valid transforms | ||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
It would be good to add information on what this does/what happens if it is missing |
||||||||
|
|
||||||||
| ```python | ||||||||
| provider = ExternalTransformProvider("localhost:" + expansion_service_port) | ||||||||
| ``` | ||||||||
|
|
||||||||
| Next, retrieve each portable transform: | ||||||||
| ```python | ||||||||
| Extract = provider.get_urn(EXTRACT_IDENTIFIER) | ||||||||
| Count = provider.get_urn(COUNT_IDENTIFIER) | ||||||||
| Write = provider.get_urn(WRITE_IDENTIFIER) | ||||||||
| ``` | ||||||||
|
|
||||||||
| Finally, build your multi-language Python pipeline using a mix of native and portable transforms: | ||||||||
| ```python | ||||||||
| with beam.Pipeline(options=pipeline_options) as p: | ||||||||
| _ = (p | ||||||||
| | 'Read' >> beam.io.ReadFromText(input_path) | ||||||||
| | 'Prepare Rows' >> beam.Map(lambda line: beam.Row(line=line)) | ||||||||
| | 'Extract Words' >> Extract(drop=["king", "palace"]) | ||||||||
| | 'Count Words' >> Count() | ||||||||
| | 'Format Text' >> beam.Map(lambda row: beam.Row(line="%s: %s" % ( | ||||||||
| row.word, row.count))).with_output_types( | ||||||||
| RowTypeConstraint.from_fields([('line', str)])) | ||||||||
| | 'Write' >> Write(file_path_prefix=output_path)) | ||||||||
| ``` | ||||||||
|
|
||||||||
| ## Run the pipeline | ||||||||
|
|
||||||||
| The exact commands for running the Python pipeline will vary based on your environment. Assuming that your pipeline is coded in a file named **wordcount_external.py**, the steps should be similar to those below. For more information, see [the comments in addprefix.py](https://github.com/apache/beam/blob/41d585f82b10195f758d14e3a54076ea1f05aa75/examples/multi-language/python/addprefix.py#L18-L40). | ||||||||
|
|
||||||||
| ### Run with direct runner | ||||||||
|
|
||||||||
| In the following command, `input1` is a file containing lines of text: | ||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably worth calling out that the expansion service needs to be started first (here and below in the Dataflow section) |
||||||||
|
|
||||||||
| ``` | ||||||||
| $ python wordcount_external.py \ | ||||||||
| --runner DirectRunner \ | ||||||||
| --input <INPUT FILE> \ | ||||||||
| --output <OUTPUT FILE> \ | ||||||||
| --expansion_service_port <PORT> | ||||||||
| ``` | ||||||||
|
|
||||||||
| ### Run with Dataflow runner | ||||||||
|
|
||||||||
| The following script runs the multi-language pipeline on Dataflow, using example text from a Cloud Storage bucket. You'll need to adapt the script to your environment. | ||||||||
|
|
||||||||
| ``` | ||||||||
| #!/bin/bash | ||||||||
| export GCP_PROJECT=<project> | ||||||||
| export GCS_BUCKET=<bucket> | ||||||||
| export TEMP_LOCATION=gs://$GCS_BUCKET/tmp | ||||||||
| export GCP_REGION=<region> | ||||||||
| export JOB_NAME="wordcount-external-`date +%Y%m%d-%H%M%S`" | ||||||||
| export NUM_WORKERS="1" | ||||||||
|
|
||||||||
| gsutil rm gs://$GCS_BUCKET/wordcount-external/* | ||||||||
|
|
||||||||
| python wordcount_external.py \ | ||||||||
| --runner DataflowRunner \ | ||||||||
| --temp_location $TEMP_LOCATION \ | ||||||||
| --project $GCP_PROJECT \ | ||||||||
| --region $GCP_REGION \ | ||||||||
| --job_name $JOB_NAME \ | ||||||||
| --num_workers $NUM_WORKERS \ | ||||||||
| --input "gs://dataflow-samples/shakespeare/kinglear.txt" \ | ||||||||
| --output "gs://$GCS_BUCKET/wordcount-external/output" \ | ||||||||
| --expansion_service_port <PORT> | ||||||||
| ``` | ||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this apply to the whole section or just
13.1.1.2? Do we need to recommend away fromJavaExternalTransformfor cases where it works?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, should we update this section to recommend the new way (even if its just linking to the full doc) by default, and just link to the legacy page for <2.60.0 instead of leaving all the content here?