Skip to content

Commit 5064af7

Browse files
peterzhu2118nobu
authored andcommitted
[ruby/mmtk] Process obj_free candidates in parallel
Redos commit 544770d which seems to have accidentally been undone in b27d935.
1 parent a0c483f commit 5064af7

File tree

4 files changed

+127
-44
lines changed

4 files changed

+127
-44
lines changed

gc/mmtk/mmtk.c

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -747,6 +747,27 @@ rb_mmtk_alloc_fast_path(struct objspace *objspace, struct MMTk_ractor_cache *rac
747747
}
748748
}
749749

750+
static bool
751+
obj_can_parallel_free_p(VALUE obj)
752+
{
753+
switch (RB_BUILTIN_TYPE(obj)) {
754+
case T_ARRAY:
755+
case T_BIGNUM:
756+
case T_COMPLEX:
757+
case T_FLOAT:
758+
case T_HASH:
759+
case T_OBJECT:
760+
case T_RATIONAL:
761+
case T_REGEXP:
762+
case T_STRING:
763+
case T_STRUCT:
764+
case T_SYMBOL:
765+
return true;
766+
default:
767+
return false;
768+
}
769+
}
770+
750771
VALUE
751772
rb_gc_impl_new_obj(void *objspace_ptr, void *cache_ptr, VALUE klass, VALUE flags, bool wb_protected, size_t alloc_size)
752773
{
@@ -783,7 +804,7 @@ rb_gc_impl_new_obj(void *objspace_ptr, void *cache_ptr, VALUE klass, VALUE flags
783804
mmtk_post_alloc(ractor_cache->mutator, (void*)alloc_obj, alloc_size, MMTK_ALLOCATION_SEMANTICS_DEFAULT);
784805

785806
// TODO: only add when object needs obj_free to be called
786-
mmtk_add_obj_free_candidate(alloc_obj);
807+
mmtk_add_obj_free_candidate(alloc_obj, obj_can_parallel_free_p((VALUE)alloc_obj));
787808

788809
objspace->total_allocated_objects++;
789810

gc/mmtk/mmtk.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ void mmtk_post_alloc(MMTk_Mutator *mutator,
123123
size_t bytes,
124124
MMTk_AllocationSemantics semantics);
125125

126-
void mmtk_add_obj_free_candidate(MMTk_ObjectReference object);
126+
void mmtk_add_obj_free_candidate(MMTk_ObjectReference object, bool can_parallel_free);
127127

128128
void mmtk_declare_weak_references(MMTk_ObjectReference object);
129129

gc/mmtk/src/api.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,10 @@ pub unsafe extern "C" fn mmtk_init_binding(
198198
let mmtk_boxed = mmtk_init(&builder);
199199
let mmtk_static = Box::leak(Box::new(mmtk_boxed));
200200

201-
let binding = RubyBinding::new(mmtk_static, &binding_options, upcalls);
201+
let mut binding = RubyBinding::new(mmtk_static, &binding_options, upcalls);
202+
binding
203+
.weak_proc
204+
.init_parallel_obj_free_candidates(memory_manager::num_of_workers(binding.mmtk));
202205

203206
crate::BINDING
204207
.set(binding)
@@ -296,8 +299,10 @@ pub unsafe extern "C" fn mmtk_post_alloc(
296299

297300
// TODO: Replace with buffered mmtk_add_obj_free_candidates
298301
#[no_mangle]
299-
pub extern "C" fn mmtk_add_obj_free_candidate(object: ObjectReference) {
300-
binding().weak_proc.add_obj_free_candidate(object)
302+
pub extern "C" fn mmtk_add_obj_free_candidate(object: ObjectReference, can_parallel_free: bool) {
303+
binding()
304+
.weak_proc
305+
.add_obj_free_candidate(object, can_parallel_free)
301306
}
302307

303308
// =============== Weak references ===============

gc/mmtk/src/weak_proc.rs

Lines changed: 96 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::sync::atomic::AtomicUsize;
2+
use std::sync::atomic::Ordering;
13
use std::sync::Mutex;
24

35
use mmtk::scheduler::GCWork;
@@ -11,10 +13,13 @@ use crate::upcalls;
1113
use crate::Ruby;
1214

1315
pub struct WeakProcessor {
16+
non_parallel_obj_free_candidates: Mutex<Vec<ObjectReference>>,
17+
parallel_obj_free_candidates: Vec<Mutex<Vec<ObjectReference>>>,
18+
parallel_obj_free_candidates_counter: AtomicUsize,
19+
1420
/// Objects that needs `obj_free` called when dying.
1521
/// If it is a bottleneck, replace it with a lock-free data structure,
1622
/// or add candidates in batch.
17-
obj_free_candidates: Mutex<Vec<ObjectReference>>,
1823
weak_references: Mutex<Vec<ObjectReference>>,
1924
}
2025

@@ -27,32 +32,59 @@ impl Default for WeakProcessor {
2732
impl WeakProcessor {
2833
pub fn new() -> Self {
2934
Self {
30-
obj_free_candidates: Mutex::new(Vec::new()),
35+
non_parallel_obj_free_candidates: Mutex::new(Vec::new()),
36+
parallel_obj_free_candidates: vec![Mutex::new(Vec::new())],
37+
parallel_obj_free_candidates_counter: AtomicUsize::new(0),
3138
weak_references: Mutex::new(Vec::new()),
3239
}
3340
}
3441

35-
/// Add an object as a candidate for `obj_free`.
36-
///
37-
/// Multiple mutators can call it concurrently, so it has `&self`.
38-
pub fn add_obj_free_candidate(&self, object: ObjectReference) {
39-
let mut obj_free_candidates = self.obj_free_candidates.lock().unwrap();
40-
obj_free_candidates.push(object);
42+
pub fn init_parallel_obj_free_candidates(&mut self, num_workers: usize) {
43+
debug_assert_eq!(self.parallel_obj_free_candidates.len(), 1);
44+
45+
for _ in 1..num_workers {
46+
self.parallel_obj_free_candidates
47+
.push(Mutex::new(Vec::new()));
48+
}
4149
}
4250

43-
/// Add many objects as candidates for `obj_free`.
51+
/// Add an object as a candidate for `obj_free`.
4452
///
4553
/// Multiple mutators can call it concurrently, so it has `&self`.
46-
pub fn add_obj_free_candidates(&self, objects: &[ObjectReference]) {
47-
let mut obj_free_candidates = self.obj_free_candidates.lock().unwrap();
48-
for object in objects.iter().copied() {
49-
obj_free_candidates.push(object);
54+
pub fn add_obj_free_candidate(&self, object: ObjectReference, can_parallel_free: bool) {
55+
if can_parallel_free {
56+
// Newly allocated objects are placed in parallel_obj_free_candidates using
57+
// round-robin. This may not be ideal for load balancing.
58+
let idx = self
59+
.parallel_obj_free_candidates_counter
60+
.fetch_add(1, Ordering::Relaxed)
61+
% self.parallel_obj_free_candidates.len();
62+
63+
self.parallel_obj_free_candidates[idx]
64+
.lock()
65+
.unwrap()
66+
.push(object);
67+
} else {
68+
self.non_parallel_obj_free_candidates
69+
.lock()
70+
.unwrap()
71+
.push(object);
5072
}
5173
}
5274

5375
pub fn get_all_obj_free_candidates(&self) -> Vec<ObjectReference> {
54-
let mut obj_free_candidates = self.obj_free_candidates.lock().unwrap();
55-
std::mem::take(obj_free_candidates.as_mut())
76+
// let mut obj_free_candidates = self.obj_free_candidates.lock().unwrap();
77+
let mut all_obj_free_candidates = self
78+
.non_parallel_obj_free_candidates
79+
.lock()
80+
.unwrap()
81+
.to_vec();
82+
83+
for candidates_mutex in &self.parallel_obj_free_candidates {
84+
all_obj_free_candidates.extend(candidates_mutex.lock().unwrap().to_vec());
85+
}
86+
87+
std::mem::take(all_obj_free_candidates.as_mut())
5688
}
5789

5890
pub fn add_weak_reference(&self, object: ObjectReference) {
@@ -65,7 +97,18 @@ impl WeakProcessor {
6597
worker: &mut GCWorker<Ruby>,
6698
_tracer_context: impl ObjectTracerContext<Ruby>,
6799
) {
68-
worker.add_work(WorkBucketStage::VMRefClosure, ProcessObjFreeCandidates);
100+
worker.add_work(
101+
WorkBucketStage::VMRefClosure,
102+
ProcessNonParallelObjFreeCanadidates {},
103+
);
104+
105+
for index in 0..self.parallel_obj_free_candidates.len() {
106+
worker.add_work(
107+
WorkBucketStage::VMRefClosure,
108+
ProcessParallelObjFreeCandidates { index },
109+
);
110+
}
111+
69112
worker.add_work(WorkBucketStage::VMRefClosure, ProcessWeakReferences);
70113

71114
worker.add_work(WorkBucketStage::Prepare, UpdateFinalizerObjIdTables);
@@ -82,36 +125,50 @@ impl WeakProcessor {
82125
}
83126
}
84127

85-
struct ProcessObjFreeCandidates;
128+
fn process_obj_free_candidates(obj_free_candidates: &mut Vec<ObjectReference>) {
129+
// Process obj_free
130+
let mut new_candidates = Vec::new();
131+
132+
for object in obj_free_candidates.iter().copied() {
133+
if object.is_reachable() {
134+
// Forward and add back to the candidate list.
135+
let new_object = object.forward();
136+
trace!("Forwarding obj_free candidate: {object} -> {new_object}");
137+
new_candidates.push(new_object);
138+
} else {
139+
(upcalls().call_obj_free)(object);
140+
}
141+
}
142+
143+
*obj_free_candidates = new_candidates;
144+
}
145+
146+
struct ProcessParallelObjFreeCandidates {
147+
index: usize,
148+
}
86149

87-
impl GCWork<Ruby> for ProcessObjFreeCandidates {
150+
impl GCWork<Ruby> for ProcessParallelObjFreeCandidates {
88151
fn do_work(&mut self, _worker: &mut GCWorker<Ruby>, _mmtk: &'static mmtk::MMTK<Ruby>) {
89-
// If it blocks, it is a bug.
90-
let mut obj_free_candidates = crate::binding()
91-
.weak_proc
92-
.obj_free_candidates
152+
let mut obj_free_candidates = crate::binding().weak_proc.parallel_obj_free_candidates
153+
[self.index]
93154
.try_lock()
94-
.expect("It's GC time. No mutators should hold this lock at this time.");
95-
96-
let n_cands = obj_free_candidates.len();
155+
.expect("Lock for parallel_obj_free_candidates should not be held");
97156

98-
debug!("Total: {n_cands} candidates");
157+
process_obj_free_candidates(&mut obj_free_candidates);
158+
}
159+
}
99160

100-
// Process obj_free
101-
let mut new_candidates = Vec::new();
161+
struct ProcessNonParallelObjFreeCanadidates;
102162

103-
for object in obj_free_candidates.iter().copied() {
104-
if object.is_reachable() {
105-
// Forward and add back to the candidate list.
106-
let new_object = object.forward();
107-
trace!("Forwarding obj_free candidate: {object} -> {new_object}");
108-
new_candidates.push(new_object);
109-
} else {
110-
(upcalls().call_obj_free)(object);
111-
}
112-
}
163+
impl GCWork<Ruby> for ProcessNonParallelObjFreeCanadidates {
164+
fn do_work(&mut self, _worker: &mut GCWorker<Ruby>, _mmtk: &'static mmtk::MMTK<Ruby>) {
165+
let mut obj_free_candidates = crate::binding()
166+
.weak_proc
167+
.non_parallel_obj_free_candidates
168+
.try_lock()
169+
.expect("Lock for non_parallel_obj_free_candidates should not be held");
113170

114-
*obj_free_candidates = new_candidates;
171+
process_obj_free_candidates(&mut obj_free_candidates);
115172
}
116173
}
117174

0 commit comments

Comments
 (0)