Skip to content

Commit 96398f1

Browse files
authored
Merge pull request #8 from robtandy/rob.tandy/pull_arrow_flight
change prints to log output. Set up proper logging
2 parents 1154320 + bae9329 commit 96398f1

File tree

12 files changed

+199
-52
lines changed

12 files changed

+199
-52
lines changed

Cargo.lock

Lines changed: 103 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ bytesize = "1.3"
3838
datafusion = { version = "43.0", features = ["pyarrow", "avro"] }
3939
datafusion-python = { version = "43.1" }
4040
datafusion-proto = "43.0"
41+
env_logger = "0.11"
4142
futures = "0.3"
4243
glob = "0.3.1"
4344
local-ip-address = "0.6"
@@ -56,6 +57,7 @@ pyo3 = { version = "0.22.6", features = [
5657
"abi3-py38",
5758
] }
5859
pyo3-async-runtimes = { version = "0.22", features = ["tokio-runtime"] }
60+
pyo3-pylogger = "0.3.0"
5961
rust_decimal = "1.36"
6062
tokio = { version = "1.40", features = [
6163
"macros",
@@ -83,11 +85,6 @@ tonic-build = { version = "0.8", default-features = false, features = [
8385
] }
8486
url = "2"
8587

86-
[dev-dependencies]
87-
#anyhow = "1.0.89"
88-
#pretty_assertions = "1.4.0"
89-
#regex = "1.11.0"
90-
9188
[lib]
9289
name = "datafusion_ray"
9390
crate-type = ["cdylib", "rlib"]
@@ -105,4 +102,4 @@ debug = 0
105102
opt-level = 1
106103

107104
[profile.dev.package."*"]
108-
opt-level = 3
105+
opt-level = 1

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,18 @@ RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpcbench.py --data=file:///path/to/yo
100100

101101
This will output a json file in the current directory with query timings.
102102

103+
## Logging
104+
105+
DataFusion Ray's logging output is determined by the `DATAFUSION_RAY_LOG_LEVEL` environment variable. The default log level is `WARN`. To change the log level, set the environment variable to one of the following values: `ERROR`, `WARN`, `INFO`, `DEBUG`, or `TRACE`.
106+
107+
DataFusion Ray outputs logs from both python and rust, and in order to handle this consistently, the python logger for `datafusion_ray` is routed to rust for logging. The `RUST_LOG` environment variable can be used to control other rust log output other than `datafusion_ray`.
108+
103109
## Status
104110

105111
- DataFusion Ray can execute all TPCH queries. Tested up to SF100.
106112

107113
## Known Issues
108114

109115
- The DataFusion config setting, `datafusion.execution.parquet.pushdown_filters`, can produce incorrect results. We think this could be related to an issue with round trip physical path serialization. At the moment, do not enable this setting, as it prevents physical plans from serializing correctly.
116+
117+
This should be resolved when we update to a DataFusion version which include <https://github.com/apache/datafusion/pull/14465#event-16194180382>

datafusion_ray/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,6 @@
2020
except ImportError:
2121
import importlib_metadata
2222

23-
from .core import RayContext, prettify
23+
from .core import RayContext, prettify, runtime_env
2424

2525
__version__ = importlib_metadata.version(__name__)

datafusion_ray/core.py

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818

1919
from collections import defaultdict
20+
from logging import error, debug, info
21+
import os
2022
import pyarrow as pa
2123
import asyncio
2224
import ray
@@ -30,6 +32,26 @@
3032
)
3133

3234

35+
def setup_logging():
36+
import logging
37+
38+
logging.addLevelName(5, "TRACE")
39+
40+
log_level = os.environ.get("DATAFUSION_RAY_LOG_LEVEL", "WARN").upper()
41+
42+
# this logger gets captured and routed to rust. See src/lib.rs
43+
logging.getLogger("datafusion_ray").setLevel(log_level)
44+
45+
46+
setup_logging()
47+
48+
_log_level = os.environ.get("DATAFUSION_RAY_LOG_LEVEL", "ERROR")
49+
runtime_env = {
50+
"worker_process_setup_hook": setup_logging,
51+
"env_vars": {"DATAFUSION_RAY_LOG_LEVEL": _log_level, "RAY_worker_niceness": "0"},
52+
}
53+
54+
3355
class RayDataFrame:
3456
def __init__(
3557
self,
@@ -76,21 +98,19 @@ def collect(self) -> list[pa.RecordBatch]:
7698
t1 = time.time()
7799
self.stages()
78100
t2 = time.time()
79-
print(f"creating stages took {t2 -t1}s")
101+
debug(f"creating stages took {t2 -t1}s")
80102

81103
last_stage = max([stage.stage_id for stage in self._stages])
82-
print("last stage is", last_stage)
104+
debug("last stage is", last_stage)
83105

84106
self.create_ray_stages()
85107
t3 = time.time()
86-
print(f"creating ray stage actors took {t3 -t2}s")
108+
debug(f"creating ray stage actors took {t3 -t2}s")
87109
self.run_stages()
88110

89111
addrs = ray.get(self.coord.get_stage_addrs.remote())
90-
print("addrs", addrs)
91112

92113
reader = self.df.read_final_stage(last_stage, addrs[last_stage][0])
93-
print("called df execute, got reader")
94114
self._batches = list(reader)
95115
self.coord.all_done.remote()
96116
return self._batches
@@ -110,7 +130,7 @@ def create_ray_stages(self):
110130
for stage in self.stages():
111131
num_shadows = stage.num_shadow_partitions()
112132
if self.isolate_partitions and num_shadows:
113-
print(f"stage {stage.stage_id} has {num_shadows} shadows")
133+
debug(f"stage {stage.stage_id} has {num_shadows} shadows")
114134
for shadow in range(num_shadows):
115135
refs.append(
116136
self.coord.new_stage.remote(
@@ -181,10 +201,10 @@ def __init__(
181201
self.stages_ready = False
182202

183203
async def all_done(self):
184-
print("calling stage all done")
204+
debug("calling stage all done")
185205
refs = [stage.all_done.remote() for stage in self.stages.values()]
186206
ray.wait(refs, num_returns=len(refs))
187-
print("done stage all done")
207+
debug("done stage all done")
188208

189209
async def new_stage(
190210
self,
@@ -195,7 +215,7 @@ async def new_stage(
195215
stage_key = (stage_id, shadow_partition)
196216
try:
197217

198-
print(f"creating new stage {stage_key} from bytes {len(plan_bytes)}")
218+
debug(f"creating new stage {stage_key} from bytes {len(plan_bytes)}")
199219
stage = RayStage.options(
200220
name=f"Stage: {stage_key}, query_id:{self.query_id}",
201221
).remote(
@@ -207,15 +227,16 @@ async def new_stage(
207227
self.stages_started.append(stage.start_up.remote())
208228

209229
except Exception as e:
210-
print(
230+
error(
211231
f"RayQueryCoordinator[{self.query_id}] Unhandled Exception in new stage! {e}"
212232
)
213233
raise e
214234

215235
async def wait_for_stages_ready(self):
236+
# TODO: signal our doneness instead of loop
216237
while not self.stages_ready:
217238
await asyncio.sleep(0.1)
218-
print("waiting for stages to be ready")
239+
debug("waiting for stages to be ready")
219240

220241
async def ensure_stages_ready(self):
221242
if not self.stages_ready:
@@ -231,10 +252,10 @@ async def get_stage_addrs(self) -> dict[int, list[str]]:
231252
async def sort_out_addresses(self):
232253
for stage_key, stage in self.stages.items():
233254
stage_id, shadow_partition = stage_key
234-
print(f" getting stage addr for {stage_id},{shadow_partition}")
255+
debug(f" getting stage addr for {stage_id},{shadow_partition}")
235256
self.stage_addrs[stage_id].append(await stage.addr.remote())
236257

237-
print(f"stage_addrs: {self.stage_addrs}")
258+
debug(f"stage_addrs: {self.stage_addrs}")
238259
# now update all the stages with the addresses of peers such
239260
# that they can contact their child stages
240261
refs = []
@@ -245,14 +266,14 @@ async def sort_out_addresses(self):
245266

246267
async def serve(self):
247268
await self.ensure_stages_ready()
248-
print("running stages")
269+
info("running stages")
249270
try:
250271
for stage_key, stage in self.stages.items():
251-
print(f"starting serving of stage {stage_key}")
272+
info(f"starting serving of stage {stage_key}")
252273
stage.serve.remote()
253274

254275
except Exception as e:
255-
print(
276+
error(
256277
f"RayQueryCoordinator[{self.query_id}] Unhandled Exception in run stages! {e}"
257278
)
258279
raise e
@@ -284,7 +305,7 @@ def __init__(
284305
shadow_partition,
285306
)
286307
except Exception as e:
287-
print(
308+
error(
288309
f"StageService[{self.stage_id}{shadow}] Unhandled Exception in init: {e}!"
289310
)
290311
raise
@@ -303,4 +324,4 @@ async def set_stage_addrs(self, stage_addrs: dict[int, list[str]]):
303324

304325
async def serve(self):
305326
await self.stage_service.serve()
306-
print("StageService done serving")
327+
info("StageService done serving")

0 commit comments

Comments
 (0)