Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion benchmarking/tpch/data_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
import math
import os
import shlex
import sqlite3
try:
import sqlite3
except ImportError:
sqlite3 = None
import subprocess
from glob import glob

Expand Down
15 changes: 15 additions & 0 deletions daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,21 @@ def _notify_optimization_start(self, query_id: str) -> None:
def _notify_optimization_end(self, query_id: str, optimized_plan: str) -> None:
self._ctx.notify_optimization_end(query_id, optimized_plan)

def _notify_exec_start(self, query_id: str, physical_plan: str) -> None:
self._ctx.notify_exec_start(query_id, physical_plan)

def _notify_exec_end(self, query_id: str) -> None:
self._ctx.notify_exec_end(query_id)

def _notify_exec_operator_start(self, query_id: str, node_id: int) -> None:
self._ctx.notify_exec_operator_start(query_id, node_id)

def _notify_exec_operator_end(self, query_id: str, node_id: int) -> None:
self._ctx.notify_exec_operator_end(query_id, node_id)

def _notify_exec_emit_stats(self, query_id: str, node_id: int, stats: dict[str, int]) -> None:
self._ctx.notify_exec_emit_stats(query_id, node_id, stats)

def _notify_result_out(self, query_id: str, result: PartitionT) -> None:
from daft.recordbatch.micropartition import MicroPartition

Expand Down
19 changes: 18 additions & 1 deletion daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -2016,6 +2016,7 @@ class DistributedPhysicalPlan:
def num_partitions(self) -> int: ...
def repr_ascii(self, simple: bool) -> str: ...
def repr_mermaid(self, options: MermaidOptions) -> str: ...
def repr_json(self) -> str: ...

class DistributedPhysicalPlanRunner:
def __init__(self) -> None: ...
Expand Down Expand Up @@ -2216,8 +2217,18 @@ class QueryEndState(Enum):
class PyQueryMetadata:
output_schema: PySchema
unoptimized_plan: str
runner: str
ray_dashboard_url: str | None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why emit the ray dashboard URL?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image Here is a link to the ray task

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah makes sense, good idea

entrypoint: str | None

def __init__(self, output_schema: PySchema, unoptimized_plan: str) -> None: ...
def __init__(
self,
output_schema: PySchema,
unoptimized_plan: str,
runner: str,
ray_dashboard_url: str | None = None,
entrypoint: str | None = None,
) -> None: ...

class PyQueryResult:
end_state: QueryEndState
Expand All @@ -2244,6 +2255,11 @@ class PyDaftContext:
def notify_result_out(self, query_id: str, result: PartitionT) -> None: ...
def notify_optimization_start(self, query_id: str) -> None: ...
def notify_optimization_end(self, query_id: str, optimized_plan: str) -> None: ...
def notify_exec_start(self, query_id: str, physical_plan: str) -> None: ...
def notify_exec_end(self, query_id: str) -> None: ...
def notify_exec_operator_start(self, query_id: str, node_id: int) -> None: ...
def notify_exec_operator_end(self, query_id: str, node_id: int) -> None: ...
def notify_exec_emit_stats(self, query_id: str, node_id: int, stats: dict[str, int]) -> None: ...

def set_runner_ray(
address: str | None = None,
Expand All @@ -2256,6 +2272,7 @@ def get_or_create_runner() -> Runner[PartitionT]: ...
def get_or_infer_runner_type() -> str: ...
def get_runner() -> Runner[PartitionT] | None: ...
def get_context() -> PyDaftContext: ...
def refresh_dashboard_subscriber() -> None: ...
def build_type() -> str: ...
def version() -> str: ...
def refresh_logger() -> None: ...
Expand Down
2 changes: 1 addition & 1 deletion daft/gravitino/gravitino_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from urllib.parse import urlparse

from daft.dependencies import requests
from daft.io import AzureConfig, GravitinoConfig, IOConfig, S3Config
from daft.io import AzureConfig, IOConfig, S3Config


@dataclasses.dataclass(frozen=True)
Expand Down
2 changes: 0 additions & 2 deletions daft/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from daft.daft import (
AzureConfig,
GCSConfig,
GravitinoConfig,
IOConfig,
HTTPConfig,
S3Config,
Expand Down Expand Up @@ -38,7 +37,6 @@
"DataSource",
"DataSourceTask",
"GCSConfig",
"GravitinoConfig",
"HTTPConfig",
"HuggingFaceConfig",
"IOConfig",
Expand Down
31 changes: 22 additions & 9 deletions daft/runners/flotilla.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,18 @@ def try_autoscale(bundles: list[dict[str, int]]) -> None:
num_cpus=0,
)
class RemoteFlotillaRunner:
def __init__(self) -> None:
def __init__(self, dashboard_url: str | None = None) -> None:
if dashboard_url:
os.environ["DAFT_DASHBOARD_URL"] = dashboard_url
try:
from daft.daft import refresh_dashboard_subscriber

refresh_dashboard_subscriber()
except ImportError:
pass
except Exception:
pass

self.curr_plans: dict[str, DistributedPhysicalPlan] = {}
self.curr_result_gens: dict[str, AsyncIterator[RayPartitionRef]] = {}
self.plan_runner = DistributedPhysicalPlanRunner()
Expand Down Expand Up @@ -340,29 +351,31 @@ class FlotillaRunner:

def __init__(self) -> None:
head_node_id = get_head_node_id()
dashboard_url = os.environ.get("DAFT_DASHBOARD_URL")
self.runner = RemoteFlotillaRunner.options( # type: ignore
name=get_flotilla_runner_actor_name(),
namespace=FLOTILLA_RUNNER_NAMESPACE,
get_if_exists=True,
runtime_env={"env_vars": {"DAFT_DASHBOARD_URL": dashboard_url}} if dashboard_url else None,
scheduling_strategy=(
ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
node_id=head_node_id,
soft=False,
)
if head_node_id is not None
else "DEFAULT"
if head_node_id
else None
),
).remote()
).remote(dashboard_url=dashboard_url)

def stream_plan(
self,
plan: DistributedPhysicalPlan,
partition_sets: dict[str, PartitionSet[ray.ObjectRef]],
partition_sets: dict[str, PartitionSet[RayMaterializedResult]],
) -> Iterator[RayMaterializedResult]:
self.runner.run_plan.remote(plan, partition_sets)
plan_id = plan.idx()
ray.get(self.runner.run_plan.remote(plan, partition_sets))
while True:
materialized_result = ray.get(self.runner.get_next_partition.remote(plan_id))
if materialized_result is None:
result = ray.get(self.runner.get_next_partition.remote(plan_id))
if result is None:
break
yield materialized_result
yield result
17 changes: 16 additions & 1 deletion daft/runners/native_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,25 @@ def run_iter(
output_schema = builder.schema()

# Optimize the logical plan.
ctx._notify_query_start(query_id, PyQueryMetadata(output_schema._schema, builder.repr_json()))
import sys
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Import statements should be at the top of the file. Move import sys to the imports section at the beginning of the file (around line 3-4) per the project's import style guidelines.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Prompt To Fix With AI
This is a comment left during a code review.
Path: daft/runners/native_runner.py
Line: 99:99

Comment:
[P2] Import statements should be at the top of the file. Move `import sys` to the imports section at the beginning of the file (around line 3-4) per the project's import style guidelines.

<sub>Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!</sub>

How can I resolve this? If you propose a fix, please make it concise.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree


entrypoint = "python " + " ".join(sys.argv)
ctx._notify_query_start(
query_id,
PyQueryMetadata(
output_schema._schema,
builder.repr_json(),
"Native (Swordfish)",
ray_dashboard_url=None,
entrypoint=entrypoint,
),
)
ctx._notify_optimization_start(query_id)
builder = builder.optimize(ctx.daft_execution_config)
ctx._notify_optimization_end(query_id, builder.repr_json())

plan = LocalPhysicalPlan.from_logical_plan_builder(builder._builder)

executor = NativeExecutor()
results_gen = executor.run(
plan,
Expand All @@ -112,8 +125,10 @@ def run_iter(
)

try:
total_rows = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why collect this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a debug code; I deleted something here.

for result in results_gen:
ctx._notify_result_out(query_id, result.partition())
total_rows += len(result.partition())
yield result
except KeyboardInterrupt as e:
query_result = PyQueryResult(QueryEndState.Canceled, "Query canceled by the user.")
Expand Down
91 changes: 81 additions & 10 deletions daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from __future__ import annotations

import json
import logging
import os
import sys
import time
import uuid
from collections.abc import Generator, Iterable, Iterator
Expand Down Expand Up @@ -45,6 +48,9 @@
FileFormatConfig,
FileInfos,
IOConfig,
PyQueryMetadata,
PyQueryResult,
QueryEndState,
)
from daft.datatype import DataType
from daft.filesystem import glob_path_with_stats
Expand Down Expand Up @@ -548,19 +554,84 @@ def run_iter(
ctx = get_context()
query_id = str(uuid.uuid4())
daft_execution_config = ctx.daft_execution_config
output_schema = builder.schema()

# Optimize the logical plan.
builder = builder.optimize(daft_execution_config)

distributed_plan = DistributedPhysicalPlan.from_logical_plan_builder(
builder._builder, query_id, daft_execution_config
# Notify query start
ray_dashboard_url = None
try:
ray_dashboard_url = ray.worker.get_dashboard_url()
if ray_dashboard_url:
if not ray_dashboard_url.startswith("http"):
ray_dashboard_url = f"http://{ray_dashboard_url}"

# Try to append Job ID
try:
job_id = ray.get_runtime_context().get_job_id()
if job_id:
ray_dashboard_url = f"{ray_dashboard_url}/#/jobs/{job_id}"
except Exception:
pass
except Exception:
pass

entrypoint = "python " + " ".join(sys.argv)

ctx._notify_query_start(
query_id,
PyQueryMetadata(
output_schema._schema, builder.repr_json(), "Ray (Flotilla)", ray_dashboard_url, entrypoint
),
)
if self.flotilla_plan_runner is None:
self.flotilla_plan_runner = FlotillaRunner()
ctx._notify_optimization_start(query_id)

yield from self.flotilla_plan_runner.stream_plan(
distributed_plan, self._part_set_cache.get_all_partition_sets()
)
# Log Dashboard URL if configured
dashboard_url = os.environ.get("DAFT_DASHBOARD_URL")
if dashboard_url:
print(f"Daft Dashboard: {dashboard_url}/query/{query_id}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove print

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, I changed print to logger, mainly to clearly show users how to access the dashboard. What do you think?


try:
# Optimize the logical plan.
builder = builder.optimize(daft_execution_config)
ctx._notify_optimization_end(query_id, builder.repr_json())

distributed_plan = DistributedPhysicalPlan.from_logical_plan_builder(
builder._builder, query_id, daft_execution_config
)
physical_plan_json = distributed_plan.repr_json()
ctx._notify_exec_start(query_id, physical_plan_json)

if self.flotilla_plan_runner is None:
self.flotilla_plan_runner = FlotillaRunner()

total_rows = 0
for result in self.flotilla_plan_runner.stream_plan(
distributed_plan, self._part_set_cache.get_all_partition_sets()
):
if result.metadata() is not None:
total_rows += result.metadata().num_rows
yield result

# Mark all operators as finished to clean up the Dashboard UI before notify_exec_end
try:
plan_dict = json.loads(physical_plan_json)

def notify_end(node: dict[str, Any]) -> None:
if "children" in node:
for child in node["children"]:
notify_end(child)
if "id" in node:
ctx._notify_exec_operator_end(query_id, node["id"])

notify_end(plan_dict)
except Exception:
pass

ctx._notify_exec_end(query_id)
ctx._notify_query_end(query_id, PyQueryResult(QueryEndState.Finished, ""))

except Exception as e:
ctx._notify_query_end(query_id, PyQueryResult(QueryEndState.Failed, str(e)))
raise

def run_iter_tables(
self, builder: LogicalPlanBuilder, results_buffer_size: int | None = None
Expand Down
2 changes: 1 addition & 1 deletion src/common/metrics/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl StatSnapshotImpl for SourceSnapshot {
stats![
CPU_US_KEY; Stat::Duration(Duration::from_micros(self.cpu_us)),
ROWS_OUT_KEY; Stat::Count(self.rows_out),
"bytes read"; Stat::Bytes(self.bytes_read),
"bytes_read"; Stat::Bytes(self.bytes_read),
]
}

Expand Down
3 changes: 2 additions & 1 deletion src/daft-context/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ log = {workspace = true}
# For debug subscriber
dashmap = {workspace = true}
# Client for submitting to dashboard server
reqwest = {workspace = true, default-features = false}
reqwest = {workspace = true, default_features = false}
uuid = {workspace = true, features = ["v4"]}

[features]
python = [
Expand Down
Loading
Loading