Skip to content

[YAML] A Streaming Inference Pipeline - YouTube Comments Sentiment Analysis#35375

Merged
chamikaramj merged 13 commits intoapache:masterfrom
charlespnh:yaml-streaming-inference
Jul 22, 2025
Merged

[YAML] A Streaming Inference Pipeline - YouTube Comments Sentiment Analysis#35375
chamikaramj merged 13 commits intoapache:masterfrom
charlespnh:yaml-streaming-inference

Conversation

@charlespnh
Copy link
Contributor

@charlespnh charlespnh commented Jun 19, 2025

Please add a meaningful description for your change here

Part of a larger effort #35069 and #35068 to add more examples involving Kafka and ML use cases.

The pipeline first reads the YouTube comments .csv dataset from
GCS bucket and performs some clean-up before writing it to a Kafka
topic. The pipeline then reads from that Kafka topic and applies
various transformation logic before RunInference transform performs
remote inference with the Vertex AI model handler and DistilBERT
deployed to a Vertex AI endpoint. The inference result is then
parsed and written to a BigQuery table.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@charlespnh
Copy link
Contributor Author

charlespnh commented Jun 19, 2025

In order for the example to be self-contained as much as possible, I decided to have the pipeline to both pushes the data from GCS to a Kafka topic and also reads from that same topic. It won't be a proper streaming pipeline, but the user doesn't have to separately do additional work to push the data to the Kafka topic before being able to finally run the example.

Also, this example doesn't work with Beam 2.65.0. During the job submission the RunInference transform fails with unexpected keyword argument env_vars, but this has recently been fixed https://github.com/apache/beam/pull/35022/files#diff-00ce93be0981df61026196248c944f073d8ba1bdae9d8cb8bb2710e4fd3494b0L145. I've been testing the pipeline on master branch and overriding with custom sdk harness containers.

@charlespnh
Copy link
Contributor Author

cc @chamikaramj and @damccorm for feedbacks on whether this example makes sense

@github-actions
Copy link
Contributor

Assigning reviewers:

R: @jrmccluskey for label python.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@github-actions
Copy link
Contributor

github-actions bot commented Jul 1, 2025

Reminder, please take a look at this pr: @jrmccluskey

@charlespnh charlespnh force-pushed the yaml-streaming-inference branch from 3863b50 to 47e4a8d Compare July 3, 2025 20:57
@codecov
Copy link

codecov bot commented Jul 5, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 56.53%. Comparing base (d5eea11) to head (2666307).
Report is 3 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff              @@
##             master   #35375      +/-   ##
============================================
- Coverage     56.53%   56.53%   -0.01%     
  Complexity     3319     3319              
============================================
  Files          1199     1199              
  Lines        183097   183099       +2     
  Branches       3426     3426              
============================================
- Hits         103519   103515       -4     
- Misses        76279    76285       +6     
  Partials       3299     3299              
Flag Coverage Δ
python 80.77% <100.00%> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@charlespnh
Copy link
Contributor Author

Retest this please

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

names=['video_id', 'comment_text', 'likes', 'replies'],
on_bad_lines='skip',
converters={'likes': _to_int, 'replies': _to_int})
| beam.Filter(lambda row:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we are filtering out records that has None or any field ? Can we include such records with default values for missing fields or some representation of None ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are csv rows that look like this:

video_id,comment_text,likes,replies
XpVt6Z1Gjjo,I am always,happy0,0

I had the helper function _to_int to try to convert the likes and replies columns when reading the csv file, and in this case it converts to None. The following beam.Filter is to filter out this case. Even if we replace with some default values instead of None, I still think it doesn't make sense to keep broken rows like this...

# limitations under the License.
#

# The pipeline first reads the YouTube comments .csv dataset from GCS bucket
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this part pluggable so that someone who has a true Kafka topic with valid values we read here can just use that to execute a true streaming pipeline ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can introduce conditional branch with jinja templatization [1] by adding, for example,

{% if true_streaming == "true" %} ... {% endif %}

... but testing is no longer straightforward with how the current test suit is set up (you have to start passing jinja variables in addition to this yaml pipeline)...

[1] https://beamsummit.org/slides/2024/BeamYAML_Advancedtopics.pdf

bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}"
producer_config_updates:
sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \
username={{ USERNAME }} \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should update this to use secret managers [1] when it's available. @damccorm is this at a state so that we can try (or is there an ETA) ?

[1] https://docs.google.com/document/d/1Ng00Kw-vnG9kKd3RteC6Q5_YN8yDtXRDHkn570GPQrY/edit?tab=t.0#heading=h.c0uts5ftkk58

auto_offset_reset_config: earliest
consumer_config:
sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \
username={{ USERNAME }} \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto regarding secret managers

# comment string, e.g. emojis, ascii characters outside
# the common day-to-day English.
- type: MapToFields
name: RemoveWeirdCharacters
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May users of the pipeline expect to see such characters ? Can we preserve using a different character encoding ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The model distilbert-base-uncased-finetuned-sst-2-english is trained on the text corpus that doesn't include emojis or non-printable ascii characters [1]. Otherwise we would have false positives, e.g. 😩 or §ÁĐ always give POSITIVE label. I don't think it's meaningful to keep these kind of characters.

[1] https://www.ascii-code.com/

| beam.Map(lambda element: beam.Row(**element)))


@beam.ptransform.ptransform_fn
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to test with a mock model hander instead of completely mocking the transform here ?

cc: @damccorm

return test_spec


@YamlExamplesTestSuite.register_test_preprocessor(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to add these as tests embedded in the YAML file under a "tests:" section instead of being implemented in Python ?

@derrickaw might know more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See previous comment


These examples leverage the built-in `Enrichment` transform for performing
ML enrichments.
These examples include the built-in `Enrichment` transform for performing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably better to list the examples here since we'll add more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added


These examples leverage the built-in `Enrichment` transform for performing
ML enrichments.
These examples include the built-in `Enrichment` transform for performing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably better to list the examples here since we'll add more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

gcloud storage cp /path/to/UScomments.csv gs://YOUR_BUCKET/UScomments.csv
```

For setting up Kafka, an option is to use [Click to Deploy](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good to introduce a script that sets up resources for executing the pipeline if possible. Beam already has tests that startup a Docker-based Kafka in GCP and create BQ datasets.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting up Kafka with Click to Deploy only takes a few button clicks. This doesn't require any manual setup/installation. I've also added link to our existing Kafka pipeline example.

As for creating BQ dataset, I've added a command line.

Comment on lines +179 to +180
from apache_beam.ml.inference.base import PredictionResult
from apache_beam.typehints.row_type import RowTypeConstraint
Copy link
Collaborator

@derrickaw derrickaw Jul 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets put the imports at the top of file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Comment on lines +877 to +908
if pipeline := test_spec.get('pipeline', None):
for transform in pipeline.get('transforms', []):
if transform.get('type', '') == 'PyTransform' and transform.get(
'name', '') == 'ReadFromGCS':
transform['windowing'] = {'type': 'fixed', 'size': '30s'}

file_name = 'youtube-comments.csv'
local_path = env.input_file(file_name, INPUT_FILES[file_name])
transform['config']['kwargs']['file_pattern'] = local_path

if pipeline := test_spec.get('pipeline', None):
for transform in pipeline.get('transforms', []):
if transform.get('type', '') == 'ReadFromKafka':
config = transform['config']
transform['type'] = 'ReadFromCsv'
transform['config'] = {
k: v
for k, v in config.items() if k.startswith('__')
}
transform['config']['path'] = ""

file_name = 'youtube-comments.csv'
test_spec = replace_recursive(
test_spec,
transform['type'],
'path',
env.input_file(file_name, INPUT_FILES[file_name]))

if pipeline := test_spec.get('pipeline', None):
for transform in pipeline.get('transforms', []):
if transform.get('type', '') == 'RunInference':
transform['type'] = 'TestRunInference'

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need three if statements or can we combine and have one for loop iteration looping through to check for each type of transform?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah because we're using replace_recursive it returns a new test_spec reference. You therefore would need to get the new pipeline reference.

@charlespnh
Copy link
Contributor Author

charlespnh commented Jul 18, 2025

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. LGTM.

We can merge when Derrick's comments are addressed.

@chamikaramj
Copy link
Contributor

Retest this please

@chamikaramj
Copy link
Contributor

Please fix the lint failure.

conftest.py gen_protos.py setup.py  
Running pylint...  
************* Module apache_beam.yaml.examples.testing.examples_test  
apache_beam/yaml/examples/testing/examples_test.py:863:0: C0301: Line too long (82/80) (line-too-long)

@chamikaramj
Copy link
Contributor

Also, ML test suite seems to be green at HEAD ?

https://github.com/apache/beam/actions/workflows/beam_PreCommit_Python_ML.yml

@charlespnh charlespnh force-pushed the yaml-streaming-inference branch from 65f26c0 to 1475054 Compare July 22, 2025 14:20
@charlespnh charlespnh force-pushed the yaml-streaming-inference branch from 715c84a to 386641a Compare July 22, 2025 17:12
@charlespnh
Copy link
Contributor Author

Apologies for the super late update on this one... Tests have finally passed now.

Thanks for the review!

@chamikaramj chamikaramj merged commit 3415378 into apache:master Jul 22, 2025
91 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants