Skip to content

Commit 17c06e7

Browse files
committed
refactor(parallel): extract apply_to_mutable for reusability
Moves the parallel batch application logic (prepare/split/merge/post-process) from create_proposal into a new public method apply_to_mutable. This enables other code paths to reuse the parallel insertion logic without requiring a Parentable nodestore or producing an ImmutableProposal. No behavior change - create_proposal now delegates to apply_to_mutable.
1 parent 7f49d32 commit 17c06e7

File tree

1 file changed

+121
-102
lines changed

1 file changed

+121
-102
lines changed

firewood/src/merkle/parallel.rs

Lines changed: 121 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,125 @@ pub struct ParallelMerkle {
6464
}
6565

6666
impl ParallelMerkle {
67+
/// Applies a batch of operations to an existing mutable nodestore using the parallel
68+
/// proposal pipeline (prepare, split, merge, post-process).
69+
///
70+
/// # Errors
71+
///
72+
/// Returns a `CreateProposalError::FileIoError` if it encounters an error fetching nodes
73+
/// from storage, a `CreateProposalError::SendError` if it is unable to send messages to
74+
/// the workers, and a `CreateProposalError::InvalidConversionToPathComponent` if it is
75+
/// unable to convert a u8 index into a path component.
76+
pub fn apply_to_mutable(
77+
&mut self,
78+
mut mutable_nodestore: NodeStore<MutableProposal, FileBacked>,
79+
batch: impl IntoBatchIter,
80+
pool: &ThreadPool,
81+
) -> Result<NodeStore<MutableProposal, FileBacked>, CreateProposalError> {
82+
// Prepare step: Force the root into a branch with no partial path in preparation for
83+
// performing parallel modifications to the trie.
84+
let mut root_branch = self.force_root(&mut mutable_nodestore)?;
85+
86+
// Create a response channel the workers use to send messages back to the coordinator (us)
87+
let (response_sender, response_receiver) = mpsc::channel();
88+
89+
// Split step: for each operation in the batch, send a request to the worker that is
90+
// responsible for the sub-trie corresponding to the operation's first nibble.
91+
for res in batch.into_batch_iter::<CreateProposalError>() {
92+
let op = res?;
93+
// Get the first nibble of the key to determine which worker to send the request to.
94+
//
95+
// Need to handle an empty key. Since the partial_path of the root must be empty, an
96+
// empty key should always be for the root node. There are 3 cases to consider.
97+
//
98+
// Insert: The main thread modifies the value of the root.
99+
//
100+
// Remove: The main thread removes any value at the root. However, it should not delete
101+
// the root node, which, if necessary later, will be done in post processing.
102+
//
103+
// Remove Prefix:
104+
// For a remove prefix, we would need to remove everything. We do this by sending
105+
// a remove prefix with an empty prefix to all of the children, then removing the
106+
// value of the root node.
107+
let mut key_nibbles = NibblesIterator::new(op.key().as_ref());
108+
let Some(first_path_component) = key_nibbles.next() else {
109+
match &op {
110+
BatchOp::Put { key: _, value } => {
111+
root_branch.value = Some(value.as_ref().into());
112+
}
113+
BatchOp::Delete { key: _ } => {
114+
root_branch.value = None;
115+
}
116+
BatchOp::DeleteRange { prefix: _ } => {
117+
// Calling remove prefix with an empty prefix is equivalent to a remove all.
118+
if let Err(err) = self.remove_all_entries(&mut root_branch) {
119+
// A send error is most likely due to a worker returning to the thread pool
120+
// after it encountered a FileIoError. Try to find the FileIoError in the
121+
// response channel and return that instead.
122+
ParallelMerkle::find_fileio_error(&response_receiver)?;
123+
return Err(err.into());
124+
}
125+
}
126+
}
127+
continue; // Done with this empty key operation.
128+
};
129+
130+
// Verify that the worker index taken from the first nibble is valid.
131+
let first_path_component = PathComponent::try_new(first_path_component)
132+
.ok_or(CreateProposalError::InvalidConversionToPathComponent)?;
133+
134+
// Get the worker that is responsible for this nibble. The worker will be created if it
135+
// doesn't already exist.
136+
let worker = self.worker(
137+
pool,
138+
&mut mutable_nodestore,
139+
&mut root_branch,
140+
first_path_component,
141+
response_sender.clone(),
142+
)?;
143+
144+
// Send the current operation to the worker.
145+
// TODO: Currently the key from the BatchOp is copied to a Box<[u8]> before it is sent
146+
// to the worker. It may be possible to send a nibble iterator instead of a
147+
// Box<[u8]> to the worker if we use rayon scoped threads. This change would
148+
// eliminate a memory copy but may require some code refactoring.
149+
if let Err(err) = match &op {
150+
BatchOp::Put { key: _, value } => worker.send(BatchOp::Put {
151+
key: op.key().as_ref().into(),
152+
value: value.as_ref().into(),
153+
}),
154+
BatchOp::Delete { key: _ } => worker.send(BatchOp::Delete {
155+
key: op.key().as_ref().into(),
156+
}),
157+
BatchOp::DeleteRange { prefix: _ } => worker.send(BatchOp::DeleteRange {
158+
prefix: op.key().as_ref().into(),
159+
}),
160+
} {
161+
// A send error is most likely due to a worker returning to the thread pool
162+
// after it encountered a FileIoError. Try to find the FileIoError in the
163+
// response channel and return that instead.
164+
ParallelMerkle::find_fileio_error(&response_receiver)?;
165+
return Err(err.into());
166+
}
167+
}
168+
169+
// Drop the sender response channel from the parent thread.
170+
drop(response_sender);
171+
172+
// Setting the workers to default will close the senders to the workers. This will cause the
173+
// workers to send back their responses.
174+
self.workers = Children::default();
175+
176+
// Merge step: Collect the results from the workers and merge them as children to the root.
177+
self.merge_children(response_receiver, &mut mutable_nodestore, &mut root_branch)?;
178+
179+
// Post-process step: return the trie to its canonical form.
180+
*mutable_nodestore.root_mut() =
181+
self.postprocess_trie(&mut mutable_nodestore, root_branch)?;
182+
183+
Ok(mutable_nodestore)
184+
}
185+
67186
/// Force the root (if necessary) into a branch with no partial path to allow the clean
68187
/// separation of the trie into an array of subtries that can be operated on independently
69188
/// by the worker threads.
@@ -383,108 +502,8 @@ impl ParallelMerkle {
383502
pool: &ThreadPool,
384503
) -> Result<Arc<NodeStore<Arc<ImmutableProposal>, FileBacked>>, CreateProposalError> {
385504
// Create a mutable nodestore from the parent
386-
let mut mutable_nodestore = NodeStore::new(parent)?;
387-
388-
// Prepare step: Force the root into a branch with no partial path in preparation for
389-
// performing parallel modifications to the trie.
390-
let mut root_branch = self.force_root(&mut mutable_nodestore)?;
391-
392-
// Create a response channel the workers use to send messages back to the coordinator (us)
393-
let (response_sender, response_receiver) = mpsc::channel();
394-
395-
// Split step: for each operation in the batch, send a request to the worker that is
396-
// responsible for the sub-trie corresponding to the operation's first nibble.
397-
for res in batch.into_batch_iter::<CreateProposalError>() {
398-
let op = res?;
399-
// Get the first nibble of the key to determine which worker to send the request to.
400-
//
401-
// Need to handle an empty key. Since the partial_path of the root must be empty, an
402-
// empty key should always be for the root node. There are 3 cases to consider.
403-
//
404-
// Insert: The main thread modifies the value of the root.
405-
//
406-
// Remove: The main thread removes any value at the root. However, it should not delete
407-
// the root node, which, if necessary later, will be done in post processing.
408-
//
409-
// Remove Prefix:
410-
// For a remove prefix, we would need to remove everything. We do this by sending
411-
// a remove prefix with an empty prefix to all of the children, then removing the
412-
// value of the root node.
413-
let mut key_nibbles = NibblesIterator::new(op.key().as_ref());
414-
let Some(first_path_component) = key_nibbles.next() else {
415-
match &op {
416-
BatchOp::Put { key: _, value } => {
417-
root_branch.value = Some(value.as_ref().into());
418-
}
419-
BatchOp::Delete { key: _ } => {
420-
root_branch.value = None;
421-
}
422-
BatchOp::DeleteRange { prefix: _ } => {
423-
// Calling remove prefix with an empty prefix is equivalent to a remove all.
424-
if let Err(err) = self.remove_all_entries(&mut root_branch) {
425-
// A send error is most likely due to a worker returning to the thread pool
426-
// after it encountered a FileIoError. Try to find the FileIoError in the
427-
// response channel and return that instead.
428-
ParallelMerkle::find_fileio_error(&response_receiver)?;
429-
return Err(err.into());
430-
}
431-
}
432-
}
433-
continue; // Done with this empty key operation.
434-
};
435-
436-
// Verify that the worker index taken from the first nibble is valid.
437-
let first_path_component = PathComponent::try_new(first_path_component)
438-
.ok_or(CreateProposalError::InvalidConversionToPathComponent)?;
439-
440-
// Get the worker that is responsible for this nibble. The worker will be created if it
441-
// doesn't already exist.
442-
let worker = self.worker(
443-
pool,
444-
&mut mutable_nodestore,
445-
&mut root_branch,
446-
first_path_component,
447-
response_sender.clone(),
448-
)?;
449-
450-
// Send the current operation to the worker.
451-
// TODO: Currently the key from the BatchOp is copied to a Box<[u8]> before it is sent
452-
// to the worker. It may be possible to send a nibble iterator instead of a
453-
// Box<[u8]> to the worker if we use rayon scoped threads. This change would
454-
// eliminate a memory copy but may require some code refactoring.
455-
if let Err(err) = match &op {
456-
BatchOp::Put { key: _, value } => worker.send(BatchOp::Put {
457-
key: op.key().as_ref().into(),
458-
value: value.as_ref().into(),
459-
}),
460-
BatchOp::Delete { key: _ } => worker.send(BatchOp::Delete {
461-
key: op.key().as_ref().into(),
462-
}),
463-
BatchOp::DeleteRange { prefix: _ } => worker.send(BatchOp::DeleteRange {
464-
prefix: op.key().as_ref().into(),
465-
}),
466-
} {
467-
// A send error is most likely due to a worker returning to the thread pool
468-
// after it encountered a FileIoError. Try to find the FileIoError in the
469-
// response channel and return that instead.
470-
ParallelMerkle::find_fileio_error(&response_receiver)?;
471-
return Err(err.into());
472-
}
473-
}
474-
475-
// Drop the sender response channel from the parent thread.
476-
drop(response_sender);
477-
478-
// Setting the workers to default will close the senders to the workers. This will cause the
479-
// workers to send back their responses.
480-
self.workers = Children::default();
481-
482-
// Merge step: Collect the results from the workers and merge them as children to the root.
483-
self.merge_children(response_receiver, &mut mutable_nodestore, &mut root_branch)?;
484-
485-
// Post-process step: return the trie to its canonical form.
486-
*mutable_nodestore.root_mut() =
487-
self.postprocess_trie(&mut mutable_nodestore, root_branch)?;
505+
let mutable_nodestore = NodeStore::new(parent)?;
506+
let mutable_nodestore = self.apply_to_mutable(mutable_nodestore, batch, pool)?;
488507

489508
let immutable: Arc<NodeStore<Arc<ImmutableProposal>, FileBacked>> =
490509
Arc::new(mutable_nodestore.try_into()?);

0 commit comments

Comments
 (0)