Skip to content

Commit 7d159c7

Browse files
authored
Implemented ML Pipeline Continuous new table rows RunInference (#37647)
* Implemented ML Pipeline Continuous new table rows RunInference * Fix formatter, lint, and test for table row inference * Fix formatting * Fix pylint and RunInference singleton test * Fix Pylint in dataflow_cost_benchmark and singleton assertion in base_test * Fixed lint * Fixed formatting * Consolidate table row inference batch pipeline into main script and improve Pub/Sub utils * Fix README newline and clean table row example docstring formatting
1 parent 12f00ac commit 7d159c7

File tree

20 files changed

+1658
-51
lines changed

20 files changed

+1658
-51
lines changed

.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ jobs:
9292
${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Streaming_DistilBert_Base_Uncased.txt
9393
${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Batch_DistilBert_Base_Uncased.txt
9494
${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_VLLM_Gemma_Batch.txt
95+
${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Table_Row_Inference_Base.txt
9596
# The env variables are created and populated in the test-arguments-action as "<github.job>_test_arguments_<argument_file_paths_index>"
9697
- name: get current time
9798
run: echo "NOW_UTC=$(date '+%m%d%H%M%S' --utc)" >> $GITHUB_ENV
@@ -189,4 +190,26 @@ jobs:
189190
-Prunner=DataflowRunner \
190191
-PpythonVersion=3.10 \
191192
-PloadTest.requirementsTxtFile=apache_beam/ml/inference/torch_tests_requirements.txt \
192-
'-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_5 }} --job_name=benchmark-tests-pytorch-imagenet-python-gpu-${{env.NOW_UTC}} --output=gs://temp-storage-for-end-to-end-tests/torch/result_resnet152_gpu-${{env.NOW_UTC}}.txt'
193+
'-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_5 }} --job_name=benchmark-tests-pytorch-imagenet-python-gpu-${{env.NOW_UTC}} --output=gs://temp-storage-for-end-to-end-tests/torch/result_resnet152_gpu-${{env.NOW_UTC}}.txt'
194+
- name: run Table Row Inference Sklearn Batch
195+
uses: ./.github/actions/gradle-command-self-hosted-action
196+
timeout-minutes: 180
197+
with:
198+
gradle-command: :sdks:python:apache_beam:testing:load_tests:run
199+
arguments: |
200+
-PloadTest.mainClass=apache_beam.testing.benchmarks.inference.table_row_inference_benchmark \
201+
-Prunner=DataflowRunner \
202+
-PpythonVersion=3.10 \
203+
-PloadTest.requirementsTxtFile=apache_beam/ml/inference/table_row_inference_requirements.txt \
204+
'-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_9 }} --autoscaling_algorithm=NONE --metrics_table=result_table_row_inference_batch --influx_measurement=result_table_row_inference_batch --mode=batch --input_file=gs://apache-beam-ml/testing/inputs/table_rows_100k_benchmark.jsonl --input_expand_factor=100 --output_table=apache-beam-testing:beam_run_inference.result_table_row_inference_batch_outputs --job_name=benchmark-tests-table-row-inference-batch-${{env.NOW_UTC}}'
205+
- name: run Table Row Inference Sklearn Stream
206+
uses: ./.github/actions/gradle-command-self-hosted-action
207+
timeout-minutes: 180
208+
with:
209+
gradle-command: :sdks:python:apache_beam:testing:load_tests:run
210+
arguments: |
211+
-PloadTest.mainClass=apache_beam.testing.benchmarks.inference.table_row_inference_benchmark \
212+
-Prunner=DataflowRunner \
213+
-PpythonVersion=3.10 \
214+
-PloadTest.requirementsTxtFile=apache_beam/ml/inference/table_row_inference_requirements.txt \
215+
'-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_9 }} --autoscaling_algorithm=THROUGHPUT_BASED --max_num_workers=20 --metrics_table=result_table_row_inference_stream --influx_measurement=result_table_row_inference_stream --mode=streaming --input_subscription=projects/apache-beam-testing/subscriptions/table_row_inference_benchmark --window_size_sec=60 --trigger_interval_sec=30 --timeout_ms=900000 --output_table=apache-beam-testing:beam_run_inference.result_table_row_inference_stream_outputs --job_name=benchmark-tests-table-row-inference-stream-${{env.NOW_UTC}}'
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
--project=apache-beam-testing
19+
--region=us-central1
20+
--worker_machine_type=n1-standard-4
21+
--num_workers=10
22+
--disk_size_gb=50
23+
--autoscaling_algorithm=NONE
24+
--staging_location=gs://temp-storage-for-perf-tests/loadtests
25+
--temp_location=gs://temp-storage-for-perf-tests/loadtests
26+
--requirements_file=apache_beam/ml/inference/table_row_inference_requirements.txt
27+
--publish_to_big_query=true
28+
--metrics_dataset=beam_run_inference
29+
--metrics_table=result_table_row_inference_batch
30+
--input_options={}
31+
--influx_measurement=result_table_row_inference_batch
32+
--mode=batch
33+
--input_file=gs://apache-beam-ml/testing/inputs/table_rows_100k_benchmark.jsonl
34+
# 100k lines × 100 = 10M rows; use 1000 for 100M rows
35+
--input_expand_factor=100
36+
--model_path=gs://apache-beam-ml/models/sklearn_table_classifier.pkl
37+
--feature_columns=feature1,feature2,feature3,feature4,feature5
38+
--output_table=apache-beam-testing:beam_run_inference.result_table_row_inference_batch_outputs
39+
--runner=DataflowRunner
40+
--experiments=use_runner_v2
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
--project=apache-beam-testing
19+
--region=us-central1
20+
--worker_machine_type=n1-standard-4
21+
--num_workers=10
22+
--disk_size_gb=50
23+
--autoscaling_algorithm=THROUGHPUT_BASED
24+
--max_num_workers=20
25+
--staging_location=gs://temp-storage-for-perf-tests/loadtests
26+
--temp_location=gs://temp-storage-for-perf-tests/loadtests
27+
--requirements_file=apache_beam/ml/inference/table_row_inference_requirements.txt
28+
--publish_to_big_query=true
29+
--metrics_dataset=beam_run_inference
30+
--metrics_table=result_table_row_inference_stream
31+
--input_options={}
32+
--influx_measurement=result_table_row_inference_stream
33+
--mode=streaming
34+
--input_subscription=projects/apache-beam-testing/subscriptions/table_row_inference_benchmark
35+
--window_size_sec=60
36+
--trigger_interval_sec=30
37+
--timeout_ms=1800000
38+
--model_path=gs://apache-beam-ml/models/sklearn_table_classifier.pkl
39+
--feature_columns=feature1,feature2,feature3,feature4,feature5
40+
--output_table=apache-beam-testing:beam_run_inference.result_table_row_inference_stream_outputs
41+
--runner=DataflowRunner
42+
--experiments=use_runner_v2

.test-infra/tools/refresh_looker_metrics.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,10 @@
4343
("82", ["263", "264", "265", "266", "267"]), # PyTorch Sentiment Streaming DistilBERT base uncased
4444
("85", ["268", "269", "270", "271", "272"]), # PyTorch Sentiment Batch DistilBERT base uncased
4545
("86", ["284", "285", "286", "287", "288"]), # VLLM Batch Gemma
46+
("96", ["270", "304", "305", "353", "354"]), # Table Row Inference Sklearn Batch
47+
("106", ["355", "356", "357", "358", "359"]) # Table Row Inference Sklearn Streaming
4648
]
4749

48-
4950
def get_look(id: str) -> models.Look:
5051
look = next(iter(sdk.search_looks(id=id)), None)
5152
if not look:

sdks/python/apache_beam/examples/inference/README.md

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -968,4 +968,70 @@ and produce the following result in your output file location:
968968
An emperor penguin is an adorable creature that lives in Antarctica.
969969
```
970970
971-
---
971+
---
972+
## Table row inference
973+
974+
[`table_row_inference.py`](./table_row_inference.py) contains an implementation for a RunInference pipeline that processes structured table rows from a file or Pub/Sub, runs ML inference while preserving the table schema, and writes results to BigQuery. It supports both batch (file input) and streaming (Pub/Sub) modes.
975+
976+
### Prerequisites for table row inference
977+
978+
Install dependencies (or use `apache_beam/ml/inference/table_row_inference_requirements.txt` from the `sdks/python` directory):
979+
980+
```sh
981+
pip install apache-beam[gcp] scikit-learn google-cloud-pubsub
982+
```
983+
984+
For streaming mode you need a Pub/Sub topic and subscription, a BigQuery dataset, and a GCS bucket for model and temp files.
985+
986+
### Model and data for table row inference
987+
988+
1. Create a scikit-learn model and sample data using the provided utilities:
989+
990+
```sh
991+
python -m apache_beam.examples.inference.table_row_inference_utils --action=create_model --output_path=model.pkl --num_features=3
992+
python -m apache_beam.examples.inference.table_row_inference_utils --action=generate_data --output_path=input_data.jsonl --num_rows=1000 --num_features=3
993+
```
994+
995+
2. Input data should be JSONL with an `id` field and feature columns, for example:
996+
997+
```json
998+
{"id": "row_1", "feature1": 1.5, "feature2": 2.3, "feature3": 3.7}
999+
```
1000+
1001+
### Running `table_row_inference.py` (batch)
1002+
1003+
To run the table row inference pipeline in batch mode locally:
1004+
1005+
```sh
1006+
python -m apache_beam.examples.inference.table_row_inference \
1007+
--mode=batch \
1008+
--input_file=input_data.jsonl \
1009+
--output_table=PROJECT:DATASET.predictions \
1010+
--model_path=model.pkl \
1011+
--feature_columns=feature1,feature2,feature3 \
1012+
--runner=DirectRunner
1013+
```
1014+
1015+
### Running `table_row_inference.py` (streaming)
1016+
1017+
For streaming mode, use a Pub/Sub subscription and DataflowRunner. Set up a topic and subscription first, then run:
1018+
1019+
```sh
1020+
python -m apache_beam.examples.inference.table_row_inference \
1021+
--mode=streaming \
1022+
--input_subscription=projects/PROJECT/subscriptions/SUBSCRIPTION \
1023+
--output_table=PROJECT:DATASET.predictions \
1024+
--model_path=gs://BUCKET/model.pkl \
1025+
--feature_columns=feature1,feature2,feature3 \
1026+
--runner=DataflowRunner \
1027+
--project=PROJECT \
1028+
--region=us-central1 \
1029+
--temp_location=gs://BUCKET/temp \
1030+
--staging_location=gs://BUCKET/staging
1031+
```
1032+
1033+
See the script for full pipeline options (window size, trigger interval, worker settings, etc.).
1034+
1035+
Output is written to the BigQuery table with columns such as `row_key`, `prediction`, and the original input feature columns.
1036+
1037+
---

0 commit comments

Comments
 (0)