@@ -2,9 +2,10 @@ use std::any::TypeId;
2
2
3
3
use schemars:: JsonSchema ;
4
4
use serde:: { Deserialize , Serialize } ;
5
+ use smallvec:: SmallVec ;
5
6
use tracing:: debug;
6
7
7
- use crate :: { Builder , IterBufferable , Output } ;
8
+ use crate :: { Builder , IterBufferable } ;
8
9
9
10
use super :: { DiagramError , DynOutput , NodeRegistry , SerializeMessage } ;
10
11
@@ -40,18 +41,22 @@ where
40
41
debug ! ( "join outputs {:?}" , outputs) ;
41
42
42
43
if outputs. is_empty ( ) {
43
- // do a empty join
44
- return Ok ( ( [ ] as [ Output < ( ) > ; 0 ] )
45
- . join_vec :: < 4 > ( builder)
46
- . output ( )
47
- . into ( ) ) ;
44
+ // do a empty join, in practice, this branch is never ran because [`WorkflowBuilder`]
45
+ // should error out if there is an empty join.
46
+ return Err ( DiagramError :: EmptyJoin ) ;
48
47
}
49
48
50
49
let first_type = outputs[ 0 ] . type_id ;
51
50
52
51
let outputs = outputs
53
52
. into_iter ( )
54
53
. map ( |o| {
54
+ // joins is only supported for outputs of the same type. This is because joins of
55
+ // different types produces a tuple and we cannot output a tuple as we don't
56
+ // know the number and order of join inputs at compile time.
57
+ // A workaround is to serialize them all the `serde_json::Value` or convert them to `Box<dyn Any>`.
58
+ // But the problem with `Box<dyn Any>` is that we can't convert it back to the original type,
59
+ // so nodes need to take a request of `JoinOutput<Box<dyn Any>>`.
55
60
if o. type_id != first_type {
56
61
Err ( DiagramError :: TypeMismatch )
57
62
} else {
@@ -61,15 +66,21 @@ where
61
66
. collect :: < Result < Vec < _ > , _ > > ( ) ?;
62
67
63
68
// we don't know the number of items at compile time, so we just use a sensible number.
69
+ // NOTE: Be sure to update `JoinOutput` if this changes.
64
70
Ok ( outputs. join_vec :: < 4 > ( builder) . output ( ) . into ( ) )
65
71
}
66
72
73
+ /// The resulting type of a `join` operation. Nodes receiving a join output must have request
74
+ /// of this type. Note that the join output is NOT serializable. If you would like to serialize it,
75
+ /// convert it to a `Vec` first.
76
+ pub type JoinOutput < T > = SmallVec < [ T ; 4 ] > ;
77
+
67
78
#[ cfg( test) ]
68
79
mod tests {
69
80
use serde_json:: json;
70
- use smallvec:: SmallVec ;
71
81
use test_log:: test;
72
82
83
+ use super :: * ;
73
84
use crate :: { diagram:: testing:: DiagramTestFixture , Diagram , DiagramError , JsonPosition } ;
74
85
75
86
#[ test]
@@ -86,8 +97,8 @@ mod tests {
86
97
|builder, _config : ( ) | builder. create_map_block ( get_split_value) ,
87
98
) ;
88
99
89
- fn serialize_join_output ( small_vec : SmallVec < [ i64 ; 4 ] > ) -> serde_json:: Value {
90
- serde_json:: to_value ( small_vec ) . unwrap ( )
100
+ fn serialize_join_output ( join_output : JoinOutput < i64 > ) -> serde_json:: Value {
101
+ serde_json:: to_value ( join_output ) . unwrap ( )
91
102
}
92
103
93
104
fixture
0 commit comments