-
Notifications
You must be signed in to change notification settings - Fork 1.9k
[ENH] Schedule tasks from the sysdb. #5562
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Reviewer ChecklistPlease leverage this checklist to ensure your code review is thorough before approving Testing, Bugs, Errors, Logs, Documentation
System Compatibility
Quality
|
Add Heap Scheduler Integration between sysdb and Heap Tender (Task Scheduling Orchestration) This PR introduces substantial orchestration between the Rust heap scheduling/compaction layer ( Key Changes• Introduced Affected Areas• This summary was automatically generated by @propel-code-bot |
// Can do both calls with a JOIN | ||
task, err := s.catalog.metaDomain.TaskDb(ctx).GetByName(req.InputCollectionId, req.TaskName) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
// If task not found, return empty response | ||
if task == nil { | ||
return nil, common.ErrTaskNotFound | ||
} | ||
|
||
// Look up operator name from operators table | ||
operator, err := s.catalog.metaDomain.OperatorDb(ctx).GetByID(task.OperatorID) | ||
if err != nil { | ||
log.Error("GetTaskByName: failed to get operator", zap.Error(err)) | ||
return nil, err | ||
} | ||
if operator == nil { | ||
log.Error("GetTaskByName: operator not found", zap.String("operator_id", task.OperatorID.String())) | ||
return nil, common.ErrOperatorNotFound | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[PerformanceOptimization]
You've correctly identified a potential optimization with the comment // Can do both calls with a JOIN
. To avoid making two separate database calls, you could implement a dedicated DAO method that performs a JOIN between the tasks
and operators
tables. This would improve efficiency and prevent potential N+1 query issues if this function is called in a loop in the future.
For example, you could add a method like GetByNameWithOperator
to your ITaskDb
interface:
// In dbmodel/task.go
type TaskAndOperator struct {
Task
OperatorName string `gorm:"column:operator_name"`
}
// In ITaskDb interface
GetByNameWithOperator(inputCollectionID string, taskName string) (*TaskAndOperator, error)
// In dao/task.go
func (s *taskDb) GetByNameWithOperator(inputCollectionID string, taskName string) (*dbmodel.TaskAndOperator, error) {
var result dbmodel.TaskAndOperator
err := s.db.Table("tasks").
Select("tasks.*, operators.operator_name").
Joins("JOIN operators ON tasks.operator_id = operators.operator_id").
Where("tasks.input_collection_id = ?", inputCollectionID).
Where("tasks.task_name = ?", taskName).
Where("tasks.is_deleted = ?", false).
First(&result).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, err
}
return &result, nil
}
Then GetTaskByName
in the coordinator can be simplified to a single database call.
Context for Agents
[**PerformanceOptimization**]
You've correctly identified a potential optimization with the comment `// Can do both calls with a JOIN`. To avoid making two separate database calls, you could implement a dedicated DAO method that performs a JOIN between the `tasks` and `operators` tables. This would improve efficiency and prevent potential N+1 query issues if this function is called in a loop in the future.
For example, you could add a method like `GetByNameWithOperator` to your `ITaskDb` interface:
```go
// In dbmodel/task.go
type TaskAndOperator struct {
Task
OperatorName string `gorm:"column:operator_name"`
}
// In ITaskDb interface
GetByNameWithOperator(inputCollectionID string, taskName string) (*TaskAndOperator, error)
// In dao/task.go
func (s *taskDb) GetByNameWithOperator(inputCollectionID string, taskName string) (*dbmodel.TaskAndOperator, error) {
var result dbmodel.TaskAndOperator
err := s.db.Table("tasks").
Select("tasks.*, operators.operator_name").
Joins("JOIN operators ON tasks.operator_id = operators.operator_id").
Where("tasks.input_collection_id = ?", inputCollectionID).
Where("tasks.task_name = ?", taskName).
Where("tasks.is_deleted = ?", false).
First(&result).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, err
}
return &result, nil
}
```
Then `GetTaskByName` in the coordinator can be simplified to a single database call.
File: go/pkg/sysdb/coordinator/task.go
Line: 146
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not for this PR.
CREATE TABLE "public"."tasks" ( | ||
"task_id" uuid NOT NULL, | ||
"task_name" text NOT NULL, | ||
"tenant_id" text NOT NULL, | ||
"database_id" text NOT NULL, | ||
"input_collection_id" text NOT NULL, -- Keeping these as text instead of UUID until collections.id becomes a UUID | ||
"output_collection_name" text NOT NULL, | ||
"operator_id" uuid NOT NULL, | ||
"operator_params" jsonb NOT NULL, | ||
"completion_offset" bigint NOT NULL DEFAULT 0, | ||
"last_run" timestamp NULL DEFAULT NULL, | ||
"next_run" timestamp NULL DEFAULT NULL, | ||
"min_records_for_task" bigint NOT NULL DEFAULT 100, | ||
"current_attempts" integer NOT NULL DEFAULT 0, | ||
"is_alive" boolean NOT NULL DEFAULT true, | ||
"is_deleted" boolean NOT NULL DEFAULT false, | ||
"created_at" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, | ||
"updated_at" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, | ||
"task_template_parent" uuid NULL, | ||
"next_nonce" UUID NOT NULL, -- UUIDv7 | ||
"oldest_written_nonce" UUID DEFAULT NULL, -- UUIDv7 | ||
PRIMARY KEY ("task_id"), | ||
CONSTRAINT "unique_task_per_collection" UNIQUE ("input_collection_id", "task_name") | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
The tasks
table defines an operator_id
column but lacks a foreign key constraint to the operators
table. This could lead to data integrity issues if an operator is deleted, leaving tasks with orphaned operator_id
references. This would cause GetTaskByName
to fail when it tries to look up the operator.
To ensure referential integrity, please consider adding a foreign key constraint.
Suggested Change
CREATE TABLE "public"."tasks" ( | |
"task_id" uuid NOT NULL, | |
"task_name" text NOT NULL, | |
"tenant_id" text NOT NULL, | |
"database_id" text NOT NULL, | |
"input_collection_id" text NOT NULL, -- Keeping these as text instead of UUID until collections.id becomes a UUID | |
"output_collection_name" text NOT NULL, | |
"operator_id" uuid NOT NULL, | |
"operator_params" jsonb NOT NULL, | |
"completion_offset" bigint NOT NULL DEFAULT 0, | |
"last_run" timestamp NULL DEFAULT NULL, | |
"next_run" timestamp NULL DEFAULT NULL, | |
"min_records_for_task" bigint NOT NULL DEFAULT 100, | |
"current_attempts" integer NOT NULL DEFAULT 0, | |
"is_alive" boolean NOT NULL DEFAULT true, | |
"is_deleted" boolean NOT NULL DEFAULT false, | |
"created_at" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, | |
"updated_at" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, | |
"task_template_parent" uuid NULL, | |
"next_nonce" UUID NOT NULL, -- UUIDv7 | |
"oldest_written_nonce" UUID DEFAULT NULL, -- UUIDv7 | |
PRIMARY KEY ("task_id"), | |
CONSTRAINT "unique_task_per_collection" UNIQUE ("input_collection_id", "task_name") | |
); | |
CREATE TABLE "public"."tasks" ( | |
"task_id" uuid NOT NULL, | |
"task_name" text NOT NULL, | |
"tenant_id" text NOT NULL, | |
"database_id" text NOT NULL, | |
"input_collection_id" text NOT NULL, -- Keeping these as text instead of UUID until collections.id becomes a UUID | |
"output_collection_name" text NOT NULL, | |
"operator_id" uuid NOT NULL, | |
"operator_params" jsonb NOT NULL, | |
"completion_offset" bigint NOT NULL DEFAULT 0, | |
"last_run" timestamp NULL DEFAULT NULL, | |
"next_run" timestamp NULL DEFAULT NULL, | |
"min_records_for_task" bigint NOT NULL DEFAULT 100, | |
"current_attempts" integer NOT NULL DEFAULT 0, | |
"is_alive" boolean NOT NULL DEFAULT true, | |
"is_deleted" boolean NOT NULL DEFAULT false, | |
"created_at" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, | |
"updated_at" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, | |
"task_template_parent" uuid NULL, | |
"next_nonce" UUID NOT NULL, -- UUIDv7 | |
"oldest_written_nonce" UUID DEFAULT NULL, -- UUIDv7 | |
PRIMARY KEY ("task_id"), | |
CONSTRAINT "unique_task_per_collection" UNIQUE ("input_collection_id", "task_name"), | |
CONSTRAINT "fk_tasks_operator_id" FOREIGN KEY ("operator_id") REFERENCES "public"."operators" ("operator_id") | |
); |
⚡ Committable suggestion
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
Context for Agents
[**BestPractice**]
The `tasks` table defines an `operator_id` column but lacks a foreign key constraint to the `operators` table. This could lead to data integrity issues if an operator is deleted, leaving tasks with orphaned `operator_id` references. This would cause `GetTaskByName` to fail when it tries to look up the operator.
To ensure referential integrity, please consider adding a foreign key constraint.
<details>
<summary>Suggested Change</summary>
```suggestion
CREATE TABLE "public"."tasks" (
"task_id" uuid NOT NULL,
"task_name" text NOT NULL,
"tenant_id" text NOT NULL,
"database_id" text NOT NULL,
"input_collection_id" text NOT NULL, -- Keeping these as text instead of UUID until collections.id becomes a UUID
"output_collection_name" text NOT NULL,
"operator_id" uuid NOT NULL,
"operator_params" jsonb NOT NULL,
"completion_offset" bigint NOT NULL DEFAULT 0,
"last_run" timestamp NULL DEFAULT NULL,
"next_run" timestamp NULL DEFAULT NULL,
"min_records_for_task" bigint NOT NULL DEFAULT 100,
"current_attempts" integer NOT NULL DEFAULT 0,
"is_alive" boolean NOT NULL DEFAULT true,
"is_deleted" boolean NOT NULL DEFAULT false,
"created_at" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
"task_template_parent" uuid NULL,
"next_nonce" UUID NOT NULL, -- UUIDv7
"oldest_written_nonce" UUID DEFAULT NULL, -- UUIDv7
PRIMARY KEY ("task_id"),
CONSTRAINT "unique_task_per_collection" UNIQUE ("input_collection_id", "task_name"),
CONSTRAINT "fk_tasks_operator_id" FOREIGN KEY ("operator_id") REFERENCES "public"."operators" ("operator_id")
);
```
⚡ **Committable suggestion**
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
</details>
File: go/pkg/sysdb/metastore/db/migrations/20251001073000.sql
Line: 43
e8da43e
to
f96c1d8
Compare
5e54c6d
to
52da7a6
Compare
52da7a6
to
43edec4
Compare
fn camel_to_snake_case(s: &str) -> String { | ||
let mut result = String::new(); | ||
for (i, ch) in s.chars().enumerate() { | ||
if ch.is_uppercase() { | ||
if i > 0 { | ||
result.push('_'); | ||
} | ||
result.push(ch.to_ascii_lowercase()); | ||
} else { | ||
result.push(ch); | ||
} | ||
} | ||
result | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
The current implementation of camel_to_snake_case
incorrectly handles acronyms in Go constant names. For example, a name like HTTPGetter
would be converted to h_t_t_p_getter
instead of the correct http_getter
. While this works for the current RecordCounter
constant, it will cause issues with future constants that use acronyms.
A more robust implementation should handle consecutive uppercase letters correctly.
Suggested Change
fn camel_to_snake_case(s: &str) -> String { | |
let mut result = String::new(); | |
for (i, ch) in s.chars().enumerate() { | |
if ch.is_uppercase() { | |
if i > 0 { | |
result.push('_'); | |
} | |
result.push(ch.to_ascii_lowercase()); | |
} else { | |
result.push(ch); | |
} | |
} | |
result | |
} | |
fn camel_to_snake_case(s: &str) -> String { | |
let mut result = String::new(); | |
let mut last_was_upper = false; | |
for (i, ch) in s.chars().enumerate() { | |
if ch.is_uppercase() { | |
if i > 0 && !last_was_upper { | |
result.push('_'); | |
} | |
result.push(ch.to_ascii_lowercase()); | |
last_was_upper = true; | |
} else { | |
result.push(ch); | |
last_was_upper = false; | |
} | |
} | |
result | |
} |
⚡ Committable suggestion
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
Context for Agents
[**BestPractice**]
The current implementation of `camel_to_snake_case` incorrectly handles acronyms in Go constant names. For example, a name like `HTTPGetter` would be converted to `h_t_t_p_getter` instead of the correct `http_getter`. While this works for the current `RecordCounter` constant, it will cause issues with future constants that use acronyms.
A more robust implementation should handle consecutive uppercase letters correctly.
<details>
<summary>Suggested Change</summary>
```suggestion
fn camel_to_snake_case(s: &str) -> String {
let mut result = String::new();
let mut last_was_upper = false;
for (i, ch) in s.chars().enumerate() {
if ch.is_uppercase() {
if i > 0 && !last_was_upper {
result.push('_');
}
result.push(ch.to_ascii_lowercase());
last_was_upper = true;
} else {
result.push(ch);
last_was_upper = false;
}
}
result
}
```
⚡ **Committable suggestion**
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
</details>
File: rust/types/operator_codegen.rs
Line: 148
43edec4
to
4007de1
Compare
chroma_sysdb::SysDbConfig::Grpc(_) => {} | ||
chroma_sysdb::SysDbConfig::Sqlite(_) => { | ||
panic!("Expected grpc sysdb config, got sqlite sysdb config") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
Using panic!
for configuration errors can cause the service to crash abruptly on startup, which can be difficult to debug in production environments. Since this function already returns a Result
, it would be more robust to return an Err
variant for invalid configurations. This allows for graceful error handling and clearer error messages upon service startup failure.
Context for Agents
[**BestPractice**]
Using `panic!` for configuration errors can cause the service to crash abruptly on startup, which can be difficult to debug in production environments. Since this function already returns a `Result`, it would be more robust to return an `Err` variant for invalid configurations. This allows for graceful error handling and clearer error messages upon service startup failure.
File: rust/s3heap-service/src/lib.rs
Line: 282
4007de1
to
08647ab
Compare
go/pkg/sysdb/coordinator/task.go
Outdated
} | ||
|
||
// Mark a task run as complete and set the nonce for the next task run. | ||
func (s *Coordinator) DoneTask(ctx context.Context, req *coordinatorpb.DoneTaskRequest) (*coordinatorpb.DoneTaskResponse, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we put a verb in this method's name to make it clear that this modifies state?
func (suite *TaskDbTestSuite) TestTaskDb_GetByID() { | ||
taskID := uuid.New() | ||
operatorID := dbmodel.OperatorRecordCounter | ||
nextNonce, _ := uuid.NewV7() | ||
|
||
task := &dbmodel.Task{ | ||
ID: taskID, | ||
Name: "test-get-by-id-task", | ||
OperatorID: operatorID, | ||
InputCollectionID: "input_col_id", | ||
OutputCollectionName: "output_col_name", | ||
OperatorParams: "{}", | ||
TenantID: "tenant1", | ||
DatabaseID: "db1", | ||
MinRecordsForTask: 100, | ||
NextNonce: nextNonce, | ||
} | ||
|
||
err := suite.Db.Insert(task) | ||
suite.Require().NoError(err) | ||
|
||
retrieved, err := suite.Db.GetByID(taskID) | ||
suite.Require().NoError(err) | ||
suite.Require().NotNil(retrieved) | ||
suite.Require().Equal(task.ID, retrieved.ID) | ||
suite.Require().Equal(task.Name, retrieved.Name) | ||
suite.Require().Equal(task.OperatorID, retrieved.OperatorID) | ||
|
||
suite.db.Unscoped().Delete(&dbmodel.Task{}, "task_id = ?", task.ID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
Resource leak: The test creates and inserts a task but uses Unscoped().Delete()
for cleanup, which bypasses GORM hooks and constraints. If the test fails before reaching cleanup, the task will remain in the database permanently. Use proper cleanup with defer
or t.Cleanup()
:
func (suite *TaskDbTestSuite) TestTaskDb_GetByID() {
// ... setup code ...
// Clean up immediately after insert, regardless of test outcome
suite.T().Cleanup(func() {
suite.db.Unscoped().Delete(&dbmodel.Task{}, "task_id = ?", task.ID)
})
err := suite.Db.Insert(task)
suite.Require().NoError(err)
// ... rest of test ...
}
Context for Agents
[**BestPractice**]
Resource leak: The test creates and inserts a task but uses `Unscoped().Delete()` for cleanup, which bypasses GORM hooks and constraints. If the test fails before reaching cleanup, the task will remain in the database permanently. Use proper cleanup with `defer` or `t.Cleanup()`:
```go
func (suite *TaskDbTestSuite) TestTaskDb_GetByID() {
// ... setup code ...
// Clean up immediately after insert, regardless of test outcome
suite.T().Cleanup(func() {
suite.db.Unscoped().Delete(&dbmodel.Task{}, "task_id = ?", task.ID)
})
err := suite.Db.Insert(task)
suite.Require().NoError(err)
// ... rest of test ...
}
```
File: go/pkg/sysdb/metastore/db/dao/task_test.go
Line: 343
return &task, nil | ||
} | ||
|
||
func (s *taskDb) FinishTask(taskID uuid.UUID, taskRunNonce uuid.UUID) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we consider generating the next nonce here instead of letting the caller supply it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that not what the code is doing?
|
||
task := &dbmodel.Task{ | ||
ID: taskID, | ||
Name: "test-done-task", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming inconsistency with FinishTask
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
|
||
task := &dbmodel.Task{ | ||
ID: taskID, | ||
Name: "test-done-task-wrong-nonce", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto.
reader: LogReader, | ||
cursor: CursorStore, | ||
_writer: HeapWriter, | ||
writer: HeapWriter, | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
Missing error handling: If self.sysdb.peek_schedule_by_collection_id()
fails, the method returns early with ?
but doesn't log the failure or provide context about which collections caused the issue. This makes debugging production failures difficult. Add error logging:
let scheduled = self
.sysdb
.clone()
.peek_schedule_by_collection_id(&collection_ids)
.await
.map_err(|e| {
log::error!("Failed to peek schedule for collections {:?}: {}", collection_ids, e);
e
})?;
Context for Agents
[**BestPractice**]
Missing error handling: If `self.sysdb.peek_schedule_by_collection_id()` fails, the method returns early with `?` but doesn't log the failure or provide context about which collections caused the issue. This makes debugging production failures difficult. Add error logging:
```rust
let scheduled = self
.sysdb
.clone()
.peek_schedule_by_collection_id(&collection_ids)
.await
.map_err(|e| {
log::error!("Failed to peek schedule for collections {:?}: {}", collection_ids, e);
e
})?;
```
File: rust/s3heap-service/src/lib.rs
Line: 54
2741ea7
to
75ff5ab
Compare
75ff5ab
to
1de60cc
Compare
return &task, nil | ||
} | ||
|
||
func (s *taskDb) AdvanceTask(taskID uuid.UUID, taskRunNonce uuid.UUID) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI I'm going to remove/modify this later. The path that updates next_nonce also needs to transactionally update completion_offset.
Description of changes
This PR adds a scheduler that talks to the sysdb to pull tasks.
Test plan
CI
Migration plan
N/A
Observability plan
N/A
Documentation Changes
N/A