Skip to content

Commit 62fcf36

Browse files
authored
remove internal tasks operations from cloud public API (#11447)
1 parent 0919885 commit 62fcf36

File tree

11 files changed

+112
-242
lines changed

11 files changed

+112
-242
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10548,7 +10548,6 @@ dependencies = [
1054810548
"mimalloc",
1054910549
"numpy",
1055010550
"parking_lot",
10551-
"prost-types",
1055210551
"pyo3",
1055310552
"pyo3-build-config",
1055410553
"rand 0.8.5",

crates/store/re_protos/proto/rerun/v1alpha1/cloud.proto

Lines changed: 0 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -154,9 +154,6 @@ service RerunCloudService {
154154
// Query the status of submitted tasks
155155
rpc QueryTasks(QueryTasksRequest) returns (QueryTasksResponse) {}
156156

157-
// Fetch the output of a completed task
158-
rpc FetchTaskOutput(FetchTaskOutputRequest) returns (FetchTaskOutputResponse) {}
159-
160157
// Query the status of submitted tasks as soon as they are no longer pending
161158
rpc QueryTasksOnCompletion(QueryTasksOnCompletionRequest) returns (stream QueryTasksOnCompletionResponse) {}
162159

@@ -640,27 +637,6 @@ message DoGlobalMaintenanceResponse {}
640637

641638
// --- Tasks ---
642639

643-
// A task is a unit of work that can be submitted to the system
644-
message Task {
645-
// Unique identifier for the task
646-
rerun.common.v1alpha1.TaskId id = 1;
647-
// Type of the task
648-
string task_type = 2;
649-
// Task-type dependant data necessary to de-serialize the task
650-
bytes task_data = 3;
651-
}
652-
653-
// `SubmitTasksRequest` is the request message for submitting tasks
654-
message SubmitTasksRequest {
655-
repeated Task tasks = 1;
656-
}
657-
658-
// `SubmitTaskResponse` contains, for each submitted task
659-
// its submission outcome, encoded as a `RecordBatch`
660-
message SubmitTasksResponse {
661-
rerun.common.v1alpha1.DataframePart data = 1;
662-
}
663-
664640
// `QueryTasksRequest` is the request message for querying tasks status
665641
message QueryTasksRequest {
666642
// Empty queries for all tasks if the server allows it.
@@ -689,18 +665,6 @@ message QueryTasksOnCompletionResponse {
689665
rerun.common.v1alpha1.DataframePart data = 1;
690666
}
691667

692-
// `FetchTaskOutputRequest` is the request message for fetching task output
693-
message FetchTaskOutputRequest {
694-
// Unique identifier for the task
695-
rerun.common.v1alpha1.TaskId id = 1;
696-
}
697-
698-
/// `FetchTaskOutputResponse` is the response message for fetching task output
699-
message FetchTaskOutputResponse {
700-
// The output of the task, encoded as a record batch
701-
rerun.common.v1alpha1.DataframePart data = 1;
702-
}
703-
704668
// --- Catalog ---
705669

706670
// FindEntries

crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.ext.rs

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::common::v1alpha1::{
2323
ComponentDescriptor, DataframePart, TaskId,
2424
ext::{DatasetHandle, IfDuplicateBehavior, PartitionId},
2525
};
26-
use crate::{TypeConversionError, missing_field};
26+
use crate::{TypeConversionError, invalid_field, missing_field};
2727

2828
// --- RegisterWithDatasetRequest ---
2929

@@ -1605,6 +1605,57 @@ impl From<ComponentColumnDescriptor> for crate::cloud::v1alpha1::IndexColumn {
16051605
}
16061606
}
16071607

1608+
// --- Tasks ---
1609+
1610+
pub struct QueryTasksOnCompletionRequest {
1611+
pub task_ids: Vec<TaskId>,
1612+
pub timeout: std::time::Duration,
1613+
}
1614+
1615+
pub struct QueryTasksRequest {
1616+
pub task_ids: Vec<TaskId>,
1617+
}
1618+
1619+
impl TryFrom<QueryTasksOnCompletionRequest>
1620+
for crate::cloud::v1alpha1::QueryTasksOnCompletionRequest
1621+
{
1622+
type Error = TypeConversionError;
1623+
1624+
fn try_from(
1625+
value: QueryTasksOnCompletionRequest,
1626+
) -> Result<crate::cloud::v1alpha1::QueryTasksOnCompletionRequest, Self::Error> {
1627+
if value.task_ids.is_empty() {
1628+
return Err(missing_field!(
1629+
crate::cloud::v1alpha1::QueryTasksOnCompletionRequest,
1630+
"task_ids"
1631+
));
1632+
}
1633+
let timeout: prost_types::Duration = value.timeout.try_into().map_err(|err| {
1634+
invalid_field!(
1635+
crate::cloud::v1alpha1::QueryTasksOnCompletionRequest,
1636+
"timeout",
1637+
err
1638+
)
1639+
})?;
1640+
Ok(Self {
1641+
ids: value.task_ids,
1642+
timeout: Some(timeout),
1643+
})
1644+
}
1645+
}
1646+
1647+
impl TryFrom<QueryTasksRequest> for crate::cloud::v1alpha1::QueryTasksRequest {
1648+
type Error = TypeConversionError;
1649+
1650+
fn try_from(
1651+
value: QueryTasksRequest,
1652+
) -> Result<crate::cloud::v1alpha1::QueryTasksRequest, Self::Error> {
1653+
Ok(Self {
1654+
ids: value.task_ids,
1655+
})
1656+
}
1657+
}
1658+
16081659
// --
16091660

16101661
#[cfg(test)]

crates/store/re_protos/src/v1alpha1/rerun.cloud.v1alpha1.rs

Lines changed: 0 additions & 157 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/store/re_redap_client/src/connection_client.rs

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,16 @@ use re_protos::{
1111
CreateDatasetEntryRequest, DeleteEntryRequest, EntryFilter, EntryKind, FetchChunksRequest,
1212
FindEntriesRequest, GetDatasetManifestSchemaRequest, GetDatasetManifestSchemaResponse,
1313
GetPartitionTableSchemaRequest, GetPartitionTableSchemaResponse, QueryDatasetRequest,
14-
QueryDatasetResponse, ReadDatasetEntryRequest, ReadTableEntryRequest,
15-
RegisterWithDatasetResponse, ScanPartitionTableRequest, ScanPartitionTableResponse,
14+
QueryDatasetResponse, QueryTasksOnCompletionResponse, QueryTasksResponse,
15+
ReadDatasetEntryRequest, ReadTableEntryRequest, RegisterWithDatasetResponse,
16+
ScanPartitionTableRequest, ScanPartitionTableResponse,
1617
ext::{
1718
CreateDatasetEntryResponse, DataSource, DataSourceKind, DatasetDetails, DatasetEntry,
1819
EntryDetails, EntryDetailsUpdate, LanceTable, ProviderDetails as _,
19-
ReadDatasetEntryResponse, ReadTableEntryResponse, RegisterTableResponse,
20-
RegisterWithDatasetRequest, RegisterWithDatasetTaskDescriptor, TableEntry,
21-
UpdateDatasetEntryRequest, UpdateDatasetEntryResponse, UpdateEntryRequest,
22-
UpdateEntryResponse,
20+
QueryTasksOnCompletionRequest, QueryTasksRequest, ReadDatasetEntryResponse,
21+
ReadTableEntryResponse, RegisterTableResponse, RegisterWithDatasetRequest,
22+
RegisterWithDatasetTaskDescriptor, TableEntry, UpdateDatasetEntryRequest,
23+
UpdateDatasetEntryResponse, UpdateEntryRequest, UpdateEntryResponse,
2324
},
2425
rerun_cloud_service_client::RerunCloudServiceClient,
2526
},
@@ -32,7 +33,7 @@ use re_protos::{
3233
missing_field,
3334
};
3435

35-
use crate::{StreamEntryError, StreamError};
36+
use crate::{StreamEntryError, StreamError, StreamTasksError};
3637

3738
pub type FetchChunksResponseStream = std::pin::Pin<
3839
Box<
@@ -548,4 +549,34 @@ where
548549
.map(|entry| entry.name.clone())
549550
.collect())
550551
}
552+
553+
// -- Tasks API --
554+
pub async fn query_tasks_on_completion(
555+
&mut self,
556+
task_ids: Vec<TaskId>,
557+
timeout: std::time::Duration,
558+
) -> Result<tonic::Streaming<QueryTasksOnCompletionResponse>, StreamError> {
559+
let q = QueryTasksOnCompletionRequest { task_ids, timeout };
560+
let response = self
561+
.inner()
562+
.query_tasks_on_completion(tonic::Request::new(q.try_into()?))
563+
.await
564+
.map_err(|err| StreamTasksError::StreamingTaskResults(err.into()))?
565+
.into_inner();
566+
Ok(response)
567+
}
568+
569+
pub async fn query_tasks(
570+
&mut self,
571+
task_ids: Vec<TaskId>,
572+
) -> Result<QueryTasksResponse, StreamError> {
573+
let q = QueryTasksRequest { task_ids };
574+
let response = self
575+
.inner()
576+
.query_tasks(tonic::Request::new(q.try_into()?))
577+
.await
578+
.map_err(|err| StreamTasksError::StreamingTaskResults(err.into()))?
579+
.into_inner();
580+
Ok(response)
581+
}
551582
}

0 commit comments

Comments
 (0)