diff --git a/docs/content/reference/migration/migration-0-28.md b/docs/content/reference/migration/migration-0-28.md index 770f19021cf4..7382230eb769 100644 --- a/docs/content/reference/migration/migration-0-28.md +++ b/docs/content/reference/migration/migration-0-28.md @@ -63,3 +63,30 @@ rotation and scale. Naturally, if any update to a transform always changes the same components, this does not cause any changes other than the simplification of not having to clear out all other components that may ever be set, thus reducing memory bloat both on send and query! + +## Python SDK: "partition" renamed to "segment" in catalog APIs + + + +In the `rerun.catalog` module, all APIs using "partition" terminology have been renamed to use "segment" instead. +The old APIs are deprecated and will be removed in a future release. + +| Old API | New API | +|---------|---------| +| `DatasetEntry.partition_ids()` | `DatasetEntry.segment_ids()` | +| `DatasetEntry.partition_table()` | `DatasetEntry.segment_table()` | +| `DatasetEntry.partition_url()` | `DatasetEntry.segment_url()` | +| `DatasetEntry.download_partition()` | `DatasetEntry.download_segment()` | +| `DatasetEntry.default_blueprint_partition_id()` | `DatasetEntry.default_blueprint_segment_id()` | +| `DatasetEntry.set_default_blueprint_partition_id()` | `DatasetEntry.set_default_blueprint_segment_id()` | +| `DataframeQueryView.filter_partition_id()` | `DataframeQueryView.filter_segment_id()` | + +The DataFusion utility functions in `rerun.utilities.datafusion.functions.url_generation` have also been renamed: + +| Old API | New API | +|---------|---------| +| `partition_url()` | `segment_url()` | +| `partition_url_udf()` | `segment_url_udf()` | +| `partition_url_with_timeref_udf()` | `segment_url_with_timeref_udf()` | + +The partition table columns have also been renamed from `rerun_partition_id` to `rerun_segment_id`. diff --git a/rerun_py/rerun_bindings/rerun_bindings.pyi b/rerun_py/rerun_bindings/rerun_bindings.pyi index 541747421bc5..a9eb8c5385a2 100644 --- a/rerun_py/rerun_bindings/rerun_bindings.pyi +++ b/rerun_py/rerun_bindings/rerun_bindings.pyi @@ -1305,17 +1305,17 @@ class DatasetEntryInternal: def blueprint_dataset_id(self) -> EntryId | None: ... def blueprint_dataset(self) -> DatasetEntryInternal | None: ... - def default_blueprint_partition_id(self) -> str | None: ... - def set_default_blueprint_partition_id(self, partition_id: str | None) -> None: ... + def default_blueprint_segment_id(self) -> str | None: ... + def set_default_blueprint_segment_id(self, segment_id: str | None) -> None: ... # --- - def partition_ids(self) -> list[str]: ... - def partition_table(self) -> DataFusionTable: ... + def segment_ids(self) -> list[str]: ... + def segment_table(self) -> DataFusionTable: ... def manifest(self) -> DataFusionTable: ... - def partition_url( + def segment_url( self, - partition_id: str, + segment_id: str, timeline: str | None = None, start: datetime | int | None = None, end: datetime | int | None = None, @@ -1329,7 +1329,7 @@ class DatasetEntryInternal: # --- - def download_partition(self, partition_id: str) -> Recording: ... + def download_segment(self, segment_id: str) -> Recording: ... def dataframe_query_view( self, *, @@ -1431,8 +1431,8 @@ class _IndexValuesLikeInternal: class DataframeQueryView: """View into a remote dataset acting as DataFusion table provider.""" - def filter_partition_id(self, partition_id: str, *args: Iterable[str]) -> Self: - """Filter by one or more partition ids. All partition ids are included if not specified.""" + def filter_segment_id(self, segment_id: str, *args: Iterable[str]) -> Self: + """Filter by one or more segment ids. All segment ids are included if not specified.""" def filter_range_sequence(self, start: int, end: int) -> Self: """ diff --git a/rerun_py/rerun_sdk/rerun/catalog/_entry.py b/rerun_py/rerun_sdk/rerun/catalog/_entry.py index b31884cc8c0a..05073d4e9bee 100644 --- a/rerun_py/rerun_sdk/rerun/catalog/_entry.py +++ b/rerun_py/rerun_sdk/rerun/catalog/_entry.py @@ -3,6 +3,8 @@ from abc import ABC from typing import TYPE_CHECKING, Any, Generic, TypeVar +from typing_extensions import deprecated + from rerun_bindings import DatasetEntryInternal, TableEntryInternal if TYPE_CHECKING: @@ -126,84 +128,115 @@ def blueprint_dataset(self) -> DatasetEntry | None: ds = self._internal.blueprint_dataset() return None if ds is None else DatasetEntry(ds) - def default_blueprint_partition_id(self) -> str | None: - """The default blueprint partition ID for this dataset, if any.""" + def default_blueprint_segment_id(self) -> str | None: + """The default blueprint segment ID for this dataset, if any.""" - return self._internal.default_blueprint_partition_id() + return self._internal.default_blueprint_segment_id() - def set_default_blueprint_partition_id(self, partition_id: str | None) -> None: + def set_default_blueprint_segment_id(self, segment_id: str | None) -> None: """ - Set the default blueprint partition ID for this dataset. + Set the default blueprint segment ID for this dataset. - Pass `None` to clear the bluprint. This fails if the change cannot be made to the remote server. + Pass `None` to clear the blueprint. This fails if the change cannot be made to the remote server. """ - return self._internal.set_default_blueprint_partition_id(partition_id) + return self._internal.set_default_blueprint_segment_id(segment_id) + + @deprecated("Use default_blueprint_segment_id() instead") + def default_blueprint_partition_id(self) -> str | None: + """The default blueprint partition ID for this dataset, if any.""" + return self.default_blueprint_segment_id() + + @deprecated("Use set_default_blueprint_segment_id() instead") + def set_default_blueprint_partition_id(self, partition_id: str | None) -> None: + """Set the default blueprint partition ID for this dataset.""" + return self.set_default_blueprint_segment_id(partition_id) def schema(self) -> Schema: """Return the schema of the data contained in the dataset.""" return self._internal.schema() + def segment_ids(self) -> list[str]: + """Returns a list of segment IDs for the dataset.""" + + return self._internal.segment_ids() + + @deprecated("Use segment_ids() instead") def partition_ids(self) -> list[str]: - """Returns a list of partitions IDs for the dataset.""" + """Returns a list of partition IDs for the dataset.""" + return self.segment_ids() + + def segment_table(self) -> DataFusionTable: + """Return the segment table as a Datafusion table provider.""" - return self._internal.partition_ids() + return self._internal.segment_table() + @deprecated("Use segment_table() instead") def partition_table(self) -> DataFusionTable: """Return the partition table as a Datafusion table provider.""" - - return self._internal.partition_table() + return self.segment_table() def manifest(self) -> DataFusionTable: """Return the dataset manifest as a Datafusion table provider.""" return self._internal.manifest() - def partition_url( # noqa: PLR0917 + def segment_url( # noqa: PLR0917 self, - partition_id: str, + segment_id: str, timeline: str | None = None, start: datetime | int | None = None, end: datetime | int | None = None, ) -> str: """ - Return the URL for the given partition. + Return the URL for the given segment. Parameters ---------- - partition_id: str - The ID of the partition to get the URL for. + segment_id: str + The ID of the segment to get the URL for. timeline: str | None The name of the timeline to display. start: int | datetime | None - The start time for the partition. + The start time for the segment. Integer for ticks, or datetime/nanoseconds for timestamps. end: int | datetime | None - The end time for the partition. + The end time for the segment. Integer for ticks, or datetime/nanoseconds for timestamps. Examples -------- # With ticks >>> start_tick, end_time = 0, 10 - >>> dataset.partition_url("some_id", "log_tick", start_tick, end_time) + >>> dataset.segment_url("some_id", "log_tick", start_tick, end_time) # With timestamps >>> start_time, end_time = datetime.now() - timedelta(seconds=4), datetime.now() - >>> dataset.partition_url("some_id", "real_time", start_time, end_time) + >>> dataset.segment_url("some_id", "real_time", start_time, end_time) Returns ------- str - The URL for the given partition. + The URL for the given segment. """ - return self._internal.partition_url(partition_id, timeline, start, end) + return self._internal.segment_url(segment_id, timeline, start, end) + + @deprecated("Use segment_url() instead") + def partition_url( # noqa: PLR0917 + self, + partition_id: str, + timeline: str | None = None, + start: datetime | int | None = None, + end: datetime | int | None = None, + ) -> str: + """Return the URL for the given partition.""" + return self.segment_url(partition_id, timeline, start, end) def register(self, recording_uri: str, *, recording_layer: str = "base", timeout_secs: int = 60) -> str: """ @@ -225,8 +258,8 @@ def register(self, recording_uri: str, *, recording_layer: str = "base", timeout Returns ------- - partition_id: str - The partition ID of the registered RRD. + segment_id: str + The segment ID of the registered RRD. """ @@ -281,10 +314,15 @@ def register_prefix(self, recordings_prefix: str, layer_name: str | None = None) return self._internal.register_prefix(recordings_prefix, layer_name=layer_name) + def download_segment(self, segment_id: str) -> Recording: + """Download a segment from the dataset.""" + + return self._internal.download_segment(segment_id) + + @deprecated("Use download_segment() instead") def download_partition(self, partition_id: str) -> Recording: """Download a partition from the dataset.""" - - return self._internal.download_partition(partition_id) + return self.download_segment(partition_id) def dataframe_query_view( self, @@ -310,7 +348,7 @@ def dataframe_query_view( monotonically increasing when data is sent from a single process. If `None` is passed as the index, the view will contain only static columns (among those - specified) and no index columns. It will also contain a single row per partition. + specified) and no index columns. It will also contain a single row per segment. Parameters ---------- diff --git a/rerun_py/rerun_sdk/rerun/utilities/datafusion/functions/url_generation.py b/rerun_py/rerun_sdk/rerun/utilities/datafusion/functions/url_generation.py index ee51c1198a64..b798071b90c7 100644 --- a/rerun_py/rerun_sdk/rerun/utilities/datafusion/functions/url_generation.py +++ b/rerun_py/rerun_sdk/rerun/utilities/datafusion/functions/url_generation.py @@ -4,6 +4,7 @@ import pyarrow as pa import pyarrow.compute +from typing_extensions import deprecated from rerun.error_utils import RerunMissingDependencyError @@ -17,33 +18,33 @@ HAS_DATAFUSION = False -def partition_url( +def segment_url( dataset: DatasetEntry, *, - partition_id_col: str | Expr | None = None, + segment_id_col: str | Expr | None = None, timestamp_col: str | Expr | None = None, timeline_name: str | None = None, ) -> Expr: """ - Compute the URL for a partition within a dataset. + Compute the URL for a segment within a dataset. This is a Rerun focused DataFusion function that will create a DataFusion - expression for the partition URL. + expression for the segment URL. - To manually invoke the underlying UDF, see `partition_url_udf` or - `partition_url_with_timeref_udf`. + To manually invoke the underlying UDF, see `segment_url_udf` or + `segment_url_with_timeref_udf`. Parameters ---------- dataset: The input Rerun Dataset. - partition_id_col: - The column containing the partition ID. If not provided, it will assume - a default value of `rerun_partition_id`. You may pass either a DataFusion + segment_id_col: + The column containing the segment ID. If not provided, it will assume + a default value of `rerun_segment_id`. You may pass either a DataFusion expression or a string column name. timestamp_col: If this parameter is passed in, generate a URL that will jump to a - specific timestamp within the partition. + specific timestamp within the segment. timeline_name: When used in combination with `timestamp_col`, this specifies which timeline to seek along. By default this will use the same string as timestamp_col. @@ -51,10 +52,10 @@ def partition_url( """ if not HAS_DATAFUSION: raise RerunMissingDependencyError("datafusion", "datafusion") - if partition_id_col is None: - partition_id_col = col("rerun_segment_id") - if isinstance(partition_id_col, str): - partition_id_col = col(partition_id_col) + if segment_id_col is None: + segment_id_col = col("rerun_segment_id") + if isinstance(segment_id_col, str): + segment_id_col = col(segment_id_col) if timestamp_col is not None: if timeline_name is None: @@ -63,44 +64,67 @@ def partition_url( if isinstance(timestamp_col, str): timestamp_col = col(timestamp_col) - inner_udf = partition_url_with_timeref_udf(dataset, timeline_name) - return inner_udf(partition_id_col, timestamp_col).alias("partition_url_with_timestamp") + inner_udf = segment_url_with_timeref_udf(dataset, timeline_name) + return inner_udf(segment_id_col, timestamp_col).alias("segment_url_with_timestamp") - inner_udf = partition_url_udf(dataset) - return inner_udf(partition_id_col).alias("partition_url") + inner_udf = segment_url_udf(dataset) + return inner_udf(segment_id_col).alias("segment_url") -def partition_url_udf(dataset: DatasetEntry) -> ScalarUDF: +@deprecated("Use segment_url() instead") +def partition_url( + dataset: DatasetEntry, + *, + partition_id_col: str | Expr | None = None, + timestamp_col: str | Expr | None = None, + timeline_name: str | None = None, +) -> Expr: + """Compute the URL for a partition within a dataset.""" + return segment_url( + dataset, + segment_id_col=partition_id_col, + timestamp_col=timestamp_col, + timeline_name=timeline_name, + ) + + +def segment_url_udf(dataset: DatasetEntry) -> ScalarUDF: """ - Create a UDF to the URL for a partition within a Dataset. + Create a UDF to the URL for a segment within a Dataset. This function will generate a UDF that expects one column of input, - a string containing the Partition ID. + a string containing the segment ID. """ if not HAS_DATAFUSION: raise RerunMissingDependencyError("datafusion", "datafusion") - def inner_udf(partition_id_arr: pa.Array) -> pa.Array: + def inner_udf(segment_id_arr: pa.Array) -> pa.Array: return pa.compute.binary_join_element_wise( - dataset.partition_url(""), - partition_id_arr, + dataset.segment_url(""), + segment_id_arr, "", # Required for join ) return udf(inner_udf, [pa.string()], pa.string(), "stable") -def partition_url_with_timeref_udf(dataset: DatasetEntry, timeline_name: str) -> ScalarUDF: +@deprecated("Use segment_url_udf() instead") +def partition_url_udf(dataset: DatasetEntry) -> ScalarUDF: + """Create a UDF to the URL for a partition within a Dataset.""" + return segment_url_udf(dataset) + + +def segment_url_with_timeref_udf(dataset: DatasetEntry, timeline_name: str) -> ScalarUDF: """ - Create a UDF to the URL for a partition within a Dataset with timestamp. + Create a UDF to the URL for a segment within a Dataset with timestamp. This function will generate a UDF that expects two columns of input, - a string containing the Partition ID and the timestamp in nanoseconds. + a string containing the segment ID and the timestamp in nanoseconds. """ if not HAS_DATAFUSION: raise RerunMissingDependencyError("datafusion", "datafusion") - def inner_udf(partition_id_arr: pa.Array, timestamp_arr: pa.Array) -> pa.Array: + def inner_udf(segment_id_arr: pa.Array, timestamp_arr: pa.Array) -> pa.Array: # The choice of `ceil_temporal` is important since this timestamp drives a cursor # selection. Due to Rerun latest-at semantics, in order for data from the provided # timestamp to be visible, the cursor must be set to a point in time which is @@ -113,11 +137,17 @@ def inner_udf(partition_id_arr: pa.Array, timestamp_arr: pa.Array) -> pa.Array: ) return pa.compute.binary_join_element_wise( - dataset.partition_url(""), - partition_id_arr, + dataset.segment_url(""), + segment_id_arr, f"#when={timeline_name}@", timestamp_us, "", # Required for join ) return udf(inner_udf, [pa.string(), pa.timestamp("ns")], pa.string(), "stable") + + +@deprecated("Use segment_url_with_timeref_udf() instead") +def partition_url_with_timeref_udf(dataset: DatasetEntry, timeline_name: str) -> ScalarUDF: + """Create a UDF to the URL for a partition within a Dataset with timestamp.""" + return segment_url_with_timeref_udf(dataset, timeline_name) diff --git a/rerun_py/src/catalog/dataframe_query.rs b/rerun_py/src/catalog/dataframe_query.rs index bb1c4f06f581..227349e874ac 100644 --- a/rerun_py/src/catalog/dataframe_query.rs +++ b/rerun_py/src/catalog/dataframe_query.rs @@ -35,10 +35,10 @@ pub struct PyDataframeQueryView { query_expression: QueryExpression, - /// Limit the query to these partition ids. + /// Limit the query to these segment ids. /// /// If empty, use the whole dataset. - partition_ids: Vec, + segment_ids: Vec, } impl PyDataframeQueryView { @@ -84,7 +84,7 @@ impl PyDataframeQueryView { sparse_fill_strategy: SparseFillStrategy::None, selection: None, }, - partition_ids: vec![], + segment_ids: vec![], }) } @@ -96,7 +96,7 @@ impl PyDataframeQueryView { let mut copy = Self { dataset: self.dataset.clone_ref(py), query_expression: self.query_expression.clone(), - partition_ids: self.partition_ids.clone(), + segment_ids: self.segment_ids.clone(), }; mutation_fn(&mut copy.query_expression); @@ -107,25 +107,25 @@ impl PyDataframeQueryView { #[pymethods] impl PyDataframeQueryView { - /// Filter by one or more partition ids. All partition ids are included if not specified. - #[pyo3(signature = (partition_id, *args))] - fn filter_partition_id<'py>( + /// Filter by one or more segment ids. All segment ids are included if not specified. + #[pyo3(signature = (segment_id, *args))] + fn filter_segment_id<'py>( &self, py: Python<'py>, - partition_id: String, + segment_id: String, args: &Bound<'py, PyTuple>, ) -> PyResult { - let mut partition_ids = vec![partition_id]; + let mut segment_ids = vec![segment_id]; for i in 0..args.len()? { let item = args.get_item(i)?; - partition_ids.push(item.extract()?); + segment_ids.push(item.extract()?); } Ok(Self { dataset: self.dataset.clone_ref(py), query_expression: self.query_expression.clone(), - partition_ids, + segment_ids, }) } @@ -456,7 +456,7 @@ impl PyDataframeQueryView { py, dataset_id, &self_.query_expression, - self_.partition_ids.as_slice(), + self_.segment_ids.as_slice(), ) } @@ -466,10 +466,9 @@ impl PyDataframeQueryView { let dataset_line = indent::indent_all_by(1, format!("dataset={dataset_str},")); let query_line = indent::indent_all_by(1, format!("query_expression={query_expr_str},")); - let partition_line = - indent::indent_all_by(1, format!("partition_ids={:?}", self.partition_ids)); + let segment_line = indent::indent_all_by(1, format!("segment_ids={:?}", self.segment_ids)); - format!("DataframeQueryView(\n{dataset_line}\n{query_line}\n{partition_line}\n)") + format!("DataframeQueryView(\n{dataset_line}\n{query_line}\n{segment_line}\n)") } } @@ -498,7 +497,7 @@ impl PyDataframeQueryView { connection.connection_registry().clone(), dataset_id, &self.query_expression, - &self.partition_ids, + &self.segment_ids, #[cfg(not(target_arch = "wasm32"))] trace_headers_opt, ) diff --git a/rerun_py/src/catalog/dataset_entry.rs b/rerun_py/src/catalog/dataset_entry.rs index cafad4db8091..8a0edb68611a 100644 --- a/rerun_py/src/catalog/dataset_entry.rs +++ b/rerun_py/src/catalog/dataset_entry.rs @@ -132,8 +132,8 @@ impl PyDatasetEntryInternal { Some(Py::new(py, Self::new(client, dataset_entry))).transpose() } - /// The default blueprint partition ID for this dataset, if any. - fn default_blueprint_partition_id(self_: PyRef<'_, Self>) -> Option { + /// The default blueprint segment ID for this dataset, if any. + fn default_blueprint_segment_id(self_: PyRef<'_, Self>) -> Option { self_ .dataset_details .default_blueprint_segment @@ -141,19 +141,19 @@ impl PyDatasetEntryInternal { .map(ToString::to_string) } - /// Set the default blueprint partition ID for this dataset. + /// Set the default blueprint segment ID for this dataset. /// /// Pass `None` to clear the bluprint. This fails if the change cannot be made to the remote server. - #[pyo3(signature = (partition_id))] - fn set_default_blueprint_partition_id( + #[pyo3(signature = (segment_id))] + fn set_default_blueprint_segment_id( mut self_: PyRefMut<'_, Self>, py: Python<'_>, - partition_id: Option, + segment_id: Option, ) -> PyResult<()> { let connection = self_.client.borrow(py).connection().clone(); let mut dataset_details = self_.dataset_details.clone(); - dataset_details.default_blueprint_segment = partition_id.map(Into::into); + dataset_details.default_blueprint_segment = segment_id.map(Into::into); let result = connection.update_dataset(py, self_.entry_details.id, dataset_details)?; @@ -167,16 +167,16 @@ impl PyDatasetEntryInternal { Self::fetch_schema(&self_) } - /// Returns a list of partitions IDs for the dataset. - fn partition_ids(self_: PyRef<'_, Self>) -> PyResult> { + /// Returns a list of segment IDs for the dataset. + fn segment_ids(self_: PyRef<'_, Self>) -> PyResult> { let connection = self_.client.borrow(self_.py()).connection().clone(); connection.get_dataset_partition_ids(self_.py(), self_.entry_details.id) } - /// Return the partition table as a Datafusion table provider. + /// Return the segment table as a Datafusion table provider. #[instrument(skip_all)] - fn partition_table(self_: PyRef<'_, Self>) -> PyResult { + fn segment_table(self_: PyRef<'_, Self>) -> PyResult { let connection = self_.client.borrow(self_.py()).connection().clone(); let dataset_id = self_.entry_details.id; @@ -189,7 +189,7 @@ impl PyDatasetEntryInternal { Ok(PyDataFusionTable { client: self_.client.clone_ref(self_.py()), - name: format!("{}_partition_table", self_.entry_details.name), + name: format!("{}_segment_table", self_.entry_details.name), provider, }) } @@ -214,44 +214,44 @@ impl PyDatasetEntryInternal { }) } - /// Return the URL for the given partition. + /// Return the URL for the given segment. /// /// Parameters /// ---------- - /// partition_id: str - /// The ID of the partition to get the URL for. + /// segment_id: str + /// The ID of the segment to get the URL for. /// /// timeline: str | None /// The name of the timeline to display. /// /// start: int | datetime | None - /// The start time for the partition. + /// The start time for the segment. /// Integer for ticks, or datetime/nanoseconds for timestamps. /// /// end: int | datetime | None - /// The end time for the partition. + /// The end time for the segment. /// Integer for ticks, or datetime/nanoseconds for timestamps. /// /// Examples /// -------- /// # With ticks /// >>> start_tick, end_time = 0, 10 - /// >>> dataset.partition_url("some_id", "log_tick", start_tick, end_time) + /// >>> dataset.segment_url("some_id", "log_tick", start_tick, end_time) /// /// # With timestamps /// >>> start_time, end_time = datetime.now() - timedelta(seconds=4), datetime.now() - /// >>> dataset.partition_url("some_id", "real_time", start_time, end_time) + /// >>> dataset.segment_url("some_id", "real_time", start_time, end_time) /// /// Returns /// ------- /// str - /// The URL for the given partition. + /// The URL for the given segment. /// - #[pyo3(signature = (partition_id, timeline=None, start=None, end=None))] - fn partition_url( + #[pyo3(signature = (segment_id, timeline=None, start=None, end=None))] + fn segment_url( self_: PyRef<'_, Self>, py: Python<'_>, - partition_id: String, + segment_id: String, timeline: Option<&str>, start: Option>, end: Option>, @@ -288,7 +288,7 @@ impl PyDatasetEntryInternal { Ok(re_uri::DatasetSegmentUri { origin: connection.origin().clone(), dataset_id: self_.entry_details.id.id, - segment_id: partition_id, // Python API still uses partition_id for now + segment_id, time_range, //TODO(ab): add support for this @@ -315,8 +315,8 @@ impl PyDatasetEntryInternal { /// /// Returns /// ------- - /// partition_id: str - /// The partition ID of the registered RRD. + /// segment_id: str + /// The segment ID of the registered RRD. #[pyo3(signature = (recording_uri, *, recording_layer = "base".to_owned(), timeout_secs = 60))] #[pyo3( text_signature = "(self, /, recording_uri, *, recording_layer = 'base', timeout_secs = 60)" @@ -434,9 +434,9 @@ impl PyDatasetEntryInternal { )) } - /// Download a partition from the dataset. + /// Download a segment from the dataset. #[instrument(skip(self_), err)] - fn download_partition(self_: PyRef<'_, Self>, partition_id: String) -> PyResult { + fn download_segment(self_: PyRef<'_, Self>, segment_id: String) -> PyResult { let catalog_client = self_.client.borrow(self_.py()); let connection = catalog_client.connection(); let dataset_id = self_.entry_details.id; @@ -447,7 +447,7 @@ impl PyDatasetEntryInternal { let response_stream = client .fetch_segment_chunks_by_query(re_redap_client::SegmentQueryParams { dataset_id, - segment_id: partition_id.clone().into(), + segment_id: segment_id.clone().into(), include_static_data: true, include_temporal_data: true, query: None, @@ -457,18 +457,18 @@ impl PyDatasetEntryInternal { let mut chunks_stream = fetch_chunks_response_to_chunk_and_segment_id(response_stream); - let store_id = StoreId::new(StoreKind::Recording, dataset_name, partition_id.clone()); + let store_id = StoreId::new(StoreKind::Recording, dataset_name, segment_id.clone()); let mut store = ChunkStore::new(store_id, Default::default()); while let Some(chunks) = chunks_stream.next().await { for chunk in chunks.map_err(to_py_err)? { - let (chunk, chunk_partition_id) = chunk; + let (chunk, chunk_segment_id) = chunk; - if Some(&partition_id) != chunk_partition_id.as_ref() { + if Some(&segment_id) != chunk_segment_id.as_ref() { re_log::warn!( - expected = partition_id, - got = chunk_partition_id, - "unexpected partition ID in chunk stream, this is a bug" + expected = segment_id, + got = chunk_segment_id, + "unexpected segment ID in chunk stream, this is a bug" ); } store diff --git a/rerun_py/tests/api_sandbox/rerun_draft/catalog.py b/rerun_py/tests/api_sandbox/rerun_draft/catalog.py index f20d5973474a..dd44d1fb3040 100644 --- a/rerun_py/tests/api_sandbox/rerun_draft/catalog.py +++ b/rerun_py/tests/api_sandbox/rerun_draft/catalog.py @@ -215,28 +215,28 @@ def register_blueprint(self, uri: str, set_default: bool = True) -> None: segment_id = blueprint_dataset.register(uri) if set_default: - self._inner.set_default_blueprint_partition_id(segment_id) + self._inner.set_default_blueprint_segment_id(segment_id) def blueprints(self) -> list[str]: """Lists all blueprints currently registered with this dataset.""" - return self._inner.blueprint_dataset().partition_ids() + return self._inner.blueprint_dataset().segment_ids() def set_default_blueprint(self, blueprint_name: str) -> None: """Set an already-registered blueprint as default for this dataset.""" - self._inner.set_default_blueprint_partition_id(blueprint_name) + self._inner.set_default_blueprint_segment_id(blueprint_name) def default_blueprint(self) -> str | None: """Return the name currently set blueprint.""" - return self._inner.default_blueprint_partition_id() + return self._inner.default_blueprint_segment_id() def schema(self) -> Schema: return Schema(self._inner.schema(), _LazyDatasetState()) def segment_ids(self) -> list[str]: - return self._inner.partition_ids() + return self._inner.segment_ids() def segment_table( self, join_meta: TableEntry | datafusion.DataFrame | None = None, join_key: str = "rerun_segment_id" @@ -254,7 +254,7 @@ def segment_url( start=None, end=None, ) -> str: - return self._inner.partition_url(segment_id, timeline, start, end) + return self._inner.segment_url(segment_id, timeline, start, end) def register(self, recording_uri: str | Sequence[str], *, layer_name: str | Sequence[str] = "base") -> Tasks: if isinstance(recording_uri, str): @@ -276,7 +276,7 @@ def register_prefix(self, recordings_prefix: str, layer_name: str | None = None) return Tasks(self._inner.register_prefix(recordings_prefix, layer_name)) def download_segment(self, segment_id: str) -> Any: - return self._inner.download_partition(segment_id) + return self._inner.download_segment(segment_id) def reader( self, @@ -501,19 +501,19 @@ def arrow_schema(self) -> pa.Schema: def segment_ids(self) -> list[str]: if self._lazy_state.filtered_segments is not None: - return [pid for pid in self._inner.partition_ids() if pid in self._lazy_state.filtered_segments] + return [pid for pid in self._inner.segment_ids() if pid in self._lazy_state.filtered_segments] else: - return self._inner.partition_ids() + return self._inner.segment_ids() def download_segment(self, segment_id: str) -> Any: - return self._inner.download_partition(segment_id) + return self._inner.download_segment(segment_id) def segment_table( self, join_meta: TableEntry | datafusion.DataFrame | None = None, join_key: str = "rerun_segment_id" ) -> datafusion.DataFrame: - # Get the partition table from the inner object + # Get the segment table from the inner object - partitions = self._inner.partition_table().df() + partitions = self._inner.segment_table().df() if self._lazy_state.filtered_segments is not None: ctx = datafusion.SessionContext() @@ -532,7 +532,7 @@ def segment_table( if isinstance(join_meta, TableEntry): join_meta = join_meta.reader() if join_key not in partitions.schema().names: - raise ValueError(f"Dataset partition table must contain join_key column '{join_key}'.") + raise ValueError(f"Dataset segment table must contain join_key column '{join_key}'.") if join_key not in join_meta.schema().names: raise ValueError(f"join_meta must contain join_key column '{join_key}'.") @@ -607,7 +607,7 @@ def reader( # Fake the intended behavior: index values are provided on a per-segment basis. If a segment is missing, # no rows are generated for it. - segments = self._lazy_state.filtered_segments or self._inner.partition_ids() + segments = self._lazy_state.filtered_segments or self._inner.segment_ids() df = None for segment in segments: @@ -616,7 +616,7 @@ def reader( else: index_values = np.array([], dtype=np.datetime64) - other_df = view.filter_partition_id(segment).using_index_values(index_values).df() + other_df = view.filter_segment_id(segment).using_index_values(index_values).df() if df is None: df = other_df @@ -640,7 +640,7 @@ def reader( return df else: if self._lazy_state.filtered_segments is not None: - view = view.filter_partition_id(*self._lazy_state.filtered_segments) + view = view.filter_segment_id(*self._lazy_state.filtered_segments) return view.df() diff --git a/rerun_py/tests/api_sandbox/test_current/test_dataframe_api.py b/rerun_py/tests/api_sandbox/test_current/test_dataframe_api.py index e77f89eec5dd..f82dfa207515 100644 --- a/rerun_py/tests/api_sandbox/test_current/test_dataframe_api.py +++ b/rerun_py/tests/api_sandbox/test_current/test_dataframe_api.py @@ -16,8 +16,8 @@ def test_dataframe_api_filter_partition_id(simple_dataset_prefix: Path) -> None: client = server.client() ds = client.get_dataset_entry(name="ds") - # Create a view with all partitions - view = ds.dataframe_query_view(index="timeline", contents="/**").filter_partition_id( + # Create a view with all segments + view = ds.dataframe_query_view(index="timeline", contents="/**").filter_segment_id( "simple_recording_0", "simple_recording_2" ) diff --git a/rerun_py/tests/api_sandbox/test_current/test_dataset_basics.py b/rerun_py/tests/api_sandbox/test_current/test_dataset_basics.py index f90763d765df..734800b19092 100644 --- a/rerun_py/tests/api_sandbox/test_current/test_dataset_basics.py +++ b/rerun_py/tests/api_sandbox/test_current/test_dataset_basics.py @@ -18,7 +18,7 @@ def test_dataset_basics(complex_dataset_prefix: Path) -> None: ds.register_prefix(complex_dataset_prefix.as_uri()) - partition_df = ds.partition_table().df() + partition_df = ds.segment_table().df() assert str(partition_df.schema()) == inline_snapshot("""\ rerun_segment_id: string not null diff --git a/rerun_py/tests/api_sandbox/test_current/test_polars_interop.py b/rerun_py/tests/api_sandbox/test_current/test_polars_interop.py index 79dce5d3ae07..89733124b786 100644 --- a/rerun_py/tests/api_sandbox/test_current/test_polars_interop.py +++ b/rerun_py/tests/api_sandbox/test_current/test_polars_interop.py @@ -76,7 +76,7 @@ def test_partition_table_to_polars(simple_dataset_prefix: Path) -> None: ds = client.create_dataset("my_dataset") ds.register_prefix(simple_dataset_prefix.as_uri()) - df = ds.partition_table().df().to_polars() + df = ds.segment_table().df().to_polars() assert pprint.pformat(df.schema) == inline_snapshot("""\ Schema([('rerun_segment_id', String), @@ -108,7 +108,7 @@ def test_dataframe_query_to_polars(simple_dataset_prefix: Path) -> None: ds = client.create_dataset("my_dataset") ds.register_prefix(simple_dataset_prefix.as_uri()) - view = ds.dataframe_query_view(index="timeline", contents="/**").filter_partition_id( + view = ds.dataframe_query_view(index="timeline", contents="/**").filter_segment_id( "simple_recording_0", "simple_recording_2" ) diff --git a/rerun_py/tests/e2e_redap_tests/test_blueprint_dataset.py b/rerun_py/tests/e2e_redap_tests/test_blueprint_dataset.py index 391f786ad1e7..9f76f62554ed 100644 --- a/rerun_py/tests/e2e_redap_tests/test_blueprint_dataset.py +++ b/rerun_py/tests/e2e_redap_tests/test_blueprint_dataset.py @@ -42,12 +42,12 @@ def test_configure_blueprint_dataset(entry_factory: EntryFactory, tmp_path: Path bds = ds.blueprint_dataset() assert bds is not None - blueprint_partition_id = bds.register(rbl_path.absolute().as_uri()) + blueprint_segment_id = bds.register(rbl_path.absolute().as_uri()) # Set our newly registered blueprint as default for our dataset - ds.set_default_blueprint_partition_id(blueprint_partition_id) + ds.set_default_blueprint_segment_id(blueprint_segment_id) # Uncomment this line for a chance to connect to this server using the viewer # input(f"Server running on {server.address()}. Press enter to continue…" - assert ds.default_blueprint_partition_id() == blueprint_partition_id + assert ds.default_blueprint_segment_id() == blueprint_segment_id diff --git a/rerun_py/tests/e2e_redap_tests/test_datafusion_utils.py b/rerun_py/tests/e2e_redap_tests/test_datafusion_utils.py index 83ae43f0acfb..9d5e31b7c301 100644 --- a/rerun_py/tests/e2e_redap_tests/test_datafusion_utils.py +++ b/rerun_py/tests/e2e_redap_tests/test_datafusion_utils.py @@ -11,7 +11,7 @@ def test_url_generation(readonly_test_dataset: DatasetEntry) -> None: from rerun.utilities.datafusion.functions import url_generation - udf = url_generation.partition_url_with_timeref_udf(readonly_test_dataset, "time_1") + udf = url_generation.segment_url_with_timeref_udf(readonly_test_dataset, "time_1") results = ( readonly_test_dataset.dataframe_query_view(index="time_1", contents="/**") diff --git a/rerun_py/tests/e2e_redap_tests/test_dataset_query.py b/rerun_py/tests/e2e_redap_tests/test_dataset_query.py index 069e5f802500..72a719b06cc7 100644 --- a/rerun_py/tests/e2e_redap_tests/test_dataset_query.py +++ b/rerun_py/tests/e2e_redap_tests/test_dataset_query.py @@ -83,7 +83,7 @@ def readonly_test_dataset_to_arrow_reader(readonly_test_dataset: DatasetEntry) - for rb in readonly_test_dataset.dataframe_query_view(index="time_1", contents="/**").to_arrow_reader(): assert rb.num_rows > 0 - for partition_batch in readonly_test_dataset.partition_table().to_arrow_reader(): + for partition_batch in readonly_test_dataset.segment_table().to_arrow_reader(): assert partition_batch.num_rows > 0 diff --git a/rerun_py/tests/e2e_redap_tests/test_partition_id.py b/rerun_py/tests/e2e_redap_tests/test_partition_id.py index 39dc9c34dcce..718fbc824342 100644 --- a/rerun_py/tests/e2e_redap_tests/test_partition_id.py +++ b/rerun_py/tests/e2e_redap_tests/test_partition_id.py @@ -9,17 +9,17 @@ def test_partition_ids(entry_factory: EntryFactory, resource_prefix: str, snapshot: SnapshotAssertion) -> None: - """Test that we can successfully collect information about partitions.""" + """Test that we can successfully collect information about segments.""" ds = entry_factory.create_dataset("test_dataset") tasks = ds.register_prefix(resource_prefix + "dataset") tasks.wait(timeout_secs=50) assert ( - ds.partition_table() + ds.segment_table() .df() .drop("rerun_storage_urls", "rerun_last_updated_at", "rerun_size_bytes") .sort("rerun_segment_id") == snapshot ) - assert sorted(ds.partition_ids()) == snapshot + assert sorted(ds.segment_ids()) == snapshot diff --git a/rerun_py/tests/unit/test_utilities_datafusion.py b/rerun_py/tests/unit/test_utilities_datafusion.py index 4718b3fc02b8..ac34296d528f 100644 --- a/rerun_py/tests/unit/test_utilities_datafusion.py +++ b/rerun_py/tests/unit/test_utilities_datafusion.py @@ -12,24 +12,24 @@ def test_smoke() -> None: """Just check that we can import the module.""" -def test_partition_url_import_normal() -> None: - """Check that we can import partition_url when datafusion is available.""" - from rerun.utilities.datafusion.functions.url_generation import partition_url +def test_segment_url_import_normal() -> None: + """Check that we can import segment_url when datafusion is available.""" + from rerun.utilities.datafusion.functions.url_generation import segment_url - assert partition_url is not None + assert segment_url is not None -def test_partition_url_without_datafusion() -> None: - """Check that calling partition_url raises RerunOptionalDependencyError when datafusion is unavailable.""" +def test_segment_url_without_datafusion() -> None: + """Check that calling segment_url raises RerunOptionalDependencyError when datafusion is unavailable.""" # Mock the import to make datafusion unavailable with patch.dict("sys.modules", {"datafusion": None}): importlib.reload(rerun.utilities.datafusion.functions.url_generation) # Import the module - this should work - from rerun.utilities.datafusion.functions.url_generation import partition_url + from rerun.utilities.datafusion.functions.url_generation import segment_url # Create a mock dataset for testing mock_dataset = Mock() # But calling the function should raise an error with pytest.raises(RerunMissingDependencyError, match="'datafusion' could not be imported"): - partition_url(mock_dataset) + segment_url(mock_dataset)