Skip to content

Commit 844aa0e

Browse files
authored
Generalise weak reference processing for other languages (#700)
This commit adds a language-neutral API to let the VM binding handle weak references and finalisers. Added VMRefClosure and VMRefForwarding work bucket stages to replace the Java-specific stages. Added a "sentinel" mechanism to execute a work packet when a bucket is drained, and prevent the GC from entering the next stage, making it possible to expand the transitive closure multiple times. It replaces GCWorkScheduler::closure_end. Extended the Collection::process_weak_refs method to allow the binding to trace objects during weak reference processing. Renamed Collection::vm_release to Collection::post_forwarding, and it is now executed in a dedicated work packet.
1 parent 541020b commit 844aa0e

File tree

9 files changed

+515
-105
lines changed

9 files changed

+515
-105
lines changed

src/memory_manager.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -900,9 +900,3 @@ pub fn add_work_packets<VM: VMBinding>(
900900
) {
901901
mmtk.scheduler.work_buckets[bucket].bulk_add(packets)
902902
}
903-
904-
/// Add a callback to be notified after the transitive closure is finished.
905-
/// The callback should return true if it add more work packets to the closure bucket.
906-
pub fn on_closure_end<VM: VMBinding>(mmtk: &'static MMTK<VM>, f: Box<dyn Send + Fn() -> bool>) {
907-
mmtk.scheduler.on_closure_end(f)
908-
}

src/plan/markcompact/global.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,10 +121,6 @@ impl<VM: VMBinding> Plan for MarkCompact<VM> {
121121
scheduler.work_buckets[WorkBucketStage::PhantomRefClosure]
122122
.add(PhantomRefProcessing::<MarkingProcessEdges<VM>>::new());
123123

124-
// VM-specific weak ref processing
125-
scheduler.work_buckets[WorkBucketStage::WeakRefClosure]
126-
.add(VMProcessWeakRefs::<MarkingProcessEdges<VM>>::new());
127-
128124
use crate::util::reference_processor::RefForwarding;
129125
scheduler.work_buckets[WorkBucketStage::RefForwarding]
130126
.add(RefForwarding::<ForwardingProcessEdges<VM>>::new());
@@ -147,6 +143,17 @@ impl<VM: VMBinding> Plan for MarkCompact<VM> {
147143
.add(ForwardFinalization::<ForwardingProcessEdges<VM>>::new());
148144
}
149145

146+
// VM-specific weak ref processing
147+
scheduler.work_buckets[WorkBucketStage::VMRefClosure]
148+
.set_sentinel(Box::new(VMProcessWeakRefs::<MarkingProcessEdges<VM>>::new()));
149+
150+
// VM-specific weak ref forwarding
151+
scheduler.work_buckets[WorkBucketStage::VMRefForwarding]
152+
.add(VMForwardWeakRefs::<ForwardingProcessEdges<VM>>::new());
153+
154+
// VM-specific work after forwarding, possible to implement ref enququing.
155+
scheduler.work_buckets[WorkBucketStage::Release].add(VMPostForwarding::<VM>::default());
156+
150157
// Analysis GC work
151158
#[cfg(feature = "analysis")]
152159
{

src/scheduler/gc_work.rs

Lines changed: 188 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,9 @@ impl<C: GCWorkContext> Release<C> {
108108
impl<C: GCWorkContext + 'static> GCWork<C::VM> for Release<C> {
109109
fn do_work(&mut self, worker: &mut GCWorker<C::VM>, mmtk: &'static MMTK<C::VM>) {
110110
trace!("Release Global");
111+
111112
self.plan.base().gc_trigger.policy.on_gc_release(mmtk);
112113

113-
<C::VM as VMBinding>::VMCollection::vm_release();
114114
// We assume this is the only running work packet that accesses plan at the point of execution
115115
#[allow(clippy::cast_ref_to_mut)]
116116
let plan_mut: &mut C::PlanType = unsafe { &mut *(self.plan as *const _ as *mut _) };
@@ -252,24 +252,190 @@ impl<VM: VMBinding> GCWork<VM> for EndOfGC {
252252

253253
impl<VM: VMBinding> CoordinatorWork<VM> for EndOfGC {}
254254

255-
/// Delegate to the VM binding for reference processing.
255+
/// This implements `ObjectTracer` by forwarding the `trace_object` calls to the wrapped
256+
/// `ProcessEdgesWork` instance.
257+
struct ProcessEdgesWorkTracer<E: ProcessEdgesWork> {
258+
process_edges_work: E,
259+
stage: WorkBucketStage,
260+
}
261+
262+
impl<E: ProcessEdgesWork> ObjectTracer for ProcessEdgesWorkTracer<E> {
263+
/// Forward the `trace_object` call to the underlying `ProcessEdgesWork`,
264+
/// and flush as soon as the underlying buffer of `process_edges_work` is full.
265+
///
266+
/// This function is inlined because `trace_object` is probably the hottest function in MMTk.
267+
/// If this function is called in small closures, please profile the program and make sure the
268+
/// closure is inlined, too.
269+
#[inline(always)]
270+
fn trace_object(&mut self, object: ObjectReference) -> ObjectReference {
271+
let result = self.process_edges_work.trace_object(object);
272+
self.flush_if_full();
273+
result
274+
}
275+
}
276+
277+
impl<E: ProcessEdgesWork> ProcessEdgesWorkTracer<E> {
278+
#[inline(always)]
279+
fn flush_if_full(&mut self) {
280+
if self.process_edges_work.nodes.is_full() {
281+
self.flush();
282+
}
283+
}
284+
285+
pub fn flush_if_not_empty(&mut self) {
286+
if !self.process_edges_work.nodes.is_empty() {
287+
self.flush();
288+
}
289+
}
290+
291+
#[cold]
292+
fn flush(&mut self) {
293+
let next_nodes = self.process_edges_work.pop_nodes();
294+
assert!(!next_nodes.is_empty());
295+
let work_packet = self.process_edges_work.create_scan_work(next_nodes, false);
296+
let worker = self.process_edges_work.worker();
297+
worker.scheduler().work_buckets[self.stage].add(work_packet);
298+
}
299+
}
300+
301+
/// This type implements `ObjectTracerContext` by creating a temporary `ProcessEdgesWork` during
302+
/// the call to `with_tracer`, making use of its `trace_object` method. It then creates work
303+
/// packets using the methods of the `ProcessEdgesWork` and add the work packet into the given
304+
/// `stage`.
305+
struct ProcessEdgesWorkTracerContext<E: ProcessEdgesWork> {
306+
stage: WorkBucketStage,
307+
phantom_data: PhantomData<E>,
308+
}
309+
310+
impl<E: ProcessEdgesWork> Clone for ProcessEdgesWorkTracerContext<E> {
311+
fn clone(&self) -> Self {
312+
Self { ..*self }
313+
}
314+
}
315+
316+
impl<E: ProcessEdgesWork> ObjectTracerContext<E::VM> for ProcessEdgesWorkTracerContext<E> {
317+
type TracerType = ProcessEdgesWorkTracer<E>;
318+
319+
fn with_tracer<R, F>(&self, worker: &mut GCWorker<E::VM>, func: F) -> R
320+
where
321+
F: FnOnce(&mut Self::TracerType) -> R,
322+
{
323+
let mmtk = worker.mmtk;
324+
325+
// Prepare the underlying ProcessEdgesWork
326+
let mut process_edges_work = E::new(vec![], false, mmtk);
327+
// FIXME: This line allows us to omit the borrowing lifetime of worker.
328+
// We should refactor ProcessEdgesWork so that it uses `worker` locally, not as a member.
329+
process_edges_work.set_worker(worker);
330+
331+
// Cretae the tracer.
332+
let mut tracer = ProcessEdgesWorkTracer {
333+
process_edges_work,
334+
stage: self.stage,
335+
};
336+
337+
// The caller can use the tracer here.
338+
let result = func(&mut tracer);
339+
340+
// Flush the queued nodes.
341+
tracer.flush_if_not_empty();
342+
343+
result
344+
}
345+
}
346+
347+
/// Delegate to the VM binding for weak reference processing.
256348
///
257349
/// Some VMs (e.g. v8) do not have a Java-like global weak reference storage, and the
258350
/// processing of those weakrefs may be more complex. For such case, we delegate to the
259351
/// VM binding to process weak references.
260-
#[derive(Default)]
261-
pub struct VMProcessWeakRefs<E: ProcessEdgesWork>(PhantomData<E>);
352+
///
353+
/// NOTE: This will replace `{Soft,Weak,Phantom}RefProcessing` and `Finalization` in the future.
354+
pub struct VMProcessWeakRefs<E: ProcessEdgesWork> {
355+
phantom_data: PhantomData<E>,
356+
}
262357

263358
impl<E: ProcessEdgesWork> VMProcessWeakRefs<E> {
264359
pub fn new() -> Self {
265-
Self(PhantomData)
360+
Self {
361+
phantom_data: PhantomData,
362+
}
266363
}
267364
}
268365

269366
impl<E: ProcessEdgesWork> GCWork<E::VM> for VMProcessWeakRefs<E> {
270367
fn do_work(&mut self, worker: &mut GCWorker<E::VM>, _mmtk: &'static MMTK<E::VM>) {
271-
trace!("ProcessWeakRefs");
272-
<E::VM as VMBinding>::VMCollection::process_weak_refs(worker); // TODO: Pass a factory/callback to decide what work packet to create.
368+
trace!("VMProcessWeakRefs");
369+
370+
let stage = WorkBucketStage::VMRefClosure;
371+
372+
let need_to_repeat = {
373+
let tracer_factory = ProcessEdgesWorkTracerContext::<E> {
374+
stage,
375+
phantom_data: PhantomData,
376+
};
377+
<E::VM as VMBinding>::VMScanning::process_weak_refs(worker, tracer_factory)
378+
};
379+
380+
if need_to_repeat {
381+
// Schedule Self as the new sentinel so we'll call `process_weak_refs` again after the
382+
// current transitive closure.
383+
let new_self = Box::new(Self::new());
384+
385+
worker.scheduler().work_buckets[stage].set_sentinel(new_self);
386+
}
387+
}
388+
}
389+
390+
/// Delegate to the VM binding for forwarding weak references.
391+
///
392+
/// Some VMs (e.g. v8) do not have a Java-like global weak reference storage, and the
393+
/// processing of those weakrefs may be more complex. For such case, we delegate to the
394+
/// VM binding to process weak references.
395+
///
396+
/// NOTE: This will replace `RefForwarding` and `ForwardFinalization` in the future.
397+
pub struct VMForwardWeakRefs<E: ProcessEdgesWork> {
398+
phantom_data: PhantomData<E>,
399+
}
400+
401+
impl<E: ProcessEdgesWork> VMForwardWeakRefs<E> {
402+
pub fn new() -> Self {
403+
Self {
404+
phantom_data: PhantomData,
405+
}
406+
}
407+
}
408+
409+
impl<E: ProcessEdgesWork> GCWork<E::VM> for VMForwardWeakRefs<E> {
410+
fn do_work(&mut self, worker: &mut GCWorker<E::VM>, _mmtk: &'static MMTK<E::VM>) {
411+
trace!("VMForwardWeakRefs");
412+
413+
let stage = WorkBucketStage::VMRefForwarding;
414+
415+
let tracer_factory = ProcessEdgesWorkTracerContext::<E> {
416+
stage,
417+
phantom_data: PhantomData,
418+
};
419+
<E::VM as VMBinding>::VMScanning::forward_weak_refs(worker, tracer_factory)
420+
}
421+
}
422+
423+
/// This work packet calls `Collection::post_forwarding`.
424+
///
425+
/// NOTE: This will replace `RefEnqueue` in the future.
426+
///
427+
/// NOTE: Although this work packet runs in parallel with the `Release` work packet, it does not
428+
/// access the `Plan` instance.
429+
#[derive(Default)]
430+
pub struct VMPostForwarding<VM: VMBinding> {
431+
phantom_data: PhantomData<VM>,
432+
}
433+
434+
impl<VM: VMBinding> GCWork<VM> for VMPostForwarding<VM> {
435+
fn do_work(&mut self, worker: &mut GCWorker<VM>, _mmtk: &'static MMTK<VM>) {
436+
trace!("VMPostForwarding start");
437+
<VM as VMBinding>::VMCollection::post_forwarding(worker.tls);
438+
trace!("VMPostForwarding end");
273439
}
274440
}
275441

@@ -678,38 +844,22 @@ pub trait ScanObjectsWork<VM: VMBinding>: GCWork<VM> + Sized {
678844

679845
// If any object does not support edge-enqueuing, we process them now.
680846
if !scan_later.is_empty() {
681-
// We create an instance of E to use its `trace_object` method and its object queue.
682-
let mut process_edges_work = Self::E::new(vec![], false, mmtk);
683-
let mut closure = |object| process_edges_work.trace_object(object);
684-
685-
// Scan objects and trace their edges at the same time.
686-
for object in scan_later.iter().copied() {
687-
<VM as VMBinding>::VMScanning::scan_object_and_trace_edges(
688-
tls,
689-
object,
690-
&mut closure,
691-
);
692-
self.post_scan_object(object);
693-
}
694-
695-
// Create work packets to scan adjacent objects. We skip ProcessEdgesWork and create
696-
// object-scanning packets directly, because the edges are already traced.
697-
if !process_edges_work.nodes.is_empty() {
698-
let next_nodes = process_edges_work.nodes.take();
699-
let make_packet = |nodes| {
700-
let work_packet = self.make_another(nodes);
701-
memory_manager::add_work_packet(mmtk, WorkBucketStage::Closure, work_packet);
702-
};
703-
704-
// Divide the resulting nodes into appropriately sized packets.
705-
if next_nodes.len() <= Self::E::CAPACITY {
706-
make_packet(next_nodes);
707-
} else {
708-
for chunk in next_nodes.chunks(Self::E::CAPACITY) {
709-
make_packet(chunk.into());
710-
}
847+
let object_tracer_context = ProcessEdgesWorkTracerContext::<Self::E> {
848+
stage: WorkBucketStage::Closure,
849+
phantom_data: PhantomData,
850+
};
851+
852+
object_tracer_context.with_tracer(worker, |object_tracer| {
853+
// Scan objects and trace their edges at the same time.
854+
for object in scan_later.iter().copied() {
855+
<VM as VMBinding>::VMScanning::scan_object_and_trace_edges(
856+
tls,
857+
object,
858+
object_tracer,
859+
);
860+
self.post_scan_object(object);
711861
}
712-
}
862+
});
713863
}
714864
}
715865
}

0 commit comments

Comments
 (0)