17
17
18
18
package org .apache .flink .streaming .api ;
19
19
20
+ import org .apache .flink .api .common .eventtime .WatermarkStrategy ;
20
21
import org .apache .flink .api .common .functions .FlatMapFunction ;
21
22
import org .apache .flink .api .common .functions .InvalidTypesException ;
22
23
import org .apache .flink .api .common .functions .MapFunction ;
23
24
import org .apache .flink .api .common .typeinfo .BasicTypeInfo ;
24
25
import org .apache .flink .api .common .typeinfo .TypeHint ;
25
26
import org .apache .flink .api .common .typeinfo .Types ;
27
+ import org .apache .flink .api .connector .source .Boundedness ;
28
+ import org .apache .flink .api .connector .source .ReaderOutput ;
29
+ import org .apache .flink .api .connector .source .Source ;
30
+ import org .apache .flink .api .connector .source .SourceReader ;
31
+ import org .apache .flink .api .connector .source .SourceReaderContext ;
32
+ import org .apache .flink .api .connector .source .SourceSplit ;
33
+ import org .apache .flink .api .connector .source .SplitEnumerator ;
34
+ import org .apache .flink .api .connector .source .SplitEnumeratorContext ;
26
35
import org .apache .flink .api .java .functions .KeySelector ;
36
+ import org .apache .flink .core .io .InputStatus ;
37
+ import org .apache .flink .core .io .SimpleVersionedSerializer ;
27
38
import org .apache .flink .streaming .api .datastream .DataStream ;
28
39
import org .apache .flink .streaming .api .datastream .SingleOutputStreamOperator ;
29
40
import org .apache .flink .streaming .api .environment .StreamExecutionEnvironment ;
30
41
import org .apache .flink .streaming .api .functions .co .CoFlatMapFunction ;
31
42
import org .apache .flink .streaming .api .functions .co .CoMapFunction ;
32
43
import org .apache .flink .streaming .api .functions .co .ProcessJoinFunction ;
33
- import org .apache .flink .streaming .api .functions .source .legacy .SourceFunction ;
34
44
import org .apache .flink .util .Collector ;
35
45
36
46
import org .junit .jupiter .api .Test ;
37
47
38
48
import java .time .Duration ;
49
+ import java .util .List ;
50
+ import java .util .concurrent .CompletableFuture ;
39
51
40
52
import static org .assertj .core .api .Assertions .assertThat ;
41
53
import static org .assertj .core .api .Assertions .assertThatThrownBy ;
@@ -51,7 +63,13 @@ class TypeFillTest {
51
63
void test () {
52
64
StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment ();
53
65
54
- assertThatThrownBy (() -> env .addSource (new TestSource <Integer >()).print ())
66
+ assertThatThrownBy (
67
+ () ->
68
+ env .fromSource (
69
+ new NoopSource <Integer >(),
70
+ WatermarkStrategy .noWatermarks (),
71
+ "NoopSource" )
72
+ .print ())
55
73
.isInstanceOf (InvalidTypesException .class );
56
74
57
75
DataStream <Long > source = env .fromSequence (1 , 10 );
@@ -110,7 +128,8 @@ void test() {
110
128
.print ())
111
129
.isInstanceOf (InvalidTypesException .class );
112
130
113
- env .addSource (new TestSource <Integer >()).returns (Integer .class );
131
+ env .fromSource (new NoopSource <Integer >(), WatermarkStrategy .noWatermarks (), "NoopSource" )
132
+ .returns (Integer .class );
114
133
source .map (new TestMap <Long , Long >()).returns (Long .class ).print ();
115
134
source .flatMap (new TestFlatMap <Long , Long >()).returns (new TypeHint <Long >() {}).print ();
116
135
source .connect (source )
@@ -157,16 +176,6 @@ public String map(Long value) throws Exception {
157
176
.isInstanceOf (IllegalStateException .class );
158
177
}
159
178
160
- private static class TestSource <T > implements SourceFunction <T > {
161
- private static final long serialVersionUID = 1L ;
162
-
163
- @ Override
164
- public void run (SourceContext <T > ctx ) throws Exception {}
165
-
166
- @ Override
167
- public void cancel () {}
168
- }
169
-
170
179
private static class TestMap <T , O > implements MapFunction <T , O > {
171
180
@ Override
172
181
public O map (T value ) throws Exception {
@@ -218,4 +227,129 @@ public void processElement(IN1 left, IN2 right, Context ctx, Collector<OUT> out)
218
227
// nothing to do
219
228
}
220
229
}
230
+
231
+ private static class NoopSource <T > implements Source <T , SourceSplit , Void > {
232
+ private static final long serialVersionUID = 1L ;
233
+
234
+ @ Override
235
+ public Boundedness getBoundedness () {
236
+ return Boundedness .BOUNDED ;
237
+ }
238
+
239
+ @ Override
240
+ public SourceReader <T , SourceSplit > createReader (SourceReaderContext readerContext ) {
241
+ return new NoopSourceReader <>();
242
+ }
243
+
244
+ @ Override
245
+ public SplitEnumerator <SourceSplit , Void > createEnumerator (
246
+ SplitEnumeratorContext <SourceSplit > enumContext ) {
247
+ return new NoopSplitEnumerator (enumContext );
248
+ }
249
+
250
+ @ Override
251
+ public SplitEnumerator <SourceSplit , Void > restoreEnumerator (
252
+ SplitEnumeratorContext <SourceSplit > enumContext , Void checkpoint ) {
253
+ return createEnumerator (enumContext );
254
+ }
255
+
256
+ @ Override
257
+ public SimpleVersionedSerializer <SourceSplit > getSplitSerializer () {
258
+ return new NoopSplitSerializer ();
259
+ }
260
+
261
+ @ Override
262
+ public SimpleVersionedSerializer <Void > getEnumeratorCheckpointSerializer () {
263
+ return new SimpleVersionedSerializer <Void >() {
264
+ @ Override
265
+ public int getVersion () {
266
+ return 1 ;
267
+ }
268
+
269
+ @ Override
270
+ public byte [] serialize (Void obj ) {
271
+ return new byte [0 ];
272
+ }
273
+
274
+ @ Override
275
+ public Void deserialize (int version , byte [] serialized ) {
276
+ return null ;
277
+ }
278
+ };
279
+ }
280
+ }
281
+
282
+ private static class NoopSourceReader <T > implements SourceReader <T , SourceSplit > {
283
+ @ Override
284
+ public void start () {}
285
+
286
+ @ Override
287
+ public InputStatus pollNext (ReaderOutput <T > output ) {
288
+ return InputStatus .END_OF_INPUT ;
289
+ }
290
+
291
+ @ Override
292
+ public List <SourceSplit > snapshotState (long checkpointId ) {
293
+ return java .util .Collections .emptyList ();
294
+ }
295
+
296
+ @ Override
297
+ public void addSplits (List <SourceSplit > splits ) {}
298
+
299
+ @ Override
300
+ public void notifyNoMoreSplits () {}
301
+
302
+ @ Override
303
+ public CompletableFuture <Void > isAvailable () {
304
+ return CompletableFuture .completedFuture (null );
305
+ }
306
+
307
+ @ Override
308
+ public void close () {}
309
+ }
310
+
311
+ private static class NoopSplitEnumerator implements SplitEnumerator <SourceSplit , Void > {
312
+ private final SplitEnumeratorContext <SourceSplit > context ;
313
+
314
+ public NoopSplitEnumerator (SplitEnumeratorContext <SourceSplit > context ) {
315
+ this .context = context ;
316
+ }
317
+
318
+ @ Override
319
+ public void start () {}
320
+
321
+ @ Override
322
+ public void handleSplitRequest (int subtaskId , String requesterHostname ) {}
323
+
324
+ @ Override
325
+ public void addSplitsBack (List <SourceSplit > splits , int subtaskId ) {}
326
+
327
+ @ Override
328
+ public void addReader (int subtaskId ) {}
329
+
330
+ @ Override
331
+ public Void snapshotState (long checkpointId ) {
332
+ return null ;
333
+ }
334
+
335
+ @ Override
336
+ public void close () {}
337
+ }
338
+
339
+ private static class NoopSplitSerializer implements SimpleVersionedSerializer <SourceSplit > {
340
+ @ Override
341
+ public int getVersion () {
342
+ return 1 ;
343
+ }
344
+
345
+ @ Override
346
+ public byte [] serialize (SourceSplit split ) {
347
+ return new byte [0 ];
348
+ }
349
+
350
+ @ Override
351
+ public SourceSplit deserialize (int version , byte [] serialized ) {
352
+ return null ;
353
+ }
354
+ }
221
355
}
0 commit comments