From fb12863da29378485846c5e92956786c89600d36 Mon Sep 17 00:00:00 2001 From: Tianle Qiu Date: Fri, 25 Jul 2025 03:07:44 +0000 Subject: [PATCH 01/18] WIP --- src/global_state.rs | 4 + src/lib.rs | 8 + src/plan/barriers.rs | 89 ++++ src/plan/concurrent/barrier.rs | 146 ++++++ .../concurrent/concurrent_marking_work.rs | 244 ++++++++++ src/plan/concurrent/immix/gc_work.rs | 25 + src/plan/concurrent/immix/global.rs | 440 ++++++++++++++++++ src/plan/concurrent/immix/mod.rs | 7 + src/plan/concurrent/immix/mutator.rs | 84 ++++ src/plan/concurrent/mod.rs | 23 + src/plan/global.rs | 23 + src/plan/mod.rs | 5 +- src/plan/tracing.rs | 77 ++- src/policy/immix/immixspace.rs | 33 ++ src/policy/immix/line.rs | 14 +- src/policy/largeobjectspace.rs | 53 +++ src/policy/space.rs | 4 + src/scheduler/gc_work.rs | 38 +- src/scheduler/mod.rs | 2 +- src/scheduler/scheduler.rs | 121 ++++- src/scheduler/work_bucket.rs | 63 ++- src/util/address.rs | 5 + src/util/alloc/immix_allocator.rs | 29 ++ src/util/options.rs | 2 + src/vm/collection.rs | 3 + 25 files changed, 1524 insertions(+), 18 deletions(-) create mode 100644 src/plan/concurrent/barrier.rs create mode 100644 src/plan/concurrent/concurrent_marking_work.rs create mode 100644 src/plan/concurrent/immix/gc_work.rs create mode 100644 src/plan/concurrent/immix/global.rs create mode 100644 src/plan/concurrent/immix/mod.rs create mode 100644 src/plan/concurrent/immix/mutator.rs create mode 100644 src/plan/concurrent/mod.rs diff --git a/src/global_state.rs b/src/global_state.rs index b5a78d9bbe..8abe617b49 100644 --- a/src/global_state.rs +++ b/src/global_state.rs @@ -49,6 +49,8 @@ pub struct GlobalState { pub(crate) malloc_bytes: AtomicUsize, /// This stores the live bytes and the used bytes (by pages) for each space in last GC. This counter is only updated in the GC release phase. pub(crate) live_bytes_in_last_gc: AtomicRefCell>, + pub(crate) concurrent_marking_active: AtomicBool, + pub(crate) concurrent_marking_threshold: AtomicUsize, } impl GlobalState { @@ -206,6 +208,8 @@ impl Default for GlobalState { #[cfg(feature = "malloc_counted_size")] malloc_bytes: AtomicUsize::new(0), live_bytes_in_last_gc: AtomicRefCell::new(HashMap::new()), + concurrent_marking_threshold: AtomicUsize::new(0), + concurrent_marking_active: AtomicBool::new(false), } } } diff --git a/src/lib.rs b/src/lib.rs index afe094885f..e1a7b627e5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,6 +32,8 @@ extern crate static_assertions; extern crate probe; mod mmtk; +use std::sync::atomic::AtomicUsize; + pub use mmtk::MMTKBuilder; pub(crate) use mmtk::MMAPPER; pub use mmtk::MMTK; @@ -51,3 +53,9 @@ pub mod vm; pub use crate::plan::{ AllocationSemantics, BarrierSelector, Mutator, MutatorContext, ObjectQueue, Plan, }; + +static NUM_CONCURRENT_TRACING_PACKETS: AtomicUsize = AtomicUsize::new(0); + +fn concurrent_marking_packets_drained() -> bool { + crate::NUM_CONCURRENT_TRACING_PACKETS.load(std::sync::atomic::Ordering::SeqCst) == 0 +} diff --git a/src/plan/barriers.rs b/src/plan/barriers.rs index 56c069c982..07152bbde7 100644 --- a/src/plan/barriers.rs +++ b/src/plan/barriers.rs @@ -21,6 +21,7 @@ pub enum BarrierSelector { NoBarrier, /// Object remembering barrier is used. ObjectBarrier, + SATBBarrier, } impl BarrierSelector { @@ -45,6 +46,9 @@ impl BarrierSelector { pub trait Barrier: 'static + Send + Downcast { fn flush(&mut self) {} + /// load referent from java.lang.Reference + fn load_reference(&mut self, _referent: ObjectReference) {} + /// Subsuming barrier for object reference write fn object_reference_write( &mut self, @@ -92,6 +96,8 @@ pub trait Barrier: 'static + Send + Downcast { self.memory_region_copy_post(src, dst); } + fn object_reference_clone_pre(&mut self, _obj: ObjectReference) {} + /// Full pre-barrier for array copy fn memory_region_copy_pre(&mut self, _src: VM::VMMemorySlice, _dst: VM::VMMemorySlice) {} @@ -159,6 +165,10 @@ pub trait BarrierSemantics: 'static + Send { /// Object will probably be modified fn object_probable_write_slow(&mut self, _obj: ObjectReference) {} + + fn load_reference(&mut self, _o: ObjectReference) {} + + fn object_reference_clone_pre(&mut self, _obj: ObjectReference) {} } /// Generic object barrier with a type argument defining it's slow-path behaviour. @@ -250,3 +260,82 @@ impl Barrier for ObjectBarrier { } } } + +pub struct SATBBarrier { + semantics: S, +} + +impl SATBBarrier { + pub fn new(semantics: S) -> Self { + Self { semantics } + } + fn object_is_unlogged(&self, object: ObjectReference) -> bool { + // unsafe { S::UNLOG_BIT_SPEC.load::(object, None) != 0 } + S::UNLOG_BIT_SPEC.load_atomic::(object, None, Ordering::SeqCst) != 0 + } +} + +impl Barrier for SATBBarrier { + fn flush(&mut self) { + self.semantics.flush(); + } + + fn load_reference(&mut self, o: ObjectReference) { + self.semantics.load_reference(o) + } + + fn object_reference_clone_pre(&mut self, obj: ObjectReference) { + self.semantics.object_reference_clone_pre(obj); + } + + fn object_probable_write(&mut self, obj: ObjectReference) { + self.semantics.object_probable_write_slow(obj); + } + + fn object_reference_write_pre( + &mut self, + src: ObjectReference, + slot: ::VMSlot, + target: Option, + ) { + if self.object_is_unlogged(src) { + self.semantics + .object_reference_write_slow(src, slot, target); + } + } + + fn object_reference_write_post( + &mut self, + _src: ObjectReference, + _slot: ::VMSlot, + _target: Option, + ) { + unimplemented!() + } + + fn object_reference_write_slow( + &mut self, + src: ObjectReference, + slot: ::VMSlot, + target: Option, + ) { + self.semantics + .object_reference_write_slow(src, slot, target); + } + + fn memory_region_copy_pre( + &mut self, + src: ::VMMemorySlice, + dst: ::VMMemorySlice, + ) { + self.semantics.memory_region_copy_slow(src, dst); + } + + fn memory_region_copy_post( + &mut self, + _src: ::VMMemorySlice, + _dst: ::VMMemorySlice, + ) { + unimplemented!() + } +} diff --git a/src/plan/concurrent/barrier.rs b/src/plan/concurrent/barrier.rs new file mode 100644 index 0000000000..cb76910147 --- /dev/null +++ b/src/plan/concurrent/barrier.rs @@ -0,0 +1,146 @@ +use std::sync::atomic::Ordering; + +use crate::{ + plan::{barriers::BarrierSemantics, concurrent::immix::global::ConcurrentImmix, VectorQueue}, + scheduler::WorkBucketStage, + util::ObjectReference, + vm::{ + slot::{MemorySlice, Slot}, + VMBinding, + }, + MMTK, +}; + +use super::{concurrent_marking_work::ProcessModBufSATB, Pause}; + +pub struct SATBBarrierSemantics { + mmtk: &'static MMTK, + satb: VectorQueue, + refs: VectorQueue, + immix: &'static ConcurrentImmix, +} + +impl SATBBarrierSemantics { + pub fn new(mmtk: &'static MMTK) -> Self { + Self { + mmtk, + satb: VectorQueue::default(), + refs: VectorQueue::default(), + immix: mmtk + .get_plan() + .downcast_ref::>() + .unwrap(), + } + } + + fn slow(&mut self, _src: Option, _slot: VM::VMSlot, old: ObjectReference) { + self.satb.push(old); + if self.satb.is_full() { + self.flush_satb(); + } + } + + fn enqueue_node( + &mut self, + src: Option, + slot: VM::VMSlot, + _new: Option, + ) -> bool { + if let Some(old) = slot.load() { + self.slow(src, slot, old); + } + true + } + + /// Attempt to atomically log an object. + /// Returns true if the object is not logged previously. + fn log_object(&self, object: ObjectReference) -> bool { + Self::UNLOG_BIT_SPEC.store_atomic::(object, 0, None, Ordering::SeqCst); + true + } + + fn flush_satb(&mut self) { + if !self.satb.is_empty() { + if self.should_create_satb_packets() { + let satb = self.satb.take(); + if let Some(pause) = self.immix.current_pause() { + debug_assert_ne!(pause, Pause::InitialMark); + self.mmtk.scheduler.work_buckets[WorkBucketStage::Closure] + .add(ProcessModBufSATB::new(satb)); + } else { + self.mmtk.scheduler.work_buckets[WorkBucketStage::Unconstrained] + .add(ProcessModBufSATB::new(satb)); + } + } else { + let _ = self.satb.take(); + }; + } + } + + #[cold] + fn flush_weak_refs(&mut self) { + if !self.refs.is_empty() { + // debug_assert!(self.should_create_satb_packets()); + let nodes = self.refs.take(); + if let Some(pause) = self.immix.current_pause() { + debug_assert_ne!(pause, Pause::InitialMark); + self.mmtk.scheduler.work_buckets[WorkBucketStage::Closure] + .add(ProcessModBufSATB::new(nodes)); + } else { + self.mmtk.scheduler.work_buckets[WorkBucketStage::Unconstrained] + .add(ProcessModBufSATB::new(nodes)); + } + } + } + + fn should_create_satb_packets(&self) -> bool { + self.immix.concurrent_marking_in_progress() + || self.immix.current_pause() == Some(Pause::FinalMark) + } +} + +impl BarrierSemantics for SATBBarrierSemantics { + type VM = VM; + + #[cold] + fn flush(&mut self) { + self.flush_satb(); + self.flush_weak_refs(); + } + + fn object_reference_write_slow( + &mut self, + src: ObjectReference, + _slot: ::VMSlot, + _target: Option, + ) { + self.object_probable_write_slow(src); + self.log_object(src); + } + + fn memory_region_copy_slow( + &mut self, + _src: ::VMMemorySlice, + dst: ::VMMemorySlice, + ) { + for s in dst.iter_slots() { + self.enqueue_node(None, s, None); + } + } + + fn load_reference(&mut self, o: ObjectReference) { + if !self.immix.concurrent_marking_in_progress() { + return; + } + self.refs.push(o); + if self.refs.is_full() { + self.flush_weak_refs(); + } + } + + fn object_probable_write_slow(&mut self, obj: ObjectReference) { + obj.iterate_fields::(|s| { + self.enqueue_node(Some(obj), s, None); + }); + } +} diff --git a/src/plan/concurrent/concurrent_marking_work.rs b/src/plan/concurrent/concurrent_marking_work.rs new file mode 100644 index 0000000000..6252b81eab --- /dev/null +++ b/src/plan/concurrent/concurrent_marking_work.rs @@ -0,0 +1,244 @@ +use crate::plan::concurrent::immix::global::ConcurrentImmix; +use crate::plan::concurrent::Pause; +use crate::plan::VectorQueue; +use crate::policy::gc_work::PolicyTraceObject; +use crate::policy::immix::TRACE_KIND_FAST; +use crate::policy::space::Space; +use crate::scheduler::gc_work::{ScanObjects, SlotOf}; +use crate::util::ObjectReference; +use crate::vm::slot::Slot; + +use crate::Plan; +use crate::{ + plan::ObjectQueue, + scheduler::{gc_work::ProcessEdgesBase, GCWork, GCWorker, ProcessEdgesWork, WorkBucketStage}, + vm::*, + MMTK, +}; +use atomic::Ordering; +use std::ops::{Deref, DerefMut}; + +pub struct ConcurrentTraceObjects { + plan: &'static ConcurrentImmix, + // objects to mark and scan + objects: Option>, + // recursively generated objects + next_objects: VectorQueue, + worker: *mut GCWorker, +} + +impl ConcurrentTraceObjects { + const SATB_BUFFER_SIZE: usize = 8192; + + pub fn new(objects: Vec, mmtk: &'static MMTK) -> Self { + let plan = mmtk + .get_plan() + .downcast_ref::>() + .unwrap(); + crate::NUM_CONCURRENT_TRACING_PACKETS.fetch_add(1, Ordering::SeqCst); + Self { + plan, + objects: Some(objects), + next_objects: VectorQueue::default(), + worker: std::ptr::null_mut(), + } + } + + pub fn worker(&self) -> &'static mut GCWorker { + debug_assert_ne!(self.worker, std::ptr::null_mut()); + unsafe { &mut *self.worker } + } + + #[cold] + fn flush(&mut self) { + if !self.next_objects.is_empty() { + let objects = self.next_objects.take(); + let worker = self.worker(); + let w = Self::new(objects, worker.mmtk); + worker.add_work(WorkBucketStage::Unconstrained, w); + } + } + + fn trace_object(&mut self, object: ObjectReference) -> ObjectReference { + if self.plan.immix_space.in_space(object) { + self.plan + .immix_space + .trace_object::(self, object, None, self.worker()); + } else { + self.plan.common().get_los().trace_object(self, object); + } + object + } + + fn trace_objects(&mut self, objects: &[ObjectReference]) { + for o in objects.iter() { + self.trace_object(*o); + } + } + + fn scan_and_enqueue(&mut self, object: ObjectReference) { + object.iterate_fields::(|s| { + let Some(t) = s.load() else { + return; + }; + + self.next_objects.push(t); + if self.next_objects.len() > Self::SATB_BUFFER_SIZE { + self.flush(); + } + }); + } +} + +impl ObjectQueue for ConcurrentTraceObjects { + fn enqueue(&mut self, object: ObjectReference) { + debug_assert!( + object.to_raw_address().is_mapped(), + "Invalid obj {:?}: address is not mapped", + object + ); + self.scan_and_enqueue(object); + } +} + +unsafe impl Send for ConcurrentTraceObjects {} + +impl GCWork for ConcurrentTraceObjects { + fn do_work(&mut self, worker: &mut GCWorker, _mmtk: &'static MMTK) { + self.worker = worker; + // mark objects + if let Some(objects) = self.objects.take() { + self.trace_objects(&objects) + } + let pause_opt = self.plan.current_pause(); + if pause_opt == Some(Pause::FinalMark) || pause_opt.is_none() { + let mut next_objects = vec![]; + while !self.next_objects.is_empty() { + let pause_opt = self.plan.current_pause(); + if !(pause_opt == Some(Pause::FinalMark) || pause_opt.is_none()) { + break; + } + next_objects.clear(); + self.next_objects.swap(&mut next_objects); + self.trace_objects(&next_objects); + } + } + self.flush(); + crate::NUM_CONCURRENT_TRACING_PACKETS.fetch_sub(1, Ordering::SeqCst); + } +} + +pub struct ProcessModBufSATB { + nodes: Option>, +} + +impl ProcessModBufSATB { + pub fn new(nodes: Vec) -> Self { + Self { nodes: Some(nodes) } + } +} + +impl GCWork for ProcessModBufSATB { + fn do_work(&mut self, worker: &mut GCWorker, mmtk: &'static MMTK) { + let mut w = if let Some(nodes) = self.nodes.take() { + if nodes.is_empty() { + return; + } + + ConcurrentTraceObjects::new(nodes, mmtk) + } else { + return; + }; + GCWork::do_work(&mut w, worker, mmtk); + } +} + +pub struct ProcessRootSlots { + base: ProcessEdgesBase, +} + +impl ProcessEdgesWork for ProcessRootSlots { + type VM = VM; + type ScanObjectsWorkType = ScanObjects; + const OVERWRITE_REFERENCE: bool = false; + const SCAN_OBJECTS_IMMEDIATELY: bool = true; + + fn new( + slots: Vec>, + roots: bool, + mmtk: &'static MMTK, + bucket: WorkBucketStage, + ) -> Self { + debug_assert!(roots); + let base = ProcessEdgesBase::new(slots, roots, mmtk, bucket); + Self { base } + } + + fn flush(&mut self) {} + + fn trace_object(&mut self, _object: ObjectReference) -> ObjectReference { + unreachable!() + } + + fn process_slots(&mut self) { + let pause = self + .base + .plan() + .downcast_ref::>() + .unwrap() + .current_pause() + .unwrap(); + // No need to scan roots in the final mark + if pause == Pause::FinalMark { + return; + } + let mut root_objects = Vec::with_capacity(Self::CAPACITY); + if !self.slots.is_empty() { + let slots = std::mem::take(&mut self.slots); + for slot in slots { + if let Some(object) = slot.load() { + root_objects.push(object); + if root_objects.len() == Self::CAPACITY { + // create the packet + let worker = self.worker(); + let mmtk = self.mmtk(); + let w = ConcurrentTraceObjects::new(root_objects.clone(), mmtk); + + match pause { + Pause::InitialMark => worker.scheduler().postpone(w), + _ => unreachable!(), + } + + root_objects.clear(); + } + } + } + if !root_objects.is_empty() { + let worker = self.worker(); + let w = ConcurrentTraceObjects::new(root_objects.clone(), self.mmtk()); + + match pause { + Pause::InitialMark => worker.scheduler().postpone(w), + _ => unreachable!(), + } + } + } + } + + fn create_scan_work(&self, _nodes: Vec) -> Self::ScanObjectsWorkType { + unimplemented!() + } +} + +impl Deref for ProcessRootSlots { + type Target = ProcessEdgesBase; + fn deref(&self) -> &Self::Target { + &self.base + } +} + +impl DerefMut for ProcessRootSlots { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.base + } +} diff --git a/src/plan/concurrent/immix/gc_work.rs b/src/plan/concurrent/immix/gc_work.rs new file mode 100644 index 0000000000..6372105313 --- /dev/null +++ b/src/plan/concurrent/immix/gc_work.rs @@ -0,0 +1,25 @@ +use crate::plan::concurrent::immix::global::ConcurrentImmix; +use crate::policy::gc_work::{TraceKind, TRACE_KIND_TRANSITIVE_PIN}; +use crate::scheduler::gc_work::{PlanProcessEdges, UnsupportedProcessEdges}; +use crate::scheduler::ProcessEdgesWork; +use crate::vm::VMBinding; + +pub(super) struct ConcurrentImmixSTWGCWorkContext( + std::marker::PhantomData, +); +impl crate::scheduler::GCWorkContext + for ConcurrentImmixSTWGCWorkContext +{ + type VM = VM; + type PlanType = ConcurrentImmix; + type DefaultProcessEdges = PlanProcessEdges, KIND>; + type PinningProcessEdges = PlanProcessEdges, TRACE_KIND_TRANSITIVE_PIN>; +} +pub(super) struct ConcurrentImmixGCWorkContext(std::marker::PhantomData); + +impl crate::scheduler::GCWorkContext for ConcurrentImmixGCWorkContext { + type VM = E::VM; + type PlanType = ConcurrentImmix; + type DefaultProcessEdges = E; + type PinningProcessEdges = UnsupportedProcessEdges; +} diff --git a/src/plan/concurrent/immix/global.rs b/src/plan/concurrent/immix/global.rs new file mode 100644 index 0000000000..bb8d7f6879 --- /dev/null +++ b/src/plan/concurrent/immix/global.rs @@ -0,0 +1,440 @@ +use crate::plan::concurrent::concurrent_marking_work::ProcessRootSlots; +use crate::plan::concurrent::immix::gc_work::ConcurrentImmixGCWorkContext; +use crate::plan::concurrent::immix::gc_work::ConcurrentImmixSTWGCWorkContext; +use crate::plan::concurrent::Pause; +use crate::plan::global::BasePlan; +use crate::plan::global::CommonPlan; +use crate::plan::global::CreateGeneralPlanArgs; +use crate::plan::global::CreateSpecificPlanArgs; +use crate::plan::immix::mutator::ALLOCATOR_MAPPING; +use crate::plan::AllocationSemantics; +use crate::plan::Plan; +use crate::plan::PlanConstraints; +use crate::policy::immix::ImmixSpaceArgs; +use crate::policy::immix::TRACE_KIND_DEFRAG; +use crate::policy::immix::TRACE_KIND_FAST; +use crate::policy::space::Space; +use crate::scheduler::gc_work::Release; +use crate::scheduler::gc_work::StopMutators; +use crate::scheduler::gc_work::UnsupportedProcessEdges; +use crate::scheduler::*; +use crate::util::alloc::allocators::AllocatorSelector; +use crate::util::copy::*; +use crate::util::heap::gc_trigger::SpaceStats; +use crate::util::heap::VMRequest; +use crate::util::metadata::side_metadata::SideMetadataContext; +use crate::vm::VMBinding; +use crate::{policy::immix::ImmixSpace, util::opaque_pointer::VMWorkerThread}; +use std::sync::atomic::AtomicBool; + +use atomic::Atomic; +use atomic::Ordering; +use enum_map::EnumMap; + +use mmtk_macros::{HasSpaces, PlanTraceObject}; + +#[derive(Debug, Clone, Copy, bytemuck::NoUninit, PartialEq, Eq)] +#[repr(u8)] +enum GCCause { + Unknown, + FullHeap, + InitialMark, + FinalMark, +} + +#[derive(HasSpaces, PlanTraceObject)] +pub struct ConcurrentImmix { + #[post_scan] + #[space] + #[copy_semantics(CopySemantics::DefaultCopy)] + pub immix_space: ImmixSpace, + #[parent] + pub common: CommonPlan, + last_gc_was_defrag: AtomicBool, + current_pause: Atomic>, + previous_pause: Atomic>, + gc_cause: Atomic, +} + +/// The plan constraints for the immix plan. +pub const CONCURRENT_IMMIX_CONSTRAINTS: PlanConstraints = PlanConstraints { + // If we disable moving in Immix, this is a non-moving plan. + moves_objects: false, + // Max immix object size is half of a block. + max_non_los_default_alloc_bytes: crate::policy::immix::MAX_IMMIX_OBJECT_SIZE, + needs_prepare_mutator: true, + barrier: crate::BarrierSelector::SATBBarrier, + ..PlanConstraints::default() +}; + +impl Plan for ConcurrentImmix { + fn collection_required(&self, space_full: bool, _space: Option>) -> bool { + if self.base().collection_required(self, space_full) { + self.gc_cause.store(GCCause::FullHeap, Ordering::Release); + return true; + } + + let concurrent_marking_in_progress = self.concurrent_marking_in_progress(); + + if concurrent_marking_in_progress && crate::concurrent_marking_packets_drained() { + self.gc_cause.store(GCCause::FinalMark, Ordering::Release); + return true; + } + let threshold = self.get_total_pages() >> 1; + let concurrent_marking_threshold = self + .common + .base + .global_state + .concurrent_marking_threshold + .load(Ordering::Acquire); + if !concurrent_marking_in_progress && concurrent_marking_threshold > threshold { + debug_assert!(crate::concurrent_marking_packets_drained()); + debug_assert!(!self.concurrent_marking_in_progress()); + let prev_pause = self.previous_pause(); + debug_assert!(prev_pause.is_none() || prev_pause.unwrap() != Pause::InitialMark); + self.gc_cause.store(GCCause::InitialMark, Ordering::Release); + return true; + } + false + } + + fn last_collection_was_exhaustive(&self) -> bool { + self.immix_space + .is_last_gc_exhaustive(self.last_gc_was_defrag.load(Ordering::Relaxed)) + } + + fn constraints(&self) -> &'static PlanConstraints { + &CONCURRENT_IMMIX_CONSTRAINTS + } + + fn create_copy_config(&'static self) -> CopyConfig { + use enum_map::enum_map; + CopyConfig { + copy_mapping: enum_map! { + CopySemantics::DefaultCopy => CopySelector::Immix(0), + _ => CopySelector::Unused, + }, + space_mapping: vec![(CopySelector::Immix(0), &self.immix_space)], + constraints: &CONCURRENT_IMMIX_CONSTRAINTS, + } + } + + fn schedule_collection(&'static self, scheduler: &GCWorkScheduler) { + self.current_pause + .store(Some(Pause::Full), Ordering::SeqCst); + + Self::schedule_immix_full_heap_collection::< + ConcurrentImmix, + ConcurrentImmixSTWGCWorkContext, + ConcurrentImmixSTWGCWorkContext, + >(self, &self.immix_space, scheduler); + } + + fn schedule_concurrent_collection(&'static self, scheduler: &GCWorkScheduler) { + let pause = self.select_collection_kind(); + if pause == Pause::Full { + self.current_pause + .store(Some(Pause::Full), Ordering::SeqCst); + + Self::schedule_immix_full_heap_collection::< + ConcurrentImmix, + ConcurrentImmixSTWGCWorkContext, + ConcurrentImmixSTWGCWorkContext, + >(self, &self.immix_space, scheduler); + } else { + // Set current pause kind + self.current_pause.store(Some(pause), Ordering::SeqCst); + // Schedule work + match pause { + Pause::InitialMark => self.schedule_concurrent_marking_initial_pause(scheduler), + Pause::FinalMark => self.schedule_concurrent_marking_final_pause(scheduler), + _ => unreachable!(), + } + } + } + + fn get_allocator_mapping(&self) -> &'static EnumMap { + &ALLOCATOR_MAPPING + } + + fn prepare(&mut self, tls: VMWorkerThread) { + let pause = self.current_pause().unwrap(); + match pause { + Pause::Full => { + self.common.prepare(tls, true); + self.immix_space.prepare( + true, + Some(crate::policy::immix::defrag::StatsForDefrag::new(self)), + ); + } + Pause::InitialMark => { + // init prepare has to be executed first, otherwise, los objects will not be + // dealt with properly + self.common.initial_pause_prepare(); + self.immix_space.initial_pause_prepare(); + self.common.prepare(tls, true); + self.immix_space.prepare( + true, + Some(crate::policy::immix::defrag::StatsForDefrag::new(self)), + ); + } + Pause::FinalMark => (), + } + } + + fn release(&mut self, tls: VMWorkerThread) { + let pause = self.current_pause().unwrap(); + match pause { + Pause::InitialMark => (), + Pause::Full | Pause::FinalMark => { + self.immix_space.final_pause_release(); + self.common.final_pause_release(); + self.common.release(tls, true); + // release the collected region + self.immix_space.release(true); + } + } + // reset the concurrent marking page counting + self.common() + .base + .global_state + .concurrent_marking_threshold + .store(0, Ordering::Release); + } + + fn end_of_gc(&mut self, _tls: VMWorkerThread) { + self.last_gc_was_defrag + .store(self.immix_space.end_of_gc(), Ordering::Relaxed); + } + + fn current_gc_may_move_object(&self) -> bool { + self.immix_space.in_defrag() + } + + fn get_collection_reserved_pages(&self) -> usize { + self.immix_space.defrag_headroom_pages() + } + + fn get_used_pages(&self) -> usize { + self.immix_space.reserved_pages() + self.common.get_used_pages() + } + + fn base(&self) -> &BasePlan { + &self.common.base + } + + fn base_mut(&mut self) -> &mut BasePlan { + &mut self.common.base + } + + fn common(&self) -> &CommonPlan { + &self.common + } + + fn gc_pause_start(&self, _scheduler: &GCWorkScheduler) { + use crate::vm::ActivePlan; + let pause = self.current_pause().unwrap(); + match pause { + Pause::Full => { + self.set_concurrent_marking_state(false); + } + Pause::InitialMark => { + debug_assert!( + !self.concurrent_marking_in_progress(), + "prev pause: {:?}", + self.previous_pause().unwrap() + ); + } + Pause::FinalMark => { + debug_assert!(self.concurrent_marking_in_progress()); + // Flush barrier buffers + for mutator in ::VMActivePlan::mutators() { + mutator.barrier.flush(); + } + self.set_concurrent_marking_state(false); + } + } + println!("{:?} start", pause); + } + + fn gc_pause_end(&self) { + let pause = self.current_pause().unwrap(); + if pause == Pause::InitialMark { + self.set_concurrent_marking_state(true); + } + self.previous_pause.store(Some(pause), Ordering::SeqCst); + self.current_pause.store(None, Ordering::SeqCst); + println!("{:?} end", pause); + } +} + +impl ConcurrentImmix { + pub fn new(args: CreateGeneralPlanArgs) -> Self { + use crate::vm::ObjectModel; + + let spec = crate::util::metadata::extract_side_metadata(&[ + *VM::VMObjectModel::GLOBAL_LOG_BIT_SPEC, + ]); + + let plan_args = CreateSpecificPlanArgs { + global_args: args, + constraints: &CONCURRENT_IMMIX_CONSTRAINTS, + global_side_metadata_specs: SideMetadataContext::new_global_specs(&spec), + }; + Self::new_with_args( + plan_args, + ImmixSpaceArgs { + unlog_object_when_traced: false, + #[cfg(feature = "vo_bit")] + mixed_age: false, + never_move_objects: true, + }, + ) + } + + pub fn new_with_args( + mut plan_args: CreateSpecificPlanArgs, + space_args: ImmixSpaceArgs, + ) -> Self { + let immix = ConcurrentImmix { + immix_space: ImmixSpace::new( + plan_args.get_space_args("immix", true, false, VMRequest::discontiguous()), + space_args, + ), + common: CommonPlan::new(plan_args), + last_gc_was_defrag: AtomicBool::new(false), + current_pause: Atomic::new(None), + previous_pause: Atomic::new(None), + gc_cause: Atomic::new(GCCause::Unknown), + }; + + immix.verify_side_metadata_sanity(); + + immix + } + + /// Schedule a full heap immix collection. This method is used by immix/genimmix/stickyimmix + /// to schedule a full heap collection. A plan must call set_collection_kind and set_gc_status before this method. + pub(crate) fn schedule_immix_full_heap_collection< + PlanType: Plan, + FastContext: GCWorkContext, + DefragContext: GCWorkContext, + >( + plan: &'static DefragContext::PlanType, + immix_space: &ImmixSpace, + scheduler: &GCWorkScheduler, + ) -> bool { + let in_defrag = immix_space.decide_whether_to_defrag( + plan.base().global_state.is_emergency_collection(), + true, + plan.base() + .global_state + .cur_collection_attempts + .load(Ordering::SeqCst), + plan.base().global_state.is_user_triggered_collection(), + *plan.base().options.full_heap_system_gc, + ); + + if in_defrag { + scheduler.schedule_common_work::(plan); + } else { + scheduler.schedule_common_work::(plan); + } + in_defrag + } + + fn select_collection_kind(&self) -> Pause { + let emergency = self.base().global_state.is_emergency_collection(); + let user_triggered = self.base().global_state.is_user_triggered_collection(); + let concurrent_marking_in_progress = self.concurrent_marking_in_progress(); + let concurrent_marking_packets_drained = crate::concurrent_marking_packets_drained(); + + if emergency || user_triggered { + return Pause::Full; + } else if !concurrent_marking_in_progress && concurrent_marking_packets_drained { + return Pause::InitialMark; + } else if concurrent_marking_in_progress && concurrent_marking_packets_drained { + return Pause::FinalMark; + } + + Pause::Full + } + + fn disable_unnecessary_buckets(&'static self, scheduler: &GCWorkScheduler, pause: Pause) { + if pause == Pause::InitialMark { + scheduler.work_buckets[WorkBucketStage::Closure].set_as_disabled(); + scheduler.work_buckets[WorkBucketStage::WeakRefClosure].set_as_disabled(); + scheduler.work_buckets[WorkBucketStage::FinalRefClosure].set_as_disabled(); + scheduler.work_buckets[WorkBucketStage::PhantomRefClosure].set_as_disabled(); + } + scheduler.work_buckets[WorkBucketStage::TPinningClosure].set_as_disabled(); + scheduler.work_buckets[WorkBucketStage::PinningRootsTrace].set_as_disabled(); + scheduler.work_buckets[WorkBucketStage::VMRefClosure].set_as_disabled(); + scheduler.work_buckets[WorkBucketStage::VMRefForwarding].set_as_disabled(); + scheduler.work_buckets[WorkBucketStage::SoftRefClosure].set_as_disabled(); + scheduler.work_buckets[WorkBucketStage::CalculateForwarding].set_as_disabled(); + scheduler.work_buckets[WorkBucketStage::SecondRoots].set_as_disabled(); + scheduler.work_buckets[WorkBucketStage::RefForwarding].set_as_disabled(); + scheduler.work_buckets[WorkBucketStage::FinalizableForwarding].set_as_disabled(); + scheduler.work_buckets[WorkBucketStage::Compact].set_as_disabled(); + } + + pub(crate) fn schedule_concurrent_marking_initial_pause( + &'static self, + scheduler: &GCWorkScheduler, + ) { + use crate::scheduler::gc_work::{Prepare, StopMutators, UnsupportedProcessEdges}; + + self.disable_unnecessary_buckets(scheduler, Pause::InitialMark); + + scheduler.work_buckets[WorkBucketStage::Unconstrained].add_prioritized(Box::new( + StopMutators::>>::new_args( + Pause::InitialMark, + ), + )); + scheduler.work_buckets[WorkBucketStage::Prepare].add(Prepare::< + ConcurrentImmixGCWorkContext>, + >::new(self)); + } + + fn schedule_concurrent_marking_final_pause(&'static self, scheduler: &GCWorkScheduler) { + self.disable_unnecessary_buckets(scheduler, Pause::FinalMark); + + scheduler.work_buckets[WorkBucketStage::Unconstrained].add_prioritized(Box::new( + StopMutators::>>::new_args( + Pause::FinalMark, + ), + )); + + scheduler.work_buckets[WorkBucketStage::Release].add(Release::< + ConcurrentImmixGCWorkContext>, + >::new(self)); + } + + pub fn concurrent_marking_in_progress(&self) -> bool { + self.common() + .base + .global_state + .concurrent_marking_active + .load(Ordering::Acquire) + } + + fn set_concurrent_marking_state(&self, active: bool) { + use crate::vm::Collection; + + ::VMCollection::set_concurrent_marking_state(active); + self.common() + .base + .global_state + .concurrent_marking_active + .store(active, Ordering::SeqCst); + } + + pub fn current_pause(&self) -> Option { + self.current_pause.load(Ordering::SeqCst) + } + + pub fn previous_pause(&self) -> Option { + self.previous_pause.load(Ordering::SeqCst) + } +} diff --git a/src/plan/concurrent/immix/mod.rs b/src/plan/concurrent/immix/mod.rs new file mode 100644 index 0000000000..0f55961897 --- /dev/null +++ b/src/plan/concurrent/immix/mod.rs @@ -0,0 +1,7 @@ +//! Plan: concurrent immix + +pub(in crate::plan) mod gc_work; +pub(in crate::plan) mod global; +pub(in crate::plan) mod mutator; + +pub use global::ConcurrentImmix; diff --git a/src/plan/concurrent/immix/mutator.rs b/src/plan/concurrent/immix/mutator.rs new file mode 100644 index 0000000000..1304e34ea0 --- /dev/null +++ b/src/plan/concurrent/immix/mutator.rs @@ -0,0 +1,84 @@ +use crate::plan::barriers::SATBBarrier; +use crate::plan::concurrent::barrier::SATBBarrierSemantics; +use crate::plan::concurrent::immix::ConcurrentImmix; +use crate::plan::mutator_context::create_allocator_mapping; +use crate::plan::mutator_context::create_space_mapping; + +use crate::plan::mutator_context::Mutator; +use crate::plan::mutator_context::MutatorBuilder; +use crate::plan::mutator_context::MutatorConfig; +use crate::plan::mutator_context::ReservedAllocators; +use crate::plan::AllocationSemantics; +use crate::util::alloc::allocators::AllocatorSelector; +use crate::util::alloc::ImmixAllocator; +use crate::util::opaque_pointer::{VMMutatorThread, VMWorkerThread}; +use crate::vm::VMBinding; +use crate::MMTK; +use enum_map::EnumMap; + +pub fn concurrent_immix_mutator_release( + mutator: &mut Mutator, + _tls: VMWorkerThread, +) { + let immix_allocator = unsafe { + mutator + .allocators + .get_allocator_mut(mutator.config.allocator_mapping[AllocationSemantics::Default]) + } + .downcast_mut::>() + .unwrap(); + immix_allocator.reset(); +} + +pub fn concurent_immix_mutator_prepare( + mutator: &mut Mutator, + _tls: VMWorkerThread, +) { + let immix_allocator = unsafe { + mutator + .allocators + .get_allocator_mut(mutator.config.allocator_mapping[AllocationSemantics::Default]) + } + .downcast_mut::>() + .unwrap(); + immix_allocator.reset(); +} + +pub(in crate::plan) const RESERVED_ALLOCATORS: ReservedAllocators = ReservedAllocators { + n_immix: 1, + ..ReservedAllocators::DEFAULT +}; + +lazy_static! { + pub static ref ALLOCATOR_MAPPING: EnumMap = { + let mut map = create_allocator_mapping(RESERVED_ALLOCATORS, true); + map[AllocationSemantics::Default] = AllocatorSelector::Immix(0); + map + }; +} + +pub fn create_concurrent_immix_mutator( + mutator_tls: VMMutatorThread, + mmtk: &'static MMTK, +) -> Mutator { + let immix = mmtk + .get_plan() + .downcast_ref::>() + .unwrap(); + let config = MutatorConfig { + allocator_mapping: &ALLOCATOR_MAPPING, + space_mapping: Box::new({ + let mut vec = create_space_mapping(RESERVED_ALLOCATORS, true, immix); + vec.push((AllocatorSelector::Immix(0), &immix.immix_space)); + vec + }), + + prepare_func: &concurent_immix_mutator_prepare, + release_func: &concurrent_immix_mutator_release, + }; + + let builder = MutatorBuilder::new(mutator_tls, mmtk, config); + builder + .barrier(Box::new(SATBBarrier::new(SATBBarrierSemantics::new(mmtk)))) + .build() +} diff --git a/src/plan/concurrent/mod.rs b/src/plan/concurrent/mod.rs new file mode 100644 index 0000000000..94a582a4aa --- /dev/null +++ b/src/plan/concurrent/mod.rs @@ -0,0 +1,23 @@ +pub mod barrier; +pub mod concurrent_marking_work; +pub mod immix; + +use bytemuck::NoUninit; + +#[repr(u8)] +#[derive(Debug, PartialEq, Eq, Copy, Clone, NoUninit)] +pub enum Pause { + Full = 1, + InitialMark, + FinalMark, +} + +unsafe impl bytemuck::ZeroableInOption for Pause {} + +unsafe impl bytemuck::PodInOption for Pause {} + +impl Default for Pause { + fn default() -> Self { + Self::Full + } +} diff --git a/src/plan/global.rs b/src/plan/global.rs index 5a5bb38ab5..ddbed6c6f1 100644 --- a/src/plan/global.rs +++ b/src/plan/global.rs @@ -58,6 +58,9 @@ pub fn create_mutator( PlanSelector::StickyImmix => { crate::plan::sticky::immix::mutator::create_stickyimmix_mutator(tls, mmtk) } + PlanSelector::ConcurrentImmix => { + crate::plan::concurrent::immix::mutator::create_concurrent_immix_mutator(tls, mmtk) + } }) } @@ -91,6 +94,10 @@ pub fn create_plan( PlanSelector::StickyImmix => { Box::new(crate::plan::sticky::immix::StickyImmix::new(args)) as Box> } + PlanSelector::ConcurrentImmix => { + Box::new(crate::plan::concurrent::immix::ConcurrentImmix::new(args)) + as Box> + } }; // We have created Plan in the heap, and we won't explicitly move it. @@ -160,6 +167,11 @@ pub trait Plan: 'static + HasSpaces + Sync + Downcast { /// Schedule work for the upcoming GC. fn schedule_collection(&'static self, _scheduler: &GCWorkScheduler); + /// Schedule work for the upcoming concurrent GC. + fn schedule_concurrent_collection(&'static self, _scheduler: &GCWorkScheduler) { + self.schedule_collection(_scheduler); + } + /// Get the common plan. CommonPlan is included by most of MMTk GC plans. fn common(&self) -> &CommonPlan { panic!("Common Plan not handled!") @@ -331,6 +343,9 @@ pub trait Plan: 'static + HasSpaces + Sync + Downcast { space.verify_side_metadata_sanity(&mut side_metadata_sanity_checker); }) } + + fn gc_pause_start(&self, _scheduler: &GCWorkScheduler) {} + fn gc_pause_end(&self) {} } impl_downcast!(Plan assoc VM); @@ -601,6 +616,14 @@ impl CommonPlan { + self.base.get_used_pages() } + pub fn initial_pause_prepare(&mut self) { + self.los.initial_pause_prepare(); + } + + pub fn final_pause_release(&mut self) { + self.los.final_pause_release(); + } + pub fn prepare(&mut self, tls: VMWorkerThread, full_heap: bool) { self.immortal.prepare(); self.los.prepare(full_heap); diff --git a/src/plan/mod.rs b/src/plan/mod.rs index 74fcac2811..91e8a6a240 100644 --- a/src/plan/mod.rs +++ b/src/plan/mod.rs @@ -19,6 +19,7 @@ pub use barriers::BarrierSelector; pub(crate) mod gc_requester; mod global; +pub(crate) use concurrent::Pause; pub(crate) use global::create_gc_worker_context; pub(crate) use global::create_mutator; pub(crate) use global::create_plan; @@ -37,13 +38,15 @@ pub use plan_constraints::PlanConstraints; pub(crate) use plan_constraints::DEFAULT_PLAN_CONSTRAINTS; mod tracing; -pub use tracing::{ObjectQueue, ObjectsClosure, VectorObjectQueue, VectorQueue}; +pub use tracing::{ObjectQueue, ObjectsClosure, SlotIterator, VectorObjectQueue, VectorQueue}; /// Generational plans (with a copying nursery) mod generational; /// Sticky plans (using sticky marks for generational behaviors without a copying nursery) mod sticky; +mod concurrent; + mod immix; mod markcompact; mod marksweep; diff --git a/src/plan/tracing.rs b/src/plan/tracing.rs index eecd40cbaf..e9dad03de8 100644 --- a/src/plan/tracing.rs +++ b/src/plan/tracing.rs @@ -1,10 +1,12 @@ //! This module contains code useful for tracing, //! i.e. visiting the reachable objects by traversing all or part of an object graph. +use std::marker::PhantomData; + use crate::scheduler::gc_work::{ProcessEdgesWork, SlotOf}; use crate::scheduler::{GCWorker, WorkBucketStage, EDGES_WORK_BUFFER_SIZE}; -use crate::util::ObjectReference; -use crate::vm::SlotVisitor; +use crate::util::{ObjectReference, VMThread, VMWorkerThread}; +use crate::vm::{Scanning, SlotVisitor, VMBinding}; /// This trait represents an object queue to enqueue objects during tracing. pub trait ObjectQueue { @@ -63,6 +65,21 @@ impl VectorQueue { } self.buffer.push(v); } + + /// Return the len of the queue + pub fn len(&self) -> usize { + self.buffer.len() + } + + /// Replace what was in the queue with data in new_buffer + pub fn swap(&mut self, new_buffer: &mut Vec) { + std::mem::swap(&mut self.buffer, new_buffer) + } + + /// Empty the queue + pub fn clear(&mut self) { + self.buffer.clear() + } } impl Default for VectorQueue { @@ -134,3 +151,59 @@ impl Drop for ObjectsClosure<'_, E> { self.flush(); } } + +struct SlotIteratorImpl { + f: F, + // should_discover_references: bool, + // should_claim_clds: bool, + // should_follow_clds: bool, + _p: PhantomData, +} + +impl SlotVisitor for SlotIteratorImpl { + fn visit_slot(&mut self, slot: VM::VMSlot) { + (self.f)(slot); + } +} + +pub struct SlotIterator { + _p: PhantomData, +} + +impl SlotIterator { + pub fn iterate( + o: ObjectReference, + // should_discover_references: bool, + // should_claim_clds: bool, + // should_follow_clds: bool, + f: impl FnMut(VM::VMSlot), + // klass: Option
, + ) { + let mut x = SlotIteratorImpl:: { + f, + // should_discover_references, + // should_claim_clds, + // should_follow_clds, + _p: PhantomData, + }; + // if let Some(klass) = klass { + // >::scan_object_with_klass( + // VMWorkerThread(VMThread::UNINITIALIZED), + // o, + // &mut x, + // klass, + // ); + // } else { + // >::scan_object( + // VMWorkerThread(VMThread::UNINITIALIZED), + // o, + // &mut x, + // ); + // } + >::scan_object( + VMWorkerThread(VMThread::UNINITIALIZED), + o, + &mut x, + ); + } +} diff --git a/src/policy/immix/immixspace.rs b/src/policy/immix/immixspace.rs index a8bcd54fda..e205f3bc3d 100644 --- a/src/policy/immix/immixspace.rs +++ b/src/policy/immix/immixspace.rs @@ -203,6 +203,13 @@ impl Space for ImmixSpace { fn enumerate_objects(&self, enumerator: &mut dyn ObjectEnumerator) { object_enum::enumerate_blocks_from_chunk_map::(enumerator, &self.chunk_map); } + + fn concurrent_marking_active(&self) -> bool { + self.common() + .global_state + .concurrent_marking_active + .load(Ordering::Acquire) + } } impl crate::policy::gc_work::PolicyTraceObject for ImmixSpace { @@ -411,6 +418,24 @@ impl ImmixSpace { &self.scheduler } + pub fn initial_pause_prepare(&mut self) { + // make sure all allocated blocks have unlog bit set during initial mark + if let MetadataSpec::OnSide(side) = *VM::VMObjectModel::GLOBAL_LOG_BIT_SPEC { + for chunk in self.chunk_map.all_chunks() { + side.bset_metadata(chunk.start(), Chunk::BYTES); + } + } + } + + pub fn final_pause_release(&mut self) { + // clear the unlog bit so that during normal mutator phase, stab barrier is effectively disabled (all objects are considered as logged and thus no slow path will be taken) + if let MetadataSpec::OnSide(side) = *VM::VMObjectModel::GLOBAL_LOG_BIT_SPEC { + for chunk in self.chunk_map.all_chunks() { + side.bzero_metadata(chunk.start(), Chunk::BYTES); + } + } + } + pub fn prepare(&mut self, major_gc: bool, plan_stats: Option) { if major_gc { // Update mark_state @@ -572,6 +597,10 @@ impl ImmixSpace { self.chunk_map.set_allocated(block.chunk(), true); self.lines_consumed .fetch_add(Block::LINES, Ordering::SeqCst); + self.common() + .global_state + .concurrent_marking_threshold + .fetch_add(Block::PAGES, Ordering::Relaxed); Some(block) } @@ -598,6 +627,10 @@ impl ImmixSpace { self.lines_consumed.fetch_add(lines_delta, Ordering::SeqCst); block.init(copy); + self.common() + .global_state + .concurrent_marking_threshold + .fetch_add(Block::PAGES, Ordering::Relaxed); return Some(block); } else { return None; diff --git a/src/policy/immix/line.rs b/src/policy/immix/line.rs index 94036ecc65..f48ea7d271 100644 --- a/src/policy/immix/line.rs +++ b/src/policy/immix/line.rs @@ -1,6 +1,8 @@ +use std::ops::Range; + use super::block::Block; use crate::util::linear_scan::{Region, RegionIterator}; -use crate::util::metadata::side_metadata::SideMetadataSpec; +use crate::util::metadata::side_metadata::{address_to_meta_address, SideMetadataSpec}; use crate::{ util::{Address, ObjectReference}, vm::*, @@ -81,4 +83,14 @@ impl Line { } marked_lines } + + pub fn initialize_mark_table_as_marked(lines: Range) { + let meta = VM::VMObjectModel::LOCAL_MARK_BIT_SPEC.extract_side_spec(); + let start: *mut u8 = address_to_meta_address(meta, lines.start.start()).to_mut_ptr(); + let limit: *mut u8 = address_to_meta_address(meta, lines.end.start()).to_mut_ptr(); + unsafe { + let bytes = limit.offset_from(start) as usize; + std::ptr::write_bytes(start, 0xffu8, bytes); + } + } } diff --git a/src/policy/largeobjectspace.rs b/src/policy/largeobjectspace.rs index abe7976082..369b913444 100644 --- a/src/policy/largeobjectspace.rs +++ b/src/policy/largeobjectspace.rs @@ -9,6 +9,7 @@ use crate::util::alloc::allocator::AllocationOptions; use crate::util::constants::BYTES_IN_PAGE; use crate::util::heap::{FreeListPageResource, PageResource}; use crate::util::metadata; +use crate::util::object_enum::ClosureObjectEnumerator; use crate::util::object_enum::ObjectEnumerator; use crate::util::opaque_pointer::*; use crate::util::treadmill::TreadMill; @@ -59,6 +60,24 @@ impl SFT for LargeObjectSpace { true } fn initialize_object_metadata(&self, object: ObjectReference, alloc: bool) { + if self.concurrent_marking_active() { + VM::VMObjectModel::LOCAL_LOS_MARK_NURSERY_SPEC.store_atomic::( + object, + self.mark_state, + None, + Ordering::SeqCst, + ); + debug_assert!( + VM::VMObjectModel::GLOBAL_LOG_BIT_SPEC.load_atomic::( + object, + None, + Ordering::Acquire + ) == 0 + ); + + self.treadmill.add_to_treadmill(object, false); + return; + } let old_value = VM::VMObjectModel::LOCAL_LOS_MARK_NURSERY_SPEC.load_atomic::( object, None, @@ -192,6 +211,13 @@ impl Space for LargeObjectSpace { fn enumerate_objects(&self, enumerator: &mut dyn ObjectEnumerator) { self.treadmill.enumerate_objects(enumerator); } + + fn concurrent_marking_active(&self) -> bool { + self.common() + .global_state + .concurrent_marking_active + .load(Ordering::Acquire) + } } use crate::scheduler::GCWorker; @@ -243,6 +269,24 @@ impl LargeObjectSpace { } } + pub fn initial_pause_prepare(&self) { + use crate::util::object_enum::ClosureObjectEnumerator; + + debug_assert!(self.treadmill.is_from_space_empty()); + debug_assert!(self.treadmill.is_nursery_empty()); + let mut enumator = ClosureObjectEnumerator::<_, VM>::new(|object| { + VM::VMObjectModel::GLOBAL_LOG_BIT_SPEC.mark_as_unlogged::(object, Ordering::SeqCst); + }); + self.treadmill.enumerate_objects(&mut enumator); + } + + pub fn final_pause_release(&self) { + let mut enumator = ClosureObjectEnumerator::<_, VM>::new(|object| { + VM::VMObjectModel::GLOBAL_LOG_BIT_SPEC.clear::(object, Ordering::SeqCst); + }); + self.treadmill.enumerate_objects(&mut enumator); + } + pub fn prepare(&mut self, full_heap: bool) { if full_heap { debug_assert!(self.treadmill.is_from_space_empty()); @@ -259,6 +303,7 @@ impl LargeObjectSpace { self.sweep_large_pages(false); } } + // Allow nested-if for this function to make it clear that test_and_mark() is only executed // for the outer condition is met. #[allow(clippy::collapsible_if)] @@ -332,6 +377,10 @@ impl LargeObjectSpace { pages: usize, alloc_options: AllocationOptions, ) -> Address { + self.common() + .global_state + .concurrent_marking_threshold + .fetch_add(pages, Ordering::Relaxed); self.acquire(tls, pages, alloc_options) } @@ -391,6 +440,10 @@ impl LargeObjectSpace { ) & NURSERY_BIT == NURSERY_BIT } + + pub fn is_marked(&self, object: ObjectReference) -> bool { + self.test_mark_bit(object, self.mark_state) + } } fn get_super_page(cell: Address) -> Address { diff --git a/src/policy/space.rs b/src/policy/space.rs index e44874fe5b..a0676d9da0 100644 --- a/src/policy/space.rs +++ b/src/policy/space.rs @@ -433,6 +433,10 @@ pub trait Space: 'static + SFT + Sync + Downcast { /// the execution time. For LOS, it will be cheaper to enumerate individual objects than /// scanning VO bits because it is sparse. fn enumerate_objects(&self, enumerator: &mut dyn ObjectEnumerator); + + fn concurrent_marking_active(&self) -> bool { + false + } } /// Print the VM map for a space. diff --git a/src/scheduler/gc_work.rs b/src/scheduler/gc_work.rs index 7e50c86aa3..c80d12a5e5 100644 --- a/src/scheduler/gc_work.rs +++ b/src/scheduler/gc_work.rs @@ -2,6 +2,7 @@ use super::work_bucket::WorkBucketStage; use super::*; use crate::global_state::GcStatus; use crate::plan::ObjectsClosure; +use crate::plan::Pause; use crate::plan::VectorObjectQueue; use crate::util::*; use crate::vm::slot::Slot; @@ -29,7 +30,14 @@ impl GCWork for ScheduleCollection { mmtk.set_gc_status(GcStatus::GcPrepare); // Let the plan to schedule collection work - mmtk.get_plan().schedule_collection(worker.scheduler()); + if mmtk.is_user_triggered_collection() || is_emergency { + // user triggered collection is always stop-the-world + mmtk.get_plan().schedule_collection(worker.scheduler()); + } else { + // Let the plan to schedule collection work + mmtk.get_plan() + .schedule_concurrent_collection(worker.scheduler()); + } } } @@ -191,11 +199,24 @@ impl GCWork for ReleaseCollector { /// /// TODO: Smaller work granularity #[derive(Default)] -pub struct StopMutators(PhantomData); +pub struct StopMutators { + pause: Pause, + phantom: PhantomData, +} impl StopMutators { pub fn new() -> Self { - Self(PhantomData) + Self { + pause: Pause::Full, + phantom: PhantomData, + } + } + + pub fn new_args(pause: Pause) -> Self { + Self { + pause, + phantom: PhantomData, + } } } @@ -206,9 +227,16 @@ impl GCWork for StopMutators { ::VMCollection::stop_all_mutators(worker.tls, |mutator| { // TODO: The stack scanning work won't start immediately, as the `Prepare` bucket is not opened yet (the bucket is opened in notify_mutators_paused). // Should we push to Unconstrained instead? - mmtk.scheduler.work_buckets[WorkBucketStage::Prepare] - .add(ScanMutatorRoots::(mutator)); + + if self.pause != Pause::FinalMark { + mmtk.scheduler.work_buckets[WorkBucketStage::Prepare] + .add(ScanMutatorRoots::(mutator)); + } else { + mutator.flush(); + } }); + mmtk.scheduler.set_in_gc_pause(true); + mmtk.get_plan().gc_pause_start(&mmtk.scheduler); trace!("stop_all_mutators end"); mmtk.scheduler.notify_mutators_paused(mmtk); mmtk.scheduler.work_buckets[WorkBucketStage::Prepare].add(ScanVMSpecificRoots::::new()); diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index 33e89be0fe..5006edbe1d 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -17,7 +17,7 @@ pub(crate) use scheduler::GCWorkScheduler; mod stat; mod work_counter; -mod work; +pub(crate) mod work; pub use work::GCWork; pub(crate) use work::GCWorkContext; diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index e849f9df07..6a331aed1f 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -14,7 +14,7 @@ use crate::util::options::AffinityKind; use crate::vm::Collection; use crate::vm::VMBinding; use crate::Plan; -use crossbeam::deque::Steal; +use crossbeam::deque::{Injector, Steal}; use enum_map::{Enum, EnumMap}; use std::collections::HashMap; use std::sync::Arc; @@ -29,6 +29,12 @@ pub struct GCWorkScheduler { pub(crate) worker_monitor: Arc, /// How to assign the affinity of each GC thread. Specified by the user. affinity: AffinityKind, + + pub(super) postponed_concurrent_work: + spin::RwLock>>>, + pub(super) postponed_concurrent_work_prioritized: + spin::RwLock>>>, + in_gc_pause: std::sync::atomic::AtomicBool, } // FIXME: GCWorkScheduler should be naturally Sync, but we cannot remove this `impl` yet. @@ -47,12 +53,25 @@ impl GCWorkScheduler { WorkBucket::new(active, worker_monitor.clone()) }); + work_buckets[WorkBucketStage::Unconstrained].enable_prioritized_queue(); + // Set the open condition of each bucket. { let first_stw_stage = WorkBucketStage::first_stw_stage(); let mut open_stages: Vec = vec![first_stw_stage]; let stages = (0..WorkBucketStage::LENGTH).map(WorkBucketStage::from_usize); for stage in stages { + { + if stage == WorkBucketStage::ConcurrentSentinel { + work_buckets[stage].set_open_condition( + move |scheduler: &GCWorkScheduler| { + scheduler.work_buckets[WorkBucketStage::Unconstrained].is_drained() + }, + ); + open_stages.push(stage); + continue; + } + } // Unconstrained is always open. // The first STW stage (Prepare) will be opened when the world stopped // (i.e. when all mutators are suspended). @@ -75,9 +94,44 @@ impl GCWorkScheduler { worker_group, worker_monitor, affinity, + postponed_concurrent_work: spin::RwLock::new(crossbeam::deque::Injector::new()), + postponed_concurrent_work_prioritized: spin::RwLock::new( + crossbeam::deque::Injector::new(), + ), + in_gc_pause: std::sync::atomic::AtomicBool::new(false), }) } + pub fn postpone(&self, w: impl GCWork) { + self.postponed_concurrent_work.read().push(Box::new(w)) + } + + pub fn postpone_prioritized(&self, w: impl GCWork) { + self.postponed_concurrent_work_prioritized + .read() + .push(Box::new(w)) + } + + pub fn postpone_dyn(&self, w: Box>) { + self.postponed_concurrent_work.read().push(w) + } + + pub fn postpone_dyn_prioritized(&self, w: Box>) { + self.postponed_concurrent_work_prioritized.read().push(w) + } + + pub fn postpone_all(&self, ws: Vec>>) { + let postponed_concurrent_work = self.postponed_concurrent_work.read(); + ws.into_iter() + .for_each(|w| postponed_concurrent_work.push(w)); + } + + pub fn postpone_all_prioritized(&self, ws: Vec>>) { + let postponed_concurrent_work = self.postponed_concurrent_work_prioritized.read(); + ws.into_iter() + .for_each(|w| postponed_concurrent_work.push(w)); + } + pub fn num_workers(&self) -> usize { self.worker_group.as_ref().worker_count() } @@ -289,6 +343,7 @@ impl GCWorkScheduler { self.work_buckets.iter().for_each(|(id, bkt)| { if id != WorkBucketStage::Unconstrained { bkt.deactivate(); + bkt.set_as_enabled(); } }); } @@ -298,6 +353,7 @@ impl GCWorkScheduler { self.work_buckets.iter().for_each(|(id, bkt)| { if id != WorkBucketStage::Unconstrained && id != first_stw_stage { bkt.deactivate(); + bkt.set_as_enabled(); } }); } @@ -330,6 +386,18 @@ impl GCWorkScheduler { } } + pub(super) fn set_in_gc_pause(&self, in_gc_pause: bool) { + self.in_gc_pause + .store(in_gc_pause, std::sync::atomic::Ordering::SeqCst); + for wb in self.work_buckets.values() { + wb.set_in_concurrent(!in_gc_pause); + } + } + + pub fn in_concurrent(&self) -> bool { + !self.in_gc_pause.load(std::sync::atomic::Ordering::SeqCst) + } + /// Get a schedulable work packet without retry. fn poll_schedulable_work_once(&self, worker: &GCWorker) -> Steal>> { let mut should_retry = false; @@ -524,6 +592,8 @@ impl GCWorkScheduler { let mmtk = worker.mmtk; + let (queue, pqueue) = self.schedule_postponed_concurrent_packets(); + // Tell GC trigger that GC ended - this happens before we resume mutators. mmtk.gc_trigger.policy.on_gc_end(mmtk); @@ -574,13 +644,17 @@ impl GCWorkScheduler { // reset the logging info at the end of each GC mmtk.slot_logger.reset(); } - + mmtk.get_plan().gc_pause_end(); // Reset the triggering information. mmtk.state.reset_collection_trigger(); // Set to NotInGC after everything, and right before resuming mutators. mmtk.set_gc_status(GcStatus::NotInGC); ::VMCollection::resume_mutators(worker.tls); + + self.set_in_gc_pause(false); + self.schedule_concurrent_packets(queue, pqueue); + self.debug_assert_all_buckets_deactivated(); } pub fn enable_stat(&self) { @@ -613,4 +687,47 @@ impl GCWorkScheduler { first_stw_bucket.activate(); self.worker_monitor.notify_work_available(true); } + + fn schedule_postponed_concurrent_packets( + &self, + ) -> (Injector>>, Injector>>) { + let mut queue = Injector::new(); + type Q = Injector>>; + std::mem::swap::>(&mut queue, &mut self.postponed_concurrent_work.write()); + + let mut pqueue = Injector::new(); + std::mem::swap::>( + &mut pqueue, + &mut self.postponed_concurrent_work_prioritized.write(), + ); + (queue, pqueue) + } + + pub(super) fn schedule_concurrent_packets( + &self, + queue: Injector>>, + pqueue: Injector>>, + ) { + // crate::MOVE_CONCURRENT_MARKING_TO_STW.store(false, Ordering::SeqCst); + // crate::PAUSE_CONCURRENT_MARKING.store(false, Ordering::SeqCst); + let mut notify = false; + if !queue.is_empty() { + let old_queue = self.work_buckets[WorkBucketStage::Unconstrained].swap_queue(queue); + debug_assert!(old_queue.is_empty()); + notify = true; + } + if !pqueue.is_empty() { + let old_queue = + self.work_buckets[WorkBucketStage::Unconstrained].swap_queue_prioritized(pqueue); + debug_assert!(old_queue.is_empty()); + notify = true; + } + if notify { + self.wakeup_all_concurrent_workers(); + } + } + + pub fn wakeup_all_concurrent_workers(&self) { + self.worker_monitor.notify_work_available(true); + } } diff --git a/src/scheduler/work_bucket.rs b/src/scheduler/work_bucket.rs index ab55093bad..563c43a9b5 100644 --- a/src/scheduler/work_bucket.rs +++ b/src/scheduler/work_bucket.rs @@ -7,34 +7,35 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; struct BucketQueue { - queue: Injector>>, + // queue: Injector>>, + queue: std::sync::RwLock>>>, } impl BucketQueue { fn new() -> Self { Self { - queue: Injector::new(), + queue: std::sync::RwLock::new(Injector::new()), } } fn is_empty(&self) -> bool { - self.queue.is_empty() + self.queue.read().unwrap().is_empty() } fn steal_batch_and_pop( &self, dest: &Worker>>, ) -> Steal>> { - self.queue.steal_batch_and_pop(dest) + self.queue.read().unwrap().steal_batch_and_pop(dest) } fn push(&self, w: Box>) { - self.queue.push(w); + self.queue.read().unwrap().push(w); } fn push_all(&self, ws: Vec>>) { for w in ws { - self.queue.push(w); + self.queue.read().unwrap().push(w); } } } @@ -59,6 +60,8 @@ pub struct WorkBucket { /// recursively, such as ephemerons and Java-style SoftReference and finalizers. Sentinels /// can be used repeatedly to discover and process more such objects. sentinel: Mutex>>>, + in_concurrent: AtomicBool, + disable: AtomicBool, } impl WorkBucket { @@ -70,9 +73,55 @@ impl WorkBucket { monitor, can_open: None, sentinel: Mutex::new(None), + in_concurrent: AtomicBool::new(true), + disable: AtomicBool::new(false), } } + pub fn set_in_concurrent(&self, in_concurrent: bool) { + self.in_concurrent.store(in_concurrent, Ordering::SeqCst); + } + + pub fn set_as_enabled(&self) { + self.disable.store(false, Ordering::SeqCst) + } + + pub fn set_as_disabled(&self) { + self.disable.store(true, Ordering::SeqCst) + } + + pub fn disabled(&self) -> bool { + self.disable.load(Ordering::Relaxed) + } + + pub fn enable_prioritized_queue(&mut self) { + self.prioritized_queue = Some(BucketQueue::new()); + } + + pub fn swap_queue( + &self, + mut new_queue: Injector>>, + ) -> Injector>> { + let mut queue = self.queue.queue.write().unwrap(); + std::mem::swap::>>>(&mut queue, &mut new_queue); + new_queue + } + + pub fn swap_queue_prioritized( + &self, + mut new_queue: Injector>>, + ) -> Injector>> { + let mut queue = self + .prioritized_queue + .as_ref() + .unwrap() + .queue + .write() + .unwrap(); + std::mem::swap::>>>(&mut queue, &mut new_queue); + new_queue + } + fn notify_one_worker(&self) { // If the bucket is not activated, don't notify anyone. if !self.is_activated() { @@ -240,6 +289,8 @@ impl WorkBucket { pub enum WorkBucketStage { /// This bucket is always open. Unconstrained, + Initial, + ConcurrentSentinel, /// Preparation work. Plans, spaces, GC workers, mutators, etc. should be prepared for GC at /// this stage. Prepare, diff --git a/src/util/address.rs b/src/util/address.rs index c87a5d3abb..45192157d0 100644 --- a/src/util/address.rs +++ b/src/util/address.rs @@ -465,6 +465,7 @@ mod tests { } } +use crate::plan::SlotIterator; use crate::vm::VMBinding; /// `ObjectReference` represents address for an object. Compared with `Address`, operations allowed @@ -699,6 +700,10 @@ impl ObjectReference { pub fn is_sane(self) -> bool { unsafe { SFT_MAP.get_unchecked(self.to_raw_address()) }.is_sane() } + + pub fn iterate_fields(self, f: F) { + SlotIterator::::iterate(self, f) + } } /// allows print Address as upper-case hex value diff --git a/src/util/alloc/immix_allocator.rs b/src/util/alloc/immix_allocator.rs index 807ddded90..bf910c1e33 100644 --- a/src/util/alloc/immix_allocator.rs +++ b/src/util/alloc/immix_allocator.rs @@ -265,6 +265,21 @@ impl ImmixAllocator { // Update the hole-searching cursor to None. Some(end_line) }; + // mark objects if concurrent marking is active + if self.immix_space().concurrent_marking_active() { + let state = self + .space + .line_mark_state + .load(std::sync::atomic::Ordering::Acquire); + + for line in + crate::util::linear_scan::RegionIterator::::new(start_line, end_line) + { + line.mark(state); + } + + Line::initialize_mark_table_as_marked::(start_line..end_line); + } return true; } else { // No more recyclable lines. Set the hole-searching cursor to None. @@ -305,6 +320,20 @@ impl ImmixAllocator { // Bulk clear stale line mark state Line::MARK_TABLE .bzero_metadata(block.start(), crate::policy::immix::block::Block::BYTES); + // mark objects if concurrent marking is active + if self.immix_space().concurrent_marking_active() { + let state = self + .space + .line_mark_state + .load(std::sync::atomic::Ordering::Acquire); + for line in block.lines() { + line.mark(state); + } + + Line::initialize_mark_table_as_marked::( + block.start_line()..block.end_line(), + ); + } if self.request_for_large { self.large_bump_pointer.cursor = block.start(); self.large_bump_pointer.limit = block.end(); diff --git a/src/util/options.rs b/src/util/options.rs index abfada69c8..6adf7d3b84 100644 --- a/src/util/options.rs +++ b/src/util/options.rs @@ -48,6 +48,8 @@ pub enum PlanSelector { MarkCompact, /// An Immix collector that uses a sticky mark bit to allow generational behaviors without a copying nursery. StickyImmix, + /// Concurrent non-moving immix using SATB + ConcurrentImmix, } /// MMTk option for perf events diff --git a/src/vm/collection.rs b/src/vm/collection.rs index 16e87eebe0..98121317b9 100644 --- a/src/vm/collection.rs +++ b/src/vm/collection.rs @@ -162,4 +162,7 @@ pub trait Collection { fn create_gc_trigger() -> Box> { unimplemented!() } + + /// Inform the VM of concurrent marking status + fn set_concurrent_marking_state(_active: bool) {} } From 9ac22bd52c2563fc93ae6f9b85304ecab02188e9 Mon Sep 17 00:00:00 2001 From: Tianle Qiu Date: Fri, 25 Jul 2025 05:11:20 +0000 Subject: [PATCH 02/18] minor --- src/plan/concurrent/immix/global.rs | 1 + src/plan/plan_constraints.rs | 2 ++ src/policy/immix/mod.rs | 2 +- src/policy/largeobjectspace.rs | 5 +++-- src/policy/space.rs | 2 ++ 5 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/plan/concurrent/immix/global.rs b/src/plan/concurrent/immix/global.rs index bb8d7f6879..4da77ec505 100644 --- a/src/plan/concurrent/immix/global.rs +++ b/src/plan/concurrent/immix/global.rs @@ -64,6 +64,7 @@ pub const CONCURRENT_IMMIX_CONSTRAINTS: PlanConstraints = PlanConstraints { max_non_los_default_alloc_bytes: crate::policy::immix::MAX_IMMIX_OBJECT_SIZE, needs_prepare_mutator: true, barrier: crate::BarrierSelector::SATBBarrier, + needs_satb: true, ..PlanConstraints::default() }; diff --git a/src/plan/plan_constraints.rs b/src/plan/plan_constraints.rs index 3110eb7538..6025020c10 100644 --- a/src/plan/plan_constraints.rs +++ b/src/plan/plan_constraints.rs @@ -45,6 +45,7 @@ pub struct PlanConstraints { /// `MutatorConfig::prepare_func`). Those plans can set this to `false` so that the /// `PrepareMutator` work packets will not be created at all. pub needs_prepare_mutator: bool, + pub needs_satb: bool, } impl PlanConstraints { @@ -67,6 +68,7 @@ impl PlanConstraints { barrier: BarrierSelector::NoBarrier, // If we use mark sweep as non moving space, we need to prepare mutator. See [`common_prepare_func`]. needs_prepare_mutator: cfg!(feature = "marksweep_as_nonmoving"), + needs_satb: false, } } } diff --git a/src/policy/immix/mod.rs b/src/policy/immix/mod.rs index d5895e9470..7243832b3a 100644 --- a/src/policy/immix/mod.rs +++ b/src/policy/immix/mod.rs @@ -16,4 +16,4 @@ pub const BLOCK_ONLY: bool = false; /// Mark lines when scanning objects. /// Otherwise, do it at mark time. -pub const MARK_LINE_AT_SCAN_TIME: bool = true; +pub const MARK_LINE_AT_SCAN_TIME: bool = false; diff --git a/src/policy/largeobjectspace.rs b/src/policy/largeobjectspace.rs index 369b913444..2563235e3a 100644 --- a/src/policy/largeobjectspace.rs +++ b/src/policy/largeobjectspace.rs @@ -274,6 +274,7 @@ impl LargeObjectSpace { debug_assert!(self.treadmill.is_from_space_empty()); debug_assert!(self.treadmill.is_nursery_empty()); + debug_assert!(self.common.needs_satb); let mut enumator = ClosureObjectEnumerator::<_, VM>::new(|object| { VM::VMObjectModel::GLOBAL_LOG_BIT_SPEC.mark_as_unlogged::(object, Ordering::SeqCst); }); @@ -289,7 +290,7 @@ impl LargeObjectSpace { pub fn prepare(&mut self, full_heap: bool) { if full_heap { - debug_assert!(self.treadmill.is_from_space_empty()); + // debug_assert!(self.treadmill.is_from_space_empty()); self.mark_state = MARK_BIT - self.mark_state; } self.treadmill.flip(full_heap); @@ -353,7 +354,7 @@ impl LargeObjectSpace { #[cfg(feature = "vo_bit")] crate::util::metadata::vo_bit::unset_vo_bit(object); // Clear log bits for dead objects to prevent a new nursery object having the unlog bit set - if self.common.needs_log_bit { + if self.common.needs_log_bit || self.common.needs_satb { VM::VMObjectModel::GLOBAL_LOG_BIT_SPEC.clear::(object, Ordering::SeqCst); } self.pr diff --git a/src/policy/space.rs b/src/policy/space.rs index a0676d9da0..80bbc2b424 100644 --- a/src/policy/space.rs +++ b/src/policy/space.rs @@ -528,6 +528,7 @@ pub struct CommonSpace { /// This field equals to needs_log_bit in the plan constraints. // TODO: This should be a constant for performance. pub needs_log_bit: bool, + pub needs_satb: bool, /// A lock used during acquire() to make sure only one thread can allocate. pub acquire_lock: Mutex<()>, @@ -598,6 +599,7 @@ impl CommonSpace { vm_map: args.plan_args.vm_map, mmapper: args.plan_args.mmapper, needs_log_bit: args.plan_args.constraints.needs_log_bit, + needs_satb: args.plan_args.constraints.needs_satb, gc_trigger: args.plan_args.gc_trigger, metadata: SideMetadataContext { global: args.plan_args.global_side_metadata_specs, From 100c04969bb6241339eaa45eba0e9b9ba219ebb3 Mon Sep 17 00:00:00 2001 From: Tianle Qiu Date: Mon, 28 Jul 2025 04:39:20 +0000 Subject: [PATCH 03/18] minor --- src/scheduler/scheduler.rs | 19 +++++++++++-------- .../mock_tests/mock_test_allocator_info.rs | 1 + 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index 6a331aed1f..eb40544028 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -20,6 +20,8 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Instant; +type PostponeQueue = Injector>>; + pub struct GCWorkScheduler { /// Work buckets pub work_buckets: EnumMap>, @@ -688,15 +690,16 @@ impl GCWorkScheduler { self.worker_monitor.notify_work_available(true); } - fn schedule_postponed_concurrent_packets( - &self, - ) -> (Injector>>, Injector>>) { + fn schedule_postponed_concurrent_packets(&self) -> (PostponeQueue, PostponeQueue) { let mut queue = Injector::new(); - type Q = Injector>>; - std::mem::swap::>(&mut queue, &mut self.postponed_concurrent_work.write()); + + std::mem::swap::>( + &mut queue, + &mut self.postponed_concurrent_work.write(), + ); let mut pqueue = Injector::new(); - std::mem::swap::>( + std::mem::swap::>( &mut pqueue, &mut self.postponed_concurrent_work_prioritized.write(), ); @@ -705,8 +708,8 @@ impl GCWorkScheduler { pub(super) fn schedule_concurrent_packets( &self, - queue: Injector>>, - pqueue: Injector>>, + queue: PostponeQueue, + pqueue: PostponeQueue, ) { // crate::MOVE_CONCURRENT_MARKING_TO_STW.store(false, Ordering::SeqCst); // crate::PAUSE_CONCURRENT_MARKING.store(false, Ordering::SeqCst); diff --git a/src/vm/tests/mock_tests/mock_test_allocator_info.rs b/src/vm/tests/mock_tests/mock_test_allocator_info.rs index fc288e8041..a856278e94 100644 --- a/src/vm/tests/mock_tests/mock_test_allocator_info.rs +++ b/src/vm/tests/mock_tests/mock_test_allocator_info.rs @@ -29,6 +29,7 @@ pub fn test_allocator_info() { | PlanSelector::GenImmix | PlanSelector::MarkCompact | PlanSelector::Compressor + | PlanSelector::ConcurrentImmix | PlanSelector::StickyImmix => { // These plans all use bump pointer allocator. let AllocatorInfo::BumpPointer { From 2bbb200c195ac372acaa862e95c67fd3094f58a7 Mon Sep 17 00:00:00 2001 From: Yi Lin Date: Thu, 31 Jul 2025 02:17:19 +0000 Subject: [PATCH 04/18] Add ref/finalizer packets for final pause. Use log instead of println. --- src/plan/concurrent/immix/global.rs | 32 +++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/src/plan/concurrent/immix/global.rs b/src/plan/concurrent/immix/global.rs index 4da77ec505..ed52096212 100644 --- a/src/plan/concurrent/immix/global.rs +++ b/src/plan/concurrent/immix/global.rs @@ -255,7 +255,7 @@ impl Plan for ConcurrentImmix { self.set_concurrent_marking_state(false); } } - println!("{:?} start", pause); + info!("{:?} start", pause); } fn gc_pause_end(&self) { @@ -265,7 +265,7 @@ impl Plan for ConcurrentImmix { } self.previous_pause.store(Some(pause), Ordering::SeqCst); self.current_pause.store(None, Ordering::SeqCst); - println!("{:?} end", pause); + info!("{:?} end", pause); } } @@ -410,6 +410,34 @@ impl ConcurrentImmix { scheduler.work_buckets[WorkBucketStage::Release].add(Release::< ConcurrentImmixGCWorkContext>, >::new(self)); + + // Deal with weak ref and finalizers + // TODO: Check against schedule_common_work and see if we are still missing any work packet + type RefProcessingEdges = + crate::scheduler::gc_work::PlanProcessEdges, TRACE_KIND_FAST>; + // Reference processing + if !*self.base().options.no_reference_types { + use crate::util::reference_processor::{ + PhantomRefProcessing, SoftRefProcessing, WeakRefProcessing, + }; + scheduler.work_buckets[WorkBucketStage::SoftRefClosure] + .add(SoftRefProcessing::>::new()); + scheduler.work_buckets[WorkBucketStage::WeakRefClosure] + .add(WeakRefProcessing::::new()); + scheduler.work_buckets[WorkBucketStage::PhantomRefClosure] + .add(PhantomRefProcessing::::new()); + + use crate::util::reference_processor::RefEnqueue; + scheduler.work_buckets[WorkBucketStage::Release].add(RefEnqueue::::new()); + } + + // Finalization + if !*self.base().options.no_finalizer { + use crate::util::finalizable_processor::{Finalization, ForwardFinalization}; + // finalization + scheduler.work_buckets[WorkBucketStage::FinalRefClosure] + .add(Finalization::>::new()); + } } pub fn concurrent_marking_in_progress(&self) -> bool { From 90f451886da0cceb0c87d770ac9294192cda7355 Mon Sep 17 00:00:00 2001 From: Yi Lin Date: Thu, 31 Jul 2025 05:35:52 +0000 Subject: [PATCH 05/18] schedule_concurrent_packets before resuming mutators --- src/scheduler/scheduler.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index eb40544028..5b5448c1d8 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -650,13 +650,13 @@ impl GCWorkScheduler { // Reset the triggering information. mmtk.state.reset_collection_trigger(); - // Set to NotInGC after everything, and right before resuming mutators. - mmtk.set_gc_status(GcStatus::NotInGC); - ::VMCollection::resume_mutators(worker.tls); - self.set_in_gc_pause(false); self.schedule_concurrent_packets(queue, pqueue); self.debug_assert_all_buckets_deactivated(); + + // Set to NotInGC after everything, and right before resuming mutators. + mmtk.set_gc_status(GcStatus::NotInGC); + ::VMCollection::resume_mutators(worker.tls); } pub fn enable_stat(&self) { From 521adea97dd394a2c46c6aa07a5abb05f83791f3 Mon Sep 17 00:00:00 2001 From: Kunshan Wang Date: Thu, 31 Jul 2025 16:55:02 +0800 Subject: [PATCH 06/18] eBPF tracing tools for concurrent Immix --- .../concurrent/concurrent_marking_work.rs | 19 ++++++++++++--- src/plan/gc_requester.rs | 1 + src/scheduler/scheduler.rs | 1 + tools/tracing/timeline/capture.bt | 15 ++++++++++++ tools/tracing/timeline/visualize.py | 24 +++++++++++++++++++ 5 files changed, 57 insertions(+), 3 deletions(-) diff --git a/src/plan/concurrent/concurrent_marking_work.rs b/src/plan/concurrent/concurrent_marking_work.rs index 6252b81eab..113776d9f2 100644 --- a/src/plan/concurrent/concurrent_marking_work.rs +++ b/src/plan/concurrent/concurrent_marking_work.rs @@ -35,7 +35,10 @@ impl ConcurrentTraceObjects { .get_plan() .downcast_ref::>() .unwrap(); - crate::NUM_CONCURRENT_TRACING_PACKETS.fetch_add(1, Ordering::SeqCst); + let old_value = crate::NUM_CONCURRENT_TRACING_PACKETS.fetch_add(1, Ordering::SeqCst); + let new_value = old_value + 1; + probe!(mmtk, num_concurrent_tracing_packets_change, new_value); + Self { plan, objects: Some(objects), @@ -106,9 +109,13 @@ unsafe impl Send for ConcurrentTraceObjects {} impl GCWork for ConcurrentTraceObjects { fn do_work(&mut self, worker: &mut GCWorker, _mmtk: &'static MMTK) { self.worker = worker; + let mut num_objects = 0; + let mut num_next_objects = 0; + let mut iterations = 0; // mark objects if let Some(objects) = self.objects.take() { - self.trace_objects(&objects) + self.trace_objects(&objects); + num_objects = objects.len(); } let pause_opt = self.plan.current_pause(); if pause_opt == Some(Pause::FinalMark) || pause_opt.is_none() { @@ -121,10 +128,16 @@ impl GCWork for ConcurrentTraceObjects { next_objects.clear(); self.next_objects.swap(&mut next_objects); self.trace_objects(&next_objects); + num_next_objects += next_objects.len(); + iterations += 1; } } + probe!(mmtk, concurrent_trace_objects, num_objects, num_next_objects, iterations); self.flush(); - crate::NUM_CONCURRENT_TRACING_PACKETS.fetch_sub(1, Ordering::SeqCst); + + let old_value = crate::NUM_CONCURRENT_TRACING_PACKETS.fetch_sub(1, Ordering::SeqCst); + let new_value = old_value - 1; + probe!(mmtk, num_concurrent_tracing_packets_change, new_value); } } diff --git a/src/plan/gc_requester.rs b/src/plan/gc_requester.rs index e3a8462f96..944558184d 100644 --- a/src/plan/gc_requester.rs +++ b/src/plan/gc_requester.rs @@ -30,6 +30,7 @@ impl GCRequester { // `GCWorkScheduler::request_schedule_collection` needs to hold a mutex to communicate // with GC workers, which is expensive for functions like `poll`. We use the atomic // flag `request_flag` to elide the need to acquire the mutex in subsequent calls. + probe!(mmtk, gcrequester_request); self.scheduler.request_schedule_collection(); } } diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index 5b5448c1d8..2f37702104 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -188,6 +188,7 @@ impl GCWorkScheduler { /// Add the `ScheduleCollection` packet. Called by the last parked worker. fn add_schedule_collection_packet(&self) { // We are still holding the mutex `WorkerMonitor::sync`. Do not notify now. + probe!(mmtk, add_schedule_collection_packet); self.work_buckets[WorkBucketStage::Unconstrained].add_no_notify(ScheduleCollection); } diff --git a/tools/tracing/timeline/capture.bt b/tools/tracing/timeline/capture.bt index b076f1cd18..5fd3a072aa 100644 --- a/tools/tracing/timeline/capture.bt +++ b/tools/tracing/timeline/capture.bt @@ -124,4 +124,19 @@ usdt:$MMTK:mmtk:sweep_chunk { } } +usdt:$MMTK:mmtk:concurrent_trace_objects { + printf("concurrent_trace_objects,meta,%d,%lu,%lu,%lu,%lu\n", tid, nsecs, arg0, arg1, arg2); +} + +usdt:$MMTK:mmtk:gcrequester_request { + printf("gcrequester_request,i,%d,%lu\n", tid, nsecs); +} + +usdt:$MMTK:mmtk:add_schedule_collection_packet { + printf("add_schedule_collection_packet,i,%d,%lu\n", tid, nsecs); +} + +usdt:$MMTK:mmtk:num_concurrent_tracing_packets_change { + printf("num_concurrent_tracing_packets_change,C,%d,%lu,%lu\n", tid, nsecs, arg0); +} // vim: ft=bpftrace ts=4 sw=4 sts=4 et diff --git a/tools/tracing/timeline/visualize.py b/tools/tracing/timeline/visualize.py index ca45845a44..7ad34df1cf 100755 --- a/tools/tracing/timeline/visualize.py +++ b/tools/tracing/timeline/visualize.py @@ -146,6 +146,15 @@ def enrich_event(self, name, ph, tid, ts, result, args): "stage": int(args[0]), } + case "gcrequester_request": + result["tid"] = 1 + + case "num_concurrent_tracing_packets_change": + result["name"] = "Concurrent tracing packets" + result["args"] |= { + "number": int(args[0]), + } + case _: if self.enrich_event_extra is not None: # Call ``enrich_event_extra`` in the extension script if defined. @@ -241,6 +250,21 @@ def enrich_meta(self, name, tid, ts, gc, wp, args): } } + case "concurrent_trace_objects": + objects = int(args[0]) + next_objects = int(args[1]) + iterations = int(args[2]) + total_objects = objects + next_objects + wp["args"] |= { + # Put args in a group. See comments in "process_slots". + "scan_objects": { + "objects": objects, + "next_objects": next_objects, + "total_objects": total_objects, + "iterations": iterations, + } + } + case "sweep_chunk": wp["args"] |= { "allocated_blocks": int(args[0]), From 4f14b2fc0c49d143866a6d9a2e0c21af5184c9e1 Mon Sep 17 00:00:00 2001 From: Kunshan Wang Date: Thu, 31 Jul 2025 17:46:00 +0800 Subject: [PATCH 07/18] Wake up workers immediately for concurrent work If concurrent work packets are scheduled during a GC, the last parked worker now wakes up all GC workers to process the concurrent packets instead of going to sleep. --- src/scheduler/scheduler.rs | 44 ++++++++++++++++++------------------ src/scheduler/work_bucket.rs | 2 -- 2 files changed, 22 insertions(+), 24 deletions(-) diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index 2f37702104..2bec746f68 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -63,17 +63,6 @@ impl GCWorkScheduler { let mut open_stages: Vec = vec![first_stw_stage]; let stages = (0..WorkBucketStage::LENGTH).map(WorkBucketStage::from_usize); for stage in stages { - { - if stage == WorkBucketStage::ConcurrentSentinel { - work_buckets[stage].set_open_condition( - move |scheduler: &GCWorkScheduler| { - scheduler.work_buckets[WorkBucketStage::Unconstrained].is_drained() - }, - ); - open_stages.push(stage); - continue; - } - } // Unconstrained is always open. // The first STW stage (Prepare) will be opened when the world stopped // (i.e. when all mutators are suspended). @@ -507,11 +496,20 @@ impl GCWorkScheduler { LastParkedResult::WakeAll } else { // GC finished. - self.on_gc_finished(worker); + let concurrent_work_scheduled = self.on_gc_finished(worker); // Clear the current goal goals.on_current_goal_completed(); - self.respond_to_requests(worker, goals) + + if concurrent_work_scheduled { + // It was the initial mark pause and scheduled concurrent work. + // Wake up all GC workers to do concurrent work. + LastParkedResult::WakeAll + } else { + // It was an STW GC or the final mark pause of a concurrent GC. + // Respond to another goal. + self.respond_to_requests(worker, goals) + } } } WorkerGoal::StopForFork => { @@ -584,7 +582,9 @@ impl GCWorkScheduler { } /// Called when GC has finished, i.e. when all work packets have been executed. - fn on_gc_finished(&self, worker: &GCWorker) { + /// + /// Return `true` if any concurrent work packets have been scheduled. + fn on_gc_finished(&self, worker: &GCWorker) -> bool { // All GC workers must have parked by now. debug_assert!(!self.worker_group.has_designated_work()); debug_assert!(self.all_buckets_empty()); @@ -652,12 +652,14 @@ impl GCWorkScheduler { mmtk.state.reset_collection_trigger(); self.set_in_gc_pause(false); - self.schedule_concurrent_packets(queue, pqueue); + let concurrent_work_scheduled = self.schedule_concurrent_packets(queue, pqueue); self.debug_assert_all_buckets_deactivated(); // Set to NotInGC after everything, and right before resuming mutators. mmtk.set_gc_status(GcStatus::NotInGC); ::VMCollection::resume_mutators(worker.tls); + + concurrent_work_scheduled } pub fn enable_stat(&self) { @@ -711,24 +713,22 @@ impl GCWorkScheduler { &self, queue: PostponeQueue, pqueue: PostponeQueue, - ) { + ) -> bool { // crate::MOVE_CONCURRENT_MARKING_TO_STW.store(false, Ordering::SeqCst); // crate::PAUSE_CONCURRENT_MARKING.store(false, Ordering::SeqCst); - let mut notify = false; + let mut concurrent_work_scheduled = false; if !queue.is_empty() { let old_queue = self.work_buckets[WorkBucketStage::Unconstrained].swap_queue(queue); debug_assert!(old_queue.is_empty()); - notify = true; + concurrent_work_scheduled = true; } if !pqueue.is_empty() { let old_queue = self.work_buckets[WorkBucketStage::Unconstrained].swap_queue_prioritized(pqueue); debug_assert!(old_queue.is_empty()); - notify = true; - } - if notify { - self.wakeup_all_concurrent_workers(); + concurrent_work_scheduled = true; } + concurrent_work_scheduled } pub fn wakeup_all_concurrent_workers(&self) { diff --git a/src/scheduler/work_bucket.rs b/src/scheduler/work_bucket.rs index 563c43a9b5..cf9fce7ca4 100644 --- a/src/scheduler/work_bucket.rs +++ b/src/scheduler/work_bucket.rs @@ -289,8 +289,6 @@ impl WorkBucket { pub enum WorkBucketStage { /// This bucket is always open. Unconstrained, - Initial, - ConcurrentSentinel, /// Preparation work. Plans, spaces, GC workers, mutators, etc. should be prepared for GC at /// this stage. Prepare, From 865e24cd1cfe825d39b5a8be8a4d39bf66301966 Mon Sep 17 00:00:00 2001 From: Kunshan Wang Date: Thu, 31 Jul 2025 17:48:11 +0800 Subject: [PATCH 08/18] Fix clippy warnings and formatting --- src/plan/concurrent/concurrent_marking_work.rs | 8 +++++++- src/plan/concurrent/immix/global.rs | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/plan/concurrent/concurrent_marking_work.rs b/src/plan/concurrent/concurrent_marking_work.rs index 113776d9f2..ff8cc77187 100644 --- a/src/plan/concurrent/concurrent_marking_work.rs +++ b/src/plan/concurrent/concurrent_marking_work.rs @@ -132,7 +132,13 @@ impl GCWork for ConcurrentTraceObjects { iterations += 1; } } - probe!(mmtk, concurrent_trace_objects, num_objects, num_next_objects, iterations); + probe!( + mmtk, + concurrent_trace_objects, + num_objects, + num_next_objects, + iterations + ); self.flush(); let old_value = crate::NUM_CONCURRENT_TRACING_PACKETS.fetch_sub(1, Ordering::SeqCst); diff --git a/src/plan/concurrent/immix/global.rs b/src/plan/concurrent/immix/global.rs index ed52096212..93228c64d0 100644 --- a/src/plan/concurrent/immix/global.rs +++ b/src/plan/concurrent/immix/global.rs @@ -433,7 +433,7 @@ impl ConcurrentImmix { // Finalization if !*self.base().options.no_finalizer { - use crate::util::finalizable_processor::{Finalization, ForwardFinalization}; + use crate::util::finalizable_processor::Finalization; // finalization scheduler.work_buckets[WorkBucketStage::FinalRefClosure] .add(Finalization::>::new()); From 7b132722a0ac94ed4325fb9f89e5d17cd6848da8 Mon Sep 17 00:00:00 2001 From: Yi Lin Date: Fri, 1 Aug 2025 03:46:25 +0000 Subject: [PATCH 09/18] Move concurrent_marking_active to the plan. Add a flag allocate_as_live for spaces. --- src/global_state.rs | 2 -- src/plan/concurrent/immix/global.rs | 23 ++++++++++++++--------- src/policy/immix/immixspace.rs | 7 ------- src/policy/largeobjectspace.rs | 9 +-------- src/policy/space.rs | 16 ++++++++++++++-- src/util/alloc/immix_allocator.rs | 4 ++-- 6 files changed, 31 insertions(+), 30 deletions(-) diff --git a/src/global_state.rs b/src/global_state.rs index 8abe617b49..317caf0c81 100644 --- a/src/global_state.rs +++ b/src/global_state.rs @@ -49,7 +49,6 @@ pub struct GlobalState { pub(crate) malloc_bytes: AtomicUsize, /// This stores the live bytes and the used bytes (by pages) for each space in last GC. This counter is only updated in the GC release phase. pub(crate) live_bytes_in_last_gc: AtomicRefCell>, - pub(crate) concurrent_marking_active: AtomicBool, pub(crate) concurrent_marking_threshold: AtomicUsize, } @@ -209,7 +208,6 @@ impl Default for GlobalState { malloc_bytes: AtomicUsize::new(0), live_bytes_in_last_gc: AtomicRefCell::new(HashMap::new()), concurrent_marking_threshold: AtomicUsize::new(0), - concurrent_marking_active: AtomicBool::new(false), } } } diff --git a/src/plan/concurrent/immix/global.rs b/src/plan/concurrent/immix/global.rs index 93228c64d0..4424a2f900 100644 --- a/src/plan/concurrent/immix/global.rs +++ b/src/plan/concurrent/immix/global.rs @@ -54,6 +54,7 @@ pub struct ConcurrentImmix { current_pause: Atomic>, previous_pause: Atomic>, gc_cause: Atomic, + concurrent_marking_active: AtomicBool, } /// The plan constraints for the immix plan. @@ -307,6 +308,7 @@ impl ConcurrentImmix { current_pause: Atomic::new(None), previous_pause: Atomic::new(None), gc_cause: Atomic::new(GCCause::Unknown), + concurrent_marking_active: AtomicBool::new(false), }; immix.verify_side_metadata_sanity(); @@ -441,21 +443,24 @@ impl ConcurrentImmix { } pub fn concurrent_marking_in_progress(&self) -> bool { - self.common() - .base - .global_state - .concurrent_marking_active - .load(Ordering::Acquire) + self.concurrent_marking_active.load(Ordering::Acquire) } fn set_concurrent_marking_state(&self, active: bool) { + use crate::plan::global::HasSpaces; use crate::vm::Collection; + // Update the binding about concurrent marking ::VMCollection::set_concurrent_marking_state(active); - self.common() - .base - .global_state - .concurrent_marking_active + + // Tell the spaces to allocate new objects as live + let allocate_object_as_live = active; + self.for_each_space(&mut |space: &dyn Space| { + space.set_allocate_as_live(allocate_object_as_live); + }); + + // Store the state. + self.concurrent_marking_active .store(active, Ordering::SeqCst); } diff --git a/src/policy/immix/immixspace.rs b/src/policy/immix/immixspace.rs index e205f3bc3d..7a051483d6 100644 --- a/src/policy/immix/immixspace.rs +++ b/src/policy/immix/immixspace.rs @@ -203,13 +203,6 @@ impl Space for ImmixSpace { fn enumerate_objects(&self, enumerator: &mut dyn ObjectEnumerator) { object_enum::enumerate_blocks_from_chunk_map::(enumerator, &self.chunk_map); } - - fn concurrent_marking_active(&self) -> bool { - self.common() - .global_state - .concurrent_marking_active - .load(Ordering::Acquire) - } } impl crate::policy::gc_work::PolicyTraceObject for ImmixSpace { diff --git a/src/policy/largeobjectspace.rs b/src/policy/largeobjectspace.rs index 2563235e3a..3a60b19dad 100644 --- a/src/policy/largeobjectspace.rs +++ b/src/policy/largeobjectspace.rs @@ -60,7 +60,7 @@ impl SFT for LargeObjectSpace { true } fn initialize_object_metadata(&self, object: ObjectReference, alloc: bool) { - if self.concurrent_marking_active() { + if self.should_allocate_as_live() { VM::VMObjectModel::LOCAL_LOS_MARK_NURSERY_SPEC.store_atomic::( object, self.mark_state, @@ -211,13 +211,6 @@ impl Space for LargeObjectSpace { fn enumerate_objects(&self, enumerator: &mut dyn ObjectEnumerator) { self.treadmill.enumerate_objects(enumerator); } - - fn concurrent_marking_active(&self) -> bool { - self.common() - .global_state - .concurrent_marking_active - .load(Ordering::Acquire) - } } use crate::scheduler::GCWorker; diff --git a/src/policy/space.rs b/src/policy/space.rs index 80bbc2b424..f6f46e8197 100644 --- a/src/policy/space.rs +++ b/src/policy/space.rs @@ -34,6 +34,7 @@ use crate::util::memory::{self, HugePageSupport, MmapProtection, MmapStrategy}; use crate::vm::VMBinding; use std::marker::PhantomData; +use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::sync::Mutex; @@ -434,8 +435,16 @@ pub trait Space: 'static + SFT + Sync + Downcast { /// scanning VO bits because it is sparse. fn enumerate_objects(&self, enumerator: &mut dyn ObjectEnumerator); - fn concurrent_marking_active(&self) -> bool { - false + fn set_allocate_as_live(&self, live: bool) { + self.common() + .allocate_as_live + .store(live, std::sync::atomic::Ordering::SeqCst); + } + + fn should_allocate_as_live(&self) -> bool { + self.common() + .allocate_as_live + .load(std::sync::atomic::Ordering::Acquire) } } @@ -537,6 +546,8 @@ pub struct CommonSpace { pub global_state: Arc, pub options: Arc, + pub allocate_as_live: AtomicBool, + p: PhantomData, } @@ -608,6 +619,7 @@ impl CommonSpace { acquire_lock: Mutex::new(()), global_state: args.plan_args.global_state, options: args.plan_args.options.clone(), + allocate_as_live: AtomicBool::new(false), p: PhantomData, }; diff --git a/src/util/alloc/immix_allocator.rs b/src/util/alloc/immix_allocator.rs index bf910c1e33..597b1db4ff 100644 --- a/src/util/alloc/immix_allocator.rs +++ b/src/util/alloc/immix_allocator.rs @@ -266,7 +266,7 @@ impl ImmixAllocator { Some(end_line) }; // mark objects if concurrent marking is active - if self.immix_space().concurrent_marking_active() { + if self.immix_space().should_allocate_as_live() { let state = self .space .line_mark_state @@ -321,7 +321,7 @@ impl ImmixAllocator { Line::MARK_TABLE .bzero_metadata(block.start(), crate::policy::immix::block::Block::BYTES); // mark objects if concurrent marking is active - if self.immix_space().concurrent_marking_active() { + if self.immix_space().should_allocate_as_live() { let state = self .space .line_mark_state From 1db4743e6fe3ec837c2a1b0c3c1f3944e2b825a8 Mon Sep 17 00:00:00 2001 From: Kunshan Wang Date: Fri, 1 Aug 2025 14:42:25 +0800 Subject: [PATCH 10/18] Rename load_reference to load_weak_reference We avoid using the term "reference" to refer to weak reference objects as in JVM. We explicitly mention "weak reference" instead. Also added comments to clarify the purpose of that barrier function. --- src/plan/barriers.rs | 20 +++++++++++++++----- src/plan/concurrent/barrier.rs | 11 ++++++++++- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/src/plan/barriers.rs b/src/plan/barriers.rs index 07152bbde7..e81b3166eb 100644 --- a/src/plan/barriers.rs +++ b/src/plan/barriers.rs @@ -46,8 +46,18 @@ impl BarrierSelector { pub trait Barrier: 'static + Send + Downcast { fn flush(&mut self) {} - /// load referent from java.lang.Reference - fn load_reference(&mut self, _referent: ObjectReference) {} + /// Weak reference loading barrier. A mutator should call this when loading from a weak + /// reference field, for example, when executing `java.lang.ref.Reference.get()` in JVM, or + /// loading from a global weak table in CRuby. + /// + /// Note: Merely loading from a field holding weak reference into a local variable will create a + /// strong reference from the stack to the referent, changing its reachablilty from weakly + /// reachable to strongly reachable. Concurrent garbage collectors may need to handle such + /// events specially. See [SATBBarrier::load_weak_reference] for a concrete example. + /// + /// Arguments: + /// * `referent`: The referent object which the weak reference is pointing to. + fn load_weak_reference(&mut self, _referent: ObjectReference) {} /// Subsuming barrier for object reference write fn object_reference_write( @@ -166,7 +176,7 @@ pub trait BarrierSemantics: 'static + Send { /// Object will probably be modified fn object_probable_write_slow(&mut self, _obj: ObjectReference) {} - fn load_reference(&mut self, _o: ObjectReference) {} + fn load_weak_reference(&mut self, _o: ObjectReference) {} fn object_reference_clone_pre(&mut self, _obj: ObjectReference) {} } @@ -280,8 +290,8 @@ impl Barrier for SATBBarrier { self.semantics.flush(); } - fn load_reference(&mut self, o: ObjectReference) { - self.semantics.load_reference(o) + fn load_weak_reference(&mut self, o: ObjectReference) { + self.semantics.load_weak_reference(o) } fn object_reference_clone_pre(&mut self, obj: ObjectReference) { diff --git a/src/plan/concurrent/barrier.rs b/src/plan/concurrent/barrier.rs index cb76910147..716ea1144e 100644 --- a/src/plan/concurrent/barrier.rs +++ b/src/plan/concurrent/barrier.rs @@ -128,7 +128,16 @@ impl BarrierSemantics for SATBBarrierSemantics { } } - fn load_reference(&mut self, o: ObjectReference) { + /// Enqueue the referent during concurrent marking. + /// + /// Note: During concurrent marking, a collector based on snapshot-at-the-beginning (SATB) will + /// not reach objects that were weakly reachable at the time of `InitialMark`. But if a mutator + /// loads from a weak reference field during concurrent marking, it will make the referent + /// strongly reachable, yet the referent is still not part of the SATB. We must conservatively + /// enqueue the referent even though its reachability has not yet been established, otherwise it + /// (and its children) may be treated as garbage if it happened to be weakly reachable at the + /// time of `InitialMark`. + fn load_weak_reference(&mut self, o: ObjectReference) { if !self.immix.concurrent_marking_in_progress() { return; } From 5c3870467022c50461c7b2ade45a351d0aa427a6 Mon Sep 17 00:00:00 2001 From: Kunshan Wang Date: Fri, 1 Aug 2025 17:15:02 +0800 Subject: [PATCH 11/18] Replace `swap` with `replace` and `take` Clippy on the stable version of Rust seems to erroniously consider `std::mem::swap(&mut a, &mut lg)` as swapping with a temporary, where `lg` is a lock guard. We may fix the warning by writing `&mut *lg`, but the current use cases can all be expressed more concisely using `std::mem::replace` and `std::mem::take`. See: https://rust-lang.github.io/rust-clippy/master/index.html#swap_with_temporary --- src/plan/concurrent/concurrent_marking_work.rs | 4 +--- src/plan/tracing.rs | 5 ----- src/scheduler/scheduler.rs | 18 ++++-------------- src/scheduler/work_bucket.rs | 14 ++++++-------- 4 files changed, 11 insertions(+), 30 deletions(-) diff --git a/src/plan/concurrent/concurrent_marking_work.rs b/src/plan/concurrent/concurrent_marking_work.rs index ff8cc77187..660f0e69af 100644 --- a/src/plan/concurrent/concurrent_marking_work.rs +++ b/src/plan/concurrent/concurrent_marking_work.rs @@ -119,14 +119,12 @@ impl GCWork for ConcurrentTraceObjects { } let pause_opt = self.plan.current_pause(); if pause_opt == Some(Pause::FinalMark) || pause_opt.is_none() { - let mut next_objects = vec![]; while !self.next_objects.is_empty() { let pause_opt = self.plan.current_pause(); if !(pause_opt == Some(Pause::FinalMark) || pause_opt.is_none()) { break; } - next_objects.clear(); - self.next_objects.swap(&mut next_objects); + let next_objects = self.next_objects.take(); self.trace_objects(&next_objects); num_next_objects += next_objects.len(); iterations += 1; diff --git a/src/plan/tracing.rs b/src/plan/tracing.rs index e9dad03de8..fac6ff0da8 100644 --- a/src/plan/tracing.rs +++ b/src/plan/tracing.rs @@ -71,11 +71,6 @@ impl VectorQueue { self.buffer.len() } - /// Replace what was in the queue with data in new_buffer - pub fn swap(&mut self, new_buffer: &mut Vec) { - std::mem::swap(&mut self.buffer, new_buffer) - } - /// Empty the queue pub fn clear(&mut self) { self.buffer.clear() diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index 2bec746f68..fa22f216aa 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -694,18 +694,8 @@ impl GCWorkScheduler { } fn schedule_postponed_concurrent_packets(&self) -> (PostponeQueue, PostponeQueue) { - let mut queue = Injector::new(); - - std::mem::swap::>( - &mut queue, - &mut self.postponed_concurrent_work.write(), - ); - - let mut pqueue = Injector::new(); - std::mem::swap::>( - &mut pqueue, - &mut self.postponed_concurrent_work_prioritized.write(), - ); + let queue = std::mem::take(&mut *self.postponed_concurrent_work.write()); + let pqueue = std::mem::take(&mut *self.postponed_concurrent_work_prioritized.write()); (queue, pqueue) } @@ -718,13 +708,13 @@ impl GCWorkScheduler { // crate::PAUSE_CONCURRENT_MARKING.store(false, Ordering::SeqCst); let mut concurrent_work_scheduled = false; if !queue.is_empty() { - let old_queue = self.work_buckets[WorkBucketStage::Unconstrained].swap_queue(queue); + let old_queue = self.work_buckets[WorkBucketStage::Unconstrained].replace_queue(queue); debug_assert!(old_queue.is_empty()); concurrent_work_scheduled = true; } if !pqueue.is_empty() { let old_queue = - self.work_buckets[WorkBucketStage::Unconstrained].swap_queue_prioritized(pqueue); + self.work_buckets[WorkBucketStage::Unconstrained].replace_queue_prioritized(pqueue); debug_assert!(old_queue.is_empty()); concurrent_work_scheduled = true; } diff --git a/src/scheduler/work_bucket.rs b/src/scheduler/work_bucket.rs index cf9fce7ca4..1a207f0d9e 100644 --- a/src/scheduler/work_bucket.rs +++ b/src/scheduler/work_bucket.rs @@ -98,18 +98,17 @@ impl WorkBucket { self.prioritized_queue = Some(BucketQueue::new()); } - pub fn swap_queue( + pub fn replace_queue( &self, - mut new_queue: Injector>>, + new_queue: Injector>>, ) -> Injector>> { let mut queue = self.queue.queue.write().unwrap(); - std::mem::swap::>>>(&mut queue, &mut new_queue); - new_queue + std::mem::replace::>>>(&mut queue, new_queue) } - pub fn swap_queue_prioritized( + pub fn replace_queue_prioritized( &self, - mut new_queue: Injector>>, + new_queue: Injector>>, ) -> Injector>> { let mut queue = self .prioritized_queue @@ -118,8 +117,7 @@ impl WorkBucket { .queue .write() .unwrap(); - std::mem::swap::>>>(&mut queue, &mut new_queue); - new_queue + std::mem::replace::>>>(&mut queue, new_queue) } fn notify_one_worker(&self) { From a95e94ad928c50fc5bbc9169ad0c268e7a1c431d Mon Sep 17 00:00:00 2001 From: Yi Lin Date: Tue, 5 Aug 2025 00:24:05 +0000 Subject: [PATCH 12/18] Remove schedule_concurrent_collection --- src/plan/concurrent/immix/global.rs | 11 ----------- src/plan/global.rs | 5 ----- src/scheduler/gc_work.rs | 9 +-------- 3 files changed, 1 insertion(+), 24 deletions(-) diff --git a/src/plan/concurrent/immix/global.rs b/src/plan/concurrent/immix/global.rs index 4424a2f900..79d7fbd127 100644 --- a/src/plan/concurrent/immix/global.rs +++ b/src/plan/concurrent/immix/global.rs @@ -122,17 +122,6 @@ impl Plan for ConcurrentImmix { } fn schedule_collection(&'static self, scheduler: &GCWorkScheduler) { - self.current_pause - .store(Some(Pause::Full), Ordering::SeqCst); - - Self::schedule_immix_full_heap_collection::< - ConcurrentImmix, - ConcurrentImmixSTWGCWorkContext, - ConcurrentImmixSTWGCWorkContext, - >(self, &self.immix_space, scheduler); - } - - fn schedule_concurrent_collection(&'static self, scheduler: &GCWorkScheduler) { let pause = self.select_collection_kind(); if pause == Pause::Full { self.current_pause diff --git a/src/plan/global.rs b/src/plan/global.rs index 0b12bb8566..8e3b6db884 100644 --- a/src/plan/global.rs +++ b/src/plan/global.rs @@ -173,11 +173,6 @@ pub trait Plan: 'static + HasSpaces + Sync + Downcast { /// Schedule work for the upcoming GC. fn schedule_collection(&'static self, _scheduler: &GCWorkScheduler); - /// Schedule work for the upcoming concurrent GC. - fn schedule_concurrent_collection(&'static self, _scheduler: &GCWorkScheduler) { - self.schedule_collection(_scheduler); - } - /// Get the common plan. CommonPlan is included by most of MMTk GC plans. fn common(&self) -> &CommonPlan { panic!("Common Plan not handled!") diff --git a/src/scheduler/gc_work.rs b/src/scheduler/gc_work.rs index c80d12a5e5..ad895ec53f 100644 --- a/src/scheduler/gc_work.rs +++ b/src/scheduler/gc_work.rs @@ -30,14 +30,7 @@ impl GCWork for ScheduleCollection { mmtk.set_gc_status(GcStatus::GcPrepare); // Let the plan to schedule collection work - if mmtk.is_user_triggered_collection() || is_emergency { - // user triggered collection is always stop-the-world - mmtk.get_plan().schedule_collection(worker.scheduler()); - } else { - // Let the plan to schedule collection work - mmtk.get_plan() - .schedule_concurrent_collection(worker.scheduler()); - } + mmtk.get_plan().schedule_collection(worker.scheduler()); } } From eacc9599d3cad5968fd06131d4639ebf9622ae64 Mon Sep 17 00:00:00 2001 From: Yi Lin Date: Tue, 5 Aug 2025 01:43:29 +0000 Subject: [PATCH 13/18] Rename gc_pause_start. Merge gc_pause_end with end_of_gc --- src/plan/concurrent/immix/global.rs | 20 +++++++++----------- src/plan/global.rs | 7 ++++--- src/scheduler/gc_work.rs | 4 ++-- src/scheduler/scheduler.rs | 1 - 4 files changed, 15 insertions(+), 17 deletions(-) diff --git a/src/plan/concurrent/immix/global.rs b/src/plan/concurrent/immix/global.rs index 79d7fbd127..c377a9fafb 100644 --- a/src/plan/concurrent/immix/global.rs +++ b/src/plan/concurrent/immix/global.rs @@ -196,6 +196,14 @@ impl Plan for ConcurrentImmix { fn end_of_gc(&mut self, _tls: VMWorkerThread) { self.last_gc_was_defrag .store(self.immix_space.end_of_gc(), Ordering::Relaxed); + + let pause = self.current_pause().unwrap(); + if pause == Pause::InitialMark { + self.set_concurrent_marking_state(true); + } + self.previous_pause.store(Some(pause), Ordering::SeqCst); + self.current_pause.store(None, Ordering::SeqCst); + info!("{:?} end", pause); } fn current_gc_may_move_object(&self) -> bool { @@ -222,7 +230,7 @@ impl Plan for ConcurrentImmix { &self.common } - fn gc_pause_start(&self, _scheduler: &GCWorkScheduler) { + fn notify_mutators_paused(&self, _scheduler: &GCWorkScheduler) { use crate::vm::ActivePlan; let pause = self.current_pause().unwrap(); match pause { @@ -247,16 +255,6 @@ impl Plan for ConcurrentImmix { } info!("{:?} start", pause); } - - fn gc_pause_end(&self) { - let pause = self.current_pause().unwrap(); - if pause == Pause::InitialMark { - self.set_concurrent_marking_state(true); - } - self.previous_pause.store(Some(pause), Ordering::SeqCst); - self.current_pause.store(None, Ordering::SeqCst); - info!("{:?} end", pause); - } } impl ConcurrentImmix { diff --git a/src/plan/global.rs b/src/plan/global.rs index 8e3b6db884..2d3f2a3638 100644 --- a/src/plan/global.rs +++ b/src/plan/global.rs @@ -195,6 +195,9 @@ pub trait Plan: 'static + HasSpaces + Sync + Downcast { /// This defines what space this plan will allocate objects into for different semantics. fn get_allocator_mapping(&self) -> &'static EnumMap; + /// Called when all mutators are paused. This is called before prepare. + fn notify_mutators_paused(&self, _scheduler: &GCWorkScheduler) {} + /// Prepare the plan before a GC. This is invoked in an initial step in the GC. /// This is invoked once per GC by one worker thread. `tls` is the worker thread that executes this method. fn prepare(&mut self, tls: VMWorkerThread); @@ -210,6 +213,7 @@ pub trait Plan: 'static + HasSpaces + Sync + Downcast { /// Inform the plan about the end of a GC. It is guaranteed that there is no further work for this GC. /// This is invoked once per GC by one worker thread. `tls` is the worker thread that executes this method. + // TODO: This is actually called at the end of a pause/STW, rather than the end of a GC. It should be renamed. fn end_of_gc(&mut self, _tls: VMWorkerThread); /// Notify the plan that an emergency collection will happen. The plan should try to free as much memory as possible. @@ -344,9 +348,6 @@ pub trait Plan: 'static + HasSpaces + Sync + Downcast { space.verify_side_metadata_sanity(&mut side_metadata_sanity_checker); }) } - - fn gc_pause_start(&self, _scheduler: &GCWorkScheduler) {} - fn gc_pause_end(&self) {} } impl_downcast!(Plan assoc VM); diff --git a/src/scheduler/gc_work.rs b/src/scheduler/gc_work.rs index ad895ec53f..d7070badd8 100644 --- a/src/scheduler/gc_work.rs +++ b/src/scheduler/gc_work.rs @@ -228,9 +228,9 @@ impl GCWork for StopMutators { mutator.flush(); } }); - mmtk.scheduler.set_in_gc_pause(true); - mmtk.get_plan().gc_pause_start(&mmtk.scheduler); trace!("stop_all_mutators end"); + mmtk.scheduler.set_in_gc_pause(true); + mmtk.get_plan().notify_mutators_paused(&mmtk.scheduler); mmtk.scheduler.notify_mutators_paused(mmtk); mmtk.scheduler.work_buckets[WorkBucketStage::Prepare].add(ScanVMSpecificRoots::::new()); } diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index fa22f216aa..7f9a771569 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -647,7 +647,6 @@ impl GCWorkScheduler { // reset the logging info at the end of each GC mmtk.slot_logger.reset(); } - mmtk.get_plan().gc_pause_end(); // Reset the triggering information. mmtk.state.reset_collection_trigger(); From fe29529297163f406d43c06af5a271fc946399e2 Mon Sep 17 00:00:00 2001 From: Yi Lin Date: Wed, 6 Aug 2025 05:21:49 +0000 Subject: [PATCH 14/18] Disallow new weak reference before ref enqueue --- src/util/reference_processor.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/util/reference_processor.rs b/src/util/reference_processor.rs index f997c16742..1b5acb4bfa 100644 --- a/src/util/reference_processor.rs +++ b/src/util/reference_processor.rs @@ -252,6 +252,11 @@ impl ReferenceProcessor { /// Inform the binding to enqueue the weak references whose referents were cleared in this GC. pub fn enqueue(&self, tls: VMWorkerThread) { + // We will acquire a lock below. If anyone tries to insert new weak refs which will acquire the same lock, a deadlock will occur. + // This does happen for OpenJDK with ConcurrentImmix where a write barrier is triggered during the enqueueing of weak references, + // and the write barrier scans the objects and attempts to add new weak references. + // Disallow new candidates to prevent the deadlock. + self.disallow_new_candidate(); let mut sync = self.sync.lock().unwrap(); // This is the end of a GC. We do some assertions here to make sure our reference tables are correct. From 26c2a54621db7cfc2bc38349f38fc9d8f2d7020b Mon Sep 17 00:00:00 2001 From: Yi Lin Date: Wed, 13 Aug 2025 05:37:30 +0000 Subject: [PATCH 15/18] Introduce ConcurrentPlan. Make ConcurrentTraceObjects trace objects in all spaces. --- src/plan/concurrent/barrier.rs | 51 +++++----- .../concurrent/concurrent_marking_work.rs | 95 ++++++++++++------- src/plan/concurrent/global.rs | 7 ++ src/plan/concurrent/immix/global.rs | 17 +++- src/plan/concurrent/immix/mutator.rs | 5 +- src/plan/concurrent/mod.rs | 4 +- src/plan/global.rs | 6 ++ 7 files changed, 114 insertions(+), 71 deletions(-) create mode 100644 src/plan/concurrent/global.rs diff --git a/src/plan/concurrent/barrier.rs b/src/plan/concurrent/barrier.rs index 716ea1144e..991a626dc9 100644 --- a/src/plan/concurrent/barrier.rs +++ b/src/plan/concurrent/barrier.rs @@ -1,7 +1,9 @@ use std::sync::atomic::Ordering; +use super::{concurrent_marking_work::ProcessModBufSATB, Pause}; +use crate::plan::global::PlanTraceObject; use crate::{ - plan::{barriers::BarrierSemantics, concurrent::immix::global::ConcurrentImmix, VectorQueue}, + plan::{barriers::BarrierSemantics, concurrent::global::ConcurrentPlan, VectorQueue}, scheduler::WorkBucketStage, util::ObjectReference, vm::{ @@ -11,25 +13,20 @@ use crate::{ MMTK, }; -use super::{concurrent_marking_work::ProcessModBufSATB, Pause}; - -pub struct SATBBarrierSemantics { +pub struct SATBBarrierSemantics + PlanTraceObject> { mmtk: &'static MMTK, satb: VectorQueue, refs: VectorQueue, - immix: &'static ConcurrentImmix, + plan: &'static P, } -impl SATBBarrierSemantics { +impl + PlanTraceObject> SATBBarrierSemantics { pub fn new(mmtk: &'static MMTK) -> Self { Self { mmtk, satb: VectorQueue::default(), refs: VectorQueue::default(), - immix: mmtk - .get_plan() - .downcast_ref::>() - .unwrap(), + plan: mmtk.get_plan().downcast_ref::

().unwrap(), } } @@ -63,14 +60,12 @@ impl SATBBarrierSemantics { if !self.satb.is_empty() { if self.should_create_satb_packets() { let satb = self.satb.take(); - if let Some(pause) = self.immix.current_pause() { - debug_assert_ne!(pause, Pause::InitialMark); - self.mmtk.scheduler.work_buckets[WorkBucketStage::Closure] - .add(ProcessModBufSATB::new(satb)); + let bucket = if self.plan.concurrent_work_in_progress() { + WorkBucketStage::Unconstrained } else { - self.mmtk.scheduler.work_buckets[WorkBucketStage::Unconstrained] - .add(ProcessModBufSATB::new(satb)); - } + WorkBucketStage::Closure + }; + self.mmtk.scheduler.work_buckets[bucket].add(ProcessModBufSATB::::new(satb)); } else { let _ = self.satb.take(); }; @@ -82,24 +77,24 @@ impl SATBBarrierSemantics { if !self.refs.is_empty() { // debug_assert!(self.should_create_satb_packets()); let nodes = self.refs.take(); - if let Some(pause) = self.immix.current_pause() { - debug_assert_ne!(pause, Pause::InitialMark); - self.mmtk.scheduler.work_buckets[WorkBucketStage::Closure] - .add(ProcessModBufSATB::new(nodes)); + let bucket = if self.plan.concurrent_work_in_progress() { + WorkBucketStage::Unconstrained } else { - self.mmtk.scheduler.work_buckets[WorkBucketStage::Unconstrained] - .add(ProcessModBufSATB::new(nodes)); - } + WorkBucketStage::Closure + }; + self.mmtk.scheduler.work_buckets[bucket].add(ProcessModBufSATB::::new(nodes)); } } fn should_create_satb_packets(&self) -> bool { - self.immix.concurrent_marking_in_progress() - || self.immix.current_pause() == Some(Pause::FinalMark) + self.plan.concurrent_work_in_progress() + || self.plan.current_pause() == Some(Pause::FinalMark) } } -impl BarrierSemantics for SATBBarrierSemantics { +impl + PlanTraceObject> BarrierSemantics + for SATBBarrierSemantics +{ type VM = VM; #[cold] @@ -138,7 +133,7 @@ impl BarrierSemantics for SATBBarrierSemantics { /// (and its children) may be treated as garbage if it happened to be weakly reachable at the /// time of `InitialMark`. fn load_weak_reference(&mut self, o: ObjectReference) { - if !self.immix.concurrent_marking_in_progress() { + if !self.plan.concurrent_work_in_progress() { return; } self.refs.push(o); diff --git a/src/plan/concurrent/concurrent_marking_work.rs b/src/plan/concurrent/concurrent_marking_work.rs index 660f0e69af..cacdeb62d6 100644 --- a/src/plan/concurrent/concurrent_marking_work.rs +++ b/src/plan/concurrent/concurrent_marking_work.rs @@ -1,14 +1,11 @@ -use crate::plan::concurrent::immix::global::ConcurrentImmix; +use crate::plan::concurrent::global::ConcurrentPlan; use crate::plan::concurrent::Pause; +use crate::plan::PlanTraceObject; use crate::plan::VectorQueue; -use crate::policy::gc_work::PolicyTraceObject; use crate::policy::immix::TRACE_KIND_FAST; -use crate::policy::space::Space; use crate::scheduler::gc_work::{ScanObjects, SlotOf}; use crate::util::ObjectReference; use crate::vm::slot::Slot; - -use crate::Plan; use crate::{ plan::ObjectQueue, scheduler::{gc_work::ProcessEdgesBase, GCWork, GCWorker, ProcessEdgesWork, WorkBucketStage}, @@ -18,8 +15,8 @@ use crate::{ use atomic::Ordering; use std::ops::{Deref, DerefMut}; -pub struct ConcurrentTraceObjects { - plan: &'static ConcurrentImmix, +pub struct ConcurrentTraceObjects + PlanTraceObject> { + plan: &'static P, // objects to mark and scan objects: Option>, // recursively generated objects @@ -27,14 +24,13 @@ pub struct ConcurrentTraceObjects { worker: *mut GCWorker, } -impl ConcurrentTraceObjects { +impl + PlanTraceObject> + ConcurrentTraceObjects +{ const SATB_BUFFER_SIZE: usize = 8192; pub fn new(objects: Vec, mmtk: &'static MMTK) -> Self { - let plan = mmtk - .get_plan() - .downcast_ref::>() - .unwrap(); + let plan = mmtk.get_plan().downcast_ref::

().unwrap(); let old_value = crate::NUM_CONCURRENT_TRACING_PACKETS.fetch_add(1, Ordering::SeqCst); let new_value = old_value + 1; probe!(mmtk, num_concurrent_tracing_packets_change, new_value); @@ -63,14 +59,8 @@ impl ConcurrentTraceObjects { } fn trace_object(&mut self, object: ObjectReference) -> ObjectReference { - if self.plan.immix_space.in_space(object) { - self.plan - .immix_space - .trace_object::(self, object, None, self.worker()); - } else { - self.plan.common().get_los().trace_object(self, object); - } - object + self.plan + .trace_object::(self, object, self.worker()) } fn trace_objects(&mut self, objects: &[ObjectReference]) { @@ -93,7 +83,9 @@ impl ConcurrentTraceObjects { } } -impl ObjectQueue for ConcurrentTraceObjects { +impl + PlanTraceObject> ObjectQueue + for ConcurrentTraceObjects +{ fn enqueue(&mut self, object: ObjectReference) { debug_assert!( object.to_raw_address().is_mapped(), @@ -104,9 +96,14 @@ impl ObjectQueue for ConcurrentTraceObjects { } } -unsafe impl Send for ConcurrentTraceObjects {} +unsafe impl + PlanTraceObject> Send + for ConcurrentTraceObjects +{ +} -impl GCWork for ConcurrentTraceObjects { +impl + PlanTraceObject> GCWork + for ConcurrentTraceObjects +{ fn do_work(&mut self, worker: &mut GCWorker, _mmtk: &'static MMTK) { self.worker = worker; let mut num_objects = 0; @@ -145,24 +142,35 @@ impl GCWork for ConcurrentTraceObjects { } } -pub struct ProcessModBufSATB { +pub struct ProcessModBufSATB + PlanTraceObject> { nodes: Option>, + _p: std::marker::PhantomData<(VM, P)>, +} + +unsafe impl + PlanTraceObject> Send + for ProcessModBufSATB +{ } -impl ProcessModBufSATB { +impl + PlanTraceObject> ProcessModBufSATB { pub fn new(nodes: Vec) -> Self { - Self { nodes: Some(nodes) } + Self { + nodes: Some(nodes), + _p: std::marker::PhantomData, + } } } -impl GCWork for ProcessModBufSATB { +impl + PlanTraceObject> GCWork + for ProcessModBufSATB +{ fn do_work(&mut self, worker: &mut GCWorker, mmtk: &'static MMTK) { let mut w = if let Some(nodes) = self.nodes.take() { if nodes.is_empty() { return; } - ConcurrentTraceObjects::new(nodes, mmtk) + ConcurrentTraceObjects::::new(nodes, mmtk) } else { return; }; @@ -170,11 +178,19 @@ impl GCWork for ProcessModBufSATB { } } -pub struct ProcessRootSlots { +pub struct ProcessRootSlots + PlanTraceObject> { base: ProcessEdgesBase, + _p: std::marker::PhantomData

, +} + +unsafe impl + PlanTraceObject> Send + for ProcessRootSlots +{ } -impl ProcessEdgesWork for ProcessRootSlots { +impl + PlanTraceObject> ProcessEdgesWork + for ProcessRootSlots +{ type VM = VM; type ScanObjectsWorkType = ScanObjects; const OVERWRITE_REFERENCE: bool = false; @@ -188,7 +204,10 @@ impl ProcessEdgesWork for ProcessRootSlots { ) -> Self { debug_assert!(roots); let base = ProcessEdgesBase::new(slots, roots, mmtk, bucket); - Self { base } + Self { + base, + _p: std::marker::PhantomData, + } } fn flush(&mut self) {} @@ -201,7 +220,7 @@ impl ProcessEdgesWork for ProcessRootSlots { let pause = self .base .plan() - .downcast_ref::>() + .concurrent() .unwrap() .current_pause() .unwrap(); @@ -219,7 +238,7 @@ impl ProcessEdgesWork for ProcessRootSlots { // create the packet let worker = self.worker(); let mmtk = self.mmtk(); - let w = ConcurrentTraceObjects::new(root_objects.clone(), mmtk); + let w = ConcurrentTraceObjects::::new(root_objects.clone(), mmtk); match pause { Pause::InitialMark => worker.scheduler().postpone(w), @@ -232,7 +251,7 @@ impl ProcessEdgesWork for ProcessRootSlots { } if !root_objects.is_empty() { let worker = self.worker(); - let w = ConcurrentTraceObjects::new(root_objects.clone(), self.mmtk()); + let w = ConcurrentTraceObjects::::new(root_objects.clone(), self.mmtk()); match pause { Pause::InitialMark => worker.scheduler().postpone(w), @@ -247,14 +266,18 @@ impl ProcessEdgesWork for ProcessRootSlots { } } -impl Deref for ProcessRootSlots { +impl + PlanTraceObject> Deref + for ProcessRootSlots +{ type Target = ProcessEdgesBase; fn deref(&self) -> &Self::Target { &self.base } } -impl DerefMut for ProcessRootSlots { +impl + PlanTraceObject> DerefMut + for ProcessRootSlots +{ fn deref_mut(&mut self) -> &mut Self::Target { &mut self.base } diff --git a/src/plan/concurrent/global.rs b/src/plan/concurrent/global.rs new file mode 100644 index 0000000000..a2b06f67c0 --- /dev/null +++ b/src/plan/concurrent/global.rs @@ -0,0 +1,7 @@ +use crate::plan::concurrent::Pause; +use crate::plan::Plan; + +pub trait ConcurrentPlan: Plan { + fn concurrent_work_in_progress(&self) -> bool; + fn current_pause(&self) -> Option; +} diff --git a/src/plan/concurrent/immix/global.rs b/src/plan/concurrent/immix/global.rs index c377a9fafb..ff6a21a092 100644 --- a/src/plan/concurrent/immix/global.rs +++ b/src/plan/concurrent/immix/global.rs @@ -1,4 +1,5 @@ use crate::plan::concurrent::concurrent_marking_work::ProcessRootSlots; +use crate::plan::concurrent::global::ConcurrentPlan; use crate::plan::concurrent::immix::gc_work::ConcurrentImmixGCWorkContext; use crate::plan::concurrent::immix::gc_work::ConcurrentImmixSTWGCWorkContext; use crate::plan::concurrent::Pause; @@ -378,7 +379,7 @@ impl ConcurrentImmix { self.disable_unnecessary_buckets(scheduler, Pause::InitialMark); scheduler.work_buckets[WorkBucketStage::Unconstrained].add_prioritized(Box::new( - StopMutators::>>::new_args( + StopMutators::>>::new_args( Pause::InitialMark, ), )); @@ -391,7 +392,7 @@ impl ConcurrentImmix { self.disable_unnecessary_buckets(scheduler, Pause::FinalMark); scheduler.work_buckets[WorkBucketStage::Unconstrained].add_prioritized(Box::new( - StopMutators::>>::new_args( + StopMutators::>>::new_args( Pause::FinalMark, ), )); @@ -451,11 +452,17 @@ impl ConcurrentImmix { .store(active, Ordering::SeqCst); } - pub fn current_pause(&self) -> Option { + fn previous_pause(&self) -> Option { + self.previous_pause.load(Ordering::SeqCst) + } +} + +impl ConcurrentPlan for ConcurrentImmix { + fn current_pause(&self) -> Option { self.current_pause.load(Ordering::SeqCst) } - pub fn previous_pause(&self) -> Option { - self.previous_pause.load(Ordering::SeqCst) + fn concurrent_work_in_progress(&self) -> bool { + self.concurrent_marking_in_progress() } } diff --git a/src/plan/concurrent/immix/mutator.rs b/src/plan/concurrent/immix/mutator.rs index 1304e34ea0..b8074544fe 100644 --- a/src/plan/concurrent/immix/mutator.rs +++ b/src/plan/concurrent/immix/mutator.rs @@ -79,6 +79,9 @@ pub fn create_concurrent_immix_mutator( let builder = MutatorBuilder::new(mutator_tls, mmtk, config); builder - .barrier(Box::new(SATBBarrier::new(SATBBarrierSemantics::new(mmtk)))) + .barrier(Box::new(SATBBarrier::new(SATBBarrierSemantics::< + VM, + ConcurrentImmix, + >::new(mmtk)))) .build() } diff --git a/src/plan/concurrent/mod.rs b/src/plan/concurrent/mod.rs index 94a582a4aa..692ed99d03 100644 --- a/src/plan/concurrent/mod.rs +++ b/src/plan/concurrent/mod.rs @@ -1,5 +1,7 @@ pub mod barrier; -pub mod concurrent_marking_work; +pub(super) mod concurrent_marking_work; +pub(super) mod global; + pub mod immix; use bytemuck::NoUninit; diff --git a/src/plan/global.rs b/src/plan/global.rs index 2d3f2a3638..b09ff0e23b 100644 --- a/src/plan/global.rs +++ b/src/plan/global.rs @@ -186,6 +186,12 @@ pub trait Plan: 'static + HasSpaces + Sync + Downcast { None } + fn concurrent( + &self, + ) -> Option<&dyn crate::plan::concurrent::global::ConcurrentPlan> { + None + } + /// Get the current run time options. fn options(&self) -> &Options { &self.base().options From 7487c1be0572c045d552da64d99e47c858a42f8b Mon Sep 17 00:00:00 2001 From: Yi Lin Date: Wed, 13 Aug 2025 01:55:11 +0000 Subject: [PATCH 16/18] Fix rayon-core version for MSRV --- Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.toml b/Cargo.toml index f96df1ec29..5b5c0685bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,7 @@ portable-atomic = "1.4.3" probe = "0.5" regex = "1.7.0" rustversion = "1.0" +rayon-core = "=1.12.1" # We can remove this dependency when we use MSRV 1.80+ spin = "0.9.5" static_assertions = "1.1.0" strum = "0.27.1" From 6bb03c08bd5f98a36e0be8f879bf1f8e973cf1e2 Mon Sep 17 00:00:00 2001 From: Yi Lin Date: Thu, 14 Aug 2025 04:13:06 +0000 Subject: [PATCH 17/18] More assertions and minor fix. --- src/plan/concurrent/barrier.rs | 2 ++ src/plan/concurrent/concurrent_marking_work.rs | 8 ++++++-- src/plan/concurrent/immix/global.rs | 4 ++++ 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/plan/concurrent/barrier.rs b/src/plan/concurrent/barrier.rs index 991a626dc9..09e65276e6 100644 --- a/src/plan/concurrent/barrier.rs +++ b/src/plan/concurrent/barrier.rs @@ -63,6 +63,7 @@ impl + PlanTraceObject> SATBBarrie let bucket = if self.plan.concurrent_work_in_progress() { WorkBucketStage::Unconstrained } else { + debug_assert_eq!(self.plan.current_pause(), Some(Pause::FinalMark)); WorkBucketStage::Closure }; self.mmtk.scheduler.work_buckets[bucket].add(ProcessModBufSATB::::new(satb)); @@ -80,6 +81,7 @@ impl + PlanTraceObject> SATBBarrie let bucket = if self.plan.concurrent_work_in_progress() { WorkBucketStage::Unconstrained } else { + debug_assert_eq!(self.plan.current_pause(), Some(Pause::FinalMark)); WorkBucketStage::Closure }; self.mmtk.scheduler.work_buckets[bucket].add(ProcessModBufSATB::::new(nodes)); diff --git a/src/plan/concurrent/concurrent_marking_work.rs b/src/plan/concurrent/concurrent_marking_work.rs index cacdeb62d6..6620b766d2 100644 --- a/src/plan/concurrent/concurrent_marking_work.rs +++ b/src/plan/concurrent/concurrent_marking_work.rs @@ -59,8 +59,12 @@ impl + PlanTraceObject> } fn trace_object(&mut self, object: ObjectReference) -> ObjectReference { - self.plan - .trace_object::(self, object, self.worker()) + let new_object = + self.plan + .trace_object::(self, object, self.worker()); + // No copying should happen. + debug_assert_eq!(object, new_object); + object } fn trace_objects(&mut self, objects: &[ObjectReference]) { diff --git a/src/plan/concurrent/immix/global.rs b/src/plan/concurrent/immix/global.rs index ff6a21a092..9d7dc4fcdd 100644 --- a/src/plan/concurrent/immix/global.rs +++ b/src/plan/concurrent/immix/global.rs @@ -256,6 +256,10 @@ impl Plan for ConcurrentImmix { } info!("{:?} start", pause); } + + fn concurrent(&self) -> Option<&dyn ConcurrentPlan> { + Some(self) + } } impl ConcurrentImmix { From ad41d7e67629d72a6918cc02e498b053bd91f8b1 Mon Sep 17 00:00:00 2001 From: Yi Lin Date: Thu, 14 Aug 2025 05:14:30 +0000 Subject: [PATCH 18/18] Wrong assertions --- src/plan/concurrent/barrier.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/plan/concurrent/barrier.rs b/src/plan/concurrent/barrier.rs index 09e65276e6..a26ee50444 100644 --- a/src/plan/concurrent/barrier.rs +++ b/src/plan/concurrent/barrier.rs @@ -63,7 +63,7 @@ impl + PlanTraceObject> SATBBarrie let bucket = if self.plan.concurrent_work_in_progress() { WorkBucketStage::Unconstrained } else { - debug_assert_eq!(self.plan.current_pause(), Some(Pause::FinalMark)); + debug_assert_ne!(self.plan.current_pause(), Some(Pause::InitialMark)); WorkBucketStage::Closure }; self.mmtk.scheduler.work_buckets[bucket].add(ProcessModBufSATB::::new(satb)); @@ -81,7 +81,7 @@ impl + PlanTraceObject> SATBBarrie let bucket = if self.plan.concurrent_work_in_progress() { WorkBucketStage::Unconstrained } else { - debug_assert_eq!(self.plan.current_pause(), Some(Pause::FinalMark)); + debug_assert_ne!(self.plan.current_pause(), Some(Pause::InitialMark)); WorkBucketStage::Closure }; self.mmtk.scheduler.work_buckets[bucket].add(ProcessModBufSATB::::new(nodes));