Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ chroma-cli = { path = "rust/cli" }
chroma-jemalloc-pprof-server = { path = "rust/jemalloc-pprof-server" }
mdac = { path = "rust/mdac" }
s3heap = { path = "rust/s3heap" }
s3heap-service = { path = "rust/s3heap-service" }
wal3 = { path = "rust/wal3" }
worker = { path = "rust/worker" }

Expand Down
10 changes: 5 additions & 5 deletions go/pkg/sysdb/coordinator/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (s *Coordinator) CreateTask(ctx context.Context, req *coordinatorpb.CreateT
OperatorParams: paramsJSON,
CompletionOffset: 0,
LastRun: nil,
NextRun: nil, // Will be set to zero initially, scheduled by task scheduler
NextRun: &now,
MinRecordsForTask: int64(req.MinRecordsForTask),
CurrentAttempts: 0,
CreatedAt: now,
Expand Down Expand Up @@ -335,10 +335,10 @@ func (s *Coordinator) PeekScheduleByCollectionId(ctx context.Context, req *coord
for _, task := range tasks {
task_id := task.ID.String()
entry := &coordinatorpb.ScheduleEntry{
CollectionId: &task.InputCollectionID,
TaskId: &task_id,
TaskRunNonce: proto.String(task.NextNonce.String()),
WhenToRun: nil,
CollectionId: &task.InputCollectionID,
TaskId: &task_id,
TaskRunNonce: proto.String(task.NextNonce.String()),
WhenToRun: nil,
}
if task.NextRun != nil {
whenToRun := uint64(task.NextRun.UnixMilli())
Expand Down
1 change: 1 addition & 0 deletions rust/s3heap-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ chroma-sysdb = { workspace = true }
chroma-tracing = { workspace = true, features = ["grpc"] }
chroma-types = { workspace = true }
s3heap = { workspace = true }
uuid = { workspace = true }
wal3 = { workspace = true }

[dev-dependencies]
Expand Down
63 changes: 63 additions & 0 deletions rust/s3heap-service/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# s3heap-service

The s3heap-service integrates with the task manager to trigger tasks at no faster than a particular
cadence, with reasonable guarantees that writing data will cause a task to run.

This document refines the design of the heap-tender and heap service until it can be implemented
safely.

## Abstract: A heap and a sysdb.

At the most abstract level, we have a heap and the sysdb. An item is either in the heap or not in
the heap. For the sysdb, an item is not in the sysdb, in the sysdb and should be scheduled, or in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to bullet point this entire sentence with one bullet per state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I leave the prose? This is meant to match the table below and the prose is what I need more than a table or bullets. Happy to add redundancy so we all have what we need to understand.

the sysdb and waiting for writes to trigger the next scheduled run.

That gives this chart
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Documentation]

Add missing colon: change That gives this chart to That gives this chart:

Context for Agents
[**Documentation**]

Add missing colon: change `That gives this chart` to `That gives this chart:`

File: rust/s3heap-service/README.md
Line: 15


| Heap State | Sysdb State |
|------------|-------------|
| Not in heap | Not in sysdb |
| Not in heap | In sysdb, should be scheduled |
| Not in heap | In sysdb, waiting for writes |
| In heap | Not in sysdb |
| In heap | In sysdb, should be scheduled |
| In heap | In sysdb, waiting for writes |

More abstractly, view it like this:

| | On Heap | Not On Heap |
|---------------------|------------|-------------|
| Not in sysdb | A_1 | A_2 |
| In sysdb, scheduled | B_1 | B_2 |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Documentation]

Fix inconsistent terminology: change In sysdb, scheduled to In sysdb, should be scheduled to match earlier table wording.

Context for Agents
[**Documentation**]

Fix inconsistent terminology: change `In sysdb, scheduled` to `In sysdb, should be scheduled` to match earlier table wording.

File: rust/s3heap-service/README.md
Line: 31

| In sysdb, waiting | C_1 | C_2 |

When viewed like this, we can establish rules for state transitions in our system. Each operation
operates on either the sysdb or the heap, never both because there is no transactionality between S3
and databases. Thus, we can reason that we can jump to any row within the same column, or to
another column within the same row.

## State space diagram

From
| | | A_1 | A_2 | B_1 | B_2 | C_1 | C_2 |
|-----|------|------|------|------|------|------|------|
| | A_1 | - | IMP1 | YES1 | X | YES1 | X |
| | A_2 | GC1 | - | X | GC2 | X | YES1 |
| To | B_1 | IMP2 | X |- | NEW2 | YES3 | X |
| | B_2 | X | NEW1 | IMP3 | - | X | YES3 |
| | C_1 | IMP2 | X | YES2 | X | - | IMP4 |
| | C_2 | X | NO1 | X | YES2 | IMP3 | - |

- GC1: Item gets a perpetual "is-done" from the sysdb and transitions to A_2.
- GC2: Garbage collection.
- NEW1: Create a new task in the sysdb.
- NEW2: Finish the new operation by priming the task and putting it on the heap.
- YES1: Task gets deleted from sysdb.
- YES2: This implies that we move from scheduled to waiting while the task is on heap. This happens when a job completes and reads all data from the log.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Documentation]

Add "the" so the phrase reads "on the heap."

Context for Agents
[**Documentation**]

Add "the" so the phrase reads "on the heap."

File: rust/s3heap-service/README.md
Line: 56

- YES3: There was a write, the heap needed to schedule, so it picked a time and updated sysdb.
- NO1: This implies that the state transitioned from being not-in-sysdb to in-sysdb. A new task will always run straight away, so it should not be put into waiting state.
- IMP1: The item is not on heap or in the database. First transition is to B_2 or C_2.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Documentation]

Add "the" so the phrase reads "not on the heap."

Context for Agents
[**Documentation**]

Add "the" so the phrase reads "not on the heap."

File: rust/s3heap-service/README.md
Line: 59

- IMP2: Task UUIDs are not re-used. Starting from A_1 implies the task was created and then put on the heap and subsequently removed from sysdb. There should be no means by which it reappears in the sysdb. Therefore this path is impossible.
- IMP3: We never take something off the heap until the sysdb is updated to reflect the job being done. Therefore we don't take this transition.
- IMP4: We don't add something to the heap until it has been scheduled.
- X: Impossible.
69 changes: 62 additions & 7 deletions rust/s3heap-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,77 @@ use chroma_types::chroma_proto::heap_tender_service_server::{
};
use chroma_types::chroma_proto::{HeapSummaryRequest, HeapSummaryResponse};
use chroma_types::{dirty_log_path_from_hostname, CollectionUuid, DirtyMarker, ScheduleEntry};
use s3heap::{Configuration, DummyScheduler, Error, HeapWriter, Schedule, Triggerable};
use s3heap::{heap_path_from_hostname, Configuration, HeapWriter, Schedule, Triggerable};
use wal3::{
Cursor, CursorName, CursorStore, CursorStoreOptions, LogPosition, LogReader, LogReaderOptions,
Witness,
};

mod scheduler;

pub use scheduler::SysDbScheduler;

/////////////////////////////////////////////// Error //////////////////////////////////////////////

/// Custom error type that can handle errors from multiple sources.
#[derive(Debug)]
pub enum Error {
/// Error from s3heap operations.
S3Heap(s3heap::Error),
/// Error from wal3 operations.
Wal3(wal3::Error),
/// Error from sysdb operations.
SysDb(chroma_sysdb::PeekScheduleError),
/// Error from JSON serialization/deserialization.
Json(serde_json::Error),
/// Internal error with a message.
Internal(String),
}

impl From<s3heap::Error> for Error {
fn from(e: s3heap::Error) -> Self {
Error::S3Heap(e)
}
}

impl From<wal3::Error> for Error {
fn from(e: wal3::Error) -> Self {
Error::Wal3(e)
}
}

impl From<chroma_sysdb::PeekScheduleError> for Error {
fn from(e: chroma_sysdb::PeekScheduleError) -> Self {
Error::SysDb(e)
}
}

impl From<serde_json::Error> for Error {
fn from(e: serde_json::Error) -> Self {
Error::Json(e)
}
}

impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Error::S3Heap(e) => write!(f, "s3heap error: {}", e),
Error::Wal3(e) => write!(f, "wal3 error: {}", e),
Error::SysDb(e) => write!(f, "sysdb error: {}", e),
Error::Json(e) => write!(f, "json error: {}", e),
Error::Internal(msg) => write!(f, "internal error: {}", msg),
}
}
}

impl std::error::Error for Error {}

///////////////////////////////////////////// constants ////////////////////////////////////////////

const DEFAULT_CONFIG_PATH: &str = "./chroma_config.yaml";

const CONFIG_PATH_ENV_VAR: &str = "CONFIG_PATH";

/// The path for the heap tended to on behalf of this hostname.
pub fn heap_path_from_hostname(hostname: &str) -> String {
format!("heap/{}", hostname)
}

/// The cursor name used by HeapTender to track its position in the dirty log.
pub static HEAP_TENDER_CURSOR_NAME: CursorName =
unsafe { CursorName::from_string_unchecked("heap_tender") };
Expand Down Expand Up @@ -295,8 +349,9 @@ impl Configurable<HeapTenderServerConfig> for HeapTenderServer {
"s3heap-tender".to_string(),
);
let heap_prefix = heap_path_from_hostname(&config.my_member_id);
let scheduler = Arc::new(DummyScheduler) as _;
let scheduler = Arc::new(SysDbScheduler::new(sysdb.clone())) as _;
let writer = HeapWriter::new(storage, heap_prefix, Arc::clone(&scheduler))
.await
.map_err(|e| -> Box<dyn chroma_error::ChromaError> { Box::new(e) })?;
let tender = Arc::new(HeapTender {
sysdb,
Expand Down
77 changes: 77 additions & 0 deletions rust/s3heap-service/src/scheduler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use std::collections::HashMap;

use chroma_sysdb::SysDb;
use chroma_types::{CollectionUuid, ScheduleEntry};
use s3heap::{HeapScheduler, Schedule, Triggerable};
use uuid::Uuid;

/// Scheduler that integrates with SysDb to manage task scheduling.
pub struct SysDbScheduler {
sysdb: SysDb,
}

impl SysDbScheduler {
pub fn new(sysdb: SysDb) -> SysDbScheduler {
Self { sysdb }
}
}

#[async_trait::async_trait]
impl HeapScheduler for SysDbScheduler {
async fn are_done(&self, items: &[(Triggerable, Uuid)]) -> Result<Vec<bool>, s3heap::Error> {
let collection_ids = items
.iter()
.map(|item| CollectionUuid(*item.0.partitioning.as_uuid()))
.collect::<Vec<_>>();
let schedules = self
.sysdb
.clone()
.peek_schedule_by_collection_id(&collection_ids)
.await
.map_err(|e| s3heap::Error::Internal(format!("sysdb error: {}", e)))?;
let mut by_triggerable: HashMap<Triggerable, ScheduleEntry> = HashMap::default();
for schedule in schedules.into_iter() {
by_triggerable.insert(
Triggerable {
partitioning: schedule.collection_id.0.into(),
scheduling: schedule.task_id.into(),
},
schedule,
);
}
let mut results = Vec::with_capacity(items.len());
for (triggerable, nonce) in items.iter() {
let Some(schedule) = by_triggerable.get(triggerable) else {
results.push(true);
continue;
};
results.push(schedule.task_run_nonce != *nonce);
}
Ok(results)
}

async fn get_schedules(&self, ids: &[Uuid]) -> Result<Vec<Schedule>, s3heap::Error> {
let collection_ids = ids.iter().cloned().map(CollectionUuid).collect::<Vec<_>>();
let schedules = self
.sysdb
.clone()
.peek_schedule_by_collection_id(&collection_ids)
.await
.map_err(|e| s3heap::Error::Internal(format!("sysdb error: {}", e)))?;
let mut results = Vec::new();
tracing::info!("schedules {schedules:?}");
for schedule in schedules.into_iter() {
if let Some(when_to_run) = schedule.when_to_run {
results.push(Schedule {
triggerable: Triggerable {
partitioning: schedule.collection_id.0.into(),
scheduling: schedule.task_id.into(),
},
nonce: schedule.task_run_nonce,
next_scheduled: when_to_run,
});
}
}
Ok(results)
}
}
Loading
Loading