Skip to content

Commit 67e5b87

Browse files
committed
[ENH]: Refactor compactor
1 parent 0605458 commit 67e5b87

20 files changed

+1979
-3879
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/worker/src/compactor/compaction_manager.rs

Lines changed: 47 additions & 233 deletions
Large diffs are not rendered by default.

rust/worker/src/compactor/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ mod compaction_manager;
22
pub(crate) mod config;
33
mod scheduler;
44
mod scheduler_policy;
5-
mod tasks;
65
mod types;
76

87
pub(crate) use compaction_manager::*;

rust/worker/src/compactor/scheduler.rs

Lines changed: 9 additions & 157 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,18 @@
1-
use std::collections::hash_map::Entry;
21
use std::collections::{HashMap, HashSet};
32
use std::str::FromStr;
4-
use std::sync::Arc;
53
use std::time::{Duration, SystemTime};
64

75
use chroma_config::assignment::assignment_policy::AssignmentPolicy;
86
use chroma_log::{CollectionInfo, CollectionRecord, Log};
97
use chroma_memberlist::memberlist_provider::Memberlist;
10-
use chroma_storage::Storage;
118
use chroma_sysdb::{GetCollectionsOptions, SysDb};
129
use chroma_types::{CollectionUuid, JobId};
1310
use figment::providers::Env;
1411
use figment::Figment;
15-
use mdac::{Scorecard, ScorecardGuard};
16-
use s3heap_service::SysDbScheduler;
1712
use serde::Deserialize;
18-
use tracing::Level;
1913
use uuid::Uuid;
2014

21-
use crate::compactor::compaction_manager::ExecutionMode;
2215
use crate::compactor::scheduler_policy::SchedulerPolicy;
23-
use crate::compactor::tasks::{FunctionHeapReader, SchedulableFunction};
2416
use crate::compactor::types::CompactionJob;
2517

2618
#[derive(Debug, Clone)]
@@ -71,16 +63,12 @@ impl FailedJob {
7163

7264
struct InProgressJob {
7365
expires_at: SystemTime,
74-
// dead because RAII-style drop protection
75-
#[allow(dead_code)]
76-
guard: Option<ScorecardGuard>,
7766
}
7867

7968
impl InProgressJob {
80-
fn new(job_expiry_seconds: u64, guard: Option<ScorecardGuard>) -> Self {
69+
fn new(job_expiry_seconds: u64) -> Self {
8170
Self {
8271
expires_at: SystemTime::now() + Duration::from_secs(job_expiry_seconds),
83-
guard,
8472
}
8573
}
8674

@@ -90,7 +78,6 @@ impl InProgressJob {
9078
}
9179

9280
pub(crate) struct Scheduler {
93-
mode: ExecutionMode,
9481
my_member_id: String,
9582
log: Log,
9683
sysdb: SysDb,
@@ -110,9 +97,6 @@ pub(crate) struct Scheduler {
11097
dead_jobs: HashSet<JobId>,
11198
max_failure_count: u8,
11299
metrics: SchedulerMetrics,
113-
tasks: FunctionHeapReader,
114-
func_queue: Vec<SchedulableFunction>,
115-
scorecard: Arc<Scorecard<'static>>,
116100
}
117101

118102
#[derive(Deserialize, Debug)]
@@ -123,11 +107,9 @@ struct RunTimeConfig {
123107
impl Scheduler {
124108
#[allow(clippy::too_many_arguments)]
125109
pub(crate) fn new(
126-
mode: ExecutionMode,
127110
my_ip: String,
128111
log: Log,
129112
sysdb: SysDb,
130-
storage: Storage,
131113
policy: Box<dyn SchedulerPolicy>,
132114
max_concurrent_jobs: usize,
133115
min_compaction_size: usize,
@@ -136,17 +118,7 @@ impl Scheduler {
136118
job_expiry_seconds: u64,
137119
max_failure_count: u8,
138120
) -> Scheduler {
139-
let heap_scheduler =
140-
Arc::new(SysDbScheduler::new(sysdb.clone())) as Arc<dyn s3heap::HeapScheduler>;
141-
let tasks = FunctionHeapReader::new(storage, heap_scheduler);
142-
let scorecard = Arc::new(Scorecard::new(
143-
&(),
144-
vec![],
145-
128.try_into().expect("128 is not zero"),
146-
));
147-
148121
Scheduler {
149-
mode,
150122
my_member_id: my_ip,
151123
log,
152124
sysdb,
@@ -166,9 +138,6 @@ impl Scheduler {
166138
max_failure_count,
167139
dead_jobs: HashSet::new(),
168140
metrics: SchedulerMetrics::default(),
169-
tasks,
170-
func_queue: Vec::with_capacity(max_concurrent_jobs),
171-
scorecard,
172141
}
173142
}
174143

@@ -428,7 +397,7 @@ impl Scheduler {
428397
fn add_in_progress(&mut self, collection_id: CollectionUuid) {
429398
self.in_progress_jobs.insert(
430399
collection_id.into(),
431-
InProgressJob::new(self.job_expiry_seconds, None),
400+
InProgressJob::new(self.job_expiry_seconds),
432401
);
433402
}
434403

@@ -501,132 +470,26 @@ impl Scheduler {
501470
pub(crate) async fn schedule(&mut self) {
502471
// For now, we clear the job queue every time, assuming we will not have any pending jobs running
503472
self.job_queue.clear();
504-
self.func_queue.clear();
505473

506474
if self.memberlist.is_none() || self.memberlist.as_ref().unwrap().is_empty() {
507475
tracing::error!("Memberlist is not set or empty. Cannot schedule compaction jobs.");
508476
return;
509477
}
510478

511-
match self.mode {
512-
ExecutionMode::Compaction => {
513-
// Recompute disabled list.
514-
self.recompute_disabled_collections();
515-
let collections = self.get_collections_with_new_data().await;
516-
if collections.is_empty() {
517-
return;
518-
}
519-
let collection_records = self.verify_and_enrich_collections(collections).await;
520-
self.schedule_internal(collection_records).await;
521-
}
522-
ExecutionMode::AttachedFunction => {
523-
let tasks = self
524-
.tasks
525-
.get_tasks_scheduled_for_execution(
526-
s3heap::Limits::default().with_items(self.max_concurrent_jobs),
527-
)
528-
.await;
529-
self.schedule_tasks(tasks).await;
530-
}
531-
}
532-
}
533-
534-
pub(crate) async fn schedule_tasks(&mut self, funcs: Vec<SchedulableFunction>) {
535-
let members = self.memberlist.as_ref().unwrap();
536-
let members_as_string = members
537-
.iter()
538-
.map(|member| member.member_id.clone())
539-
.collect();
540-
self.assignment_policy.set_members(members_as_string);
541-
for func in funcs {
542-
let result = self
543-
.assignment_policy
544-
.assign_one(func.collection_id.0.to_string().as_str());
545-
if result.is_err() {
546-
tracing::error!(
547-
"Failed to assign func {} for collection {} to member: {}",
548-
func.task_id,
549-
func.collection_id,
550-
result.err().unwrap()
551-
);
552-
continue;
553-
}
554-
let member = result.unwrap();
555-
if member != self.my_member_id {
556-
continue;
557-
}
558-
559-
let failure_count = self
560-
.failing_jobs
561-
.get(&func.collection_id.into())
562-
.map(|job| job.failure_count())
563-
.unwrap_or(0);
564-
565-
if failure_count >= self.max_failure_count {
566-
tracing::warn!(
567-
"Job for collection {} failed more than {} times, moving this to dead jobs and skipping function for it",
568-
func.collection_id,
569-
self.max_failure_count
570-
);
571-
self.kill_job(func.task_id.into());
572-
continue;
573-
}
574-
575-
if self.disabled_collections.contains(&func.collection_id)
576-
|| self.dead_jobs.contains(&func.collection_id.into())
577-
{
578-
tracing::info!(
579-
"Ignoring collection: {:?} because it disabled",
580-
func.collection_id
581-
);
582-
continue;
583-
}
584-
if let Entry::Vacant(entry) = self.in_progress_jobs.entry(func.task_id.into()) {
585-
let result = self
586-
.sysdb
587-
.get_collections(GetCollectionsOptions {
588-
collection_id: Some(func.collection_id),
589-
..Default::default()
590-
})
591-
.await;
592-
match result {
593-
Ok(collections) => {
594-
if collections.is_empty() {
595-
self.deleted_collections.insert(func.collection_id);
596-
continue;
597-
}
598-
let tags = ["op:function", &format!("tenant:{}", collections[0].tenant)];
599-
let guard = self.scorecard.track(&tags).map(|ticket| {
600-
ScorecardGuard::new(Arc::clone(&self.scorecard), Some(ticket))
601-
});
602-
if let Some(guard) = guard {
603-
entry.insert(InProgressJob::new(self.job_expiry_seconds, Some(guard)));
604-
self.func_queue.push(func);
605-
} else {
606-
tracing::event!(
607-
Level::INFO,
608-
name = "not scheduling function because scorecard",
609-
collection_id =? func.collection_id,
610-
tenant =? collections[0].tenant,
611-
);
612-
}
613-
}
614-
Err(err) => {
615-
tracing::error!("Error: {:?}", err);
616-
}
617-
}
618-
}
479+
// Recompute disabled list.
480+
self.recompute_disabled_collections();
481+
let collections = self.get_collections_with_new_data().await;
482+
if collections.is_empty() {
483+
return;
619484
}
485+
let collection_records = self.verify_and_enrich_collections(collections).await;
486+
self.schedule_internal(collection_records).await;
620487
}
621488

622489
pub(crate) fn get_jobs(&self) -> impl Iterator<Item = &CompactionJob> {
623490
self.job_queue.iter()
624491
}
625492

626-
pub(crate) fn get_tasks_scheduled_for_execution(&self) -> &Vec<SchedulableFunction> {
627-
&self.func_queue
628-
}
629-
630493
pub(crate) fn set_memberlist(&mut self, memberlist: Memberlist) {
631494
self.memberlist = Some(memberlist);
632495
}
@@ -645,16 +508,11 @@ mod tests {
645508
use chroma_config::assignment::assignment_policy::RendezvousHashingAssignmentPolicy;
646509
use chroma_log::in_memory_log::{InMemoryLog, InternalLogRecord};
647510
use chroma_memberlist::memberlist_provider::Member;
648-
use chroma_storage::s3_client_for_test_with_new_bucket;
649511
use chroma_sysdb::TestSysDb;
650512
use chroma_types::{Collection, LogRecord, Operation, OperationRecord};
651513

652-
use crate::compactor::compaction_manager::ExecutionMode;
653-
654514
#[tokio::test]
655515
async fn test_k8s_integration_scheduler() {
656-
let storage = s3_client_for_test_with_new_bucket().await;
657-
658516
let mut log = Log::InMemory(InMemoryLog::new());
659517
let in_memory_log = match log {
660518
Log::InMemory(ref mut in_memory_log) => in_memory_log,
@@ -751,11 +609,9 @@ mod tests {
751609
assignment_policy.set_members(vec![my_member.member_id.clone()]);
752610

753611
let mut scheduler = Scheduler::new(
754-
ExecutionMode::Compaction,
755612
my_member.member_id.clone(),
756613
log,
757614
sysdb.clone(),
758-
storage,
759615
scheduler_policy,
760616
max_concurrent_jobs,
761617
1,
@@ -885,8 +741,6 @@ mod tests {
885741
#[tokio::test]
886742
#[should_panic(expected = "is less than offset")]
887743
async fn test_k8s_integration_scheduler_panic() {
888-
let storage = s3_client_for_test_with_new_bucket().await;
889-
890744
let mut log = Log::InMemory(InMemoryLog::new());
891745
let in_memory_log = match log {
892746
Log::InMemory(ref mut in_memory_log) => in_memory_log,
@@ -1008,11 +862,9 @@ mod tests {
1008862
assignment_policy.set_members(vec![my_member.member_id.clone()]);
1009863

1010864
let mut scheduler = Scheduler::new(
1011-
ExecutionMode::Compaction,
1012865
my_member.member_id.clone(),
1013866
log,
1014867
sysdb.clone(),
1015-
storage,
1016868
scheduler_policy,
1017869
max_concurrent_jobs,
1018870
1,

0 commit comments

Comments
 (0)