-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpipelineSimple.td
More file actions
56 lines (48 loc) · 1.7 KB
/
pipelineSimple.td
File metadata and controls
56 lines (48 loc) · 1.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package pipelineSimple;
use pipelineSimple_types;
/**
* This code implements the following spark code:
*
* Input: {timestamp, value}
* Output: {min_value, max_value, sum_value, avg_value}
*
* df.filter(col("value") >= 0)
* .groupBy("timestamp")
* .agg(
* min("value").as("min_value"),
* max("value").as("max_value"),
* sum("value").as("sum_value"),
* avg("value").as("avg_value")
* )
* .select("min_value", "max_value", "sum_value", "avg_value")
*
*/
#Interface for the non negative filter: df.filter(col("value") >= 0)#
streamlet NonNegativeFilter_interface<in_t: type> {
std_in : pipelineSimple_types.NumberGroup_stream in;
std_out : pipelineSimple_types.NumberGroup_stream out;
}
#Implementation of df.filter(col("value") >= 0)#
impl NonNegativeFilter of NonNegativeFilter_interface<pipelineSimple_types.NumberGroup_stream> {}
#Interface for the agg function#
streamlet Reducer_interface {
std_in : pipelineSimple_types.NumberGroup_stream in;
std_out : pipelineSimple_types.Stats_stream out;
}
#Implementation of the agg function#
impl Reducer of Reducer_interface {}
#Top level interface#
streamlet PipelineSimple_interface {
std_in : pipelineSimple_types.NumberGroup_stream in;
std_out : pipelineSimple_types.Stats_stream out;
}
#Top level implementation. It instantiates the subcomponents and connects them together#
impl PipelineSimple of PipelineSimple_interface {
// Instantiate the subcomponents
instance filter(NonNegativeFilter);
instance reducer(Reducer);
// Connect the subcomponents
self.std_in => filter.std_in;
filter.std_out => reducer.std_in;
reducer.std_out => self.std_out;
}