Skip to content

Nested queries in Flotilla could collide on idx or hang(on ray mode) #6547

@everySympathy

Description

@everySympathy

Describe the bug

Here, "nested query" mean a new DataFrame is somehow created inside a UDF, aka, one Daft query is started from inside the execution path of another Daft query.

There are two related problems in Flotilla when nested queries are involved.

  1. Flotilla now used DistributedPhysicalPlan.idx() to track in-flight plan state.
    idx() is only process-local, so a driver process and a Ray worker can both produce idx == "0" in a new process. This can cause collision because two different plans may have same idx, and in practice can fail with KeyError.

  2. A nested Flotilla runner can resolve back to the same named actor through get_if_exists=True.
    When that happens, the actor ends up waiting on work scheduled back onto itself, and the nested query can hang. see(https://docs.ray.io/en/latest/ray-core/actors/named-actors.html)

To Reproduce

It's very hard to produce the error from the daft user interface, because this is related to internal implementation. But this issue is easy to understand. The smaller scripts below are minimized internal reproducers for the two underlying failure modes:

Repro 1: process-local idx() can collide across processes

This is a minimized internal reproducer for the underlying Flotilla state collision.

Run this against the old implementation:

from __future__ import annotations

import tempfile
import traceback
from pathlib import Path

import pyarrow as pa
import pyarrow.parquet as pq
import ray

import daft
from daft.context import get_context
from daft.daft import DistributedPhysicalPlan, PyExecutionStats
from daft.runners.flotilla import RemoteFlotillaRunner


def build_plan(path: str, query_id: str) -> DistributedPhysicalPlan:
    cfg = get_context().daft_execution_config
    df = daft.read_parquet(path)
    builder = df._builder.optimize(cfg)
    return DistributedPhysicalPlan.from_logical_plan_builder(builder._builder, query_id, cfg)


@ray.remote
def build_remote_plan(path: str, query_id: str):
    plan = build_plan(path, query_id)
    return plan, plan.idx()


def drain_plan(runner, plan_id: str):
    values: list[str] = []
    while True:
        result = ray.get(runner.get_next_partition.remote(plan_id))
        if isinstance(result, PyExecutionStats):
            return values
        mp = ray.get(result.partition())
        values.extend(mp.to_pydict()["x"])


root = Path(tempfile.mkdtemp(prefix="daft-flotilla-repro-"))
driver_dir = root / "driver"
worker_dir = root / "worker"
driver_dir.mkdir()
worker_dir.mkdir()

pq.write_table(pa.table({"x": ["driver"]}), driver_dir / "part-0.parquet")
pq.write_table(pa.table({"x": ["worker"]}), worker_dir / "part-0.parquet")

ray.init(ignore_reinit_error=True, configure_logging=False, log_to_driver=False)
try:
    # a plan is build on drive process
    driver_plan = build_plan(str(driver_dir), "driver-query")
    # the code below mocks a plan is build on another process, in a ray worker
    worker_plan, worker_idx = ray.get(build_remote_plan.remote(str(worker_dir), "worker-query"))

    print("driver idx:", driver_plan.idx())
    print("worker idx:", worker_idx)

    runner = RemoteFlotillaRunner.remote(None)
    ray.get(runner.run_plan.remote(driver_plan, {}))
    ray.get(runner.run_plan.remote(worker_plan, {}))

    print("first drain:", drain_plan(runner, driver_plan.idx()))
    # error will happen here, because we use DistributedPhysicalPlan.idx(). We should not use this...
    print("second drain:", drain_plan(runner, worker_plan.idx()))
except Exception:
    traceback.print_exc()
finally:
    ray.shutdown()

Run it with:

DAFT_RUNNER=ray python repro_query_id.py

A typical result is:

 driver idx: 0
worker idx: 0
first drain: ['worker']
Traceback (most recent call last):
  File "/data00/home/wangzheyan/las-Daft/11.py", line 62, in <module>
    print("second drain:", drain_plan(runner, worker_plan.idx()))
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/data00/home/wangzheyan/las-Daft/11.py", line 33, in drain_plan
    result = ray.get(runner.get_next_partition.remote(plan_id))
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/data00/home/wangzheyan/las-Daft/.venv/lib/python3.11/site-packages/ray/_private/auto_init_hook.py", line 22, in auto_init_wrapper
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/data00/home/wangzheyan/las-Daft/.venv/lib/python3.11/site-packages/ray/_private/client_mode_hook.py", line 104, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/data00/home/wangzheyan/las-Daft/.venv/lib/python3.11/site-packages/ray/_private/worker.py", line 2972, in get
    values, debugger_breakpoint = worker.get_objects(
                                  ^^^^^^^^^^^^^^^^^^^
  File "/data00/home/wangzheyan/las-Daft/.venv/lib/python3.11/site-packages/ray/_private/worker.py", line 1031, in get_objects
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(KeyError): ray::RemoteFlotillaRunner.get_next_partition() (pid=2388433, ip=10.37.124.21, actor_id=2747a84c2f4c21cf0ad2094701000000, repr=<daft.runners.flotilla.RemoteFlotillaRunner object at 0x7fb217bdf0d0>)
  File "/usr/lib/python3.11/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/data00/home/wangzheyan/las-Daft/daft/runners/flotilla.py", line 396, in get_next_partition
    next_partition_ref = await self.curr_result_gens[plan_id].__anext__()
                               ~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^
KeyError: '0'

The problem is that two different plans from different processes can both use "0" as the Flotilla runtime key. One plan then overwrites the other in curr_result_gens, and
after the surviving entry is drained and removed, a later lookup for "0" fails with KeyError('0').

Repro 2: nested runner can resolve back to the same actor and hang

This is a minimal Ray reproducer for the self-targeting actor pattern behind the nested-runner hang.

  from __future__ import annotations

  import ray

  NAMESPACE = "daft-repro"
  NAME = "flotilla-plan-runner"

  ray.init(ignore_reinit_error=True, configure_logging=False, log_to_driver=False)


  @ray.remote(num_cpus=0)
  class DemoRunner:
      def inner(self):
          return "ok"

      def outer(self):
          nested = DemoRunner.options(
              name=NAME, # should not use the same name, but this is actually how daft now works like for nested queries
              namespace=NAMESPACE,
              get_if_exists=True,
          ).remote()
          return ray.get(nested.inner.remote())


  runner = DemoRunner.options(
      name=NAME,
      namespace=NAMESPACE,
      get_if_exists=True,
  ).remote()

  try:
      print(ray.get(runner.outer.remote(), timeout=5))
  except Exception as e:
      print(type(e).__name__)
      print(e)
  finally:
      ray.shutdown()

Run it with:

  python repro_actor_name.py

The result is:

GetTimeoutError
Get timed out: some object(s) not ready.

Expected behavior

  • Flotilla should not use a process-local idx() as the identity for in-flight plans across processes.
  • A nested Flotilla runner should not resolve back to the actor that is already running the outer query.

Component(s)

Distributed Runner (flotilla)

Additional context

skip_existing(#5931) is one real user-facing example of this(https://github.com/Eventual-Inc/Daft/pull/5931/changes#diff-c807f322bc143ad8ce42764686cffce06d87680a362dfd0cf500c53e79c0baf0R520). The outer query is the query the user starts, and during that execution skip_existing triggers an inner Daft query/collect to load existing keys. That inner query runs while the outer query is still executing inside Flotilla, so it exercises the nested-query path.

Metadata

Metadata

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions