Skip to content

Commit 701aed1

Browse files
committed
Add grpc endpoint for layer table and cleanup helper objects
1 parent 38e0195 commit 701aed1

File tree

5 files changed

+412
-49
lines changed

5 files changed

+412
-49
lines changed

crates/store/re_protos/proto/rerun/v1alpha1/cloud.proto

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,21 @@ service RerunCloudService {
7777
// This endpoint requires the standard dataset headers.
7878
rpc ScanPartitionTable(ScanPartitionTableRequest) returns (stream ScanPartitionTableResponse) {}
7979

80+
// Returns the schema of the layer table.
81+
//
82+
// To inspect the data of the partition table, which is guaranteed to match the schema returned by
83+
// this endpoint, check out `ScanLayerTable`.
84+
//
85+
// This endpoint requires the standard dataset headers.
86+
rpc GetLayerTableSchema(GetLayerTableSchemaRequest) returns (GetLayerTableSchemaResponse) {}
87+
88+
// Inspect the contents of the layer table.
89+
//
90+
// The data will follow the schema returned by `GetLayerTableSchema`.
91+
//
92+
// This endpoint requires the standard dataset headers.
93+
rpc ScanLayerTable(ScanLayerTableRequest) returns (stream ScanLayerTableResponse) {}
94+
8095
// Returns the schema of the dataset.
8196
//
8297
// This is the union of all the schemas from all the underlying partitions. It will contain all the indexes,
@@ -271,6 +286,29 @@ message ScanPartitionTableResponse {
271286
rerun.common.v1alpha1.DataframePart data = 1;
272287
}
273288

289+
message GetLayerTableSchemaRequest {
290+
reserved "dataset_id";
291+
}
292+
293+
message GetLayerTableSchemaResponse {
294+
rerun.common.v1alpha1.Schema schema = 1;
295+
}
296+
297+
message ScanLayerTableRequest {
298+
// A list of column names to be projected server-side.
299+
//
300+
// All of them if left empty.
301+
repeated string columns = 3;
302+
303+
reserved "dataset_id";
304+
reserved "scan_parameters";
305+
}
306+
307+
message ScanLayerTableResponse {
308+
// Layer metadata as arrow RecordBatch
309+
rerun.common.v1alpha1.DataframePart data = 1;
310+
}
311+
274312
message GetDatasetSchemaRequest {
275313
reserved 1;
276314
reserved "dataset_id";

crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs

Lines changed: 98 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use std::sync::Arc;
22

3-
use arrow::array::RecordBatchOptions;
3+
use arrow::array::{
4+
FixedSizeBinaryArray, ListBuilder, RecordBatchOptions, StringBuilder, UInt64Array,
5+
};
46
use arrow::{
57
array::{Array, ArrayRef, RecordBatch, StringArray, TimestampNanosecondArray},
68
datatypes::{DataType, Field, Schema, TimeUnit},
@@ -11,10 +13,9 @@ use re_chunk::TimelineName;
1113
use re_log_types::{EntityPath, EntryId, TimeInt};
1214
use re_sorbet::ComponentColumnDescriptor;
1315

14-
use crate::cloud::v1alpha1::{EntryKind, QueryTasksResponse};
1516
use crate::cloud::v1alpha1::{
16-
GetDatasetSchemaResponse, RegisterWithDatasetResponse, ScanPartitionTableResponse,
17-
VectorDistanceMetric,
17+
EntryKind, GetDatasetSchemaResponse, QueryTasksResponse, RegisterWithDatasetResponse,
18+
ScanLayerTableResponse, ScanPartitionTableResponse, VectorDistanceMetric,
1819
};
1920
use crate::common::v1alpha1::ext::{DatasetHandle, IfDuplicateBehavior, PartitionId};
2021
use crate::common::v1alpha1::{ComponentDescriptor, DataframePart, TaskId};
@@ -1178,51 +1179,125 @@ pub struct RegisterWithDatasetTaskDescriptor {
11781179

11791180
impl ScanPartitionTableResponse {
11801181
pub const PARTITION_ID: &str = "rerun_partition_id";
1181-
pub const PARTITION_TYPE: &str = "rerun_partition_type";
1182+
pub const LAYERS: &str = "rerun_layers";
1183+
pub const LAST_UPDATED_AT: &str = "rerun_last_updated_at";
1184+
pub const NUM_CHUNKS: &str = "rerun_num_chunks";
1185+
pub const SIZE_BYTES: &str = "rerun_size_bytes";
1186+
1187+
pub fn schema() -> Schema {
1188+
Schema::new(vec![
1189+
Field::new(Self::PARTITION_ID, DataType::Utf8, false),
1190+
Field::new(
1191+
Self::LAYERS,
1192+
DataType::List(Arc::new(Field::new(Self::LAYERS, DataType::Utf8, false))),
1193+
false,
1194+
),
1195+
Field::new(
1196+
Self::LAST_UPDATED_AT,
1197+
DataType::Timestamp(TimeUnit::Nanosecond, Some("utc".into())),
1198+
false,
1199+
),
1200+
Field::new(Self::NUM_CHUNKS, DataType::UInt64, false),
1201+
Field::new(Self::SIZE_BYTES, DataType::UInt64, false),
1202+
])
1203+
}
1204+
1205+
/// Helper to simplify instantiation of the dataframe in [`Self::data`].
1206+
pub fn create_dataframe(
1207+
partition_ids: Vec<String>,
1208+
layers: Vec<Vec<String>>,
1209+
last_updated_at: Vec<i64>,
1210+
num_chunks: Vec<u64>,
1211+
size_bytes: Vec<u64>,
1212+
) -> arrow::error::Result<RecordBatch> {
1213+
let row_count = partition_ids.len();
1214+
let schema = Arc::new(Self::schema());
1215+
1216+
let mut layers_builder = ListBuilder::new(StringBuilder::new());
1217+
for mut inner_vec in layers {
1218+
for layer_name in inner_vec.drain(..) {
1219+
layers_builder.values().append_value(layer_name)
1220+
}
1221+
layers_builder.append(true);
1222+
}
1223+
1224+
let columns: Vec<ArrayRef> = vec![
1225+
Arc::new(StringArray::from(partition_ids)),
1226+
Arc::new(layers_builder.finish()),
1227+
Arc::new(TimestampNanosecondArray::from(last_updated_at)),
1228+
Arc::new(UInt64Array::from(num_chunks)),
1229+
Arc::new(UInt64Array::from(size_bytes)),
1230+
];
1231+
1232+
RecordBatch::try_new_with_options(
1233+
schema,
1234+
columns,
1235+
&RecordBatchOptions::default().with_row_count(Some(row_count)),
1236+
)
1237+
}
1238+
1239+
pub fn data(&self) -> Result<&DataframePart, TypeConversionError> {
1240+
Ok(self.data.as_ref().ok_or_else(|| {
1241+
missing_field!(crate::cloud::v1alpha1::ScanPartitionTableResponse, "data")
1242+
})?)
1243+
}
1244+
}
1245+
1246+
// --- ScanLayerTableResponse --
1247+
1248+
impl ScanLayerTableResponse {
1249+
pub const LAYER_NAME: &str = "rerun_layer_name";
1250+
pub const PARTITION_ID: &str = "rerun_partition_id";
11821251
pub const STORAGE_URL: &str = "rerun_storage_url";
1252+
pub const LAYER_TYPE: &str = "rerun_layer_type";
11831253
pub const REGISTRATION_TIME: &str = "rerun_registration_time";
1184-
pub const PARTITION_MANIFEST_UPDATED_AT: &str = "rerun_partition_manifest_updated_at";
1185-
pub const PARTITION_MANIFEST_URL: &str = "rerun_partition_manifest_url";
1254+
pub const NUM_CHUNKS: &str = "rerun_num_chunks";
1255+
pub const SIZE_BYTES: &str = "rerun_size_bytes";
1256+
pub const SCHEMA_SHA256: &str = "rerun_schema_sha256";
11861257

11871258
pub fn schema() -> Schema {
11881259
Schema::new(vec![
1260+
Field::new(Self::LAYER_NAME, DataType::Utf8, false),
11891261
Field::new(Self::PARTITION_ID, DataType::Utf8, false),
1190-
Field::new(Self::PARTITION_TYPE, DataType::Utf8, false),
11911262
Field::new(Self::STORAGE_URL, DataType::Utf8, false),
1263+
Field::new(Self::LAYER_TYPE, DataType::Utf8, false),
11921264
Field::new(
11931265
Self::REGISTRATION_TIME,
1194-
DataType::Timestamp(TimeUnit::Nanosecond, None),
1266+
DataType::Timestamp(TimeUnit::Nanosecond, Some("utc".into())),
11951267
false,
11961268
),
1197-
Field::new(
1198-
Self::PARTITION_MANIFEST_UPDATED_AT,
1199-
DataType::Timestamp(TimeUnit::Nanosecond, None),
1200-
true,
1201-
),
1202-
Field::new(Self::PARTITION_MANIFEST_URL, DataType::Utf8, true),
1269+
Field::new(Self::NUM_CHUNKS, DataType::UInt64, false),
1270+
Field::new(Self::SIZE_BYTES, DataType::UInt64, false),
1271+
Field::new(Self::SCHEMA_SHA256, DataType::FixedSizeBinary(32), false),
12031272
])
12041273
}
12051274

12061275
/// Helper to simplify instantiation of the dataframe in [`Self::data`].
12071276
pub fn create_dataframe(
1277+
layer_names: Vec<String>,
12081278
partition_ids: Vec<String>,
1209-
partition_types: Vec<String>,
12101279
storage_urls: Vec<String>,
1280+
layer_types: Vec<String>,
12111281
registration_times: Vec<i64>,
1212-
partition_manifest_updated_ats: Vec<Option<i64>>,
1213-
partition_manifest_urls: Vec<Option<String>>,
1282+
num_chunks: Vec<u64>,
1283+
size_bytes: Vec<u64>,
1284+
schema_sha256s: Vec<[u8; 32]>,
12141285
) -> arrow::error::Result<RecordBatch> {
12151286
let row_count = partition_ids.len();
12161287
let schema = Arc::new(Self::schema());
1288+
12171289
let columns: Vec<ArrayRef> = vec![
1290+
Arc::new(StringArray::from(layer_names)),
12181291
Arc::new(StringArray::from(partition_ids)),
1219-
Arc::new(StringArray::from(partition_types)),
12201292
Arc::new(StringArray::from(storage_urls)),
1293+
Arc::new(StringArray::from(layer_types)),
12211294
Arc::new(TimestampNanosecondArray::from(registration_times)),
1222-
Arc::new(TimestampNanosecondArray::from(
1223-
partition_manifest_updated_ats,
1224-
)),
1225-
Arc::new(StringArray::from(partition_manifest_urls)),
1295+
Arc::new(UInt64Array::from(num_chunks)),
1296+
Arc::new(UInt64Array::from(size_bytes)),
1297+
Arc::new(
1298+
FixedSizeBinaryArray::try_from_iter(schema_sha256s.into_iter())
1299+
.expect("sizes of nested slices are guaranteed to match"),
1300+
),
12261301
];
12271302

12281303
RecordBatch::try_new_with_options(

0 commit comments

Comments
 (0)