Skip to content

Commit 60170ff

Browse files
Poorvankbhatiafapaul
authored andcommitted
[FLINK-32695] [Tests] Replace NonSerializableTupleSource with DataGeneratorSource V2 API
1 parent fbc7d0a commit 60170ff

File tree

1 file changed

+57
-62
lines changed

1 file changed

+57
-62
lines changed

flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java

Lines changed: 57 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,32 @@
1818

1919
package org.apache.flink.test.streaming.api;
2020

21+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
2122
import org.apache.flink.api.common.functions.FlatMapFunction;
2223
import org.apache.flink.api.common.functions.OpenContext;
24+
import org.apache.flink.api.common.typeinfo.TypeInformation;
25+
import org.apache.flink.api.common.typeinfo.Types;
26+
import org.apache.flink.api.connector.sink2.Sink;
27+
import org.apache.flink.api.connector.sink2.SinkWriter;
28+
import org.apache.flink.api.connector.sink2.WriterInitContext;
2329
import org.apache.flink.api.java.tuple.Tuple2;
30+
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
31+
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
32+
import org.apache.flink.connector.datagen.source.GeneratorFunction;
2433
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
2534
import org.apache.flink.streaming.api.datastream.DataStream;
2635
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
2736
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
2837
import org.apache.flink.streaming.api.functions.async.ResultFuture;
2938
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
30-
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
31-
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
3239
import org.apache.flink.test.util.AbstractTestBaseJUnit4;
3340
import org.apache.flink.util.Collector;
34-
import org.apache.flink.util.MathUtils;
3541

3642
import org.junit.Assert;
3743
import org.junit.Test;
3844

45+
import java.io.IOException;
46+
import java.io.Serializable;
3947
import java.util.ArrayList;
4048
import java.util.Collection;
4149
import java.util.Collections;
@@ -60,8 +68,22 @@ public void testAsyncWaitOperator() throws Exception {
6068

6169
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
6270

71+
// Create type information for Tuple2<Integer, NonSerializable>
72+
TypeInformation<Tuple2<Integer, NonSerializable>> tupleTypeInfo =
73+
new TupleTypeInfo<>(Types.INT, TypeInformation.of(NonSerializable.class));
74+
75+
// Create generator function for NonSerializable tuples
76+
GeneratorFunction<Long, Tuple2<Integer, NonSerializable>> generateNonSerializableTuple =
77+
index -> new Tuple2<>(index.intValue(), new NonSerializable(index.intValue()));
78+
79+
// Create DataGeneratorSource with the generator function
80+
DataGeneratorSource<Tuple2<Integer, NonSerializable>> source =
81+
new DataGeneratorSource<>(generateNonSerializableTuple, numElements, tupleTypeInfo);
82+
83+
// Create data stream using Source V2 API
6384
DataStream<Tuple2<Integer, NonSerializable>> input =
64-
env.addSource(new NonSerializableTupleSource(numElements));
85+
env.fromSource(source, WatermarkStrategy.noWatermarks(), "NonSerializable Source")
86+
.setParallelism(1);
6587

6688
AsyncFunction<Tuple2<Integer, NonSerializable>, Integer> function =
6789
new RichAsyncFunction<Tuple2<Integer, NonSerializable>, Integer>() {
@@ -106,7 +128,7 @@ public void run() {
106128
final List<Integer> actualResult1 = new ArrayList<>(numElements);
107129
MemorySinkFunction.registerCollection(0, actualResult1);
108130

109-
orderedResult.addSink(sinkFunction1).setParallelism(1);
131+
orderedResult.sinkTo(sinkFunction1).setParallelism(1);
110132

111133
DataStream<Integer> unorderedResult =
112134
AsyncDataStream.unorderedWait(input, function, timeout, TimeUnit.MILLISECONDS, 2);
@@ -116,7 +138,7 @@ public void run() {
116138
final List<Integer> actualResult2 = new ArrayList<>(numElements);
117139
MemorySinkFunction.registerCollection(1, actualResult2);
118140

119-
unorderedResult.addSink(sinkFunction2);
141+
unorderedResult.sinkTo(sinkFunction2);
120142

121143
Collection<Integer> expected = new ArrayList<>(10);
122144

@@ -145,77 +167,50 @@ public NonSerializable(int value) {
145167
}
146168
}
147169

148-
private static class NonSerializableTupleSource
149-
implements SourceFunction<Tuple2<Integer, NonSerializable>> {
150-
private static final long serialVersionUID = 3949171986015451520L;
151-
private final int numElements;
170+
private static class MemorySinkFunction implements Sink<Integer> {
152171

153-
public NonSerializableTupleSource(int numElements) {
154-
this.numElements = numElements;
155-
}
172+
private final int key;
173+
private static final Map<Integer, Collection<Integer>> collections =
174+
new ConcurrentHashMap<>();
156175

157-
@Override
158-
public void run(SourceContext<Tuple2<Integer, NonSerializable>> ctx) throws Exception {
159-
for (int i = 0; i < numElements; i++) {
160-
ctx.collect(new Tuple2<>(i, new NonSerializable(i)));
161-
}
176+
public MemorySinkFunction(int key) {
177+
this.key = key;
162178
}
163179

164-
@Override
165-
public void cancel() {}
166-
}
167-
168-
private static class TupleSource implements SourceFunction<Tuple2<Integer, Integer>> {
169-
170-
private static final long serialVersionUID = -8110466235852024821L;
171-
private final int numElements;
172-
private final int numKeys;
173-
174-
public TupleSource(int numElements, int numKeys) {
175-
this.numElements = numElements;
176-
this.numKeys = numKeys;
180+
public static void registerCollection(int key, Collection<Integer> collection) {
181+
collections.put(key, collection);
177182
}
178183

179-
@Override
180-
public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
181-
for (int i = 0; i < numElements; i++) {
182-
// keys '1' and '2' hash to different buckets
183-
Tuple2<Integer, Integer> result =
184-
new Tuple2<>(1 + (MathUtils.murmurHash(i) % numKeys), i);
185-
ctx.collect(result);
186-
}
184+
public static void clear() {
185+
collections.clear();
187186
}
188187

189188
@Override
190-
public void cancel() {}
191-
}
192-
193-
private static class MemorySinkFunction implements SinkFunction<Integer> {
194-
private static Map<Integer, Collection<Integer>> collections = new ConcurrentHashMap<>();
195-
196-
private static final long serialVersionUID = -8815570195074103860L;
197-
198-
private final int key;
199-
200-
public MemorySinkFunction(int key) {
201-
this.key = key;
189+
public SinkWriter<Integer> createWriter(WriterInitContext context) throws IOException {
190+
return new MemorySinkWriter(key);
202191
}
203192

204-
@Override
205-
public void invoke(Integer value) throws Exception {
206-
Collection<Integer> collection = collections.get(key);
193+
private static class MemorySinkWriter implements SinkWriter<Integer>, Serializable {
194+
private final int key;
195+
private final Collection<Integer> collection;
207196

208-
synchronized (collection) {
209-
collection.add(value);
197+
public MemorySinkWriter(int key) {
198+
this.key = key;
199+
this.collection = collections.get(key);
210200
}
211-
}
212201

213-
public static void registerCollection(int key, Collection<Integer> collection) {
214-
collections.put(key, collection);
215-
}
202+
@Override
203+
public void write(Integer element, Context context) {
204+
synchronized (collection) {
205+
collection.add(element);
206+
}
207+
}
216208

217-
public static void clear() {
218-
collections.clear();
209+
@Override
210+
public void flush(boolean endOfInput) {}
211+
212+
@Override
213+
public void close() {}
219214
}
220215
}
221216

0 commit comments

Comments
 (0)