-
Notifications
You must be signed in to change notification settings - Fork 1.9k
[ENH]: Add API endpoints for Task management #5579
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Reviewer ChecklistPlease leverage this checklist to ensure your code review is thorough before approving Testing, Bugs, Errors, Logs, Documentation
System Compatibility
Quality
|
Add API Endpoints for Task Management (Create/Remove Task) This pull request introduces new API endpoints and supporting infrastructure for registering and removing asynchronous tasks that process collections in Chroma. It defines the client, HTTP, Rust, and Go backend interfaces for task creation and task deletion, including support in the protobuf definitions and coordinator layer. The implementation makes these endpoints available in both the Rust frontend and Python/JS clients, adds validation, and ensures correct propagation of errors. Extensive integration and property-based Python tests demonstrate correct task registration, error handling (e.g., duplicate tasks, bad operators), and removal logic. Key Changes• Added new REST endpoints and handlers on Affected Areas• This summary was automatically generated by @propel-code-bot |
d7a7b17
to
233198e
Compare
233198e
to
481f36a
Compare
481f36a
to
c2368d3
Compare
c2368d3
to
472df2a
Compare
Search, Key, K, Knn, Val | ||
) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these from your editor? Or did you run the python formatter? I'm always suspicious of whitespace changes that would imply the formatter has changed or was not run.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
editor, left them in because i thought it was good to get rid of trailing whitespace
task_name: Unique name for this task instance | ||
operator_name: Built-in operator name (e.g., "record_counter") | ||
output_collection_name: Name of the collection where task output will be stored | ||
params: Optional dictionary with operator-specific parameters |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this blob just passed in to the operator as e.g. a JSON value? How does the operator get these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are stored in the task definition as a JSON string that a TaskRunner receives and passes into the operator it executes for that Task.
chromadb/api/fastapi.py
Outdated
"""Register a recurring task on a collection.""" | ||
import json | ||
|
||
params_str = json.dumps(params) if params is not None else None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why nested JSON? An alternative that seems cleaner (but may have a con you've thought of) would be to make params a JSON object (it has to be anyway to round trip through JSON).
from chromadb.errors import ChromaError, NotFoundError | ||
|
||
|
||
def test_task_create_and_remove(basic_http_client: System) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test would pass if I changed the impls of create task and remove task to simply return static objects, right? Can we assert anything about the task existing and then not existing?
) | ||
|
||
|
||
def test_task_multiple_collections(basic_http_client: System) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about the adjacent: test_task_multiple_tasks
that tries to register multiple tasks on the same collection?
}: CreateTaskRequest, | ||
) -> Result<CreateTaskResponse, AddTaskError> { | ||
// TODO: Make min_records_for_task configurable | ||
const DEFAULT_MIN_RECORDS_FOR_TASK: u64 = 100; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do this in this PR? It's just a few lines.
rust/sysdb/src/sysdb.rs
Outdated
// This is a client-side parsing issue, not a creation failure. | ||
let task_id = | ||
chroma_types::TaskUuid(uuid::Uuid::parse_str(&response.task_id).map_err(|e| { | ||
CreateTaskError::FailedToCreateTask(tonic::Status::internal(format!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels very awkward to me that we're returning a TypedError
that throws away all typing to wrap a tonic::Status
that formats a string.
What about:
enum CreateTaskError {
NaturalLanguageReasonForFailure {
task_id: Uuid,
}
}
As written it's a create task error (so it failed) where the reason it failed is because it failed.
rust/types/src/api_types.rs
Outdated
pub task_name: String, | ||
pub operator_name: String, | ||
pub output_collection_name: String, | ||
pub params: Option<String>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following up on my other point: What if this were a serde_json::Value
rather than a string for the user to parse?
472df2a
to
12d53a6
Compare
let response = self.client.create_task(req).await?.into_inner(); | ||
|
||
// Parse the returned task_id - this should always succeed since the server generated it | ||
// If this fails, it indicates a serious server bug or protocol corruption | ||
let task_id = chroma_types::TaskUuid( | ||
uuid::Uuid::parse_str(&response.task_id).map_err(|e| { | ||
tracing::error!( | ||
task_id = %response.task_id, | ||
error = %e, | ||
"Server returned invalid task_id UUID - task was created but response is corrupt" | ||
); | ||
CreateTaskError::ServerReturnedInvalidData | ||
})?, | ||
); | ||
|
||
Ok(task_id) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
The current error handling for create_task
doesn't correctly propagate the AlreadyExists
error code from the gRPC service. The ?
operator on line 1682 will convert any gRPC error status, including AlreadyExists
, into a generic CreateTaskError::FailedToCreateTask
. This causes the ServiceBasedFrontend
to misinterpret it as an internal error instead of a specific AlreadyExists
condition.
This is inconsistent with how get_task_by_name
and delete_task_by_name
handle specific error codes like NotFound
. To fix this, you should match on the result of the gRPC call and explicitly handle the AlreadyExists
status code.
Suggested Change
let response = self.client.create_task(req).await?.into_inner(); | |
// Parse the returned task_id - this should always succeed since the server generated it | |
// If this fails, it indicates a serious server bug or protocol corruption | |
let task_id = chroma_types::TaskUuid( | |
uuid::Uuid::parse_str(&response.task_id).map_err(|e| { | |
tracing::error!( | |
task_id = %response.task_id, | |
error = %e, | |
"Server returned invalid task_id UUID - task was created but response is corrupt" | |
); | |
CreateTaskError::ServerReturnedInvalidData | |
})?, | |
); | |
Ok(task_id) | |
} | |
let response = self.client.create_task(req).await; | |
match response { | |
Ok(resp) => { | |
let inner = resp.into_inner(); | |
// Parse the returned task_id - this should always succeed since the server generated it | |
// If this fails, it indicates a serious server bug or protocol corruption | |
let task_id = chroma_types::TaskUuid( | |
uuid::Uuid::parse_str(&inner.task_id).map_err(|e| { | |
tracing::error!( | |
task_id = %inner.task_id, | |
error = %e, | |
"Server returned invalid task_id UUID - task was created but response is corrupt" | |
); | |
CreateTaskError::ServerReturnedInvalidData | |
})?, | |
); | |
Ok(task_id) | |
} | |
Err(status) => { | |
if status.code() == tonic::Code::AlreadyExists { | |
Err(CreateTaskError::AlreadyExists) | |
} else { | |
Err(CreateTaskError::FailedToCreateTask(status)) | |
} | |
} | |
} |
⚡ Committable suggestion
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
Context for Agents
[**BestPractice**]
The current error handling for `create_task` doesn't correctly propagate the `AlreadyExists` error code from the gRPC service. The `?` operator on line 1682 will convert any gRPC error status, including `AlreadyExists`, into a generic `CreateTaskError::FailedToCreateTask`. This causes the `ServiceBasedFrontend` to misinterpret it as an internal error instead of a specific `AlreadyExists` condition.
This is inconsistent with how `get_task_by_name` and `delete_task_by_name` handle specific error codes like `NotFound`. To fix this, you should match on the result of the gRPC call and explicitly handle the `AlreadyExists` status code.
<details>
<summary>Suggested Change</summary>
```suggestion
let response = self.client.create_task(req).await;
match response {
Ok(resp) => {
let inner = resp.into_inner();
// Parse the returned task_id - this should always succeed since the server generated it
// If this fails, it indicates a serious server bug or protocol corruption
let task_id = chroma_types::TaskUuid(
uuid::Uuid::parse_str(&inner.task_id).map_err(|e| {
tracing::error!(
task_id = %inner.task_id,
error = %e,
"Server returned invalid task_id UUID - task was created but response is corrupt"
);
CreateTaskError::ServerReturnedInvalidData
})?,
);
Ok(task_id)
}
Err(status) => {
if status.code() == tonic::Code::AlreadyExists {
Err(CreateTaskError::AlreadyExists)
} else {
Err(CreateTaskError::FailedToCreateTask(status))
}
}
}
```
⚡ **Committable suggestion**
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
</details>
File: rust/sysdb/src/sysdb.rs
Line: 1698
12d53a6
to
7474028
Compare
serde_json::Value::Null => Kind::NullValue(0), | ||
serde_json::Value::Bool(b) => Kind::BoolValue(b), | ||
serde_json::Value::Number(n) => { | ||
if let Some(f) = n.as_f64() { | ||
Kind::NumberValue(f) | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CriticalError]
Critical Bug: Potential task creation with corrupted parameters
The JSON-to-protobuf conversion in json_to_prost_value()
has a silent failure case:
serde_json::Value::Number(n) => {
if let Some(f) = n.as_f64() {
Kind::NumberValue(f)
} else {
Kind::NullValue(0) // Silent data corruption!
}
}
If a JSON number can't be converted to f64 (e.g., very large integers), it silently becomes null instead of failing. This could cause tasks to be created with corrupted parameters, leading to runtime failures.
Fix: Return an error or use a more robust conversion:
serde_json::Value::Number(n) => {
Kind::NumberValue(n.as_f64().ok_or_else(||
ConversionError::InvalidNumber(n.to_string()))?)
}
Context for Agents
[**CriticalError**]
**Critical Bug: Potential task creation with corrupted parameters**
The JSON-to-protobuf conversion in `json_to_prost_value()` has a silent failure case:
```rust
serde_json::Value::Number(n) => {
if let Some(f) = n.as_f64() {
Kind::NumberValue(f)
} else {
Kind::NullValue(0) // Silent data corruption!
}
}
```
If a JSON number can't be converted to f64 (e.g., very large integers), it silently becomes null instead of failing. This could cause tasks to be created with corrupted parameters, leading to runtime failures.
**Fix**: Return an error or use a more robust conversion:
```rust
serde_json::Value::Number(n) => {
Kind::NumberValue(n.as_f64().ok_or_else(||
ConversionError::InvalidNumber(n.to_string()))?)
}
```
File: rust/sysdb/src/sysdb.rs
Line: 51
if existingTask != nil { | ||
log.Info("CreateTask: task already exists, returning existing") | ||
taskID = existingTask.ID | ||
return nil | ||
log.Error("CreateTask: task already exists", zap.String("task_name", req.Name)) | ||
return common.ErrTaskAlreadyExists |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
Logic Bug: Incorrect parameter handling
The CreateTask
function has a logic error where it changes from returning existing tasks to rejecting duplicates, but the error handling pattern suggests it should still handle the case where a task might legitimately already exist:
if existingTask != nil {
log.Error("CreateTask: task already exists", zap.String("task_name", req.Name))
return common.ErrTaskAlreadyExists // Always errors now
}
This change breaks idempotency. If this is intentional, the API should be documented as non-idempotent. If not, the original logic should be preserved for idempotent task creation.
Context for Agents
[**BestPractice**]
**Logic Bug: Incorrect parameter handling**
The `CreateTask` function has a logic error where it changes from returning existing tasks to rejecting duplicates, but the error handling pattern suggests it should still handle the case where a task might legitimately already exist:
```go
if existingTask != nil {
log.Error("CreateTask: task already exists", zap.String("task_name", req.Name))
return common.ErrTaskAlreadyExists // Always errors now
}
```
This change breaks idempotency. If this is intentional, the API should be documented as non-idempotent. If not, the original logic should be preserved for idempotent task creation.
File: go/pkg/sysdb/coordinator/task.go
Line: 42
|
||
res, err := s.coordinator.CreateTask(ctx, req) | ||
if err != nil { | ||
log.Error("CreateTask failed", zap.Error(err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
For improved debuggability, consider adding more context to the error logs in this file. Including the task name when an operation fails can be very helpful. This pattern could be applied to GetTaskByName
and DeleteTask
as well.
For example:
Context for Agents
[**BestPractice**]
For improved debuggability, consider adding more context to the error logs in this file. Including the task name when an operation fails can be very helpful. This pattern could be applied to `GetTaskByName` and `DeleteTask` as well.
For example:
File: go/pkg/sysdb/grpc/task_service.go
Line: 18
Description of changes
Summarize the changes made by this PR.
This is a duplicate of PR 5547 as that branch got corrupted.
Test plan
How are these changes tested?
pytest
for python,yarn test
for js,cargo test
for rustMigration plan
Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?
Observability plan
What is the plan to instrument and monitor this change?
Documentation Changes
Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the _docs section?_