Skip to content

Commit 3dde272

Browse files
authored
Python SDK: Add timeout_sec argument to flush (#11295)
1 parent 3f6a23a commit 3dde272

File tree

10 files changed

+65
-26
lines changed

10 files changed

+65
-26
lines changed

clippy.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ disallowed-methods = [
5252
{ path = "std::panic::catch_unwind", reason = "We compile with `panic = 'abort'`" },
5353
{ path = "std::thread::spawn", reason = "Use `std::thread::Builder` and name the thread" },
5454

55+
{ path = "std::time::Duration::from_secs_f32", reason = "Use try_from_secs_f32 instead to avoid panics" },
56+
{ path = "std::time::Duration::from_secs_f64", reason = "Use try_from_secs_f64 instead to avoid panics" },
57+
5558
{ path = "arrow::compute::concat", reason = "Use `re_arrow_util::arrow_util::concat_arrays` instead, which has better memory management" },
5659
{ path = "arrow::compute::filter", reason = "Use `re_arrow_util::arrow_util::filter_array` instead" },
5760
{ path = "arrow::compute::take", reason = "Use `re_arrow_util::arrow_util::take_array` instead" },

crates/store/re_chunk/src/batcher.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,13 @@ impl ChunkBatcherConfig {
248248
err: Box::new(err),
249249
})?;
250250

251-
new.flush_tick = Duration::from_secs_f64(flush_duration_secs);
251+
new.flush_tick = Duration::try_from_secs_f64(flush_duration_secs).map_err(|err| {
252+
ChunkBatcherError::ParseConfig {
253+
name: Self::ENV_FLUSH_TICK,
254+
value: s.clone(),
255+
err: Box::new(err),
256+
}
257+
})?;
252258
}
253259

254260
if let Ok(s) = std::env::var(Self::ENV_FLUSH_NUM_BYTES) {

crates/top/re_sdk/src/recording_stream.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2538,7 +2538,12 @@ impl RecordingStream {
25382538
/// - [`Self::reset_time`]
25392539
#[inline]
25402540
pub fn set_duration_secs(&self, timeline: impl Into<TimelineName>, secs: impl Into<f64>) {
2541-
self.set_time(timeline, std::time::Duration::from_secs_f64(secs.into()));
2541+
let secs = secs.into();
2542+
if let Ok(duration) = std::time::Duration::try_from_secs_f64(secs) {
2543+
self.set_time(timeline, duration);
2544+
} else {
2545+
re_log::error_once!("set_duration_secs: can't set time to {secs}");
2546+
}
25422547
}
25432548

25442549
/// Set a timestamp as seconds since Unix epoch (1970-01-01 00:00:00 UTC).

crates/viewer/re_ui/src/notifications.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ impl NotificationUi {
341341
}
342342

343343
fn base_ttl() -> Duration {
344-
Duration::from_secs_f64(4.0)
344+
Duration::from_secs(4)
345345
}
346346

347347
struct Toasts {
@@ -363,7 +363,9 @@ impl Toasts {
363363

364364
/// Shows and updates all toasts
365365
fn show(&self, egui_ctx: &egui::Context, notifications: &mut [Notification]) {
366-
let dt = Duration::from_secs_f32(egui_ctx.input(|i| i.unstable_dt));
366+
let dt = Duration::try_from_secs_f32(egui_ctx.input(|i| i.unstable_dt))
367+
.unwrap_or(std::time::Duration::from_millis(100));
368+
367369
let mut offset = egui::vec2(-8.0, 32.0);
368370

369371
let mut first_nonzero_ttl = None;

examples/rust/objectron/src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -310,9 +310,9 @@ struct Args {
310310
per_frame_sleep: Option<Duration>,
311311
}
312312

313-
fn parse_duration(arg: &str) -> Result<std::time::Duration, std::num::ParseFloatError> {
313+
fn parse_duration(arg: &str) -> anyhow::Result<std::time::Duration> {
314314
let seconds = arg.parse()?;
315-
Ok(std::time::Duration::from_secs_f64(seconds))
315+
Ok(std::time::Duration::try_from_secs_f64(seconds)?)
316316
}
317317

318318
fn run(rec: &rerun::RecordingStream, args: &Args) -> anyhow::Result<()> {

rerun_py/rerun_bindings/rerun_bindings.pyi

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1083,7 +1083,8 @@ def disconnect(recording: PyRecordingStream | None = None) -> None:
10831083
Subsequent log messages will be buffered and either sent on the next call to `connect_grpc` or `spawn`.
10841084
"""
10851085

1086-
def flush(blocking: bool, recording: PyRecordingStream | None = None) -> None:
1086+
# TODO(#11294): remove `blocking` argument and put the `*` first
1087+
def flush(blocking: bool = True, recording: PyRecordingStream | None = None, *, timeout_sec: float = 1e38) -> None:
10871088
"""Block until outstanding data has been flushed to the sink."""
10881089

10891090
#

rerun_py/rerun_sdk/rerun/recording_stream.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,7 @@ def __exit__(
448448
exc_val: BaseException | None,
449449
exc_tb: TracebackType | None,
450450
) -> None:
451-
self.flush(blocking=True)
451+
self.flush()
452452

453453
current_recording = active_recording_stream.get(None)
454454

@@ -471,17 +471,22 @@ def __exit__(
471471
def to_native(self) -> bindings.PyRecordingStream:
472472
return self.inner
473473

474-
def flush(self, blocking: bool = True) -> None:
474+
# TODO(#11294): remove `blocking` argument
475+
def flush(self, blocking: bool = True, *, timeout_sec: float = 1e38) -> None:
475476
"""
476477
Initiates a flush the batching pipeline and optionally waits for it to propagate to the underlying file descriptor (if any).
477478
478479
Parameters
479480
----------
480481
blocking:
481482
If true, the flush will block until the flush is complete.
483+
timeout_sec:
484+
Wait at most this many seconds.
485+
If the timeout is reached, an error is raised.
486+
If set to zero, the flush will be started but not waited for.
482487
483488
"""
484-
bindings.flush(blocking, recording=self.to_native())
489+
bindings.flush(blocking=blocking, recording=self.to_native(), timeout_sec=timeout_sec)
485490

486491
def __del__(self) -> None: # type: ignore[no-untyped-def]
487492
recording = self.to_native()
@@ -491,7 +496,7 @@ def __del__(self) -> None: # type: ignore[no-untyped-def]
491496
#
492497
# See: https://github.com/rerun-io/rerun/issues/6223 for context on why this is necessary.
493498
if not recording.is_forked_child():
494-
bindings.flush(blocking=False, recording=recording) # NOLINT
499+
bindings.flush(timeout_sec=0.0, recording=recording) # NOLINT
495500

496501
# any free function taking a `RecordingStream` as the first argument can also be a method
497502
binary_stream = binary_stream

rerun_py/src/python_bridge.rs

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,13 @@ impl DurationLike {
298298
fn into_duration(self) -> Duration {
299299
match self {
300300
Self::Int(i) => Duration::from_secs(i as u64),
301-
Self::Float(f) => Duration::from_secs_f64(f),
301+
Self::Float(f) => match duration_from_sec(f) {
302+
Ok(duration) => duration,
303+
Err(err) => {
304+
re_log::error_once!("{err}");
305+
Duration::ZERO
306+
}
307+
},
302308
Self::Duration(d) => d,
303309
}
304310
}
@@ -1374,7 +1380,7 @@ impl PyBinarySinkStorage {
13741380
// Release the GIL in case any flushing behavior needs to cleanup a python object.
13751381
py.allow_threads(|| -> PyResult<_> {
13761382
if flush {
1377-
let timeout = timeout_from_sec(flush_timeout_sec)?;
1383+
let timeout = duration_from_sec(flush_timeout_sec as _)?;
13781384
self.inner
13791385
.flush(timeout)
13801386
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;
@@ -1405,7 +1411,7 @@ impl PyBinarySinkStorage {
14051411
fn flush(&self, py: Python<'_>, timeout_sec: f32) -> PyResult<()> {
14061412
// Release the GIL in case any flushing behavior needs to cleanup a python object.
14071413
py.allow_threads(|| -> PyResult<_> {
1408-
let timeout = timeout_from_sec(timeout_sec)?;
1414+
let timeout = duration_from_sec(timeout_sec as _)?;
14091415
self.inner
14101416
.flush(timeout)
14111417
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;
@@ -1415,13 +1421,13 @@ impl PyBinarySinkStorage {
14151421
}
14161422
}
14171423

1418-
fn timeout_from_sec(seconds: f32) -> PyResult<Duration> {
1424+
fn duration_from_sec(seconds: f64) -> PyResult<Duration> {
14191425
if seconds.is_nan() {
1420-
Err(PyRuntimeError::new_err("timeout_sec must not be NaN"))
1426+
Err(PyRuntimeError::new_err("duration must not be NaN"))
14211427
} else if seconds < 0.0 {
1422-
Err(PyRuntimeError::new_err("timeout_sec must be non-negative"))
1428+
Err(PyRuntimeError::new_err("duration must be non-negative"))
14231429
} else {
1424-
Ok(Duration::try_from_secs_f32(seconds).unwrap_or(Duration::MAX))
1430+
Ok(Duration::try_from_secs_f64(seconds).unwrap_or(Duration::MAX))
14251431
}
14261432
}
14271433

@@ -1607,24 +1613,33 @@ fn disconnect(py: Python<'_>, recording: Option<&PyRecordingStream>) {
16071613
}
16081614

16091615
/// Block until outstanding data has been flushed to the sink.
1616+
// TODO(#11294): remove `blocking` argument and put the `*` first
16101617
#[pyfunction]
1611-
#[pyo3(signature = (blocking, recording=None))]
1612-
fn flush(py: Python<'_>, blocking: bool, recording: Option<&PyRecordingStream>) -> PyResult<()> {
1618+
#[pyo3(signature = (blocking = true, recording = None, *, timeout_sec = 1e38))] // Can't use infinity here because of python_check_signatures.py
1619+
fn flush(
1620+
py: Python<'_>,
1621+
blocking: bool,
1622+
recording: Option<&PyRecordingStream>,
1623+
timeout_sec: f32,
1624+
) -> PyResult<()> {
16131625
let Some(recording) = get_data_recording(recording) else {
16141626
return Ok(());
16151627
};
16161628

16171629
// Release the GIL in case any flushing behavior needs to cleanup a python object.
1618-
py.allow_threads(|| -> Result<(), SinkFlushError> {
1619-
if blocking {
1620-
recording.flush_blocking()?;
1630+
py.allow_threads(|| -> PyResult<()> {
1631+
if !blocking || timeout_sec == 0.0 {
1632+
recording
1633+
.flush_async()
1634+
.map_err(|err: SinkFlushError| PyRuntimeError::new_err(err.to_string()))?;
16211635
} else {
1622-
recording.flush_async()?;
1636+
recording
1637+
.flush_with_timeout(duration_from_sec(timeout_sec as _)?)
1638+
.map_err(|err: SinkFlushError| PyRuntimeError::new_err(err.to_string()))?;
16231639
}
16241640
flush_garbage_queue();
16251641
Ok(())
16261642
})
1627-
.map_err(|err: SinkFlushError| PyRuntimeError::new_err(err.to_string()))
16281643
}
16291644

16301645
// --- Components ---

rerun_py/tests/unit/test_batcher_config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ def test_flush_never() -> None:
104104
assert sz2 == sz1, "Expected the file size to stay the same"
105105
assert sz3 == sz2, "Expected the file size to stay the same"
106106

107-
rec.flush(blocking=True)
107+
rec.flush()
108108

109109
sz4 = os.stat(rec_path).st_size
110110

tests/rust/plot_dashboard_stress/src/main.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
//! pixi run rs-plot-dashboard --num-plots 10 --num-series-per-plot 5 --num-points-per-series 5000 --freq 1000
1313
//! ```
1414
15+
#![expect(clippy::disallowed_methods)]
16+
1517
use rerun::external::re_log;
1618

1719
#[derive(Debug, clap::ValueEnum, Clone)]

0 commit comments

Comments
 (0)