Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions crates/store/re_data_source/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ pub enum LogDataSource {
Stdin,

/// A `rerun://` URI pointing to a recording.
RedapDatasetPartition {
uri: re_uri::DatasetPartitionUri,
RedapDatasetSegment {
uri: re_uri::DatasetSegmentUri,

/// Switch to this recording once it has been loaded?
select_when_loaded: bool,
Expand Down Expand Up @@ -117,8 +117,8 @@ impl LogDataSource {
}
}

if let Ok(uri) = url.parse::<re_uri::DatasetPartitionUri>() {
Some(Self::RedapDatasetPartition {
if let Ok(uri) = url.parse::<re_uri::DatasetSegmentUri>() {
Some(Self::RedapDatasetSegment {
uri,
select_when_loaded: true,
})
Expand Down Expand Up @@ -233,7 +233,7 @@ impl LogDataSource {
Ok(rx)
}

Self::RedapDatasetPartition {
Self::RedapDatasetSegment {
uri,
select_when_loaded,
} => {
Expand All @@ -250,7 +250,7 @@ impl LogDataSource {

let connection_registry = connection_registry.clone();
let uri_clone = uri.clone();
let stream_partition = async move {
let stream_segment = async move {
let client = connection_registry
.client(uri_clone.origin.clone())
.await
Expand All @@ -262,7 +262,7 @@ impl LogDataSource {
};

spawn_future(async move {
if let Err(err) = stream_partition.await {
if let Err(err) = stream_segment.await {
re_log::warn!("Error while streaming: {}", re_error::format_ref(&err));
}
});
Expand Down Expand Up @@ -322,9 +322,12 @@ mod tests {
"www.foo.zip/blueprint.rbl",
];
let grpc = [
// segment_id (new)
"rerun://127.0.0.1:1234/dataset/1830B33B45B963E7774455beb91701ae/data?segment_id=sid",
"rerun://127.0.0.1:1234/dataset/1830B33B45B963E7774455beb91701ae/data?segment_id=sid&[email protected]",
"rerun+http://example.com/dataset/1830B33B45B963E7774455beb91701ae/data?segment_id=sid",
// partition_id (legacy, for backward compatibility)
"rerun://127.0.0.1:1234/dataset/1830B33B45B963E7774455beb91701ae/data?partition_id=pid",
"rerun://127.0.0.1:1234/dataset/1830B33B45B963E7774455beb91701ae/data?partition_id=pid&[email protected]",
"rerun+http://example.com/dataset/1830B33B45B963E7774455beb91701ae/data?partition_id=pid",
];

let proxy = [
Expand Down Expand Up @@ -360,12 +363,9 @@ mod tests {

for uri in grpc {
let data_source = LogDataSource::from_uri(file_source.clone(), uri);
if !matches!(
data_source,
Some(LogDataSource::RedapDatasetPartition { .. })
) {
if !matches!(data_source, Some(LogDataSource::RedapDatasetSegment { .. })) {
eprintln!(
"Expected {uri:?} to be categorized as readp dataset. Instead it got parsed as {data_source:?}"
"Expected {uri:?} to be categorized as redap dataset segment. Instead it got parsed as {data_source:?}"
);
failed = true;
}
Expand Down
6 changes: 3 additions & 3 deletions crates/store/re_entity_db/src/entity_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ pub enum EntityDbClass<'a> {
/// This is an official rerun example recording.
ExampleRecording,

/// This is a recording loaded from a remote dataset partition.
DatasetPartition(&'a re_uri::DatasetPartitionUri),
/// This is a recording loaded from a remote dataset segment.
DatasetSegment(&'a re_uri::DatasetSegmentUri),

/// This is a blueprint.
Blueprint,
Expand Down Expand Up @@ -293,7 +293,7 @@ impl EntityDb {
}

Some(SmartChannelSource::RedapGrpcStream { uri, .. }) => {
EntityDbClass::DatasetPartition(uri)
EntityDbClass::DatasetSegment(uri)
}

_ => EntityDbClass::LocalRecording,
Expand Down
6 changes: 3 additions & 3 deletions crates/store/re_redap_client/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ where
pub async fn stream_blueprint_and_segment_from_server(
mut client: ConnectionClient,
tx: re_smart_channel::Sender<DataSourceMessage>,
uri: re_uri::DatasetPartitionUri,
uri: re_uri::DatasetSegmentUri,
on_msg: Option<Box<dyn Fn() + Send + Sync>>,
) -> Result<(), ApiError> {
re_log::debug!("Loading {uri}…");
Expand Down Expand Up @@ -354,10 +354,10 @@ pub async fn stream_blueprint_and_segment_from_server(
re_log::debug!("No blueprint dataset found for {uri}");
}

let re_uri::DatasetPartitionUri {
let re_uri::DatasetSegmentUri {
origin: _,
dataset_id,
partition_id: segment_id,
segment_id,
time_range,
fragment,
} = uri;
Expand Down
6 changes: 3 additions & 3 deletions crates/store/re_smart_channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub enum SmartChannelSource {

/// The data is streaming in directly from a Rerun Data Platform server, over gRPC.
RedapGrpcStream {
uri: re_uri::DatasetPartitionUri,
uri: re_uri::DatasetSegmentUri,

/// Switch to this recording once it has been loaded?
select_when_loaded: bool,
Expand Down Expand Up @@ -146,7 +146,7 @@ impl SmartChannelSource {
// We only show things we know are very-soon-to-be recordings:
Self::File(path) => Some(path.to_string_lossy().into_owned()),
Self::RrdHttpStream { url, .. } => Some(url.clone()),
Self::RedapGrpcStream { uri, .. } => Some(uri.partition_id.clone()),
Self::RedapGrpcStream { uri, .. } => Some(uri.segment_id.clone()),

Self::RrdWebEventListener
| Self::JsChannel { .. }
Expand Down Expand Up @@ -240,7 +240,7 @@ pub enum SmartMessageSource {

/// A file on a Rerun Data Platform server, over `rerun://` gRPC interface.
RedapGrpcStream {
uri: re_uri::DatasetPartitionUri,
uri: re_uri::DatasetSegmentUri,

/// Switch to this recording once it has been loaded?
select_when_loaded: bool,
Expand Down
60 changes: 37 additions & 23 deletions crates/store/re_uri/src/endpoints/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,30 @@ use crate::{Error, Fragment, Origin, RedapUri, TimeSelection};
/// URI pointing at the data underlying a dataset.
///
/// Currently, the following format is supported:
/// `<origin>/dataset/$DATASET_ID/data?partition_id=$PARTITION_ID&time_range=$TIME_RANGE`
/// `<origin>/dataset/$DATASET_ID/data?segment_id=$SEGMENT_ID&time_range=$TIME_RANGE`
///
/// `partition_id` is currently mandatory, and `time_range` is optional.
/// `segment_id` is currently mandatory, and `time_range` is optional.
/// In the future we will add richer queries.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct DatasetPartitionUri {
pub struct DatasetSegmentUri {
pub origin: Origin,
pub dataset_id: re_tuid::Tuid,

// Query parameters: these affect what data is returned.
/// Currently mandatory.
pub partition_id: String,
pub segment_id: String,
pub time_range: Option<TimeSelection>,

// Fragment parameters: these affect what the viewer focuses on:
pub fragment: Fragment,
}

impl std::fmt::Display for DatasetPartitionUri {
impl std::fmt::Display for DatasetSegmentUri {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
origin,
dataset_id,
partition_id,
segment_id,
time_range,
fragment,
} = self;
Expand All @@ -37,7 +37,7 @@ impl std::fmt::Display for DatasetPartitionUri {

// ?query:
{
write!(f, "?partition_id={partition_id}")?;
write!(f, "?segment_id={segment_id}")?;
}
if let Some(time_range) = time_range {
write!(f, "&time_range={time_range}")?;
Expand All @@ -53,15 +53,21 @@ impl std::fmt::Display for DatasetPartitionUri {
}
}

impl DatasetPartitionUri {
impl DatasetSegmentUri {
pub fn new(origin: Origin, dataset_id: re_tuid::Tuid, url: &url::Url) -> Result<Self, Error> {
let mut partition_id = None;
let mut segment_id = None;
let mut legacy_partition_id = None;
let mut time_range = None;

for (key, value) in url.query_pairs() {
match key.as_ref() {
// Accept legacy `partition_id` query parameter.
"partition_id" => {
partition_id = Some(value.to_string());
legacy_partition_id = Some(value.to_string());
}

"segment_id" => {
segment_id = Some(value.to_string());
}
"time_range" => {
// `+` means whitespace in URLs.
Expand All @@ -75,8 +81,16 @@ impl DatasetPartitionUri {
}
}

let Some(partition_id) = partition_id else {
return Err(Error::MissingPartitionId);
let segment_id = match (segment_id, legacy_partition_id) {
(Some(s), None) | (None, Some(s)) => s,

(None, None) => {
return Err(Error::MissingSegmentId);
}

(Some(_), Some(_)) => {
return Err(Error::AmbiguousSegmentId);
}
};

let mut fragment = Fragment::default();
Expand All @@ -87,7 +101,7 @@ impl DatasetPartitionUri {
Ok(Self {
origin,
dataset_id,
partition_id,
segment_id,
time_range,
fragment,
})
Expand All @@ -96,9 +110,9 @@ impl DatasetPartitionUri {
/// Returns [`Self`] without any (optional) `?query` or `#fragment`.
pub fn without_query_and_fragment(mut self) -> Self {
let Self {
origin: _, // Mandatory
dataset_id: _, // Mandatory
partition_id: _, // Mandatory
origin: _, // Mandatory
dataset_id: _, // Mandatory
segment_id: _, // Mandatory
time_range,
fragment,
} = &mut self;
Expand All @@ -112,9 +126,9 @@ impl DatasetPartitionUri {
/// Returns [`Self`] without any (optional) `#fragment`.
pub fn without_fragment(mut self) -> Self {
let Self {
origin: _, // Mandatory
dataset_id: _, // Mandatory
partition_id: _, // Mandatory
origin: _, // Mandatory
dataset_id: _, // Mandatory
segment_id: _, // Mandatory
time_range: _,
fragment,
} = &mut self;
Expand All @@ -128,12 +142,12 @@ impl DatasetPartitionUri {
StoreId::new(
re_log_types::StoreKind::Recording,
self.dataset_id.to_string(),
self.partition_id.clone(),
self.segment_id.clone(),
)
}
}

impl std::str::FromStr for DatasetPartitionUri {
impl std::str::FromStr for DatasetSegmentUri {
type Err = Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
Expand All @@ -148,7 +162,7 @@ impl std::str::FromStr for DatasetPartitionUri {
// --------------------------------

// Serialize as string:
impl serde::Serialize for DatasetPartitionUri {
impl serde::Serialize for DatasetSegmentUri {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
Expand All @@ -157,7 +171,7 @@ impl serde::Serialize for DatasetPartitionUri {
}
}

impl<'de> serde::Deserialize<'de> for DatasetPartitionUri {
impl<'de> serde::Deserialize<'de> for DatasetSegmentUri {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
Expand Down
9 changes: 7 additions & 2 deletions crates/store/re_uri/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,13 @@ pub enum Error {
#[error("URL {url:?} cannot be loaded as a recording")]
CannotLoadUrlAsRecording { url: String },

#[error("Dataset data URL required a `?partition_id` query parameter")]
MissingPartitionId,
#[error("Dataset data URL requires a `?segment_id` query parameter")]
MissingSegmentId,

#[error(
"Dataset data URL cannot contain both `?segment_id` and legacy `?partition_id` query parameters"
)]
AmbiguousSegmentId,

#[error("Invalid TUID: {0}")]
InvalidTuid(<re_tuid::Tuid as FromStr>::Err),
Expand Down
4 changes: 2 additions & 2 deletions crates/store/re_uri/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
//! "rerun+http://localhost:51234/proxy",
//!
//! // Links to recording on the dataplatform (optionally with timestamp).
//! "rerun://127.0.0.1:1234/dataset/1830B33B45B963E7774455beb91701ae/data?partition_id=pid&[email protected]",
//! "rerun://127.0.0.1:1234/dataset/1830B33B45B963E7774455beb91701ae/data?segment_id=sid&[email protected]",
//! ] {
//! assert!(uri.parse::<re_uri::RedapUri>().is_ok());
//! }
Expand All @@ -43,7 +43,7 @@ mod time_selection;

pub use self::{
endpoints::{
catalog::CatalogUri, dataset::DatasetPartitionUri, entry::EntryUri, proxy::ProxyUri,
catalog::CatalogUri, dataset::DatasetSegmentUri, entry::EntryUri, proxy::ProxyUri,
},
error::Error,
fragment::Fragment,
Expand Down
Loading