Skip to content

Commit 79300b1

Browse files
authored
Workspace CRUD API with MongoDB persistence
* Workspace CRUD API with MongoDB persistence * Address Copilot review and CodeScene feedback
1 parent 60de0fb commit 79300b1

File tree

14 files changed

+969
-103
lines changed

14 files changed

+969
-103
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "mstream"
3-
version = "0.46.0"
3+
version = "0.47.0"
44
edition = "2024"
55
authors = ["Simon Makarski"]
66

@@ -62,6 +62,7 @@ apache-avro = { version = "0.14", features = ["derive"] }
6262
tokio = { version = "1.0", features = ["full", "test-util"] }
6363
tempfile = "3.23.0"
6464
criterion = { version = "0.8", features = ["html_reports"] }
65+
tower = { version = "0.5.3", features = ["util"] }
6566

6667
[[bench]]
6768
name = "json_schema_bench"

README.md

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,50 @@ For MongoDB services, the `id` field is auto-assigned if omitted. For PubSub ser
517517

518518
Test suites persist test cases, assertions, and optionally a reference to the transform script they were authored against. Used by the Transform Studio to save and reload testing sessions.
519519

520+
### Workspaces
521+
522+
Workspaces persist a named development context — script reference, sample input, schema, and test suite — so that work-in-progress can be saved, shared, and restored later.
523+
524+
> **Workspaces vs Connectors (Pipelines)**
525+
>
526+
> A **connector** is a deployed, running pipeline: it binds a source → middlewares → sinks and processes live data. A **workspace** is a development-time concept: it captures what you're *working on* (which script, which sample input, which schema, which test suite) without running anything. When ready, a workspace can pre-fill a connector's configuration, but the two are independent records.
527+
528+
| Method | Endpoint | Description |
529+
|--------|----------|-------------|
530+
| `GET` | `/workspaces` | List saved workspaces (id + name) |
531+
| `GET` | `/workspaces/{id}` | Get a workspace by ID |
532+
| `POST` | `/workspaces` | Create a workspace |
533+
| `PUT` | `/workspaces/{id}` | Update a workspace |
534+
| `DELETE` | `/workspaces/{id}` | Delete a workspace |
535+
536+
#### Create Workspace
537+
538+
```
539+
POST /workspaces
540+
```
541+
542+
```json
543+
{
544+
"name": "PII Masking Workspace",
545+
"script": { "service_name": "udf-anonymizer", "resource": "mask_pii.rhai" },
546+
"input": "{\"name\": \"John\", \"email\": \"john@acme.com\"}",
547+
"schema": { "schema_id": "schema-1", "service_name": "etl-test-data" },
548+
"test_suite_id": "suite-1"
549+
}
550+
```
551+
552+
All fields except `name` are optional. The `id` is auto-assigned if omitted.
553+
554+
#### System Configuration
555+
556+
```toml
557+
[system.workspaces]
558+
service_name = "system-db"
559+
resource = "workspaces"
560+
```
561+
562+
Requires a MongoDB service. Without this configuration, workspaces use a noop store and workspace data is not persisted.
563+
520564
## Checkpoints
521565

522566
Checkpoints allow connectors to resume from their last processed position after a restart, preventing data loss or reprocessing.

mstream-config.toml.example

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,11 @@ resource = "mstream-checkpoints"
7777
buffer_capacity = 10000 # Max entries in ring buffer (default: 10000)
7878
stream_preload_count = 100 # Entries sent on SSE connect (default: 100)
7979

80+
# Optional: persist Transform Studio workspaces to MongoDB
81+
[system.workspaces]
82+
service_name = "mongodb-source"
83+
resource = "workspaces"
84+
8085
# Connector Configurations
8186
[[connectors]]
8287
enabled = true

src/api/error.rs

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use tracing::{error, warn};
88
use crate::{
99
api::types::Message, job_manager::error::JobManagerError,
1010
middleware::udf::rhai::RhaiMiddlewareError, schema::introspect::SchemaIntrospectError,
11-
testing::store::TestSuiteStoreError,
11+
testing::store::TestSuiteStoreError, workspace::WorkspaceStoreError,
1212
};
1313

1414
#[derive(Debug, thiserror::Error)]
@@ -33,6 +33,9 @@ pub enum ApiError {
3333

3434
#[error(transparent)]
3535
TestSuiteStore(#[from] TestSuiteStoreError),
36+
37+
#[error(transparent)]
38+
WorkspaceStore(#[from] WorkspaceStoreError),
3639
}
3740

3841
impl IntoResponse for ApiError {
@@ -46,6 +49,7 @@ impl IntoResponse for ApiError {
4649
ApiError::BadRequest(msg) => (StatusCode::BAD_REQUEST, msg),
4750
ApiError::Internal(msg) => (StatusCode::INTERNAL_SERVER_ERROR, msg),
4851
ApiError::TestSuiteStore(err) => err.response_data(),
52+
ApiError::WorkspaceStore(err) => err.response_data(),
4953
};
5054

5155
if status.is_server_error() {
@@ -136,6 +140,22 @@ impl ApiErrorDetails for TestSuiteStoreError {
136140
}
137141
}
138142

143+
impl ApiErrorDetails for WorkspaceStoreError {
144+
fn response_data(&self) -> (StatusCode, String) {
145+
match self {
146+
WorkspaceStoreError::NotFound(id) => (
147+
StatusCode::NOT_FOUND,
148+
format!("Workspace '{}' not found", id),
149+
),
150+
WorkspaceStoreError::MongoDb(e) => (
151+
StatusCode::INTERNAL_SERVER_ERROR,
152+
format!("Database error: {}", e),
153+
),
154+
WorkspaceStoreError::Other(msg) => (StatusCode::INTERNAL_SERVER_ERROR, msg.clone()),
155+
}
156+
}
157+
}
158+
139159
impl ApiErrorDetails for RhaiMiddlewareError {
140160
fn response_data(&self) -> (StatusCode, String) {
141161
match &self {
@@ -353,6 +373,29 @@ mod tests {
353373
}
354374
}
355375

376+
mod workspace_store_error_tests {
377+
use super::*;
378+
379+
#[tokio::test]
380+
async fn not_found_returns_404() {
381+
let error: ApiError = WorkspaceStoreError::NotFound("ws-123".to_string()).into();
382+
let (status, message) = extract_response(error.into_response()).await;
383+
384+
assert_eq!(status, StatusCode::NOT_FOUND);
385+
assert!(message.contains("ws-123"));
386+
}
387+
388+
#[tokio::test]
389+
async fn other_error_returns_500() {
390+
let error: ApiError =
391+
WorkspaceStoreError::Other("unexpected failure".to_string()).into();
392+
let (status, message) = extract_response(error.into_response()).await;
393+
394+
assert_eq!(status, StatusCode::INTERNAL_SERVER_ERROR);
395+
assert!(message.contains("unexpected failure"));
396+
}
397+
}
398+
356399
mod api_error_tests {
357400
use super::*;
358401

src/api/handler/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ pub mod test_gen;
77
pub mod test_suites;
88
pub mod transform;
99
pub mod validate;
10+
pub mod workspaces;
1011

1112
pub use jobs::{create_start_job, list_checkpoints, list_jobs, restart_job, stop_job};
1213
pub use schema::{fill_schema, get_resource_schema, schema_convert};
@@ -20,3 +21,6 @@ pub use test_gen::{transform_test_generate, transform_test_run};
2021
pub use test_suites::{delete_test_suite, get_test_suite, list_test_suites, save_test_suite};
2122
pub use transform::transform_run;
2223
pub use validate::{transform_completions, transform_validate};
24+
pub use workspaces::{
25+
delete_workspace, get_workspace, list_workspaces, save_workspace, update_workspace,
26+
};

src/api/handler/test_suites.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ pub async fn list_test_suites(
1414
State(state): State<AppState>,
1515
) -> Result<impl IntoResponse, ApiError> {
1616
let summaries = state
17+
.stores
1718
.test_suite_store
1819
.list()
1920
.await
@@ -28,6 +29,7 @@ pub async fn get_test_suite(
2829
Path(id): Path<String>,
2930
) -> Result<impl IntoResponse, ApiError> {
3031
let suite = state
32+
.stores
3133
.test_suite_store
3234
.get(&id)
3335
.await
@@ -47,6 +49,7 @@ pub async fn save_test_suite(
4749
suite.updated_at = chrono::Utc::now();
4850

4951
state
52+
.stores
5053
.test_suite_store
5154
.save(&suite)
5255
.await
@@ -67,6 +70,7 @@ pub async fn delete_test_suite(
6770
Path(id): Path<String>,
6871
) -> Result<impl IntoResponse, ApiError> {
6972
state
73+
.stores
7074
.test_suite_store
7175
.delete(&id)
7276
.await

0 commit comments

Comments
 (0)