diff --git a/Cargo.toml b/Cargo.toml index f96df1ec29..5b5c0685bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,7 @@ portable-atomic = "1.4.3" probe = "0.5" regex = "1.7.0" rustversion = "1.0" +rayon-core = "=1.12.1" # We can remove this dependency when we use MSRV 1.80+ spin = "0.9.5" static_assertions = "1.1.0" strum = "0.27.1" diff --git a/src/global_state.rs b/src/global_state.rs index b5a78d9bbe..317caf0c81 100644 --- a/src/global_state.rs +++ b/src/global_state.rs @@ -49,6 +49,7 @@ pub struct GlobalState { pub(crate) malloc_bytes: AtomicUsize, /// This stores the live bytes and the used bytes (by pages) for each space in last GC. This counter is only updated in the GC release phase. pub(crate) live_bytes_in_last_gc: AtomicRefCell>, + pub(crate) concurrent_marking_threshold: AtomicUsize, } impl GlobalState { @@ -206,6 +207,7 @@ impl Default for GlobalState { #[cfg(feature = "malloc_counted_size")] malloc_bytes: AtomicUsize::new(0), live_bytes_in_last_gc: AtomicRefCell::new(HashMap::new()), + concurrent_marking_threshold: AtomicUsize::new(0), } } } diff --git a/src/lib.rs b/src/lib.rs index afe094885f..e1a7b627e5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,6 +32,8 @@ extern crate static_assertions; extern crate probe; mod mmtk; +use std::sync::atomic::AtomicUsize; + pub use mmtk::MMTKBuilder; pub(crate) use mmtk::MMAPPER; pub use mmtk::MMTK; @@ -51,3 +53,9 @@ pub mod vm; pub use crate::plan::{ AllocationSemantics, BarrierSelector, Mutator, MutatorContext, ObjectQueue, Plan, }; + +static NUM_CONCURRENT_TRACING_PACKETS: AtomicUsize = AtomicUsize::new(0); + +fn concurrent_marking_packets_drained() -> bool { + crate::NUM_CONCURRENT_TRACING_PACKETS.load(std::sync::atomic::Ordering::SeqCst) == 0 +} diff --git a/src/plan/barriers.rs b/src/plan/barriers.rs index 56c069c982..e81b3166eb 100644 --- a/src/plan/barriers.rs +++ b/src/plan/barriers.rs @@ -21,6 +21,7 @@ pub enum BarrierSelector { NoBarrier, /// Object remembering barrier is used. ObjectBarrier, + SATBBarrier, } impl BarrierSelector { @@ -45,6 +46,19 @@ impl BarrierSelector { pub trait Barrier: 'static + Send + Downcast { fn flush(&mut self) {} + /// Weak reference loading barrier. A mutator should call this when loading from a weak + /// reference field, for example, when executing `java.lang.ref.Reference.get()` in JVM, or + /// loading from a global weak table in CRuby. + /// + /// Note: Merely loading from a field holding weak reference into a local variable will create a + /// strong reference from the stack to the referent, changing its reachablilty from weakly + /// reachable to strongly reachable. Concurrent garbage collectors may need to handle such + /// events specially. See [SATBBarrier::load_weak_reference] for a concrete example. + /// + /// Arguments: + /// * `referent`: The referent object which the weak reference is pointing to. + fn load_weak_reference(&mut self, _referent: ObjectReference) {} + /// Subsuming barrier for object reference write fn object_reference_write( &mut self, @@ -92,6 +106,8 @@ pub trait Barrier: 'static + Send + Downcast { self.memory_region_copy_post(src, dst); } + fn object_reference_clone_pre(&mut self, _obj: ObjectReference) {} + /// Full pre-barrier for array copy fn memory_region_copy_pre(&mut self, _src: VM::VMMemorySlice, _dst: VM::VMMemorySlice) {} @@ -159,6 +175,10 @@ pub trait BarrierSemantics: 'static + Send { /// Object will probably be modified fn object_probable_write_slow(&mut self, _obj: ObjectReference) {} + + fn load_weak_reference(&mut self, _o: ObjectReference) {} + + fn object_reference_clone_pre(&mut self, _obj: ObjectReference) {} } /// Generic object barrier with a type argument defining it's slow-path behaviour. @@ -250,3 +270,82 @@ impl Barrier for ObjectBarrier { } } } + +pub struct SATBBarrier { + semantics: S, +} + +impl SATBBarrier { + pub fn new(semantics: S) -> Self { + Self { semantics } + } + fn object_is_unlogged(&self, object: ObjectReference) -> bool { + // unsafe { S::UNLOG_BIT_SPEC.load::(object, None) != 0 } + S::UNLOG_BIT_SPEC.load_atomic::(object, None, Ordering::SeqCst) != 0 + } +} + +impl Barrier for SATBBarrier { + fn flush(&mut self) { + self.semantics.flush(); + } + + fn load_weak_reference(&mut self, o: ObjectReference) { + self.semantics.load_weak_reference(o) + } + + fn object_reference_clone_pre(&mut self, obj: ObjectReference) { + self.semantics.object_reference_clone_pre(obj); + } + + fn object_probable_write(&mut self, obj: ObjectReference) { + self.semantics.object_probable_write_slow(obj); + } + + fn object_reference_write_pre( + &mut self, + src: ObjectReference, + slot: ::VMSlot, + target: Option, + ) { + if self.object_is_unlogged(src) { + self.semantics + .object_reference_write_slow(src, slot, target); + } + } + + fn object_reference_write_post( + &mut self, + _src: ObjectReference, + _slot: ::VMSlot, + _target: Option, + ) { + unimplemented!() + } + + fn object_reference_write_slow( + &mut self, + src: ObjectReference, + slot: ::VMSlot, + target: Option, + ) { + self.semantics + .object_reference_write_slow(src, slot, target); + } + + fn memory_region_copy_pre( + &mut self, + src: ::VMMemorySlice, + dst: ::VMMemorySlice, + ) { + self.semantics.memory_region_copy_slow(src, dst); + } + + fn memory_region_copy_post( + &mut self, + _src: ::VMMemorySlice, + _dst: ::VMMemorySlice, + ) { + unimplemented!() + } +} diff --git a/src/plan/concurrent/barrier.rs b/src/plan/concurrent/barrier.rs new file mode 100644 index 0000000000..a26ee50444 --- /dev/null +++ b/src/plan/concurrent/barrier.rs @@ -0,0 +1,152 @@ +use std::sync::atomic::Ordering; + +use super::{concurrent_marking_work::ProcessModBufSATB, Pause}; +use crate::plan::global::PlanTraceObject; +use crate::{ + plan::{barriers::BarrierSemantics, concurrent::global::ConcurrentPlan, VectorQueue}, + scheduler::WorkBucketStage, + util::ObjectReference, + vm::{ + slot::{MemorySlice, Slot}, + VMBinding, + }, + MMTK, +}; + +pub struct SATBBarrierSemantics + PlanTraceObject> { + mmtk: &'static MMTK, + satb: VectorQueue, + refs: VectorQueue, + plan: &'static P, +} + +impl + PlanTraceObject> SATBBarrierSemantics { + pub fn new(mmtk: &'static MMTK) -> Self { + Self { + mmtk, + satb: VectorQueue::default(), + refs: VectorQueue::default(), + plan: mmtk.get_plan().downcast_ref::

().unwrap(), + } + } + + fn slow(&mut self, _src: Option, _slot: VM::VMSlot, old: ObjectReference) { + self.satb.push(old); + if self.satb.is_full() { + self.flush_satb(); + } + } + + fn enqueue_node( + &mut self, + src: Option, + slot: VM::VMSlot, + _new: Option, + ) -> bool { + if let Some(old) = slot.load() { + self.slow(src, slot, old); + } + true + } + + /// Attempt to atomically log an object. + /// Returns true if the object is not logged previously. + fn log_object(&self, object: ObjectReference) -> bool { + Self::UNLOG_BIT_SPEC.store_atomic::(object, 0, None, Ordering::SeqCst); + true + } + + fn flush_satb(&mut self) { + if !self.satb.is_empty() { + if self.should_create_satb_packets() { + let satb = self.satb.take(); + let bucket = if self.plan.concurrent_work_in_progress() { + WorkBucketStage::Unconstrained + } else { + debug_assert_ne!(self.plan.current_pause(), Some(Pause::InitialMark)); + WorkBucketStage::Closure + }; + self.mmtk.scheduler.work_buckets[bucket].add(ProcessModBufSATB::::new(satb)); + } else { + let _ = self.satb.take(); + }; + } + } + + #[cold] + fn flush_weak_refs(&mut self) { + if !self.refs.is_empty() { + // debug_assert!(self.should_create_satb_packets()); + let nodes = self.refs.take(); + let bucket = if self.plan.concurrent_work_in_progress() { + WorkBucketStage::Unconstrained + } else { + debug_assert_ne!(self.plan.current_pause(), Some(Pause::InitialMark)); + WorkBucketStage::Closure + }; + self.mmtk.scheduler.work_buckets[bucket].add(ProcessModBufSATB::::new(nodes)); + } + } + + fn should_create_satb_packets(&self) -> bool { + self.plan.concurrent_work_in_progress() + || self.plan.current_pause() == Some(Pause::FinalMark) + } +} + +impl + PlanTraceObject> BarrierSemantics + for SATBBarrierSemantics +{ + type VM = VM; + + #[cold] + fn flush(&mut self) { + self.flush_satb(); + self.flush_weak_refs(); + } + + fn object_reference_write_slow( + &mut self, + src: ObjectReference, + _slot: ::VMSlot, + _target: Option, + ) { + self.object_probable_write_slow(src); + self.log_object(src); + } + + fn memory_region_copy_slow( + &mut self, + _src: ::VMMemorySlice, + dst: ::VMMemorySlice, + ) { + for s in dst.iter_slots() { + self.enqueue_node(None, s, None); + } + } + + /// Enqueue the referent during concurrent marking. + /// + /// Note: During concurrent marking, a collector based on snapshot-at-the-beginning (SATB) will + /// not reach objects that were weakly reachable at the time of `InitialMark`. But if a mutator + /// loads from a weak reference field during concurrent marking, it will make the referent + /// strongly reachable, yet the referent is still not part of the SATB. We must conservatively + /// enqueue the referent even though its reachability has not yet been established, otherwise it + /// (and its children) may be treated as garbage if it happened to be weakly reachable at the + /// time of `InitialMark`. + fn load_weak_reference(&mut self, o: ObjectReference) { + if !self.plan.concurrent_work_in_progress() { + return; + } + self.refs.push(o); + if self.refs.is_full() { + self.flush_weak_refs(); + } + } + + fn object_probable_write_slow(&mut self, obj: ObjectReference) { + obj.iterate_fields::(|s| { + self.enqueue_node(Some(obj), s, None); + }); + } +} diff --git a/src/plan/concurrent/concurrent_marking_work.rs b/src/plan/concurrent/concurrent_marking_work.rs new file mode 100644 index 0000000000..6620b766d2 --- /dev/null +++ b/src/plan/concurrent/concurrent_marking_work.rs @@ -0,0 +1,288 @@ +use crate::plan::concurrent::global::ConcurrentPlan; +use crate::plan::concurrent::Pause; +use crate::plan::PlanTraceObject; +use crate::plan::VectorQueue; +use crate::policy::immix::TRACE_KIND_FAST; +use crate::scheduler::gc_work::{ScanObjects, SlotOf}; +use crate::util::ObjectReference; +use crate::vm::slot::Slot; +use crate::{ + plan::ObjectQueue, + scheduler::{gc_work::ProcessEdgesBase, GCWork, GCWorker, ProcessEdgesWork, WorkBucketStage}, + vm::*, + MMTK, +}; +use atomic::Ordering; +use std::ops::{Deref, DerefMut}; + +pub struct ConcurrentTraceObjects + PlanTraceObject> { + plan: &'static P, + // objects to mark and scan + objects: Option>, + // recursively generated objects + next_objects: VectorQueue, + worker: *mut GCWorker, +} + +impl + PlanTraceObject> + ConcurrentTraceObjects +{ + const SATB_BUFFER_SIZE: usize = 8192; + + pub fn new(objects: Vec, mmtk: &'static MMTK) -> Self { + let plan = mmtk.get_plan().downcast_ref::

().unwrap(); + let old_value = crate::NUM_CONCURRENT_TRACING_PACKETS.fetch_add(1, Ordering::SeqCst); + let new_value = old_value + 1; + probe!(mmtk, num_concurrent_tracing_packets_change, new_value); + + Self { + plan, + objects: Some(objects), + next_objects: VectorQueue::default(), + worker: std::ptr::null_mut(), + } + } + + pub fn worker(&self) -> &'static mut GCWorker { + debug_assert_ne!(self.worker, std::ptr::null_mut()); + unsafe { &mut *self.worker } + } + + #[cold] + fn flush(&mut self) { + if !self.next_objects.is_empty() { + let objects = self.next_objects.take(); + let worker = self.worker(); + let w = Self::new(objects, worker.mmtk); + worker.add_work(WorkBucketStage::Unconstrained, w); + } + } + + fn trace_object(&mut self, object: ObjectReference) -> ObjectReference { + let new_object = + self.plan + .trace_object::(self, object, self.worker()); + // No copying should happen. + debug_assert_eq!(object, new_object); + object + } + + fn trace_objects(&mut self, objects: &[ObjectReference]) { + for o in objects.iter() { + self.trace_object(*o); + } + } + + fn scan_and_enqueue(&mut self, object: ObjectReference) { + object.iterate_fields::(|s| { + let Some(t) = s.load() else { + return; + }; + + self.next_objects.push(t); + if self.next_objects.len() > Self::SATB_BUFFER_SIZE { + self.flush(); + } + }); + } +} + +impl + PlanTraceObject> ObjectQueue + for ConcurrentTraceObjects +{ + fn enqueue(&mut self, object: ObjectReference) { + debug_assert!( + object.to_raw_address().is_mapped(), + "Invalid obj {:?}: address is not mapped", + object + ); + self.scan_and_enqueue(object); + } +} + +unsafe impl + PlanTraceObject> Send + for ConcurrentTraceObjects +{ +} + +impl + PlanTraceObject> GCWork + for ConcurrentTraceObjects +{ + fn do_work(&mut self, worker: &mut GCWorker, _mmtk: &'static MMTK) { + self.worker = worker; + let mut num_objects = 0; + let mut num_next_objects = 0; + let mut iterations = 0; + // mark objects + if let Some(objects) = self.objects.take() { + self.trace_objects(&objects); + num_objects = objects.len(); + } + let pause_opt = self.plan.current_pause(); + if pause_opt == Some(Pause::FinalMark) || pause_opt.is_none() { + while !self.next_objects.is_empty() { + let pause_opt = self.plan.current_pause(); + if !(pause_opt == Some(Pause::FinalMark) || pause_opt.is_none()) { + break; + } + let next_objects = self.next_objects.take(); + self.trace_objects(&next_objects); + num_next_objects += next_objects.len(); + iterations += 1; + } + } + probe!( + mmtk, + concurrent_trace_objects, + num_objects, + num_next_objects, + iterations + ); + self.flush(); + + let old_value = crate::NUM_CONCURRENT_TRACING_PACKETS.fetch_sub(1, Ordering::SeqCst); + let new_value = old_value - 1; + probe!(mmtk, num_concurrent_tracing_packets_change, new_value); + } +} + +pub struct ProcessModBufSATB + PlanTraceObject> { + nodes: Option>, + _p: std::marker::PhantomData<(VM, P)>, +} + +unsafe impl + PlanTraceObject> Send + for ProcessModBufSATB +{ +} + +impl + PlanTraceObject> ProcessModBufSATB { + pub fn new(nodes: Vec) -> Self { + Self { + nodes: Some(nodes), + _p: std::marker::PhantomData, + } + } +} + +impl + PlanTraceObject> GCWork + for ProcessModBufSATB +{ + fn do_work(&mut self, worker: &mut GCWorker, mmtk: &'static MMTK) { + let mut w = if let Some(nodes) = self.nodes.take() { + if nodes.is_empty() { + return; + } + + ConcurrentTraceObjects::::new(nodes, mmtk) + } else { + return; + }; + GCWork::do_work(&mut w, worker, mmtk); + } +} + +pub struct ProcessRootSlots + PlanTraceObject> { + base: ProcessEdgesBase, + _p: std::marker::PhantomData

, +} + +unsafe impl + PlanTraceObject> Send + for ProcessRootSlots +{ +} + +impl + PlanTraceObject> ProcessEdgesWork + for ProcessRootSlots +{ + type VM = VM; + type ScanObjectsWorkType = ScanObjects; + const OVERWRITE_REFERENCE: bool = false; + const SCAN_OBJECTS_IMMEDIATELY: bool = true; + + fn new( + slots: Vec>, + roots: bool, + mmtk: &'static MMTK, + bucket: WorkBucketStage, + ) -> Self { + debug_assert!(roots); + let base = ProcessEdgesBase::new(slots, roots, mmtk, bucket); + Self { + base, + _p: std::marker::PhantomData, + } + } + + fn flush(&mut self) {} + + fn trace_object(&mut self, _object: ObjectReference) -> ObjectReference { + unreachable!() + } + + fn process_slots(&mut self) { + let pause = self + .base + .plan() + .concurrent() + .unwrap() + .current_pause() + .unwrap(); + // No need to scan roots in the final mark + if pause == Pause::FinalMark { + return; + } + let mut root_objects = Vec::with_capacity(Self::CAPACITY); + if !self.slots.is_empty() { + let slots = std::mem::take(&mut self.slots); + for slot in slots { + if let Some(object) = slot.load() { + root_objects.push(object); + if root_objects.len() == Self::CAPACITY { + // create the packet + let worker = self.worker(); + let mmtk = self.mmtk(); + let w = ConcurrentTraceObjects::::new(root_objects.clone(), mmtk); + + match pause { + Pause::InitialMark => worker.scheduler().postpone(w), + _ => unreachable!(), + } + + root_objects.clear(); + } + } + } + if !root_objects.is_empty() { + let worker = self.worker(); + let w = ConcurrentTraceObjects::::new(root_objects.clone(), self.mmtk()); + + match pause { + Pause::InitialMark => worker.scheduler().postpone(w), + _ => unreachable!(), + } + } + } + } + + fn create_scan_work(&self, _nodes: Vec) -> Self::ScanObjectsWorkType { + unimplemented!() + } +} + +impl + PlanTraceObject> Deref + for ProcessRootSlots +{ + type Target = ProcessEdgesBase; + fn deref(&self) -> &Self::Target { + &self.base + } +} + +impl + PlanTraceObject> DerefMut + for ProcessRootSlots +{ + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.base + } +} diff --git a/src/plan/concurrent/global.rs b/src/plan/concurrent/global.rs new file mode 100644 index 0000000000..a2b06f67c0 --- /dev/null +++ b/src/plan/concurrent/global.rs @@ -0,0 +1,7 @@ +use crate::plan::concurrent::Pause; +use crate::plan::Plan; + +pub trait ConcurrentPlan: Plan { + fn concurrent_work_in_progress(&self) -> bool; + fn current_pause(&self) -> Option; +} diff --git a/src/plan/concurrent/immix/gc_work.rs b/src/plan/concurrent/immix/gc_work.rs new file mode 100644 index 0000000000..6372105313 --- /dev/null +++ b/src/plan/concurrent/immix/gc_work.rs @@ -0,0 +1,25 @@ +use crate::plan::concurrent::immix::global::ConcurrentImmix; +use crate::policy::gc_work::{TraceKind, TRACE_KIND_TRANSITIVE_PIN}; +use crate::scheduler::gc_work::{PlanProcessEdges, UnsupportedProcessEdges}; +use crate::scheduler::ProcessEdgesWork; +use crate::vm::VMBinding; + +pub(super) struct ConcurrentImmixSTWGCWorkContext( + std::marker::PhantomData, +); +impl crate::scheduler::GCWorkContext + for ConcurrentImmixSTWGCWorkContext +{ + type VM = VM; + type PlanType = ConcurrentImmix; + type DefaultProcessEdges = PlanProcessEdges, KIND>; + type PinningProcessEdges = PlanProcessEdges, TRACE_KIND_TRANSITIVE_PIN>; +} +pub(super) struct ConcurrentImmixGCWorkContext(std::marker::PhantomData); + +impl crate::scheduler::GCWorkContext for ConcurrentImmixGCWorkContext { + type VM = E::VM; + type PlanType = ConcurrentImmix; + type DefaultProcessEdges = E; + type PinningProcessEdges = UnsupportedProcessEdges; +} diff --git a/src/plan/concurrent/immix/global.rs b/src/plan/concurrent/immix/global.rs new file mode 100644 index 0000000000..9d7dc4fcdd --- /dev/null +++ b/src/plan/concurrent/immix/global.rs @@ -0,0 +1,472 @@ +use crate::plan::concurrent::concurrent_marking_work::ProcessRootSlots; +use crate::plan::concurrent::global::ConcurrentPlan; +use crate::plan::concurrent::immix::gc_work::ConcurrentImmixGCWorkContext; +use crate::plan::concurrent::immix::gc_work::ConcurrentImmixSTWGCWorkContext; +use crate::plan::concurrent::Pause; +use crate::plan::global::BasePlan; +use crate::plan::global::CommonPlan; +use crate::plan::global::CreateGeneralPlanArgs; +use crate::plan::global::CreateSpecificPlanArgs; +use crate::plan::immix::mutator::ALLOCATOR_MAPPING; +use crate::plan::AllocationSemantics; +use crate::plan::Plan; +use crate::plan::PlanConstraints; +use crate::policy::immix::ImmixSpaceArgs; +use crate::policy::immix::TRACE_KIND_DEFRAG; +use crate::policy::immix::TRACE_KIND_FAST; +use crate::policy::space::Space; +use crate::scheduler::gc_work::Release; +use crate::scheduler::gc_work::StopMutators; +use crate::scheduler::gc_work::UnsupportedProcessEdges; +use crate::scheduler::*; +use crate::util::alloc::allocators::AllocatorSelector; +use crate::util::copy::*; +use crate::util::heap::gc_trigger::SpaceStats; +use crate::util::heap::VMRequest; +use crate::util::metadata::side_metadata::SideMetadataContext; +use crate::vm::VMBinding; +use crate::{policy::immix::ImmixSpace, util::opaque_pointer::VMWorkerThread}; +use std::sync::atomic::AtomicBool; + +use atomic::Atomic; +use atomic::Ordering; +use enum_map::EnumMap; + +use mmtk_macros::{HasSpaces, PlanTraceObject}; + +#[derive(Debug, Clone, Copy, bytemuck::NoUninit, PartialEq, Eq)] +#[repr(u8)] +enum GCCause { + Unknown, + FullHeap, + InitialMark, + FinalMark, +} + +#[derive(HasSpaces, PlanTraceObject)] +pub struct ConcurrentImmix { + #[post_scan] + #[space] + #[copy_semantics(CopySemantics::DefaultCopy)] + pub immix_space: ImmixSpace, + #[parent] + pub common: CommonPlan, + last_gc_was_defrag: AtomicBool, + current_pause: Atomic>, + previous_pause: Atomic>, + gc_cause: Atomic, + concurrent_marking_active: AtomicBool, +} + +/// The plan constraints for the immix plan. +pub const CONCURRENT_IMMIX_CONSTRAINTS: PlanConstraints = PlanConstraints { + // If we disable moving in Immix, this is a non-moving plan. + moves_objects: false, + // Max immix object size is half of a block. + max_non_los_default_alloc_bytes: crate::policy::immix::MAX_IMMIX_OBJECT_SIZE, + needs_prepare_mutator: true, + barrier: crate::BarrierSelector::SATBBarrier, + needs_satb: true, + ..PlanConstraints::default() +}; + +impl Plan for ConcurrentImmix { + fn collection_required(&self, space_full: bool, _space: Option>) -> bool { + if self.base().collection_required(self, space_full) { + self.gc_cause.store(GCCause::FullHeap, Ordering::Release); + return true; + } + + let concurrent_marking_in_progress = self.concurrent_marking_in_progress(); + + if concurrent_marking_in_progress && crate::concurrent_marking_packets_drained() { + self.gc_cause.store(GCCause::FinalMark, Ordering::Release); + return true; + } + let threshold = self.get_total_pages() >> 1; + let concurrent_marking_threshold = self + .common + .base + .global_state + .concurrent_marking_threshold + .load(Ordering::Acquire); + if !concurrent_marking_in_progress && concurrent_marking_threshold > threshold { + debug_assert!(crate::concurrent_marking_packets_drained()); + debug_assert!(!self.concurrent_marking_in_progress()); + let prev_pause = self.previous_pause(); + debug_assert!(prev_pause.is_none() || prev_pause.unwrap() != Pause::InitialMark); + self.gc_cause.store(GCCause::InitialMark, Ordering::Release); + return true; + } + false + } + + fn last_collection_was_exhaustive(&self) -> bool { + self.immix_space + .is_last_gc_exhaustive(self.last_gc_was_defrag.load(Ordering::Relaxed)) + } + + fn constraints(&self) -> &'static PlanConstraints { + &CONCURRENT_IMMIX_CONSTRAINTS + } + + fn create_copy_config(&'static self) -> CopyConfig { + use enum_map::enum_map; + CopyConfig { + copy_mapping: enum_map! { + CopySemantics::DefaultCopy => CopySelector::Immix(0), + _ => CopySelector::Unused, + }, + space_mapping: vec![(CopySelector::Immix(0), &self.immix_space)], + constraints: &CONCURRENT_IMMIX_CONSTRAINTS, + } + } + + fn schedule_collection(&'static self, scheduler: &GCWorkScheduler) { + let pause = self.select_collection_kind(); + if pause == Pause::Full { + self.current_pause + .store(Some(Pause::Full), Ordering::SeqCst); + + Self::schedule_immix_full_heap_collection::< + ConcurrentImmix, + ConcurrentImmixSTWGCWorkContext, + ConcurrentImmixSTWGCWorkContext, + >(self, &self.immix_space, scheduler); + } else { + // Set current pause kind + self.current_pause.store(Some(pause), Ordering::SeqCst); + // Schedule work + match pause { + Pause::InitialMark => self.schedule_concurrent_marking_initial_pause(scheduler), + Pause::FinalMark => self.schedule_concurrent_marking_final_pause(scheduler), + _ => unreachable!(), + } + } + } + + fn get_allocator_mapping(&self) -> &'static EnumMap { + &ALLOCATOR_MAPPING + } + + fn prepare(&mut self, tls: VMWorkerThread) { + let pause = self.current_pause().unwrap(); + match pause { + Pause::Full => { + self.common.prepare(tls, true); + self.immix_space.prepare( + true, + Some(crate::policy::immix::defrag::StatsForDefrag::new(self)), + ); + } + Pause::InitialMark => { + // init prepare has to be executed first, otherwise, los objects will not be + // dealt with properly + self.common.initial_pause_prepare(); + self.immix_space.initial_pause_prepare(); + self.common.prepare(tls, true); + self.immix_space.prepare( + true, + Some(crate::policy::immix::defrag::StatsForDefrag::new(self)), + ); + } + Pause::FinalMark => (), + } + } + + fn release(&mut self, tls: VMWorkerThread) { + let pause = self.current_pause().unwrap(); + match pause { + Pause::InitialMark => (), + Pause::Full | Pause::FinalMark => { + self.immix_space.final_pause_release(); + self.common.final_pause_release(); + self.common.release(tls, true); + // release the collected region + self.immix_space.release(true); + } + } + // reset the concurrent marking page counting + self.common() + .base + .global_state + .concurrent_marking_threshold + .store(0, Ordering::Release); + } + + fn end_of_gc(&mut self, _tls: VMWorkerThread) { + self.last_gc_was_defrag + .store(self.immix_space.end_of_gc(), Ordering::Relaxed); + + let pause = self.current_pause().unwrap(); + if pause == Pause::InitialMark { + self.set_concurrent_marking_state(true); + } + self.previous_pause.store(Some(pause), Ordering::SeqCst); + self.current_pause.store(None, Ordering::SeqCst); + info!("{:?} end", pause); + } + + fn current_gc_may_move_object(&self) -> bool { + self.immix_space.in_defrag() + } + + fn get_collection_reserved_pages(&self) -> usize { + self.immix_space.defrag_headroom_pages() + } + + fn get_used_pages(&self) -> usize { + self.immix_space.reserved_pages() + self.common.get_used_pages() + } + + fn base(&self) -> &BasePlan { + &self.common.base + } + + fn base_mut(&mut self) -> &mut BasePlan { + &mut self.common.base + } + + fn common(&self) -> &CommonPlan { + &self.common + } + + fn notify_mutators_paused(&self, _scheduler: &GCWorkScheduler) { + use crate::vm::ActivePlan; + let pause = self.current_pause().unwrap(); + match pause { + Pause::Full => { + self.set_concurrent_marking_state(false); + } + Pause::InitialMark => { + debug_assert!( + !self.concurrent_marking_in_progress(), + "prev pause: {:?}", + self.previous_pause().unwrap() + ); + } + Pause::FinalMark => { + debug_assert!(self.concurrent_marking_in_progress()); + // Flush barrier buffers + for mutator in ::VMActivePlan::mutators() { + mutator.barrier.flush(); + } + self.set_concurrent_marking_state(false); + } + } + info!("{:?} start", pause); + } + + fn concurrent(&self) -> Option<&dyn ConcurrentPlan> { + Some(self) + } +} + +impl ConcurrentImmix { + pub fn new(args: CreateGeneralPlanArgs) -> Self { + use crate::vm::ObjectModel; + + let spec = crate::util::metadata::extract_side_metadata(&[ + *VM::VMObjectModel::GLOBAL_LOG_BIT_SPEC, + ]); + + let plan_args = CreateSpecificPlanArgs { + global_args: args, + constraints: &CONCURRENT_IMMIX_CONSTRAINTS, + global_side_metadata_specs: SideMetadataContext::new_global_specs(&spec), + }; + Self::new_with_args( + plan_args, + ImmixSpaceArgs { + unlog_object_when_traced: false, + #[cfg(feature = "vo_bit")] + mixed_age: false, + never_move_objects: true, + }, + ) + } + + pub fn new_with_args( + mut plan_args: CreateSpecificPlanArgs, + space_args: ImmixSpaceArgs, + ) -> Self { + let immix = ConcurrentImmix { + immix_space: ImmixSpace::new( + plan_args.get_space_args("immix", true, false, VMRequest::discontiguous()), + space_args, + ), + common: CommonPlan::new(plan_args), + last_gc_was_defrag: AtomicBool::new(false), + current_pause: Atomic::new(None), + previous_pause: Atomic::new(None), + gc_cause: Atomic::new(GCCause::Unknown), + concurrent_marking_active: AtomicBool::new(false), + }; + + immix.verify_side_metadata_sanity(); + + immix + } + + /// Schedule a full heap immix collection. This method is used by immix/genimmix/stickyimmix + /// to schedule a full heap collection. A plan must call set_collection_kind and set_gc_status before this method. + pub(crate) fn schedule_immix_full_heap_collection< + PlanType: Plan, + FastContext: GCWorkContext, + DefragContext: GCWorkContext, + >( + plan: &'static DefragContext::PlanType, + immix_space: &ImmixSpace, + scheduler: &GCWorkScheduler, + ) -> bool { + let in_defrag = immix_space.decide_whether_to_defrag( + plan.base().global_state.is_emergency_collection(), + true, + plan.base() + .global_state + .cur_collection_attempts + .load(Ordering::SeqCst), + plan.base().global_state.is_user_triggered_collection(), + *plan.base().options.full_heap_system_gc, + ); + + if in_defrag { + scheduler.schedule_common_work::(plan); + } else { + scheduler.schedule_common_work::(plan); + } + in_defrag + } + + fn select_collection_kind(&self) -> Pause { + let emergency = self.base().global_state.is_emergency_collection(); + let user_triggered = self.base().global_state.is_user_triggered_collection(); + let concurrent_marking_in_progress = self.concurrent_marking_in_progress(); + let concurrent_marking_packets_drained = crate::concurrent_marking_packets_drained(); + + if emergency || user_triggered { + return Pause::Full; + } else if !concurrent_marking_in_progress && concurrent_marking_packets_drained { + return Pause::InitialMark; + } else if concurrent_marking_in_progress && concurrent_marking_packets_drained { + return Pause::FinalMark; + } + + Pause::Full + } + + fn disable_unnecessary_buckets(&'static self, scheduler: &GCWorkScheduler, pause: Pause) { + if pause == Pause::InitialMark { + scheduler.work_buckets[WorkBucketStage::Closure].set_as_disabled(); + scheduler.work_buckets[WorkBucketStage::WeakRefClosure].set_as_disabled(); + scheduler.work_buckets[WorkBucketStage::FinalRefClosure].set_as_disabled(); + scheduler.work_buckets[WorkBucketStage::PhantomRefClosure].set_as_disabled(); + } + scheduler.work_buckets[WorkBucketStage::TPinningClosure].set_as_disabled(); + scheduler.work_buckets[WorkBucketStage::PinningRootsTrace].set_as_disabled(); + scheduler.work_buckets[WorkBucketStage::VMRefClosure].set_as_disabled(); + scheduler.work_buckets[WorkBucketStage::VMRefForwarding].set_as_disabled(); + scheduler.work_buckets[WorkBucketStage::SoftRefClosure].set_as_disabled(); + scheduler.work_buckets[WorkBucketStage::CalculateForwarding].set_as_disabled(); + scheduler.work_buckets[WorkBucketStage::SecondRoots].set_as_disabled(); + scheduler.work_buckets[WorkBucketStage::RefForwarding].set_as_disabled(); + scheduler.work_buckets[WorkBucketStage::FinalizableForwarding].set_as_disabled(); + scheduler.work_buckets[WorkBucketStage::Compact].set_as_disabled(); + } + + pub(crate) fn schedule_concurrent_marking_initial_pause( + &'static self, + scheduler: &GCWorkScheduler, + ) { + use crate::scheduler::gc_work::{Prepare, StopMutators, UnsupportedProcessEdges}; + + self.disable_unnecessary_buckets(scheduler, Pause::InitialMark); + + scheduler.work_buckets[WorkBucketStage::Unconstrained].add_prioritized(Box::new( + StopMutators::>>::new_args( + Pause::InitialMark, + ), + )); + scheduler.work_buckets[WorkBucketStage::Prepare].add(Prepare::< + ConcurrentImmixGCWorkContext>, + >::new(self)); + } + + fn schedule_concurrent_marking_final_pause(&'static self, scheduler: &GCWorkScheduler) { + self.disable_unnecessary_buckets(scheduler, Pause::FinalMark); + + scheduler.work_buckets[WorkBucketStage::Unconstrained].add_prioritized(Box::new( + StopMutators::>>::new_args( + Pause::FinalMark, + ), + )); + + scheduler.work_buckets[WorkBucketStage::Release].add(Release::< + ConcurrentImmixGCWorkContext>, + >::new(self)); + + // Deal with weak ref and finalizers + // TODO: Check against schedule_common_work and see if we are still missing any work packet + type RefProcessingEdges = + crate::scheduler::gc_work::PlanProcessEdges, TRACE_KIND_FAST>; + // Reference processing + if !*self.base().options.no_reference_types { + use crate::util::reference_processor::{ + PhantomRefProcessing, SoftRefProcessing, WeakRefProcessing, + }; + scheduler.work_buckets[WorkBucketStage::SoftRefClosure] + .add(SoftRefProcessing::>::new()); + scheduler.work_buckets[WorkBucketStage::WeakRefClosure] + .add(WeakRefProcessing::::new()); + scheduler.work_buckets[WorkBucketStage::PhantomRefClosure] + .add(PhantomRefProcessing::::new()); + + use crate::util::reference_processor::RefEnqueue; + scheduler.work_buckets[WorkBucketStage::Release].add(RefEnqueue::::new()); + } + + // Finalization + if !*self.base().options.no_finalizer { + use crate::util::finalizable_processor::Finalization; + // finalization + scheduler.work_buckets[WorkBucketStage::FinalRefClosure] + .add(Finalization::>::new()); + } + } + + pub fn concurrent_marking_in_progress(&self) -> bool { + self.concurrent_marking_active.load(Ordering::Acquire) + } + + fn set_concurrent_marking_state(&self, active: bool) { + use crate::plan::global::HasSpaces; + use crate::vm::Collection; + + // Update the binding about concurrent marking + ::VMCollection::set_concurrent_marking_state(active); + + // Tell the spaces to allocate new objects as live + let allocate_object_as_live = active; + self.for_each_space(&mut |space: &dyn Space| { + space.set_allocate_as_live(allocate_object_as_live); + }); + + // Store the state. + self.concurrent_marking_active + .store(active, Ordering::SeqCst); + } + + fn previous_pause(&self) -> Option { + self.previous_pause.load(Ordering::SeqCst) + } +} + +impl ConcurrentPlan for ConcurrentImmix { + fn current_pause(&self) -> Option { + self.current_pause.load(Ordering::SeqCst) + } + + fn concurrent_work_in_progress(&self) -> bool { + self.concurrent_marking_in_progress() + } +} diff --git a/src/plan/concurrent/immix/mod.rs b/src/plan/concurrent/immix/mod.rs new file mode 100644 index 0000000000..0f55961897 --- /dev/null +++ b/src/plan/concurrent/immix/mod.rs @@ -0,0 +1,7 @@ +//! Plan: concurrent immix + +pub(in crate::plan) mod gc_work; +pub(in crate::plan) mod global; +pub(in crate::plan) mod mutator; + +pub use global::ConcurrentImmix; diff --git a/src/plan/concurrent/immix/mutator.rs b/src/plan/concurrent/immix/mutator.rs new file mode 100644 index 0000000000..b8074544fe --- /dev/null +++ b/src/plan/concurrent/immix/mutator.rs @@ -0,0 +1,87 @@ +use crate::plan::barriers::SATBBarrier; +use crate::plan::concurrent::barrier::SATBBarrierSemantics; +use crate::plan::concurrent::immix::ConcurrentImmix; +use crate::plan::mutator_context::create_allocator_mapping; +use crate::plan::mutator_context::create_space_mapping; + +use crate::plan::mutator_context::Mutator; +use crate::plan::mutator_context::MutatorBuilder; +use crate::plan::mutator_context::MutatorConfig; +use crate::plan::mutator_context::ReservedAllocators; +use crate::plan::AllocationSemantics; +use crate::util::alloc::allocators::AllocatorSelector; +use crate::util::alloc::ImmixAllocator; +use crate::util::opaque_pointer::{VMMutatorThread, VMWorkerThread}; +use crate::vm::VMBinding; +use crate::MMTK; +use enum_map::EnumMap; + +pub fn concurrent_immix_mutator_release( + mutator: &mut Mutator, + _tls: VMWorkerThread, +) { + let immix_allocator = unsafe { + mutator + .allocators + .get_allocator_mut(mutator.config.allocator_mapping[AllocationSemantics::Default]) + } + .downcast_mut::>() + .unwrap(); + immix_allocator.reset(); +} + +pub fn concurent_immix_mutator_prepare( + mutator: &mut Mutator, + _tls: VMWorkerThread, +) { + let immix_allocator = unsafe { + mutator + .allocators + .get_allocator_mut(mutator.config.allocator_mapping[AllocationSemantics::Default]) + } + .downcast_mut::>() + .unwrap(); + immix_allocator.reset(); +} + +pub(in crate::plan) const RESERVED_ALLOCATORS: ReservedAllocators = ReservedAllocators { + n_immix: 1, + ..ReservedAllocators::DEFAULT +}; + +lazy_static! { + pub static ref ALLOCATOR_MAPPING: EnumMap = { + let mut map = create_allocator_mapping(RESERVED_ALLOCATORS, true); + map[AllocationSemantics::Default] = AllocatorSelector::Immix(0); + map + }; +} + +pub fn create_concurrent_immix_mutator( + mutator_tls: VMMutatorThread, + mmtk: &'static MMTK, +) -> Mutator { + let immix = mmtk + .get_plan() + .downcast_ref::>() + .unwrap(); + let config = MutatorConfig { + allocator_mapping: &ALLOCATOR_MAPPING, + space_mapping: Box::new({ + let mut vec = create_space_mapping(RESERVED_ALLOCATORS, true, immix); + vec.push((AllocatorSelector::Immix(0), &immix.immix_space)); + vec + }), + + prepare_func: &concurent_immix_mutator_prepare, + release_func: &concurrent_immix_mutator_release, + }; + + let builder = MutatorBuilder::new(mutator_tls, mmtk, config); + builder + .barrier(Box::new(SATBBarrier::new(SATBBarrierSemantics::< + VM, + ConcurrentImmix, + >::new(mmtk)))) + .build() +} diff --git a/src/plan/concurrent/mod.rs b/src/plan/concurrent/mod.rs new file mode 100644 index 0000000000..692ed99d03 --- /dev/null +++ b/src/plan/concurrent/mod.rs @@ -0,0 +1,25 @@ +pub mod barrier; +pub(super) mod concurrent_marking_work; +pub(super) mod global; + +pub mod immix; + +use bytemuck::NoUninit; + +#[repr(u8)] +#[derive(Debug, PartialEq, Eq, Copy, Clone, NoUninit)] +pub enum Pause { + Full = 1, + InitialMark, + FinalMark, +} + +unsafe impl bytemuck::ZeroableInOption for Pause {} + +unsafe impl bytemuck::PodInOption for Pause {} + +impl Default for Pause { + fn default() -> Self { + Self::Full + } +} diff --git a/src/plan/gc_requester.rs b/src/plan/gc_requester.rs index e3a8462f96..944558184d 100644 --- a/src/plan/gc_requester.rs +++ b/src/plan/gc_requester.rs @@ -30,6 +30,7 @@ impl GCRequester { // `GCWorkScheduler::request_schedule_collection` needs to hold a mutex to communicate // with GC workers, which is expensive for functions like `poll`. We use the atomic // flag `request_flag` to elide the need to acquire the mutex in subsequent calls. + probe!(mmtk, gcrequester_request); self.scheduler.request_schedule_collection(); } } diff --git a/src/plan/global.rs b/src/plan/global.rs index 8b6865f42c..0573ee4d82 100644 --- a/src/plan/global.rs +++ b/src/plan/global.rs @@ -58,6 +58,9 @@ pub fn create_mutator( PlanSelector::StickyImmix => { crate::plan::sticky::immix::mutator::create_stickyimmix_mutator(tls, mmtk) } + PlanSelector::ConcurrentImmix => { + crate::plan::concurrent::immix::mutator::create_concurrent_immix_mutator(tls, mmtk) + } PlanSelector::Compressor => { crate::plan::compressor::mutator::create_compressor_mutator(tls, mmtk) } @@ -94,6 +97,10 @@ pub fn create_plan( PlanSelector::StickyImmix => { Box::new(crate::plan::sticky::immix::StickyImmix::new(args)) as Box> } + PlanSelector::ConcurrentImmix => { + Box::new(crate::plan::concurrent::immix::ConcurrentImmix::new(args)) + as Box> + } PlanSelector::Compressor => { Box::new(crate::plan::compressor::Compressor::new(args)) as Box> } @@ -179,6 +186,12 @@ pub trait Plan: 'static + HasSpaces + Sync + Downcast { None } + fn concurrent( + &self, + ) -> Option<&dyn crate::plan::concurrent::global::ConcurrentPlan> { + None + } + /// Get the current run time options. fn options(&self) -> &Options { &self.base().options @@ -188,6 +201,9 @@ pub trait Plan: 'static + HasSpaces + Sync + Downcast { /// This defines what space this plan will allocate objects into for different semantics. fn get_allocator_mapping(&self) -> &'static EnumMap; + /// Called when all mutators are paused. This is called before prepare. + fn notify_mutators_paused(&self, _scheduler: &GCWorkScheduler) {} + /// Prepare the plan before a GC. This is invoked in an initial step in the GC. /// This is invoked once per GC by one worker thread. `tls` is the worker thread that executes this method. fn prepare(&mut self, tls: VMWorkerThread); @@ -203,6 +219,7 @@ pub trait Plan: 'static + HasSpaces + Sync + Downcast { /// Inform the plan about the end of a GC. It is guaranteed that there is no further work for this GC. /// This is invoked once per GC by one worker thread. `tls` is the worker thread that executes this method. + // TODO: This is actually called at the end of a pause/STW, rather than the end of a GC. It should be renamed. fn end_of_gc(&mut self, _tls: VMWorkerThread); /// Notify the plan that an emergency collection will happen. The plan should try to free as much memory as possible. @@ -607,6 +624,14 @@ impl CommonPlan { + self.base.get_used_pages() } + pub fn initial_pause_prepare(&mut self) { + self.los.initial_pause_prepare(); + } + + pub fn final_pause_release(&mut self) { + self.los.final_pause_release(); + } + pub fn prepare(&mut self, tls: VMWorkerThread, full_heap: bool) { self.immortal.prepare(); self.los.prepare(full_heap); diff --git a/src/plan/mod.rs b/src/plan/mod.rs index e86a7fe590..062c5c9cb9 100644 --- a/src/plan/mod.rs +++ b/src/plan/mod.rs @@ -19,6 +19,7 @@ pub use barriers::BarrierSelector; pub(crate) mod gc_requester; mod global; +pub(crate) use concurrent::Pause; pub(crate) use global::create_gc_worker_context; pub(crate) use global::create_mutator; pub(crate) use global::create_plan; @@ -37,7 +38,7 @@ pub use plan_constraints::PlanConstraints; pub(crate) use plan_constraints::DEFAULT_PLAN_CONSTRAINTS; mod tracing; -pub use tracing::{ObjectQueue, ObjectsClosure, VectorObjectQueue, VectorQueue}; +pub use tracing::{ObjectQueue, ObjectsClosure, SlotIterator, VectorObjectQueue, VectorQueue}; /// Generational plans (with a copying nursery) mod generational; @@ -45,6 +46,7 @@ mod generational; mod sticky; mod compressor; +mod concurrent; mod immix; mod markcompact; mod marksweep; diff --git a/src/plan/plan_constraints.rs b/src/plan/plan_constraints.rs index 3110eb7538..6025020c10 100644 --- a/src/plan/plan_constraints.rs +++ b/src/plan/plan_constraints.rs @@ -45,6 +45,7 @@ pub struct PlanConstraints { /// `MutatorConfig::prepare_func`). Those plans can set this to `false` so that the /// `PrepareMutator` work packets will not be created at all. pub needs_prepare_mutator: bool, + pub needs_satb: bool, } impl PlanConstraints { @@ -67,6 +68,7 @@ impl PlanConstraints { barrier: BarrierSelector::NoBarrier, // If we use mark sweep as non moving space, we need to prepare mutator. See [`common_prepare_func`]. needs_prepare_mutator: cfg!(feature = "marksweep_as_nonmoving"), + needs_satb: false, } } } diff --git a/src/plan/tracing.rs b/src/plan/tracing.rs index eecd40cbaf..fac6ff0da8 100644 --- a/src/plan/tracing.rs +++ b/src/plan/tracing.rs @@ -1,10 +1,12 @@ //! This module contains code useful for tracing, //! i.e. visiting the reachable objects by traversing all or part of an object graph. +use std::marker::PhantomData; + use crate::scheduler::gc_work::{ProcessEdgesWork, SlotOf}; use crate::scheduler::{GCWorker, WorkBucketStage, EDGES_WORK_BUFFER_SIZE}; -use crate::util::ObjectReference; -use crate::vm::SlotVisitor; +use crate::util::{ObjectReference, VMThread, VMWorkerThread}; +use crate::vm::{Scanning, SlotVisitor, VMBinding}; /// This trait represents an object queue to enqueue objects during tracing. pub trait ObjectQueue { @@ -63,6 +65,16 @@ impl VectorQueue { } self.buffer.push(v); } + + /// Return the len of the queue + pub fn len(&self) -> usize { + self.buffer.len() + } + + /// Empty the queue + pub fn clear(&mut self) { + self.buffer.clear() + } } impl Default for VectorQueue { @@ -134,3 +146,59 @@ impl Drop for ObjectsClosure<'_, E> { self.flush(); } } + +struct SlotIteratorImpl { + f: F, + // should_discover_references: bool, + // should_claim_clds: bool, + // should_follow_clds: bool, + _p: PhantomData, +} + +impl SlotVisitor for SlotIteratorImpl { + fn visit_slot(&mut self, slot: VM::VMSlot) { + (self.f)(slot); + } +} + +pub struct SlotIterator { + _p: PhantomData, +} + +impl SlotIterator { + pub fn iterate( + o: ObjectReference, + // should_discover_references: bool, + // should_claim_clds: bool, + // should_follow_clds: bool, + f: impl FnMut(VM::VMSlot), + // klass: Option

, + ) { + let mut x = SlotIteratorImpl:: { + f, + // should_discover_references, + // should_claim_clds, + // should_follow_clds, + _p: PhantomData, + }; + // if let Some(klass) = klass { + // >::scan_object_with_klass( + // VMWorkerThread(VMThread::UNINITIALIZED), + // o, + // &mut x, + // klass, + // ); + // } else { + // >::scan_object( + // VMWorkerThread(VMThread::UNINITIALIZED), + // o, + // &mut x, + // ); + // } + >::scan_object( + VMWorkerThread(VMThread::UNINITIALIZED), + o, + &mut x, + ); + } +} diff --git a/src/policy/immix/immixspace.rs b/src/policy/immix/immixspace.rs index a8bcd54fda..7a051483d6 100644 --- a/src/policy/immix/immixspace.rs +++ b/src/policy/immix/immixspace.rs @@ -411,6 +411,24 @@ impl ImmixSpace { &self.scheduler } + pub fn initial_pause_prepare(&mut self) { + // make sure all allocated blocks have unlog bit set during initial mark + if let MetadataSpec::OnSide(side) = *VM::VMObjectModel::GLOBAL_LOG_BIT_SPEC { + for chunk in self.chunk_map.all_chunks() { + side.bset_metadata(chunk.start(), Chunk::BYTES); + } + } + } + + pub fn final_pause_release(&mut self) { + // clear the unlog bit so that during normal mutator phase, stab barrier is effectively disabled (all objects are considered as logged and thus no slow path will be taken) + if let MetadataSpec::OnSide(side) = *VM::VMObjectModel::GLOBAL_LOG_BIT_SPEC { + for chunk in self.chunk_map.all_chunks() { + side.bzero_metadata(chunk.start(), Chunk::BYTES); + } + } + } + pub fn prepare(&mut self, major_gc: bool, plan_stats: Option) { if major_gc { // Update mark_state @@ -572,6 +590,10 @@ impl ImmixSpace { self.chunk_map.set_allocated(block.chunk(), true); self.lines_consumed .fetch_add(Block::LINES, Ordering::SeqCst); + self.common() + .global_state + .concurrent_marking_threshold + .fetch_add(Block::PAGES, Ordering::Relaxed); Some(block) } @@ -598,6 +620,10 @@ impl ImmixSpace { self.lines_consumed.fetch_add(lines_delta, Ordering::SeqCst); block.init(copy); + self.common() + .global_state + .concurrent_marking_threshold + .fetch_add(Block::PAGES, Ordering::Relaxed); return Some(block); } else { return None; diff --git a/src/policy/immix/line.rs b/src/policy/immix/line.rs index 94036ecc65..f48ea7d271 100644 --- a/src/policy/immix/line.rs +++ b/src/policy/immix/line.rs @@ -1,6 +1,8 @@ +use std::ops::Range; + use super::block::Block; use crate::util::linear_scan::{Region, RegionIterator}; -use crate::util::metadata::side_metadata::SideMetadataSpec; +use crate::util::metadata::side_metadata::{address_to_meta_address, SideMetadataSpec}; use crate::{ util::{Address, ObjectReference}, vm::*, @@ -81,4 +83,14 @@ impl Line { } marked_lines } + + pub fn initialize_mark_table_as_marked(lines: Range) { + let meta = VM::VMObjectModel::LOCAL_MARK_BIT_SPEC.extract_side_spec(); + let start: *mut u8 = address_to_meta_address(meta, lines.start.start()).to_mut_ptr(); + let limit: *mut u8 = address_to_meta_address(meta, lines.end.start()).to_mut_ptr(); + unsafe { + let bytes = limit.offset_from(start) as usize; + std::ptr::write_bytes(start, 0xffu8, bytes); + } + } } diff --git a/src/policy/immix/mod.rs b/src/policy/immix/mod.rs index d5895e9470..7243832b3a 100644 --- a/src/policy/immix/mod.rs +++ b/src/policy/immix/mod.rs @@ -16,4 +16,4 @@ pub const BLOCK_ONLY: bool = false; /// Mark lines when scanning objects. /// Otherwise, do it at mark time. -pub const MARK_LINE_AT_SCAN_TIME: bool = true; +pub const MARK_LINE_AT_SCAN_TIME: bool = false; diff --git a/src/policy/largeobjectspace.rs b/src/policy/largeobjectspace.rs index abe7976082..3a60b19dad 100644 --- a/src/policy/largeobjectspace.rs +++ b/src/policy/largeobjectspace.rs @@ -9,6 +9,7 @@ use crate::util::alloc::allocator::AllocationOptions; use crate::util::constants::BYTES_IN_PAGE; use crate::util::heap::{FreeListPageResource, PageResource}; use crate::util::metadata; +use crate::util::object_enum::ClosureObjectEnumerator; use crate::util::object_enum::ObjectEnumerator; use crate::util::opaque_pointer::*; use crate::util::treadmill::TreadMill; @@ -59,6 +60,24 @@ impl SFT for LargeObjectSpace { true } fn initialize_object_metadata(&self, object: ObjectReference, alloc: bool) { + if self.should_allocate_as_live() { + VM::VMObjectModel::LOCAL_LOS_MARK_NURSERY_SPEC.store_atomic::( + object, + self.mark_state, + None, + Ordering::SeqCst, + ); + debug_assert!( + VM::VMObjectModel::GLOBAL_LOG_BIT_SPEC.load_atomic::( + object, + None, + Ordering::Acquire + ) == 0 + ); + + self.treadmill.add_to_treadmill(object, false); + return; + } let old_value = VM::VMObjectModel::LOCAL_LOS_MARK_NURSERY_SPEC.load_atomic::( object, None, @@ -243,9 +262,28 @@ impl LargeObjectSpace { } } + pub fn initial_pause_prepare(&self) { + use crate::util::object_enum::ClosureObjectEnumerator; + + debug_assert!(self.treadmill.is_from_space_empty()); + debug_assert!(self.treadmill.is_nursery_empty()); + debug_assert!(self.common.needs_satb); + let mut enumator = ClosureObjectEnumerator::<_, VM>::new(|object| { + VM::VMObjectModel::GLOBAL_LOG_BIT_SPEC.mark_as_unlogged::(object, Ordering::SeqCst); + }); + self.treadmill.enumerate_objects(&mut enumator); + } + + pub fn final_pause_release(&self) { + let mut enumator = ClosureObjectEnumerator::<_, VM>::new(|object| { + VM::VMObjectModel::GLOBAL_LOG_BIT_SPEC.clear::(object, Ordering::SeqCst); + }); + self.treadmill.enumerate_objects(&mut enumator); + } + pub fn prepare(&mut self, full_heap: bool) { if full_heap { - debug_assert!(self.treadmill.is_from_space_empty()); + // debug_assert!(self.treadmill.is_from_space_empty()); self.mark_state = MARK_BIT - self.mark_state; } self.treadmill.flip(full_heap); @@ -259,6 +297,7 @@ impl LargeObjectSpace { self.sweep_large_pages(false); } } + // Allow nested-if for this function to make it clear that test_and_mark() is only executed // for the outer condition is met. #[allow(clippy::collapsible_if)] @@ -308,7 +347,7 @@ impl LargeObjectSpace { #[cfg(feature = "vo_bit")] crate::util::metadata::vo_bit::unset_vo_bit(object); // Clear log bits for dead objects to prevent a new nursery object having the unlog bit set - if self.common.needs_log_bit { + if self.common.needs_log_bit || self.common.needs_satb { VM::VMObjectModel::GLOBAL_LOG_BIT_SPEC.clear::(object, Ordering::SeqCst); } self.pr @@ -332,6 +371,10 @@ impl LargeObjectSpace { pages: usize, alloc_options: AllocationOptions, ) -> Address { + self.common() + .global_state + .concurrent_marking_threshold + .fetch_add(pages, Ordering::Relaxed); self.acquire(tls, pages, alloc_options) } @@ -391,6 +434,10 @@ impl LargeObjectSpace { ) & NURSERY_BIT == NURSERY_BIT } + + pub fn is_marked(&self, object: ObjectReference) -> bool { + self.test_mark_bit(object, self.mark_state) + } } fn get_super_page(cell: Address) -> Address { diff --git a/src/policy/space.rs b/src/policy/space.rs index e44874fe5b..f6f46e8197 100644 --- a/src/policy/space.rs +++ b/src/policy/space.rs @@ -34,6 +34,7 @@ use crate::util::memory::{self, HugePageSupport, MmapProtection, MmapStrategy}; use crate::vm::VMBinding; use std::marker::PhantomData; +use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::sync::Mutex; @@ -433,6 +434,18 @@ pub trait Space: 'static + SFT + Sync + Downcast { /// the execution time. For LOS, it will be cheaper to enumerate individual objects than /// scanning VO bits because it is sparse. fn enumerate_objects(&self, enumerator: &mut dyn ObjectEnumerator); + + fn set_allocate_as_live(&self, live: bool) { + self.common() + .allocate_as_live + .store(live, std::sync::atomic::Ordering::SeqCst); + } + + fn should_allocate_as_live(&self) -> bool { + self.common() + .allocate_as_live + .load(std::sync::atomic::Ordering::Acquire) + } } /// Print the VM map for a space. @@ -524,6 +537,7 @@ pub struct CommonSpace { /// This field equals to needs_log_bit in the plan constraints. // TODO: This should be a constant for performance. pub needs_log_bit: bool, + pub needs_satb: bool, /// A lock used during acquire() to make sure only one thread can allocate. pub acquire_lock: Mutex<()>, @@ -532,6 +546,8 @@ pub struct CommonSpace { pub global_state: Arc, pub options: Arc, + pub allocate_as_live: AtomicBool, + p: PhantomData, } @@ -594,6 +610,7 @@ impl CommonSpace { vm_map: args.plan_args.vm_map, mmapper: args.plan_args.mmapper, needs_log_bit: args.plan_args.constraints.needs_log_bit, + needs_satb: args.plan_args.constraints.needs_satb, gc_trigger: args.plan_args.gc_trigger, metadata: SideMetadataContext { global: args.plan_args.global_side_metadata_specs, @@ -602,6 +619,7 @@ impl CommonSpace { acquire_lock: Mutex::new(()), global_state: args.plan_args.global_state, options: args.plan_args.options.clone(), + allocate_as_live: AtomicBool::new(false), p: PhantomData, }; diff --git a/src/scheduler/gc_work.rs b/src/scheduler/gc_work.rs index 7e50c86aa3..d7070badd8 100644 --- a/src/scheduler/gc_work.rs +++ b/src/scheduler/gc_work.rs @@ -2,6 +2,7 @@ use super::work_bucket::WorkBucketStage; use super::*; use crate::global_state::GcStatus; use crate::plan::ObjectsClosure; +use crate::plan::Pause; use crate::plan::VectorObjectQueue; use crate::util::*; use crate::vm::slot::Slot; @@ -191,11 +192,24 @@ impl GCWork for ReleaseCollector { /// /// TODO: Smaller work granularity #[derive(Default)] -pub struct StopMutators(PhantomData); +pub struct StopMutators { + pause: Pause, + phantom: PhantomData, +} impl StopMutators { pub fn new() -> Self { - Self(PhantomData) + Self { + pause: Pause::Full, + phantom: PhantomData, + } + } + + pub fn new_args(pause: Pause) -> Self { + Self { + pause, + phantom: PhantomData, + } } } @@ -206,10 +220,17 @@ impl GCWork for StopMutators { ::VMCollection::stop_all_mutators(worker.tls, |mutator| { // TODO: The stack scanning work won't start immediately, as the `Prepare` bucket is not opened yet (the bucket is opened in notify_mutators_paused). // Should we push to Unconstrained instead? - mmtk.scheduler.work_buckets[WorkBucketStage::Prepare] - .add(ScanMutatorRoots::(mutator)); + + if self.pause != Pause::FinalMark { + mmtk.scheduler.work_buckets[WorkBucketStage::Prepare] + .add(ScanMutatorRoots::(mutator)); + } else { + mutator.flush(); + } }); trace!("stop_all_mutators end"); + mmtk.scheduler.set_in_gc_pause(true); + mmtk.get_plan().notify_mutators_paused(&mmtk.scheduler); mmtk.scheduler.notify_mutators_paused(mmtk); mmtk.scheduler.work_buckets[WorkBucketStage::Prepare].add(ScanVMSpecificRoots::::new()); } diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index 33e89be0fe..5006edbe1d 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -17,7 +17,7 @@ pub(crate) use scheduler::GCWorkScheduler; mod stat; mod work_counter; -mod work; +pub(crate) mod work; pub use work::GCWork; pub(crate) use work::GCWorkContext; diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index e849f9df07..7f9a771569 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -14,12 +14,14 @@ use crate::util::options::AffinityKind; use crate::vm::Collection; use crate::vm::VMBinding; use crate::Plan; -use crossbeam::deque::Steal; +use crossbeam::deque::{Injector, Steal}; use enum_map::{Enum, EnumMap}; use std::collections::HashMap; use std::sync::Arc; use std::time::Instant; +type PostponeQueue = Injector>>; + pub struct GCWorkScheduler { /// Work buckets pub work_buckets: EnumMap>, @@ -29,6 +31,12 @@ pub struct GCWorkScheduler { pub(crate) worker_monitor: Arc, /// How to assign the affinity of each GC thread. Specified by the user. affinity: AffinityKind, + + pub(super) postponed_concurrent_work: + spin::RwLock>>>, + pub(super) postponed_concurrent_work_prioritized: + spin::RwLock>>>, + in_gc_pause: std::sync::atomic::AtomicBool, } // FIXME: GCWorkScheduler should be naturally Sync, but we cannot remove this `impl` yet. @@ -47,6 +55,8 @@ impl GCWorkScheduler { WorkBucket::new(active, worker_monitor.clone()) }); + work_buckets[WorkBucketStage::Unconstrained].enable_prioritized_queue(); + // Set the open condition of each bucket. { let first_stw_stage = WorkBucketStage::first_stw_stage(); @@ -75,9 +85,44 @@ impl GCWorkScheduler { worker_group, worker_monitor, affinity, + postponed_concurrent_work: spin::RwLock::new(crossbeam::deque::Injector::new()), + postponed_concurrent_work_prioritized: spin::RwLock::new( + crossbeam::deque::Injector::new(), + ), + in_gc_pause: std::sync::atomic::AtomicBool::new(false), }) } + pub fn postpone(&self, w: impl GCWork) { + self.postponed_concurrent_work.read().push(Box::new(w)) + } + + pub fn postpone_prioritized(&self, w: impl GCWork) { + self.postponed_concurrent_work_prioritized + .read() + .push(Box::new(w)) + } + + pub fn postpone_dyn(&self, w: Box>) { + self.postponed_concurrent_work.read().push(w) + } + + pub fn postpone_dyn_prioritized(&self, w: Box>) { + self.postponed_concurrent_work_prioritized.read().push(w) + } + + pub fn postpone_all(&self, ws: Vec>>) { + let postponed_concurrent_work = self.postponed_concurrent_work.read(); + ws.into_iter() + .for_each(|w| postponed_concurrent_work.push(w)); + } + + pub fn postpone_all_prioritized(&self, ws: Vec>>) { + let postponed_concurrent_work = self.postponed_concurrent_work_prioritized.read(); + ws.into_iter() + .for_each(|w| postponed_concurrent_work.push(w)); + } + pub fn num_workers(&self) -> usize { self.worker_group.as_ref().worker_count() } @@ -132,6 +177,7 @@ impl GCWorkScheduler { /// Add the `ScheduleCollection` packet. Called by the last parked worker. fn add_schedule_collection_packet(&self) { // We are still holding the mutex `WorkerMonitor::sync`. Do not notify now. + probe!(mmtk, add_schedule_collection_packet); self.work_buckets[WorkBucketStage::Unconstrained].add_no_notify(ScheduleCollection); } @@ -289,6 +335,7 @@ impl GCWorkScheduler { self.work_buckets.iter().for_each(|(id, bkt)| { if id != WorkBucketStage::Unconstrained { bkt.deactivate(); + bkt.set_as_enabled(); } }); } @@ -298,6 +345,7 @@ impl GCWorkScheduler { self.work_buckets.iter().for_each(|(id, bkt)| { if id != WorkBucketStage::Unconstrained && id != first_stw_stage { bkt.deactivate(); + bkt.set_as_enabled(); } }); } @@ -330,6 +378,18 @@ impl GCWorkScheduler { } } + pub(super) fn set_in_gc_pause(&self, in_gc_pause: bool) { + self.in_gc_pause + .store(in_gc_pause, std::sync::atomic::Ordering::SeqCst); + for wb in self.work_buckets.values() { + wb.set_in_concurrent(!in_gc_pause); + } + } + + pub fn in_concurrent(&self) -> bool { + !self.in_gc_pause.load(std::sync::atomic::Ordering::SeqCst) + } + /// Get a schedulable work packet without retry. fn poll_schedulable_work_once(&self, worker: &GCWorker) -> Steal>> { let mut should_retry = false; @@ -436,11 +496,20 @@ impl GCWorkScheduler { LastParkedResult::WakeAll } else { // GC finished. - self.on_gc_finished(worker); + let concurrent_work_scheduled = self.on_gc_finished(worker); // Clear the current goal goals.on_current_goal_completed(); - self.respond_to_requests(worker, goals) + + if concurrent_work_scheduled { + // It was the initial mark pause and scheduled concurrent work. + // Wake up all GC workers to do concurrent work. + LastParkedResult::WakeAll + } else { + // It was an STW GC or the final mark pause of a concurrent GC. + // Respond to another goal. + self.respond_to_requests(worker, goals) + } } } WorkerGoal::StopForFork => { @@ -513,7 +582,9 @@ impl GCWorkScheduler { } /// Called when GC has finished, i.e. when all work packets have been executed. - fn on_gc_finished(&self, worker: &GCWorker) { + /// + /// Return `true` if any concurrent work packets have been scheduled. + fn on_gc_finished(&self, worker: &GCWorker) -> bool { // All GC workers must have parked by now. debug_assert!(!self.worker_group.has_designated_work()); debug_assert!(self.all_buckets_empty()); @@ -524,6 +595,8 @@ impl GCWorkScheduler { let mmtk = worker.mmtk; + let (queue, pqueue) = self.schedule_postponed_concurrent_packets(); + // Tell GC trigger that GC ended - this happens before we resume mutators. mmtk.gc_trigger.policy.on_gc_end(mmtk); @@ -574,13 +647,18 @@ impl GCWorkScheduler { // reset the logging info at the end of each GC mmtk.slot_logger.reset(); } - // Reset the triggering information. mmtk.state.reset_collection_trigger(); + self.set_in_gc_pause(false); + let concurrent_work_scheduled = self.schedule_concurrent_packets(queue, pqueue); + self.debug_assert_all_buckets_deactivated(); + // Set to NotInGC after everything, and right before resuming mutators. mmtk.set_gc_status(GcStatus::NotInGC); ::VMCollection::resume_mutators(worker.tls); + + concurrent_work_scheduled } pub fn enable_stat(&self) { @@ -613,4 +691,36 @@ impl GCWorkScheduler { first_stw_bucket.activate(); self.worker_monitor.notify_work_available(true); } + + fn schedule_postponed_concurrent_packets(&self) -> (PostponeQueue, PostponeQueue) { + let queue = std::mem::take(&mut *self.postponed_concurrent_work.write()); + let pqueue = std::mem::take(&mut *self.postponed_concurrent_work_prioritized.write()); + (queue, pqueue) + } + + pub(super) fn schedule_concurrent_packets( + &self, + queue: PostponeQueue, + pqueue: PostponeQueue, + ) -> bool { + // crate::MOVE_CONCURRENT_MARKING_TO_STW.store(false, Ordering::SeqCst); + // crate::PAUSE_CONCURRENT_MARKING.store(false, Ordering::SeqCst); + let mut concurrent_work_scheduled = false; + if !queue.is_empty() { + let old_queue = self.work_buckets[WorkBucketStage::Unconstrained].replace_queue(queue); + debug_assert!(old_queue.is_empty()); + concurrent_work_scheduled = true; + } + if !pqueue.is_empty() { + let old_queue = + self.work_buckets[WorkBucketStage::Unconstrained].replace_queue_prioritized(pqueue); + debug_assert!(old_queue.is_empty()); + concurrent_work_scheduled = true; + } + concurrent_work_scheduled + } + + pub fn wakeup_all_concurrent_workers(&self) { + self.worker_monitor.notify_work_available(true); + } } diff --git a/src/scheduler/work_bucket.rs b/src/scheduler/work_bucket.rs index ab55093bad..1a207f0d9e 100644 --- a/src/scheduler/work_bucket.rs +++ b/src/scheduler/work_bucket.rs @@ -7,34 +7,35 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; struct BucketQueue { - queue: Injector>>, + // queue: Injector>>, + queue: std::sync::RwLock>>>, } impl BucketQueue { fn new() -> Self { Self { - queue: Injector::new(), + queue: std::sync::RwLock::new(Injector::new()), } } fn is_empty(&self) -> bool { - self.queue.is_empty() + self.queue.read().unwrap().is_empty() } fn steal_batch_and_pop( &self, dest: &Worker>>, ) -> Steal>> { - self.queue.steal_batch_and_pop(dest) + self.queue.read().unwrap().steal_batch_and_pop(dest) } fn push(&self, w: Box>) { - self.queue.push(w); + self.queue.read().unwrap().push(w); } fn push_all(&self, ws: Vec>>) { for w in ws { - self.queue.push(w); + self.queue.read().unwrap().push(w); } } } @@ -59,6 +60,8 @@ pub struct WorkBucket { /// recursively, such as ephemerons and Java-style SoftReference and finalizers. Sentinels /// can be used repeatedly to discover and process more such objects. sentinel: Mutex>>>, + in_concurrent: AtomicBool, + disable: AtomicBool, } impl WorkBucket { @@ -70,9 +73,53 @@ impl WorkBucket { monitor, can_open: None, sentinel: Mutex::new(None), + in_concurrent: AtomicBool::new(true), + disable: AtomicBool::new(false), } } + pub fn set_in_concurrent(&self, in_concurrent: bool) { + self.in_concurrent.store(in_concurrent, Ordering::SeqCst); + } + + pub fn set_as_enabled(&self) { + self.disable.store(false, Ordering::SeqCst) + } + + pub fn set_as_disabled(&self) { + self.disable.store(true, Ordering::SeqCst) + } + + pub fn disabled(&self) -> bool { + self.disable.load(Ordering::Relaxed) + } + + pub fn enable_prioritized_queue(&mut self) { + self.prioritized_queue = Some(BucketQueue::new()); + } + + pub fn replace_queue( + &self, + new_queue: Injector>>, + ) -> Injector>> { + let mut queue = self.queue.queue.write().unwrap(); + std::mem::replace::>>>(&mut queue, new_queue) + } + + pub fn replace_queue_prioritized( + &self, + new_queue: Injector>>, + ) -> Injector>> { + let mut queue = self + .prioritized_queue + .as_ref() + .unwrap() + .queue + .write() + .unwrap(); + std::mem::replace::>>>(&mut queue, new_queue) + } + fn notify_one_worker(&self) { // If the bucket is not activated, don't notify anyone. if !self.is_activated() { diff --git a/src/util/address.rs b/src/util/address.rs index c87a5d3abb..45192157d0 100644 --- a/src/util/address.rs +++ b/src/util/address.rs @@ -465,6 +465,7 @@ mod tests { } } +use crate::plan::SlotIterator; use crate::vm::VMBinding; /// `ObjectReference` represents address for an object. Compared with `Address`, operations allowed @@ -699,6 +700,10 @@ impl ObjectReference { pub fn is_sane(self) -> bool { unsafe { SFT_MAP.get_unchecked(self.to_raw_address()) }.is_sane() } + + pub fn iterate_fields(self, f: F) { + SlotIterator::::iterate(self, f) + } } /// allows print Address as upper-case hex value diff --git a/src/util/alloc/immix_allocator.rs b/src/util/alloc/immix_allocator.rs index 807ddded90..597b1db4ff 100644 --- a/src/util/alloc/immix_allocator.rs +++ b/src/util/alloc/immix_allocator.rs @@ -265,6 +265,21 @@ impl ImmixAllocator { // Update the hole-searching cursor to None. Some(end_line) }; + // mark objects if concurrent marking is active + if self.immix_space().should_allocate_as_live() { + let state = self + .space + .line_mark_state + .load(std::sync::atomic::Ordering::Acquire); + + for line in + crate::util::linear_scan::RegionIterator::::new(start_line, end_line) + { + line.mark(state); + } + + Line::initialize_mark_table_as_marked::(start_line..end_line); + } return true; } else { // No more recyclable lines. Set the hole-searching cursor to None. @@ -305,6 +320,20 @@ impl ImmixAllocator { // Bulk clear stale line mark state Line::MARK_TABLE .bzero_metadata(block.start(), crate::policy::immix::block::Block::BYTES); + // mark objects if concurrent marking is active + if self.immix_space().should_allocate_as_live() { + let state = self + .space + .line_mark_state + .load(std::sync::atomic::Ordering::Acquire); + for line in block.lines() { + line.mark(state); + } + + Line::initialize_mark_table_as_marked::( + block.start_line()..block.end_line(), + ); + } if self.request_for_large { self.large_bump_pointer.cursor = block.start(); self.large_bump_pointer.limit = block.end(); diff --git a/src/util/options.rs b/src/util/options.rs index 0beb79db97..8d8ae1160e 100644 --- a/src/util/options.rs +++ b/src/util/options.rs @@ -50,6 +50,8 @@ pub enum PlanSelector { Compressor, /// An Immix collector that uses a sticky mark bit to allow generational behaviors without a copying nursery. StickyImmix, + /// Concurrent non-moving immix using SATB + ConcurrentImmix, } /// MMTk option for perf events diff --git a/src/util/reference_processor.rs b/src/util/reference_processor.rs index f997c16742..1b5acb4bfa 100644 --- a/src/util/reference_processor.rs +++ b/src/util/reference_processor.rs @@ -252,6 +252,11 @@ impl ReferenceProcessor { /// Inform the binding to enqueue the weak references whose referents were cleared in this GC. pub fn enqueue(&self, tls: VMWorkerThread) { + // We will acquire a lock below. If anyone tries to insert new weak refs which will acquire the same lock, a deadlock will occur. + // This does happen for OpenJDK with ConcurrentImmix where a write barrier is triggered during the enqueueing of weak references, + // and the write barrier scans the objects and attempts to add new weak references. + // Disallow new candidates to prevent the deadlock. + self.disallow_new_candidate(); let mut sync = self.sync.lock().unwrap(); // This is the end of a GC. We do some assertions here to make sure our reference tables are correct. diff --git a/src/vm/collection.rs b/src/vm/collection.rs index 16e87eebe0..98121317b9 100644 --- a/src/vm/collection.rs +++ b/src/vm/collection.rs @@ -162,4 +162,7 @@ pub trait Collection { fn create_gc_trigger() -> Box> { unimplemented!() } + + /// Inform the VM of concurrent marking status + fn set_concurrent_marking_state(_active: bool) {} } diff --git a/src/vm/tests/mock_tests/mock_test_allocator_info.rs b/src/vm/tests/mock_tests/mock_test_allocator_info.rs index fc288e8041..a856278e94 100644 --- a/src/vm/tests/mock_tests/mock_test_allocator_info.rs +++ b/src/vm/tests/mock_tests/mock_test_allocator_info.rs @@ -29,6 +29,7 @@ pub fn test_allocator_info() { | PlanSelector::GenImmix | PlanSelector::MarkCompact | PlanSelector::Compressor + | PlanSelector::ConcurrentImmix | PlanSelector::StickyImmix => { // These plans all use bump pointer allocator. let AllocatorInfo::BumpPointer { diff --git a/tools/tracing/timeline/capture.bt b/tools/tracing/timeline/capture.bt index b076f1cd18..5fd3a072aa 100644 --- a/tools/tracing/timeline/capture.bt +++ b/tools/tracing/timeline/capture.bt @@ -124,4 +124,19 @@ usdt:$MMTK:mmtk:sweep_chunk { } } +usdt:$MMTK:mmtk:concurrent_trace_objects { + printf("concurrent_trace_objects,meta,%d,%lu,%lu,%lu,%lu\n", tid, nsecs, arg0, arg1, arg2); +} + +usdt:$MMTK:mmtk:gcrequester_request { + printf("gcrequester_request,i,%d,%lu\n", tid, nsecs); +} + +usdt:$MMTK:mmtk:add_schedule_collection_packet { + printf("add_schedule_collection_packet,i,%d,%lu\n", tid, nsecs); +} + +usdt:$MMTK:mmtk:num_concurrent_tracing_packets_change { + printf("num_concurrent_tracing_packets_change,C,%d,%lu,%lu\n", tid, nsecs, arg0); +} // vim: ft=bpftrace ts=4 sw=4 sts=4 et diff --git a/tools/tracing/timeline/visualize.py b/tools/tracing/timeline/visualize.py index ca45845a44..7ad34df1cf 100755 --- a/tools/tracing/timeline/visualize.py +++ b/tools/tracing/timeline/visualize.py @@ -146,6 +146,15 @@ def enrich_event(self, name, ph, tid, ts, result, args): "stage": int(args[0]), } + case "gcrequester_request": + result["tid"] = 1 + + case "num_concurrent_tracing_packets_change": + result["name"] = "Concurrent tracing packets" + result["args"] |= { + "number": int(args[0]), + } + case _: if self.enrich_event_extra is not None: # Call ``enrich_event_extra`` in the extension script if defined. @@ -241,6 +250,21 @@ def enrich_meta(self, name, tid, ts, gc, wp, args): } } + case "concurrent_trace_objects": + objects = int(args[0]) + next_objects = int(args[1]) + iterations = int(args[2]) + total_objects = objects + next_objects + wp["args"] |= { + # Put args in a group. See comments in "process_slots". + "scan_objects": { + "objects": objects, + "next_objects": next_objects, + "total_objects": total_objects, + "iterations": iterations, + } + } + case "sweep_chunk": wp["args"] |= { "allocated_blocks": int(args[0]),