Skip to content

[Batch Inference] AttributeError when using bentoml.batch.run_in_spark - 'APIMethod' object has no attribute 'input' #5524

@Arlen-Du

Description

@Arlen-Du

Describe the bug

Overview

When attempting to use bentoml.batch.run_in_spark for batch inference, the job fails with an AttributeError: 'APIMethod' object has no attribute 'input'. The same BentoService works correctly via REST API, indicating the issue is specific to the Spark batch processing backend.

Environment

  • BentoML Version: 1.4.30
  • Python Version: 3.11
  • Spark Version: 3.4.3
  • Operating System: centos7

Expected Result

The run_in_spark function should successfully process the DataFrame and return a new DataFrame with prediction results.

Actual Result

The Spark job fails. The driver logs show AttributeError: 'APIMethod' object has no attribute 'input' originating from bentoml/_internal/batch/spark.py.

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/19 14:40:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
+---+----+---+----+                                                             
| id|name|age|addr|
+---+----+---+----+
|  1|   2|  3|   4|
+---+----+---+----+

INFO: Starting production HTTP BentoServer from "demo2024:1.0" listening on http://localhost:45085 (Press CTRL+C to quit)
2025-12-19 14:40:49,493 INFO - 服务 demo2024:1.0} 初始化完成
2025-12-19T14:40:51+0800 [INFO] [entry_service:demo2024:1] Service demo2024 initialized
WARNING: Client is deprecated and will be removed in BentoML 2.0, please use AsyncClient or SyncClient instead.
25/12/19 14:40:51 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/data/home/bgdata/miniconda3/envs/py311/lib/python3.11/site-packages/bentoml/_internal/batch/spark.py", line 86, in process
    func_input = inference_api.input.from_arrow(batch)
                 ^^^^^^^^^^^^^^^^^^^
AttributeError: 'APIMethod' object has no attribute 'input'

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561)
        at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:118)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:891)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:891)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
        at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at org.apache.spark.scheduler.Task.run(Task.scala:139)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
25/12/19 14:40:51 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1) (node3.kdp.com executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/data/home/bgdata/miniconda3/envs/py311/lib/python3.11/site-packages/bentoml/_internal/batch/spark.py", line 86, in process
    func_input = inference_api.input.from_arrow(batch)
                 ^^^^^^^^^^^^^^^^^^^
AttributeError: 'APIMethod' object has no attribute 'input'

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561)
        at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:118)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:891)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:891)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
        at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at org.apache.spark.scheduler.Task.run(Task.scala:139)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)

25/12/19 14:40:51 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job
Traceback (most recent call last):
  File "/data/home/bgdata/deploy/dataplatform-algo-worker/app/manager/task_manager.py", line 27, in <module>
    run_task()
  File "/data/home/bgdata/deploy/dataplatform-algo-worker/app/manager/task_manager.py", line 23, in run_task
    result.show()
  File "/data/home/bgdata/miniconda3/envs/py311/lib/python3.11/site-packages/pyspark/sql/dataframe.py", line 901, in show
    print(self._jdf.showString(n, 20, vertical))
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/data/home/bgdata/miniconda3/envs/py311/lib/python3.11/site-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
                   ^^^^^^^^^^^^^^^^^
  File "/data/home/bgdata/miniconda3/envs/py311/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py", line 175, in deco
    raise converted from None
pyspark.errors.exceptions.captured.PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/data/home/bgdata/miniconda3/envs/py311/lib/python3.11/site-packages/bentoml/_internal/batch/spark.py", line 86, in process
    func_input = inference_api.input.from_arrow(batch)
                 ^^^^^^^^^^^^^^^^^^^
AttributeError: 'APIMethod' object has no attribute 'input'

Code Example

Service Code (service.py): build as bento "demo2024:1.0"

# -*- coding: utf-8 -*-
import os
from http import HTTPStatus
import bentoml
import numpy as np
from bentoml.exceptions import BentoMLException
import logging

from app.utils.log_util import LogUtil

image = (bentoml.images.Image(base_image="python:3.13-slim",python_version="3.13")
         .python_packages("scikit-learn", "pandas","bentoml"))

# 配置日志
logger = LogUtil("demo2024-1.0",log_path="/tmp/logs/algo-worker")

@bentoml.service(
    name="demo2024",
    resources={"cpu": "0.5", "memory": "1GB"},
    traffic={"timeout": 60},
    image=image,
)
class service:
    # Declare the model as a class variable
    bento_model = bentoml.models.BentoModel("demo2024:1.0")
    def __init__(self):
        logger.info("服务 demo2024:1.0} 初始化完成")
        self.model = bentoml.sklearn.load_model(self.bento_model)

    @bentoml.task
    def predict(self, input_data: np.ndarray):
        try:
            logger.info(f"收到预测请求,输入数据为: {input_data.tolist()}")
            predict = self.model.predict(input_data)
            logger.info(f"预测完成,结果为: {predict.tolist()}")
            return predict.tolist()
        except Exception as e:
            logger.error(f"预测模型失败:{e}")
            raise BentoMLException(f"预测模型失败:{e}",error_code=HTTPStatus.BAD_REQUEST)

Task Code:

import bentoml
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType

def run_task():
    spark_session = SparkSession.builder.master("local[*]").getOrCreate()

    # 创建测试数据
    df = spark_session.sql("select 1 as id, 2 as name, 3 as age, 4 as addr")
    df.show()

    bento = bentoml.get("demo2024:1.0")

    result = bentoml.batch.run_in_spark(
        bento,
        df=df,
        spark=spark_session,
        api_name="predict",
        output_schema=StructType([
            StructField("pred", IntegerType(), True)
        ])
    )
    result.show()
    spark_session.stop()

if __name__ == '__main__':
    run_task()

To reproduce

No response

Expected behavior

No response

Environment

  • BentoML Version: 1.4.30
  • Python Version: 3.11
  • Spark Version: 3.4.3
  • Operating System: centos7

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions