Skip to content

Commit 96558df

Browse files
authored
Merge pull request #520 from juripetersen/fix-flink-distributed
Fix flink distributed
2 parents 9882b86 + a5e9633 commit 96558df

33 files changed

+327
-61
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1058,7 +1058,7 @@
10581058

10591059
<!--RAT files-->
10601060
<exclude>**/apache-rat-0.13/**</exclude>
1061-
1061+
10621062
<!--Javadoc files-->
10631063
<exclude>**/docs/**</exclude>
10641064

wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/plan/wayangplan/Operator.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityPusher;
2727
import org.apache.wayang.core.platform.Platform;
2828

29+
import java.io.Serializable;
30+
2931
import java.lang.reflect.Field;
3032
import java.util.Arrays;
3133
import java.util.Collection;
@@ -57,7 +59,7 @@
5759
* provided <i>before</i> the regular data.</li>
5860
* </ol>
5961
*/
60-
public interface Operator {
62+
public interface Operator extends Serializable {
6163

6264
/**
6365
* @return the number of {@link InputSlot}s of this instance; inclusive of broadcast {@link InputSlot}s
@@ -463,6 +465,13 @@ default boolean isElementary() {
463465
return true;
464466
}
465467

468+
/**
469+
* @return whether this is a conversion operator
470+
*/
471+
default boolean isConversion() {
472+
return false;
473+
}
474+
466475
/**
467476
* This method is part of the visitor pattern and calls the appropriate visit method on {@code visitor}.
468477
*/
@@ -641,5 +650,15 @@ default Collection<String> getEstimationContextProperties() {
641650
return properties;
642651
}
643652

653+
/*
654+
* Collects all in and outputs of this operator instance and connects
655+
* the operator given as parameter to them, effectively rendering this
656+
* instance useless
657+
*
658+
* @param operator the operator to replace this one with
659+
*/
660+
default void replaceWith(Operator operator) {
661+
InputSlot.stealConnections(this, operator);
662+
OutputSlot.stealConnections(this, operator);
663+
}
644664
}
645-

wayang-platforms/wayang-flink/pom.xml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@
4949
<artifactId>wayang-basic</artifactId>
5050
<version>1.0.1-SNAPSHOT</version>
5151
</dependency>
52+
<dependency>
53+
<groupId>org.apache.wayang</groupId>
54+
<artifactId>wayang-api-sql</artifactId>
55+
<version>1.0.1-SNAPSHOT</version>
56+
</dependency>
5257
<dependency>
5358
<groupId>org.apache.wayang</groupId>
5459
<artifactId>wayang-java</artifactId>
@@ -62,6 +67,11 @@
6267
<groupId>org.apache.hadoop</groupId>
6368
<artifactId>hadoop-hdfs</artifactId>
6469
</dependency>
70+
<dependency>
71+
<groupId>org.apache.flink</groupId>
72+
<artifactId>flink-core</artifactId>
73+
<version>${flink.version}</version>
74+
</dependency>
6575
<!-- depencies of flink -->
6676
<dependency>
6777
<groupId>org.apache.flink</groupId>
@@ -111,5 +121,15 @@
111121
<artifactId>commons-math3</artifactId>
112122
<version>3.4.1</version>
113123
</dependency>
124+
<dependency>
125+
<groupId>com.esotericsoftware</groupId>
126+
<artifactId>kryo</artifactId>
127+
<version>5.6.2</version>
128+
</dependency>
129+
<dependency>
130+
<groupId>io.altoo</groupId>
131+
<artifactId>akka-kryo-serialization_2.12</artifactId>
132+
<version>2.5.2</version>
133+
</dependency>
114134
</dependencies>
115135
</project>

wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/compiler/FunctionCompiler.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,6 @@ public <I, O> MapPartitionFunction<I, O> compile(MapPartitionsDescriptor<I, O> d
137137
return new MapPartitionFunction<I, O>() {
138138
@Override
139139
public void mapPartition(Iterable<I> iterable, Collector<O> collector) throws Exception {
140-
System.out.println(collector.getClass());
141140
Iterable<O> out = function.apply(iterable);
142141
for(O element: out){
143142
collector.collect(element);

wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/compiler/KeySelectorFunction.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,12 @@
2323
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
2424
import org.apache.wayang.core.function.TransformationDescriptor;
2525

26-
import java.io.Serializable;
2726
import java.util.function.Function;
2827

2928
/**
30-
* Wrapper for {@Link KeySelector}
29+
* Wrapper for {@link KeySelector}
3130
*/
32-
public class KeySelectorFunction<T, K> implements KeySelector<T, K>, ResultTypeQueryable<K>, Serializable {
31+
public class KeySelectorFunction<T, K> implements KeySelector<T, K>, ResultTypeQueryable<K> {
3332

3433
public Function<T, K> impl;
3534

wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/execution/FlinkExecutor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.wayang.flink.compiler.FunctionCompiler;
3636
import org.apache.wayang.flink.operators.FlinkExecutionOperator;
3737
import org.apache.wayang.flink.platform.FlinkPlatform;
38+
import com.esotericsoftware.kryo.serializers.DefaultSerializers;
3839

3940
import java.util.Arrays;
4041
import java.util.Collection;
@@ -76,7 +77,7 @@ public FlinkExecutor(FlinkPlatform flinkPlatform, Job job) {
7677
this.platform = flinkPlatform;
7778
this.flinkContextReference = this.platform.getFlinkContext(job);
7879
this.fee = this.flinkContextReference.get();
79-
this.numDefaultPartitions = (int)this.getConfiguration().getLongProperty("wayang.flink.paralelism");
80+
this.numDefaultPartitions = (int) this.getConfiguration().getLongProperty("wayang.flink.parallelism");
8081
this.fee.setParallelism(this.numDefaultPartitions);
8182
this.flinkContextReference.noteObtainedReference();
8283
}
@@ -133,7 +134,7 @@ protected Tuple<List<ChannelInstance>, PartialExecution> execute(
133134
}else {
134135
try {
135136
//TODO validate the execute in different contexts
136-
//this.fee.execute();
137+
this.fee.execute();
137138
} catch (Exception e) {
138139
throw new WayangException(e);
139140
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.wayang.flink.operators;
19+
20+
import org.apache.flink.util.SplittableIterator;
21+
import java.util.Iterator;
22+
import java.util.List;
23+
import java.io.Serializable;
24+
25+
public class CollectionSplittableIterator<T> extends SplittableIterator<T> implements Serializable {
26+
private int numSplits;
27+
private int head;
28+
private final List<T> collection;
29+
30+
public CollectionSplittableIterator(List<T> collection, int numSplits) {
31+
this.collection = collection;
32+
this.numSplits = numSplits;
33+
}
34+
35+
@Override
36+
public Iterator<T>[] split(int numSplits) {
37+
// Split the collection into chunks
38+
int chunkSize = (int) Math.ceil((double) numElements() / this.numSplits);
39+
@SuppressWarnings("unchecked")
40+
Iterator<T>[] splits = new Iterator[this.numSplits];
41+
42+
43+
for (int i = 0; i < this.numSplits; i++) {
44+
int fromIndex = i * chunkSize;
45+
int toIndex = Math.min(fromIndex + chunkSize, numElements());
46+
splits[i] = new CollectionSplittableIterator<>(this.collection.subList(fromIndex, toIndex), 1);
47+
}
48+
49+
return splits;
50+
}
51+
52+
@Override
53+
public boolean hasNext() {
54+
return this.head < this.collection.size() - 1;
55+
}
56+
57+
@Override
58+
public T next() {
59+
T next = this.collection.get(this.head);
60+
this.head++;
61+
62+
return next;
63+
}
64+
65+
@Override
66+
public int getMaximumNumberOfSplits() {
67+
return this.numSplits;
68+
}
69+
70+
private int numElements() {
71+
return this.collection.size();
72+
}
73+
}
74+

wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkCartesianOperator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,9 @@ public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> eval
8181
(dataInput0, dataInput1) -> {
8282
return new Tuple2<>(dataInput0, dataInput1);
8383
}
84-
).returns(ReflectionUtils.specify(Tuple2.class));
84+
)
85+
.setParallelism(flinkExecutor.fee.getParallelism())
86+
.returns(ReflectionUtils.specify(Tuple2.class));
8587

8688
output.accept(datasetOutput, flinkExecutor);
8789

wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkCoGroupOperator.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,9 @@ public void coGroup (
127127
iterable1.forEach(list1::add);
128128
collector.collect( new Tuple2<>(list0, list1));
129129
}
130-
}).returns(ReflectionUtils.specify(Tuple2.class));
130+
})
131+
.setParallelism(flinkExecutor.fee.getParallelism())
132+
.returns(ReflectionUtils.specify(Tuple2.class));
131133

132134
output.accept(datasetOutput, flinkExecutor);
133135

@@ -159,5 +161,5 @@ public List<ChannelDescriptor> getSupportedOutputChannels(int index) {
159161
public boolean containsAction() {
160162
return false;
161163
}
162-
164+
163165
}

wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkCollectionSink.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
import org.apache.commons.lang3.Validate;
2222
import org.apache.flink.api.java.DataSet;
23+
import org.apache.flink.util.Collector;
24+
import org.apache.flink.api.common.functions.MapPartitionFunction;
2325
import org.apache.wayang.core.api.Configuration;
2426
import org.apache.wayang.core.optimizer.OptimizationContext;
2527
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator;
@@ -34,10 +36,13 @@
3436
import org.apache.wayang.flink.channels.DataSetChannel;
3537
import org.apache.wayang.flink.execution.FlinkExecutor;
3638
import org.apache.wayang.java.channels.CollectionChannel;
39+
import com.esotericsoftware.kryo.Serializer;
40+
import org.apache.flink.api.common.typeinfo.TypeInformation;
3741

3842
import java.util.Collection;
3943
import java.util.Collections;
4044
import java.util.List;
45+
import java.util.ArrayList;
4146
import java.util.Optional;
4247

4348

@@ -60,8 +65,15 @@ public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> eval
6065
final CollectionChannel.Instance output = (CollectionChannel.Instance) outputs[0];
6166

6267
final DataSet<Type> dataSetInput = input.provideDataSet();
68+
TypeInformation<Type> type = dataSetInput.getType();
6369

64-
output.accept(dataSetInput.filter(a -> true).setParallelism(1).collect());
70+
if (type.getTypeClass().getName().contains("scala.Tuple")) {
71+
flinkExecutor.fee.getConfig().registerTypeWithKryoSerializer(type.getTypeClass(), ScalaTupleSerializer.class);
72+
}
73+
74+
output.accept(dataSetInput
75+
.collect()
76+
);
6577

6678
return ExecutionOperator.modelLazyExecution(inputs, outputs, operatorContext);
6779

@@ -95,4 +107,8 @@ public Optional<CardinalityEstimator> createCardinalityEstimator(
95107
public String getLoadProfileEstimatorConfigurationKey() {
96108
return "wayang.flink.collect.load";
97109
}
110+
111+
@Override public boolean isConversion() {
112+
return true;
113+
}
98114
}

0 commit comments

Comments
 (0)