Skip to content

Spark read hangs indefinitely #1463

@danbaron63

Description

@danbaron63

Environment

  • Running on AWS EKS.
  • Deployed using the Apache Spark Operator (not the kubeflow one).
  • Spark 3.5.7.
  • Using com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.43.1
  • Python 3.12

Problem

Reading from BigQuery using this connector causes spark to hang indefinitely on a single task with no progress. From INFORMATION_SCHEMA.JOBS it seems this query actually completed in seconds whereas the job is still waiting for data hours later.

Gemini believes this to be related to the gRPC connection timing out silently whilst waiting for the query to execute (took about 135 seconds). It then believes Spark just sits waiting for data, however, I'm unsure if this is just a hallucination.

Implementation

As this is proprietary code I cannot share it completely but here are the key parts.

We have a query function which generates a query to send to BigQuery:

def generate_query(
    start: int | None,
    end: int | None,
    timestamp_field: str,
    field_names: list[str],
    entity_field_names: list[str],
    project_id: str,
    dataset: str,
    table: str,
) -> str:
    where_predicates = []
    if start is not None:
        logger.info("Filtering from start timestamp: %s", start)
        where_predicates.append(f"UNIX_MILLIS({timestamp_field}) >= {start}")
    if end is not None:
        logger.info("Filtering to end timestamp: %s", end)
        where_predicates.append(f"UNIX_MILLIS({timestamp_field}) <= {end}")
    where = f"WHERE {" AND ".join(where_predicates)}" if where_predicates else ""

    return f"""
    SELECT {",".join(field_names)} FROM
    (
      SELECT
        ROW_NUMBER() OVER (
            PARTITION BY {",".join(entity_field_names)} ORDER BY {timestamp_field} DESC
        ) AS _rn,
        {",".join(field_names)}
      FROM `{project_id}.{dataset}`.{table}
      {where}
    )
    WHERE _rn=1
    """

We then send the query to spark like this:

query = generate_query(...)
df = spark.read.format("bigquery").option("viewsEnabled", "true").load(query)

# further processing in to a sink
df.foreachPartition(write...)

Actual query (redacted):

Here is the query, the column names and table location are redacted. Ultimately this is just doing deduplication over the primary keys taking the records with the latest timestamp:

SELECT REDACTED, timestamp_column FROM
    (
      SELECT
        ROW_NUMBER() OVER (
            PARTITION BY REDACTED ORDER BY timestamp_column DESC
        ) AS _rn,
        REDACTED, timestamp_column
      FROM `REDACTED.REDACTED`.REDACTED
      
    )
    WHERE _rn=1

Diagnostic information

Here's the SparkUI - in this case its currently at 30 mins but in other cases they're > 3 hours.
Image

Thread dump of the executor that's hanging:

Image Image Image

DAG:

Image

Logs (reverse order)

Driver logs:

"26/01/29 16:00:02 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on REDACTED:59428 (size: 23.3 KiB, free: 19.0 GiB)"
"26/01/29 16:00:02 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (REDACTED, executor 1, partition 0, PROCESS_LOCAL, 9130 bytes) "
"26/01/29 16:00:02 INFO BlockManagerMasterEndpoint: Registering block manager REDACTED with 19.0 GiB RAM, BlockManagerId(1, REDACTED, 59428, None)"
"26/01/29 16:00:02 INFO ExecutorMonitor: New executor 1 has registered (new total is 1)"
"26/01/29 16:00:02 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (REDACTED:37652) with ID 1,  ResourceProfileId 0"
"26/01/29 16:00:02 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: No executor found for REDACTED:37646"
"26/01/29 16:00:00 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources"
"26/01/29 15:59:45 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources"
"26/01/29 15:59:30 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources"
"26/01/29 15:59:15 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources"
"26/01/29 15:59:01 INFO BasicExecutorFeatureStep: Decommissioning not enabled, skipping shutdown script"
"26/01/29 15:59:01 INFO KubernetesClientUtils: Spark configuration files loaded from Some(/opt/spark/conf) : spark.kubernetes.namespace"
"26/01/29 15:59:01 INFO ExecutorPodsAllocator: Found 0 reusable PVCs from 0 PVCs"
"26/01/29 15:59:01 INFO ExecutorPodsAllocator: Going to request 1 executors from Kubernetes for ResourceProfile Id: 0, target: 1, known: 0, sharedSlotFromPendingPods: 2147483647."
"26/01/29 15:59:01 INFO ExecutorAllocationManager: Requesting 1 new executor because tasks are backlogged (new desired total will be 1 for resource profile id: 0)"
"26/01/29 15:59:00 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks resource profile 0"
"26/01/29 15:59:00 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (PythonRDD[5] at foreachPartition at /usr/app/materialise.py:188) (first 15 tasks are for partitions Vector(0))"
"26/01/29 15:59:00 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1611"
"26/01/29 15:59:00 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on REDACTED.spark.svc:59429 (size: 23.3 KiB, free: 4.6 GiB)"
"26/01/29 15:59:00 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 23.3 KiB, free 4.6 GiB)"
"26/01/29 15:59:00 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 52.1 KiB, free 4.6 GiB)"
"26/01/29 15:58:59 INFO DAGScheduler: Submitting ResultStage 0 (PythonRDD[5] at foreachPartition at /usr/app/materialise.py:188), which has no missing parents"
"26/01/29 15:58:59 INFO DAGScheduler: Missing parents: List()"
"26/01/29 15:58:59 INFO DAGScheduler: Parents of final stage: List()"
"26/01/29 15:58:59 INFO DAGScheduler: Final stage: ResultStage 0 (foreachPartition at /usr/app/materialise.py:188)"
"26/01/29 15:58:59 INFO DAGScheduler: Got job 0 (foreachPartition at /usr/app/materialise.py:188) with 1 output partitions"
"26/01/29 15:58:59 INFO SparkContext: Starting job: foreachPartition at /usr/app/materialise.py:188"
"26/01/29 15:58:59 INFO CodeGenerator: Code generated in 97.708746 ms"
"26/01/29 15:58:59 INFO BigQueryRDDFactory: Created read session for table 'REDACTED.REDACTED.REDACTED': projects/REDACTED/locations/REDACTED/sessions/REDACTED"
"26/01/29 15:58:59 INFO ReadSessionCreator: Received 1 partitions from the BigQuery Storage API for session projects/REDACTED/locations/REDACTED/sessions/REDACTED. Notice that the number of streams in actual may be lower than the requested number, depending on the amount parallelism that is reasonable for the table and the maximum amount of parallelism allowed by the system."
"26/01/29 15:58:59 INFO ReadSessionCreator: Read session:{""readSessionName"":""projects/REDACTED/locations/REDACTED/sessions/REDACTED"",""readSessionCreationStartTime"":""2026-01-29T15:58:58.428181676Z"",""readSessionCreationEndTime"":""2026-01-29T15:58:59.599529128Z"",""readSessionPrepDuration"":355,""readSessionCreationDuration"":815,""readSessionDuration"":1171}"
"26/01/29 15:58:58 INFO ReadSessionCreator: |creation a read session for table REDACTED.REDACTED.REDACTED, parameters: |selectedFields=[REDACTED],|filter=[None]|snapshotTimeMillis[None]"
"26/01/29 15:58:58 INFO BigQueryClientFactory: Channel pool size set to 8"
"26/01/29 15:58:58 INFO DirectBigQueryRelation: |Querying table REDACTED.REDACTED.REDACTED created from ""REDACTED(SEE QUERY)
"26/01/29 15:58:52 INFO BigQueryClient: running query [REDACTED(SEE QUERY)
"26/01/29 15:58:52 INFO BigQueryClient: DestinationTable is automatically generated"
"26/01/29 15:58:52 INFO SparkBigQueryConnectorModule: Registering cleanup jobs listener, should happen just once"
"26/01/29 15:58:51 INFO SharedState: Warehouse path is 'file:/opt/spark/work-dir/spark-warehouse'."
"26/01/29 15:58:51 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir."
"26/01/29 15:58:51 INFO KubernetesClusterSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8"
"26/01/29 15:58:51 INFO ExecutorAllocationManager: Dynamic allocation is enabled without a shuffle service."
"26/01/29 15:58:51 INFO Utils: Using initial executors = 0, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances"
"26/01/29 15:58:51 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, REDACTED, 59429, None)"
"26/01/29 15:58:51 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, REDACTED, 59429, None)"
"26/01/29 15:58:51 INFO BlockManagerMasterEndpoint: Registering block manager REDACTED:59429 with 4.6 GiB RAM, BlockManagerId(driver, REDACTED, 59429, None)"
"26/01/29 15:58:51 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, REDACTED, 59429, None)"
"26/01/29 15:58:51 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy"
"26/01/29 15:58:51 INFO NettyBlockTransferService: Server created on REDACTED REDACTED:59429"
"26/01/29 15:58:51 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 59429."
"26/01/29 15:58:51 INFO KubernetesClientUtils: Spark configuration files loaded from Some(/opt/spark/conf) : spark.kubernetes.namespace"
"26/01/29 15:58:51 INFO Utils: Using initial executors = 0, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances"
"26/01/29 15:58:50 INFO SparkKubernetesClientFactory: Auto-configuring K8S client using current context from users K8S config file"
"26/01/29 15:58:50 WARN SparkContext: File with 'local' scheme local:/usr/app/materialise.py is not supported to add to file server, since it is already available on every node."
"26/01/29 15:58:50 INFO SparkContext: Added JAR local:///usr/app/lib-all.jar at file:/usr/app/lib-all.jar with timestamp 1769702330259"
"26/01/29 15:58:50 INFO Utils: Successfully started service 'SparkUI' on port 4040."
"26/01/29 15:58:50 INFO JettyUtils: Start Jetty 0.0.0.0:4040 for SparkUI"
"26/01/29 15:58:50 INFO SparkEnv: Registering OutputCommitCoordinator"
"26/01/29 15:58:50 INFO MemoryStore: MemoryStore started with capacity 4.6 GiB"
"26/01/29 15:58:50 INFO DiskBlockManager: Created local directory at /var/data/REDACTED"
"26/01/29 15:58:50 INFO SparkEnv: Registering BlockManagerMasterHeartbeat"
"26/01/29 15:58:50 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up"
"26/01/29 15:58:50 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information"
"26/01/29 15:58:50 INFO SparkEnv: Registering BlockManagerMaster"
"26/01/29 15:58:50 INFO SparkEnv: Registering MapOutputTracker"
"26/01/29 15:58:50 INFO Utils: Successfully started service 'sparkDriver' on port 59427."
"26/01/29 15:58:50 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: spark; groups with view permissions: EMPTY; users with modify permissions: spark; groups with modify permissions: EMPTY"
"26/01/29 15:58:50 INFO SecurityManager: Changing modify acls groups to: "
"26/01/29 15:58:50 INFO SecurityManager: Changing view acls groups to: "
"26/01/29 15:58:50 INFO SecurityManager: Changing modify acls to: spark"
"26/01/29 15:58:50 INFO SecurityManager: Changing view acls to: spark"
"26/01/29 15:58:50 INFO ResourceProfileManager: Added ResourceProfile id: 0"
"26/01/29 15:58:50 INFO ResourceProfile: Limiting resource is cpus at 8 tasks per executor"
"26/01/29 15:58:50 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 8, script: , vendor: , memory -> name: memory, amount: 32768, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)"
"26/01/29 15:58:50 INFO SparkContext: Submitted application: "
"26/01/29 15:58:50 INFO ResourceUtils: =============================================================="
"26/01/29 15:58:50 INFO ResourceUtils: No custom resources configured for spark.driver."
"26/01/29 15:58:50 INFO ResourceUtils: =============================================================="
"26/01/29 15:58:50 INFO SparkContext: Java version 17.0.18"
"26/01/29 15:58:50 INFO SparkContext: OS info Linux, 6.12.63, amd64"
"26/01/29 15:58:50 INFO SparkContext: Running Spark version 3.5.7"
"26/01/29 15:58:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable"
"Files local:///usr/app/materialise.py from /usr/app/materialise.py to /opt/spark/work-dir/materialise.py"
"Files local:///usr/app/lib-all.jar from /usr/app/lib-all.jar to /opt/spark/work-dir/lib-all.jar"

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions