File tree Expand file tree Collapse file tree 1 file changed +3
-23
lines changed
Expand file tree Collapse file tree 1 file changed +3
-23
lines changed Original file line number Diff line number Diff line change @@ -65,42 +65,22 @@ def execute(self):
6565
6666
6767class PipelineSpec (object ):
68- readers = []
69- filters = []
70- writer = None
68+ stages = []
7169
7270 def __init__ (self , other = None ):
7371 if other is not None :
74- self .readers = copy .copy (other .readers )
75- self .filters = copy .copy (other .filters )
76- self .writer = other .writer
72+ self .stages = copy .copy (other .stages )
7773
7874 @property
7975 def spec (self ):
8076 return {
8177 "pipeline" : [stage .spec for stage in self .stages ]
8278 }
8379
84- @property
85- def stages (self ):
86- if self .writer is not None :
87- return chain (self .readers , self .filters , [self .writer ])
88- else :
89- return chain (self .readers , self .filters )
90-
9180 def add_stage (self , stage ):
9281 assert isinstance (stage , StageSpec ), "Expected StageSpec"
93- if stage .prefix == "writers" :
94- if self .writer is not None :
95- warnings .warn ("Currently assigned output stage will be replaced." )
96- self .writer = stage
97- elif stage .prefix == "readers" :
98- self .readers .append (stage )
99- elif stage .prefix == "filters" :
100- self .filters .append (stage )
101- else :
102- warnings .warn ("Unknown stage type. Skipping." )
10382
83+ self .stages .append (stage )
10484 return self
10585
10686 def __str__ (self ):
You can’t perform that action at this time.
0 commit comments