Skip to content

Commit 0d640dd

Browse files
committed
updated to a centralized warning
1 parent afb8e41 commit 0d640dd

File tree

9 files changed

+24
-94
lines changed

9 files changed

+24
-94
lines changed

wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSink.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919
package org.apache.wayang.basic.operators;
2020

2121
import java.util.Objects;
22+
import java.util.concurrent.atomic.AtomicBoolean;
23+
import org.apache.logging.log4j.LogManager;
24+
import org.apache.logging.log4j.Logger;
2225
import org.apache.wayang.core.function.TransformationDescriptor;
2326
import org.apache.wayang.core.optimizer.costs.DefaultLoadEstimator;
2427
import org.apache.wayang.core.optimizer.costs.NestableLoadProfileEstimator;
@@ -36,7 +39,11 @@ public class ObjectFileSink<T> extends UnarySink<T> {
3639

3740
protected final Class<T> tClass;
3841

39-
private ObjectFileSerializationMode serializationMode = ObjectFileSerializationMode.LEGACY_JAVA_SERIALIZATION;
42+
private static final AtomicBoolean LEGACY_WARNING_EMITTED = new AtomicBoolean(false);
43+
44+
private final Logger logger = LogManager.getLogger(this.getClass());
45+
46+
private ObjectFileSerializationMode serializationMode = ObjectFileSerializationMode.JSON;
4047

4148
/**
4249
* Creates a new instance.
@@ -75,6 +82,11 @@ public ObjectFileSink(ObjectFileSink<T> that) {
7582
}
7683

7784
public ObjectFileSerializationMode getSerializationMode() {
85+
if (this.serializationMode == ObjectFileSerializationMode.LEGACY_JAVA_SERIALIZATION
86+
&& LEGACY_WARNING_EMITTED.compareAndSet(false, true)) {
87+
this.logger.warn("ObjectFileSink is using deprecated legacy Java serialization. "
88+
+ "Please switch to the JSON serialization mode via ObjectFileSink#useJsonSerialization().");
89+
}
7890
return this.serializationMode;
7991
}
8092

wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSource.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Optional;
2323
import java.util.OptionalDouble;
2424
import java.util.OptionalLong;
25+
import java.util.concurrent.atomic.AtomicBoolean;
2526
import org.apache.commons.lang3.Validate;
2627
import org.apache.logging.log4j.LogManager;
2728
import org.apache.logging.log4j.Logger;
@@ -44,7 +45,9 @@ public class ObjectFileSource<T> extends UnarySource<T> {
4445

4546
private final Class<T> tClass;
4647

47-
private ObjectFileSerializationMode serializationMode = ObjectFileSerializationMode.LEGACY_JAVA_SERIALIZATION;
48+
private static final AtomicBoolean LEGACY_WARNING_EMITTED = new AtomicBoolean(false);
49+
50+
private ObjectFileSerializationMode serializationMode = ObjectFileSerializationMode.JSON;
4851

4952
public ObjectFileSource(String inputUrl, DataSetType<T> type) {
5053
super(type);
@@ -79,6 +82,11 @@ public Class<T> getTypeClass(){
7982
}
8083

8184
public ObjectFileSerializationMode getSerializationMode() {
85+
if (this.serializationMode == ObjectFileSerializationMode.LEGACY_JAVA_SERIALIZATION
86+
&& LEGACY_WARNING_EMITTED.compareAndSet(false, true)) {
87+
this.logger.warn("ObjectFileSource is using deprecated legacy Java serialization. "
88+
+ "Please switch to the JSON serialization mode via ObjectFileSource#useJsonSerialization().");
89+
}
8290
return this.serializationMode;
8391
}
8492

wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/compiler/WayangFileOutputFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public static void initDefaultsFromConfiguration(Configuration configuration) {
125125

126126
private transient DataOutputViewStreamWrapper outView;
127127

128-
private ObjectFileSerializationMode serializationMode = ObjectFileSerializationMode.LEGACY_JAVA_SERIALIZATION;
128+
private ObjectFileSerializationMode serializationMode = ObjectFileSerializationMode.JSON;
129129
// --------------------------------------------------------------------------------------------
130130

131131
public WayangFileOutputFormat() {}

wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkObjectFileSink.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,10 @@
3636
import org.apache.wayang.flink.compiler.WayangFileOutputFormat;
3737
import org.apache.wayang.flink.execution.FlinkExecutor;
3838
import org.apache.wayang.flink.platform.FlinkPlatform;
39-
import org.apache.logging.log4j.LogManager;
40-
import org.apache.logging.log4j.Logger;
4139

4240
import java.util.Collection;
4341
import java.util.Collections;
4442
import java.util.List;
45-
import java.util.concurrent.atomic.AtomicBoolean;
4643

4744
/**
4845
* {@link Operator} for the {@link FlinkPlatform} that creates a sequence file.
@@ -51,10 +48,6 @@
5148
*/
5249
public class FlinkObjectFileSink<Type> extends ObjectFileSink<Type> implements FlinkExecutionOperator {
5350

54-
private static final Logger LOGGER = LogManager.getLogger(FlinkObjectFileSink.class);
55-
56-
private static final AtomicBoolean LEGACY_WARNING_EMITTED = new AtomicBoolean(false);
57-
5851
public FlinkObjectFileSink(ObjectFileSink<Type> that) {
5952
super(that);
6053
}
@@ -89,9 +82,6 @@ public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> eval
8982
//TODO: remove the set parallelism 1
9083
DataSetChannel.Instance input = (DataSetChannel.Instance) inputs[0];
9184
ObjectFileSerializationMode serializationMode = this.getSerializationMode();
92-
if (serializationMode == ObjectFileSerializationMode.LEGACY_JAVA_SERIALIZATION) {
93-
logLegacyWarning();
94-
}
9585
WayangFileOutputFormat<Type> outputFormat = new WayangFileOutputFormat<>(targetPath);
9686
outputFormat.setSerializationMode(serializationMode);
9787
final DataSink<Type> tDataSink = input.<Type>provideDataSet()
@@ -131,10 +121,4 @@ public boolean containsAction() {
131121
return true;
132122
}
133123

134-
private static void logLegacyWarning() {
135-
if (LEGACY_WARNING_EMITTED.compareAndSet(false, true)) {
136-
LOGGER.warn("FlinkObjectFileSink is using deprecated legacy Java serialization. "
137-
+ "Please switch to the JSON serialization mode via ObjectFileSink#useJsonSerialization().");
138-
}
139-
}
140124
}

wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkObjectFileSource.java

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,10 @@
4242
import org.apache.wayang.flink.channels.DataSetChannel;
4343
import org.apache.wayang.flink.execution.FlinkExecutor;
4444
import org.apache.wayang.flink.platform.FlinkPlatform;
45-
import org.apache.logging.log4j.LogManager;
46-
import org.apache.logging.log4j.Logger;
4745

4846
import java.util.Collection;
4947
import java.util.Collections;
5048
import java.util.List;
51-
import java.util.concurrent.atomic.AtomicBoolean;
5249

5350

5451
/**
@@ -58,10 +55,6 @@
5855
*/
5956
public class FlinkObjectFileSource<Type> extends ObjectFileSource<Type> implements FlinkExecutionOperator {
6057

61-
private static final Logger LOGGER = LogManager.getLogger(FlinkObjectFileSource.class);
62-
63-
private static final AtomicBoolean LEGACY_WARNING_EMITTED = new AtomicBoolean(false);
64-
6558
public FlinkObjectFileSource(ObjectFileSource<Type> that) {
6659
super(that);
6760
}
@@ -94,9 +87,6 @@ public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> eval
9487
}
9588
DataSetChannel.Instance output = (DataSetChannel.Instance) outputs[0];
9689
ObjectFileSerializationMode serializationMode = this.getSerializationMode();
97-
if (serializationMode == ObjectFileSerializationMode.LEGACY_JAVA_SERIALIZATION) {
98-
logLegacyWarning();
99-
}
10090
final Class<Type> typeClass = this.getTypeClass();
10191

10292
HadoopInputFormat<NullWritable, BytesWritable> _file = HadoopInputs.readSequenceFile(NullWritable.class, BytesWritable.class, path);
@@ -151,11 +141,4 @@ public boolean containsAction() {
151141
@Override public boolean isConversion() {
152142
return true;
153143
}
154-
155-
private static void logLegacyWarning() {
156-
if (LEGACY_WARNING_EMITTED.compareAndSet(false, true)) {
157-
LOGGER.warn("FlinkObjectFileSource is using deprecated legacy Java serialization. "
158-
+ "Please switch to the JSON serialization mode via ObjectFileSource#useJsonSerialization().");
159-
}
160-
}
161144
}

wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSink.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import org.apache.wayang.java.execution.JavaExecutor;
4545
import org.apache.wayang.java.platform.JavaPlatform;
4646
import org.apache.logging.log4j.LogManager;
47-
import org.apache.logging.log4j.Logger;
4847

4948
import java.io.IOException;
5049
import java.io.UncheckedIOException;
@@ -54,7 +53,6 @@
5453
import java.util.List;
5554
import java.util.function.BiConsumer;
5655
import java.util.stream.Stream;
57-
import java.util.concurrent.atomic.AtomicBoolean;
5856

5957
/**
6058
* {@link Operator} for the {@link JavaPlatform} that creates a sequence file. Consistent with Spark's object files.
@@ -63,10 +61,6 @@
6361
*/
6462
public class JavaObjectFileSink<T> extends ObjectFileSink<T> implements JavaExecutionOperator {
6563

66-
private static final Logger LOGGER = LogManager.getLogger(JavaObjectFileSink.class);
67-
68-
private static final AtomicBoolean LEGACY_WARNING_EMITTED = new AtomicBoolean(false);
69-
7064
public JavaObjectFileSink(ObjectFileSink<T> that) {
7165
super(that);
7266
}
@@ -101,9 +95,6 @@ public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> eval
10195
final SequenceFile.Writer.Option keyClassOption = SequenceFile.Writer.keyClass(NullWritable.class);
10296
final SequenceFile.Writer.Option valueClassOption = SequenceFile.Writer.valueClass(BytesWritable.class);
10397
final ObjectFileSerializationMode serializationMode = this.getSerializationMode();
104-
if (serializationMode == ObjectFileSerializationMode.LEGACY_JAVA_SERIALIZATION) {
105-
logLegacyWarning();
106-
}
10798
try (SequenceFile.Writer writer = SequenceFile.createWriter(new Configuration(true), fileOption, keyClassOption, valueClassOption)) {
10899

109100
// Chunk the stream of data quanta and write the chunks into the sequence file.
@@ -190,11 +181,4 @@ public void fire() {
190181

191182

192183
}
193-
194-
private static void logLegacyWarning() {
195-
if (LEGACY_WARNING_EMITTED.compareAndSet(false, true)) {
196-
LOGGER.warn("JavaObjectFileSink is using deprecated legacy Java serialization. "
197-
+ "Please switch to the JSON serialization mode via ObjectFileSink#useJsonSerialization().");
198-
}
199-
}
200184
}

wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSource.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import org.apache.wayang.java.execution.JavaExecutor;
4545
import org.apache.wayang.java.platform.JavaPlatform;
4646
import org.apache.logging.log4j.LogManager;
47-
import org.apache.logging.log4j.Logger;
4847

4948
import java.io.Closeable;
5049
import java.io.IOException;
@@ -54,7 +53,6 @@
5453
import java.util.Iterator;
5554
import java.util.List;
5655
import java.util.Spliterators;
57-
import java.util.concurrent.atomic.AtomicBoolean;
5856
import java.util.stream.Stream;
5957
import java.util.stream.StreamSupport;
6058

@@ -65,10 +63,6 @@
6563
*/
6664
public class JavaObjectFileSource<T> extends ObjectFileSource<T> implements JavaExecutionOperator {
6765

68-
private static final Logger LOGGER = LogManager.getLogger(JavaObjectFileSource.class);
69-
70-
private static final AtomicBoolean LEGACY_WARNING_EMITTED = new AtomicBoolean(false);
71-
7266
public JavaObjectFileSource(ObjectFileSource<T> that) {
7367
super(that);
7468
}
@@ -100,9 +94,6 @@ public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> eval
10094
try {
10195
final String actualInputPath = FileSystems.findActualSingleInputPath(path);
10296
ObjectFileSerializationMode serializationMode = this.getSerializationMode();
103-
if (serializationMode == ObjectFileSerializationMode.LEGACY_JAVA_SERIALIZATION) {
104-
logLegacyWarning();
105-
}
10697
sequenceFileIterator = new SequenceFileIterator<>(actualInputPath, serializationMode, this.getTypeClass());
10798
Stream<?> sequenceFileStream =
10899
StreamSupport.stream(Spliterators.spliteratorUnknownSize(sequenceFileIterator, 0), false);
@@ -136,13 +127,6 @@ public List<ChannelDescriptor> getSupportedOutputChannels(int index) {
136127
return Collections.singletonList(StreamChannel.DESCRIPTOR);
137128
}
138129

139-
private static void logLegacyWarning() {
140-
if (LEGACY_WARNING_EMITTED.compareAndSet(false, true)) {
141-
LOGGER.warn("JavaObjectFileSource is using deprecated legacy Java serialization. "
142-
+ "Please switch to the JSON serialization mode via ObjectFileSource#useJsonSerialization().");
143-
}
144-
}
145-
146130
private static class SequenceFileIterator<T> implements Iterator<T>, AutoCloseable, Closeable {
147131

148132
private SequenceFile.Reader sequenceFileReader;

wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkObjectFileSink.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import org.apache.wayang.spark.execution.SparkExecutor;
4040
import org.apache.wayang.spark.platform.SparkPlatform;
4141
import org.apache.logging.log4j.LogManager;
42-
import org.apache.logging.log4j.Logger;
4342
import scala.Tuple2;
4443

4544
import java.io.IOException;
@@ -49,7 +48,6 @@
4948
import java.util.Collections;
5049
import java.util.Iterator;
5150
import java.util.List;
52-
import java.util.concurrent.atomic.AtomicBoolean;
5351

5452
/**
5553
* {@link Operator} for the {@link SparkPlatform} that creates a sequence file.
@@ -58,8 +56,6 @@
5856
*/
5957
public class SparkObjectFileSink<T> extends ObjectFileSink<T> implements SparkExecutionOperator {
6058

61-
private static final AtomicBoolean LEGACY_WARNING_EMITTED = new AtomicBoolean(false);
62-
6359
public SparkObjectFileSink(ObjectFileSink<T> that) {
6460
super(that);
6561
}
@@ -91,9 +87,6 @@ public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> eval
9187

9288
RddChannel.Instance input = (RddChannel.Instance) inputs[0];
9389
ObjectFileSerializationMode serializationMode = this.getSerializationMode();
94-
if (serializationMode == ObjectFileSerializationMode.LEGACY_JAVA_SERIALIZATION) {
95-
logLegacyWarning();
96-
}
9790

9891
final int chunkSize = 10;
9992
JavaPairRDD<NullWritable, BytesWritable> serializedRdd = input.provideRdd()
@@ -132,14 +125,6 @@ public boolean containsAction() {
132125
return true;
133126
}
134127

135-
private static void logLegacyWarning() {
136-
if (LEGACY_WARNING_EMITTED.compareAndSet(false, true)) {
137-
Logger logger = LogManager.getLogger(SparkObjectFileSink.class);
138-
logger.warn("SparkObjectFileSink is using deprecated legacy Java serialization. "
139-
+ "Please switch to the JSON serialization mode via ObjectFileSink#useJsonSerialization().");
140-
}
141-
}
142-
143128
private static Tuple2<NullWritable, BytesWritable> encodeBuffer(Object[] buffer,
144129
int validLength,
145130
ObjectFileSerializationMode mode) {

wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkObjectFileSource.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import java.util.Collection;
4949
import java.util.Collections;
5050
import java.util.List;
51-
import java.util.concurrent.atomic.AtomicBoolean;
5251

5352
/**
5453
* {@link Operator} for the {@link SparkPlatform} that creates a sequence file.
@@ -58,7 +57,6 @@
5857
public class SparkObjectFileSource<T> extends ObjectFileSource<T> implements SparkExecutionOperator {
5958

6059
private final Logger logger = LogManager.getLogger(this.getClass());
61-
private static final AtomicBoolean LEGACY_WARNING_EMITTED = new AtomicBoolean(false);
6260

6361
public SparkObjectFileSource(ObjectFileSource that) {
6462
super(that);
@@ -91,7 +89,7 @@ public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> eval
9189
final String actualInputPath = FileSystems.findActualSingleInputPath(sourcePath);
9290
final ObjectFileSerializationMode serializationMode = this.getSerializationMode();
9391
if (serializationMode == ObjectFileSerializationMode.LEGACY_JAVA_SERIALIZATION) {
94-
logLegacyWarning();
92+
// Warning is emitted by ObjectFileSource#getSerializationMode.
9593
}
9694
final JavaPairRDD<NullWritable, BytesWritable> rawRdd =
9795
sparkExecutor.sc.sequenceFile(actualInputPath, NullWritable.class, BytesWritable.class);
@@ -136,12 +134,4 @@ public boolean containsAction() {
136134
return false;
137135
}
138136

139-
private static void logLegacyWarning() {
140-
if (LEGACY_WARNING_EMITTED.compareAndSet(false, true)) {
141-
LogManager.getLogger(SparkObjectFileSource.class)
142-
.warn("SparkObjectFileSource is using deprecated legacy Java serialization. "
143-
+ "Please switch to the JSON serialization mode via ObjectFileSource#useJsonSerialization().");
144-
}
145-
}
146-
147137
}

0 commit comments

Comments
 (0)