Skip to content

Commit c319992

Browse files
committed
fix unstable split
Signed-off-by: Teo Koon Peng <[email protected]>
1 parent 632dfc2 commit c319992

File tree

3 files changed

+20
-15
lines changed

3 files changed

+20
-15
lines changed

src/diagram.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ pub enum DiagramOperation {
262262
/// },
263263
/// "join": {
264264
/// "type": "join",
265-
/// "order": ["op1", "op2"],
265+
/// "inputs": ["op1", "op2"],
266266
/// "next": { "builtin": "terminate" }
267267
/// }
268268
/// }

src/diagram/join.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub struct JoinOp {
1818

1919
/// Controls the order of the resulting join. Each item must be an operation id of one of the
2020
/// incoming outputs.
21-
pub(super) order: Vec<SourceOperation>,
21+
pub(super) inputs: Vec<SourceOperation>,
2222

2323
/// Do not serialize before performing the join. If true, joins can only be done
2424
/// on outputs of the same type.
@@ -176,7 +176,7 @@ mod tests {
176176
},
177177
"join": {
178178
"type": "join",
179-
"order": ["op1", "op2"],
179+
"inputs": ["op1", "op2"],
180180
"next": "serialize_join_output",
181181
"no_serialize": true,
182182
},
@@ -197,6 +197,14 @@ mod tests {
197197
assert_eq!(result[1], 6);
198198
}
199199

200+
/// This test is to ensure that the order of split and join operations are stable.
201+
#[test]
202+
fn test_join_stress() {
203+
for _ in 1..20 {
204+
test_join();
205+
}
206+
}
207+
200208
#[test]
201209
fn test_empty_join() {
202210
let mut fixture = DiagramTestFixture::new();
@@ -241,7 +249,7 @@ mod tests {
241249
},
242250
"join": {
243251
"type": "join",
244-
"order": [],
252+
"inputs": [],
245253
"next": { "builtin": "terminate" },
246254
"no_serialize": true,
247255
},
@@ -297,7 +305,7 @@ mod tests {
297305
},
298306
"join": {
299307
"type": "join",
300-
"order": ["op1", "op2"],
308+
"inputs": ["op1", "op2"],
301309
"next": { "builtin": "terminate" },
302310
},
303311
}

src/diagram/workflow_builder.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ fn connect_vertex<'a>(
276276
.collect();
277277

278278
let mut ordered_outputs: Vec<DynOutput> = Vec::with_capacity(target.in_edges.len());
279-
for source_id in join_op.order.iter() {
279+
for source_id in join_op.inputs.iter() {
280280
let o = outputs
281281
.remove(source_id)
282282
.ok_or(DiagramError::OperationNotFound(source_id.to_string()))?;
@@ -415,7 +415,7 @@ fn connect_edge<'a>(
415415
}
416416
}
417417
DiagramOperation::Split(split_op) => {
418-
let outputs = if output.type_id == TypeId::of::<serde_json::Value>() {
418+
let mut outputs = if output.type_id == TypeId::of::<serde_json::Value>() {
419419
let chain = output.into_output::<serde_json::Value>()?.chain(builder);
420420
split_chain(chain, split_op)
421421
} else {
@@ -428,14 +428,11 @@ fn connect_edge<'a>(
428428
let reg = registry.get_registration(&origin.builder)?;
429429
reg.split(builder, output, split_op)
430430
}?;
431-
outputs
432-
.outputs
433-
.into_iter()
434-
.zip(target.out_edges.iter())
435-
.for_each(|((_, o), e)| {
436-
let out_edge = edges.get_mut(e).unwrap();
437-
out_edge.state = EdgeState::Ready { output: o, origin };
438-
});
431+
for e in &target.out_edges {
432+
let out_edge = edges.get_mut(e).unwrap();
433+
let output = outputs.outputs.remove(out_edge.target).unwrap();
434+
out_edge.state = EdgeState::Ready { output, origin };
435+
}
439436
if let Some(_) = &split_op.remaining {
440437
let out_edge = edges.get_mut(target.out_edges.last().unwrap()).unwrap();
441438
out_edge.state = EdgeState::Ready {

0 commit comments

Comments
 (0)