Skip to content
9 changes: 7 additions & 2 deletions sdks/python/apache_beam/yaml/examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,13 @@ gcloud dataflow yaml run $JOB_NAME \

### ML

These examples leverage the built-in `Enrichment` transform for performing
ML enrichments.
Examples that include the built-in `Enrichment` transform for performing
ML enrichments:
- [bigquery_enrichment.yaml](transforms/ml/enrichment/bigquery_enrichment.yaml)
- [spanner_enrichment.yaml](transforms/ml/enrichment/spanner_enrichment.yaml)

Examples that include the `RunInference` transform for ML inference:
- [streaming_sentiment_analysis.yaml](transforms/ml/inference/streaming_sentiment_analysis.yaml)

More information can be found about aggregation transforms
[here](https://beam.apache.org/documentation/sdks/yaml-combine/).
142 changes: 135 additions & 7 deletions sdks/python/apache_beam/yaml/examples/testing/examples_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@
import apache_beam as beam
from apache_beam import PCollection
from apache_beam.examples.snippets.util import assert_matches_stdout
from apache_beam.ml.inference.base import PredictionResult
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.typehints.row_type import RowTypeConstraint
from apache_beam.utils import subprocess_server
from apache_beam.yaml import yaml_provider
from apache_beam.yaml import yaml_transform
Expand Down Expand Up @@ -120,14 +122,31 @@ def test_kafka_read(
bootstrap_servers,
auto_offset_reset_config,
consumer_config):
"""
This PTransform simulates the behavior of the ReadFromKafka transform
with the RAW format by simply using some fixed sample text data and
encode it to raw bytes.

Args:
pcoll: The input PCollection.
format: The format of the Kafka messages (e.g., 'RAW').
topic: The name of Kafka topic to read from.
bootstrap_servers: A list of Kafka bootstrap servers to connect to.
auto_offset_reset_config: A configuration for the auto offset reset
consumer_config: A dictionary containing additional consumer configurations

Returns:
A PCollection containing the sample text data in bytes.
"""

return (
pcoll | beam.Create(input_data.text_data().split('\n'))
| beam.Map(lambda element: beam.Row(payload=element.encode('utf-8'))))


@beam.ptransform.ptransform_fn
def test_pubsub_read(
pbegin,
pcoll,
topic: Optional[str] = None,
subscription: Optional[str] = None,
format: Optional[str] = None,
Expand All @@ -140,15 +159,58 @@ def test_pubsub_read(
pubsub_messages = input_data.pubsub_messages_data()

return (
pbegin
pcoll
| beam.Create([json.loads(msg.data) for msg in pubsub_messages])
| beam.Map(lambda element: beam.Row(**element)))


@beam.ptransform.ptransform_fn
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to test with a mock model hander instead of completely mocking the transform here ?

cc: @damccorm

def test_run_inference(pcoll, inference_tag, model_handler):
"""
This PTransform simulates the behavior of the RunInference transform.

Args:
pcoll: The input PCollection.
inference_tag: The tag to use for the returned inference.
model_handler: A configuration for the respective ML model handler

Returns:
A PCollection containing the enriched data.
"""
def _fn(row):
input = row._asdict()

row = {
inference_tag: PredictionResult(
input['comment_text'],
[{
'label': 'POSITIVE'
if 'happy' in input['comment_text'] else 'NEGATIVE',
'score': 0.95
}]),
**input
}

return beam.Row(**row)

user_type = RowTypeConstraint.from_user_type(pcoll.element_type.user_type)
user_schema_fields = [(name, type(typ) if not isinstance(typ, type) else typ)
for (name,
typ) in user_type._fields] if user_type else []
inference_output_type = RowTypeConstraint.from_fields([
('example', Any), ('inference', Any), ('model_id', Optional[str])
])
schema = RowTypeConstraint.from_fields(
user_schema_fields + [(str(inference_tag), inference_output_type)])

return pcoll | beam.Map(_fn).with_output_types(schema)


TEST_PROVIDERS = {
'TestEnrichment': test_enrichment,
'TestReadFromKafka': test_kafka_read,
'TestReadFromPubSub': test_pubsub_read
'TestReadFromPubSub': test_pubsub_read,
'TestRunInference': test_run_inference
}
"""
Transforms not requiring inputs.
Expand Down Expand Up @@ -238,7 +300,12 @@ def test_yaml_example(self):
actual += list(transform.outputs.values())
check_output(expected)(actual)

if 'deps' in pipeline_spec_file:
def _python_deps_involved(spec_filename):
return any(
substr in spec_filename
for substr in ['deps', 'streaming_sentiment_analysis'])

if _python_deps_involved(pipeline_spec_file):
test_yaml_example = pytest.mark.no_xdist(test_yaml_example)
test_yaml_example = unittest.skipIf(
sys.platform == 'win32', "Github virtualenv permissions issues.")(
Expand Down Expand Up @@ -457,7 +524,9 @@ def _kafka_test_preprocessor(
'test_pubsub_to_iceberg_yaml',
'test_oracle_to_bigquery_yaml',
'test_mysql_to_bigquery_yaml',
'test_spanner_to_bigquery_yaml'
'test_spanner_to_bigquery_yaml',
'test_streaming_sentiment_analysis_yaml',
'test_enrich_spanner_with_bigquery_yaml'
])
def _io_write_test_preprocessor(
test_spec: dict, expected: List[str], env: TestEnvironment):
Expand Down Expand Up @@ -782,9 +851,68 @@ def _db_io_read_test_processor(
return test_spec


@YamlExamplesTestSuite.register_test_preprocessor(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to add these as tests embedded in the YAML file under a "tests:" section instead of being implemented in Python ?

@derrickaw might know more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See previous comment

'test_streaming_sentiment_analysis_yaml')
def _streaming_sentiment_analysis_test_preprocessor(
test_spec: dict, expected: List[str], env: TestEnvironment):
"""
Preprocessor for tests that involve the streaming sentiment analysis example.

This preprocessor replaces several IO transforms and the RunInference
transform.
This allows the test to verify the pipeline's correctness without relying on
external data sources and the model hosted on VertexAI.

Args:
test_spec: The dictionary representation of the YAML pipeline specification.
expected: A list of strings representing the expected output of the
pipeline.
env: The TestEnvironment object providing utilities for creating temporary
files.

Returns:
The modified test_spec dictionary with ... transforms replaced.
"""
if pipeline := test_spec.get('pipeline', None):
for transform in pipeline.get('transforms', []):
if transform.get('type', '') == 'PyTransform' and transform.get(
'name', '') == 'ReadFromGCS':
transform['windowing'] = {'type': 'fixed', 'size': '30s'}

file_name = 'youtube-comments.csv'
local_path = env.input_file(file_name, INPUT_FILES[file_name])
transform['config']['kwargs']['file_pattern'] = local_path

if pipeline := test_spec.get('pipeline', None):
for transform in pipeline.get('transforms', []):
if transform.get('type', '') == 'ReadFromKafka':
config = transform['config']
transform['type'] = 'ReadFromCsv'
transform['config'] = {
k: v
for k, v in config.items() if k.startswith('__')
}
transform['config']['path'] = ""

file_name = 'youtube-comments.csv'
test_spec = replace_recursive(
test_spec,
transform['type'],
'path',
env.input_file(file_name, INPUT_FILES[file_name]))

if pipeline := test_spec.get('pipeline', None):
for transform in pipeline.get('transforms', []):
if transform.get('type', '') == 'RunInference':
transform['type'] = 'TestRunInference'

Comment on lines +876 to +908
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need three if statements or can we combine and have one for loop iteration looping through to check for each type of transform?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah because we're using replace_recursive it returns a new test_spec reference. You therefore would need to get the new pipeline reference.

return test_spec


INPUT_FILES = {
'products.csv': input_data.products_csv(),
'kinglear.txt': input_data.text_data()
'kinglear.txt': input_data.text_data(),
'youtube-comments.csv': input_data.youtube_comments_csv()
}

INPUT_TABLES = {
Expand Down Expand Up @@ -819,7 +947,7 @@ def _db_io_read_test_processor(
'../transforms/io/*.yaml')).run()
MLTest = YamlExamplesTestSuite(
'MLExamplesTest', os.path.join(YAML_DOCS_DIR,
'../transforms/ml/*.yaml')).run()
'../transforms/ml/**/*.yaml')).run()

if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down
9 changes: 9 additions & 0 deletions sdks/python/apache_beam/yaml/examples/testing/input_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ def products_csv():
])


def youtube_comments_csv():
return '\n'.join([
'video_id,comment_text,likes,replies',
'XpVt6Z1Gjjo,I AM HAPPY,1,1',
'XpVt6Z1Gjjo,I AM SAD,1,1',
'XpVt6Z1Gjjo,§ÁĐ,1,1'
])


def spanner_orders_data():
return [{
'order_id': 1,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

## Streaming Sentiment Analysis

The example leverages the `RunInference` transform with Vertex AI
model handler [VertexAIModelHandlerJSON](
https://beam.apache.org/releases/pydoc/current/apache_beam.yaml.yaml_ml#apache_beam.yaml.yaml_ml.VertexAIModelHandlerJSONProvider),
in addition to Kafka IO to demonstrate an end-to-end example of a
streaming sentiment analysis pipeline. The dataset to perform
sentiment analysis on is the YouTube video comments and can be found
on Kaggle [here](
https://www.kaggle.com/datasets/datasnaek/youtube?select=UScomments.csv).

Download the dataset and copy over to a GCS bucket:
```sh
gcloud storage cp /path/to/UScomments.csv gs://YOUR_BUCKET/UScomments.csv
```

For setting up Kafka, an option is to use [Click to Deploy](
https://console.cloud.google.com/marketplace/details/click-to-deploy-images/kafka?)
to quickly launch a Kafka cluster on GCE. See [here](
../../../README.md#kafka) for more context around using Kafka
with Dataflow.

A hosted model on Vertex AI is needed before being able to use
the Vertex AI model handler. One of the current state-of-the-art
NLP models is HuggingFace's DistilBERT, a distilled version of
BERT model and is faster at inference. To deploy DistilBERT on
Vertex AI, run this [notebook](
https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/community/model_garden/model_garden_huggingface_pytorch_inference_deployment.ipynb) in Colab Enterprise.

BigQuery is the pipeline's sink for the inference result output.
A BigQuery dataset needs to exist first before the pipeline can
create/write to a table. Run the following command to create
a BigQuery dataset:

```sh
bq --location=us-central1 mk \
--dataset DATASET_ID
```
See also [here](
https://cloud.google.com/bigquery/docs/datasets) for more details on
how to create BigQuery datasets

The pipeline first reads the YouTube comments .csv dataset from
GCS bucket and performs some clean-up before writing it to a Kafka
topic. The pipeline then reads from that Kafka topic and applies
various transformation logic before `RunInference` transform performs
remote inference with the Vertex AI model handler and DistilBERT
deployed to a Vertex AI endpoint. The inference result is then
parsed and written to a BigQuery table.

Run the pipeline (replace with appropriate variables in the command
below):

```sh
export PROJECT="$(gcloud config get-value project)"
export TEMP_LOCATION="gs://YOUR-BUCKET/tmp"
export REGION="us-central1"
export JOB_NAME="streaming-sentiment-analysis-`date +%Y%m%d-%H%M%S`"
export NUM_WORKERS="3"

python -m apache_beam.yaml.main \
--yaml_pipeline_file transforms/ml/sentiment_analysis/streaming_sentiment_analysis.yaml \
--runner DataflowRunner \
--temp_location $TEMP_LOCATION \
--project $PROJECT \
--region $REGION \
--num_workers $NUM_WORKERS \
--job_name $JOB_NAME \
--jinja_variables '{ "GCS_PATH": "gs://YOUR-BUCKET/USComments.csv",
"BOOTSTRAP_SERVERS": "BOOTSTRAP_IP_ADD:9092",
"TOPIC": "YOUR_TOPIC", "USERNAME": "KAFKA_USERNAME", "PASSWORD": "KAFKA_PASSWORD",
"ENDPOINT": "ENDPOINT_ID", "PROJECT": "PROJECT_ID", "LOCATION": "LOCATION",
"DATASET": "DATASET_ID", "TABLE": "TABLE_ID" }'
```
Loading
Loading