File tree Expand file tree Collapse file tree 8 files changed +355
-5
lines changed
tests/Flow/ETL/Tests/Unit/Pipeline
lib/types/src/Flow/Types/Type/Native/String Expand file tree Collapse file tree 8 files changed +355
-5
lines changed Original file line number Diff line number Diff line change 44
55namespace Flow \ETL ;
66
7- use Flow \ETL \Pipeline \Stages ;
7+ use Flow \ETL \Pipeline \Segments ;
88
99/**
1010 * @internal
1111 */
1212final readonly class Pipeline
1313{
14- private Stages $ stages ;
14+ private Segments $ stages ;
1515
1616 public function __construct (private Extractor $ extractor )
1717 {
18- $ this ->stages = new Stages ();
18+ $ this ->stages = new Segments ();
1919 }
2020
2121 public function add (Transformer |Loader |Processor $ step ) : self
@@ -68,7 +68,7 @@ public function process(FlowContext $context) : \Generator
6868 /**
6969 * Get the pipeline stages.
7070 */
71- public function stages () : Stages
71+ public function stages () : Segments
7272 {
7373 return $ this ->stages ;
7474 }
Original file line number Diff line number Diff line change 1111 *
1212 * @internal
1313 */
14- final class Stages
14+ final class Segments
1515{
1616 private Segment $ currentSegment ;
1717
Original file line number Diff line number Diff line change @@ -154,6 +154,22 @@ public function merge(Definition $definition) : Definition
154154 );
155155 }
156156
157+ if ($ definition instanceof FloatDefinition) {
158+ return new FloatDefinition (
159+ $ this ->ref ,
160+ $ this ->nullable || $ definition ->isNullable (),
161+ $ this ->metadata ->merge ($ definition ->metadata ())
162+ );
163+ }
164+
165+ if ($ definition instanceof IntegerDefinition) {
166+ return new IntegerDefinition (
167+ $ this ->ref ,
168+ $ this ->nullable || $ definition ->isNullable (),
169+ $ this ->metadata ->merge ($ definition ->metadata ())
170+ );
171+ }
172+
157173 throw new RuntimeException (\sprintf (
158174 'Cannot merge %s with %s ' ,
159175 self ::class,
Original file line number Diff line number Diff line change @@ -146,6 +146,22 @@ public function merge(Definition $definition) : Definition
146146 );
147147 }
148148
149+ if ($ definition instanceof FloatDefinition) {
150+ return new FloatDefinition (
151+ $ this ->ref ,
152+ $ this ->nullable || $ definition ->isNullable (),
153+ $ this ->metadata ->merge ($ definition ->metadata ())
154+ );
155+ }
156+
157+ if ($ definition instanceof IntegerDefinition) {
158+ return new IntegerDefinition (
159+ $ this ->ref ,
160+ $ this ->nullable || $ definition ->isNullable (),
161+ $ this ->metadata ->merge ($ definition ->metadata ())
162+ );
163+ }
164+
149165 throw new RuntimeException (\sprintf (
150166 'Cannot merge %s with %s ' ,
151167 self ::class,
Original file line number Diff line number Diff line change @@ -146,6 +146,14 @@ public function merge(Definition $definition) : Definition
146146 );
147147 }
148148
149+ if ($ definition instanceof DateDefinition || $ definition instanceof DateTimeDefinition) {
150+ return new self (
151+ $ this ->ref ,
152+ $ this ->nullable || $ definition ->isNullable (),
153+ $ this ->metadata ->merge ($ definition ->metadata ())
154+ );
155+ }
156+
149157 if ($ definition instanceof StringDefinition) {
150158 return new StringDefinition (
151159 $ this ->ref ,
Original file line number Diff line number Diff line change @@ -146,6 +146,14 @@ public function merge(Definition $definition) : Definition
146146 );
147147 }
148148
149+ if ($ definition instanceof DateDefinition || $ definition instanceof DateTimeDefinition) {
150+ return new self (
151+ $ this ->ref ,
152+ $ this ->nullable || $ definition ->isNullable (),
153+ $ this ->metadata ->merge ($ definition ->metadata ())
154+ );
155+ }
156+
149157 if ($ definition instanceof StringDefinition) {
150158 return new StringDefinition (
151159 $ this ->ref ,
You can’t perform that action at this time.
0 commit comments