17
17
18
18
package org .apache .flink .streaming .api ;
19
19
20
+ import org .apache .flink .api .common .eventtime .WatermarkStrategy ;
20
21
import org .apache .flink .api .common .functions .FlatMapFunction ;
21
22
import org .apache .flink .api .common .functions .InvalidTypesException ;
22
23
import org .apache .flink .api .common .functions .MapFunction ;
30
31
import org .apache .flink .streaming .api .functions .co .CoFlatMapFunction ;
31
32
import org .apache .flink .streaming .api .functions .co .CoMapFunction ;
32
33
import org .apache .flink .streaming .api .functions .co .ProcessJoinFunction ;
33
- import org .apache .flink .streaming . api . functions . source .legacy . SourceFunction ;
34
+ import org .apache .flink .test . util . source .AbstractTestSource ;
34
35
import org .apache .flink .util .Collector ;
35
36
36
37
import org .junit .jupiter .api .Test ;
@@ -51,7 +52,13 @@ class TypeFillTest {
51
52
void test () {
52
53
StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment ();
53
54
54
- assertThatThrownBy (() -> env .addSource (new TestSource <Integer >()).print ())
55
+ assertThatThrownBy (
56
+ () ->
57
+ env .fromSource (
58
+ new TestSource <Integer >(),
59
+ WatermarkStrategy .noWatermarks (),
60
+ "" )
61
+ .print ())
55
62
.isInstanceOf (InvalidTypesException .class );
56
63
57
64
DataStream <Long > source = env .fromSequence (1 , 10 );
@@ -110,7 +117,8 @@ void test() {
110
117
.print ())
111
118
.isInstanceOf (InvalidTypesException .class );
112
119
113
- env .addSource (new TestSource <Integer >()).returns (Integer .class );
120
+ env .fromSource (new TestSource <Integer >(), WatermarkStrategy .noWatermarks (), "" )
121
+ .returns (Integer .class );
114
122
source .map (new TestMap <Long , Long >()).returns (Long .class ).print ();
115
123
source .flatMap (new TestFlatMap <Long , Long >()).returns (new TypeHint <Long >() {}).print ();
116
124
source .connect (source )
@@ -157,15 +165,7 @@ public String map(Long value) throws Exception {
157
165
.isInstanceOf (IllegalStateException .class );
158
166
}
159
167
160
- private static class TestSource <T > implements SourceFunction <T > {
161
- private static final long serialVersionUID = 1L ;
162
-
163
- @ Override
164
- public void run (SourceContext <T > ctx ) throws Exception {}
165
-
166
- @ Override
167
- public void cancel () {}
168
- }
168
+ private static class TestSource <T > extends AbstractTestSource <T > {}
169
169
170
170
private static class TestMap <T , O > implements MapFunction <T , O > {
171
171
@ Override
0 commit comments