Skip to content

Commit afb8e41

Browse files
committed
Ported object-file IO onto a safe codec to mitigate gadget chain attack that runs arbitrary bytecode inside the Wayang JVM
1 parent d4fa092 commit afb8e41

File tree

12 files changed

+527
-61
lines changed

12 files changed

+527
-61
lines changed
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.basic.operators;
20+
21+
import com.fasterxml.jackson.databind.DeserializationFeature;
22+
import com.fasterxml.jackson.databind.ObjectMapper;
23+
import com.fasterxml.jackson.databind.type.CollectionType;
24+
import org.apache.commons.lang3.Validate;
25+
26+
import java.io.ByteArrayInputStream;
27+
import java.io.ByteArrayOutputStream;
28+
import java.io.IOException;
29+
import java.io.ObjectInputStream;
30+
import java.io.ObjectOutputStream;
31+
import java.util.ArrayList;
32+
import java.util.Arrays;
33+
import java.util.Collection;
34+
import java.util.Collections;
35+
import java.util.List;
36+
37+
/**
38+
* Utility methods that convert between Java objects and the on-disk representation used by {@link ObjectFileSink}.
39+
*/
40+
public final class ObjectFileSerialization {
41+
42+
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
43+
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
44+
45+
static {
46+
OBJECT_MAPPER.findAndRegisterModules();
47+
}
48+
49+
private ObjectFileSerialization() {
50+
}
51+
52+
/**
53+
* Serialize a chunk of objects using the provided {@link ObjectFileSerializationMode}.
54+
*
55+
* @param chunk buffer that contains the objects to serialize
56+
* @param validLength number of valid entries inside {@code chunk}
57+
* @param mode the serialization mode
58+
* @return serialized bytes
59+
* @throws IOException if serialization fails
60+
*/
61+
public static byte[] serializeChunk(Object[] chunk, int validLength, ObjectFileSerializationMode mode) throws IOException {
62+
Validate.notNull(mode, "Serialization mode must be provided.");
63+
switch (mode) {
64+
case JSON:
65+
return serializeJson(chunk, validLength);
66+
case LEGACY_JAVA_SERIALIZATION:
67+
return serializeLegacy(chunk, validLength);
68+
default:
69+
throw new IllegalArgumentException("Unknown serialization mode: " + mode);
70+
}
71+
}
72+
73+
/**
74+
* Deserialize a chunk of objects.
75+
*
76+
* @param payload the serialized data
77+
* @param mode the serialization mode
78+
* @param elementType the expected element type
79+
* @return list of deserialized objects (never {@code null})
80+
* @throws IOException if deserialization fails
81+
* @throws ClassNotFoundException if a class cannot be resolved in legacy mode
82+
*/
83+
public static List<Object> deserializeChunk(byte[] payload,
84+
ObjectFileSerializationMode mode,
85+
Class<?> elementType) throws IOException, ClassNotFoundException {
86+
Validate.notNull(mode, "Serialization mode must be provided.");
87+
switch (mode) {
88+
case JSON:
89+
return deserializeJson(payload, elementType);
90+
case LEGACY_JAVA_SERIALIZATION:
91+
return deserializeLegacy(payload);
92+
default:
93+
throw new IllegalArgumentException("Unknown serialization mode: " + mode);
94+
}
95+
}
96+
97+
private static byte[] serializeLegacy(Object[] chunk, int validLength) throws IOException {
98+
Object[] payload = Arrays.copyOf(chunk, validLength);
99+
try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
100+
ObjectOutputStream oos = new ObjectOutputStream(bos)) {
101+
oos.writeObject(payload);
102+
oos.flush();
103+
return bos.toByteArray();
104+
}
105+
}
106+
107+
private static List<Object> deserializeLegacy(byte[] payload) throws IOException, ClassNotFoundException {
108+
try (ByteArrayInputStream bis = new ByteArrayInputStream(payload);
109+
ObjectInputStream ois = new ObjectInputStream(bis)) {
110+
Object tmp = ois.readObject();
111+
if (tmp == null) {
112+
return Collections.emptyList();
113+
}
114+
if (tmp instanceof Collection) {
115+
return new ArrayList<>((Collection<?>) tmp);
116+
}
117+
if (tmp.getClass().isArray()) {
118+
return Arrays.asList((Object[]) tmp);
119+
}
120+
return new ArrayList<>(Collections.singletonList(tmp));
121+
}
122+
}
123+
124+
private static byte[] serializeJson(Object[] chunk, int validLength) throws IOException {
125+
Object[] payload = Arrays.copyOf(chunk, validLength);
126+
return OBJECT_MAPPER.writeValueAsBytes(payload);
127+
}
128+
129+
private static List<Object> deserializeJson(byte[] payload, Class<?> elementType) throws IOException {
130+
Validate.notNull(elementType, "Element type must be provided for JSON deserialization.");
131+
CollectionType type = OBJECT_MAPPER.getTypeFactory()
132+
.constructCollectionType(List.class, elementType);
133+
List<?> list = OBJECT_MAPPER.readValue(payload, type);
134+
if (list == null) {
135+
return Collections.emptyList();
136+
}
137+
return new ArrayList<>(list);
138+
}
139+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.basic.operators;
20+
21+
/**
22+
* Supported serialization modes for {@link ObjectFileSource} and {@link ObjectFileSink}.
23+
*/
24+
public enum ObjectFileSerializationMode {
25+
26+
/**
27+
* Legacy Java serialization using {@link java.io.ObjectOutputStream}/{@link java.io.ObjectInputStream}.
28+
* This mode is deprecated and will be removed in a future release.
29+
*/
30+
@Deprecated
31+
LEGACY_JAVA_SERIALIZATION,
32+
33+
/**
34+
* JSON-based serialization that avoids Java serialization gadget chains.
35+
*/
36+
JSON
37+
}

wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSink.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ public class ObjectFileSink<T> extends UnarySink<T> {
3636

3737
protected final Class<T> tClass;
3838

39+
private ObjectFileSerializationMode serializationMode = ObjectFileSerializationMode.LEGACY_JAVA_SERIALIZATION;
40+
3941
/**
4042
* Creates a new instance.
4143
*
@@ -69,5 +71,30 @@ public ObjectFileSink(ObjectFileSink<T> that) {
6971
super(that);
7072
this.textFileUrl = that.textFileUrl;
7173
this.tClass = that.tClass;
74+
this.serializationMode = that.getSerializationMode();
75+
}
76+
77+
public ObjectFileSerializationMode getSerializationMode() {
78+
return this.serializationMode;
79+
}
80+
81+
public ObjectFileSink<T> withSerializationMode(ObjectFileSerializationMode serializationMode) {
82+
this.serializationMode = Objects.requireNonNull(serializationMode, "serializationMode");
83+
return this;
84+
}
85+
86+
/**
87+
* Configure this sink to use the deprecated legacy Java serialization.
88+
*/
89+
@Deprecated
90+
public ObjectFileSink<T> useLegacySerialization() {
91+
return this.withSerializationMode(ObjectFileSerializationMode.LEGACY_JAVA_SERIALIZATION);
92+
}
93+
94+
/**
95+
* Configure this sink to use the JSON-based serialization.
96+
*/
97+
public ObjectFileSink<T> useJsonSerialization() {
98+
return this.withSerializationMode(ObjectFileSerializationMode.JSON);
7299
}
73100
}

wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSource.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.wayang.basic.operators;
2020

21+
import java.util.Objects;
2122
import java.util.Optional;
2223
import java.util.OptionalDouble;
2324
import java.util.OptionalLong;
@@ -43,6 +44,8 @@ public class ObjectFileSource<T> extends UnarySource<T> {
4344

4445
private final Class<T> tClass;
4546

47+
private ObjectFileSerializationMode serializationMode = ObjectFileSerializationMode.LEGACY_JAVA_SERIALIZATION;
48+
4649
public ObjectFileSource(String inputUrl, DataSetType<T> type) {
4750
super(type);
4851
this.inputUrl = inputUrl;
@@ -64,6 +67,7 @@ public ObjectFileSource(ObjectFileSource that) {
6467
super(that);
6568
this.inputUrl = that.getInputUrl();
6669
this.tClass = that.getTypeClass();
70+
this.serializationMode = that.getSerializationMode();
6771
}
6872

6973
public String getInputUrl() {
@@ -74,6 +78,30 @@ public Class<T> getTypeClass(){
7478
return this.tClass;
7579
}
7680

81+
public ObjectFileSerializationMode getSerializationMode() {
82+
return this.serializationMode;
83+
}
84+
85+
public ObjectFileSource<T> withSerializationMode(ObjectFileSerializationMode serializationMode) {
86+
this.serializationMode = Objects.requireNonNull(serializationMode, "serializationMode");
87+
return this;
88+
}
89+
90+
/**
91+
* Configure this source to use the deprecated legacy Java serialization.
92+
*/
93+
@Deprecated
94+
public ObjectFileSource<T> useLegacySerialization() {
95+
return this.withSerializationMode(ObjectFileSerializationMode.LEGACY_JAVA_SERIALIZATION);
96+
}
97+
98+
/**
99+
* Configure this source to use the JSON-based serialization.
100+
*/
101+
public ObjectFileSource<T> useJsonSerialization() {
102+
return this.withSerializationMode(ObjectFileSerializationMode.JSON);
103+
}
104+
77105
@Override
78106
public Optional<org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator> createCardinalityEstimator(
79107
final int outputIndex,
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.basic.operators;
20+
21+
import org.junit.Assert;
22+
import org.junit.Test;
23+
24+
import java.io.IOException;
25+
import java.io.Serializable;
26+
import java.util.List;
27+
28+
public class ObjectFileSerializationTest {
29+
30+
@Test
31+
public void jsonRoundtrip() throws Exception {
32+
Object[] chunk = new Object[]{"alpha", "beta", "gamma"};
33+
byte[] payload = ObjectFileSerialization.serializeChunk(
34+
chunk,
35+
chunk.length,
36+
ObjectFileSerializationMode.JSON);
37+
38+
List<Object> result = ObjectFileSerialization.deserializeChunk(
39+
payload,
40+
ObjectFileSerializationMode.JSON,
41+
String.class);
42+
43+
Assert.assertEquals(3, result.size());
44+
Assert.assertEquals("alpha", result.get(0));
45+
Assert.assertEquals("beta", result.get(1));
46+
Assert.assertEquals("gamma", result.get(2));
47+
}
48+
49+
@Test
50+
public void legacyRoundtrip() throws Exception {
51+
SerializablePayload payload = new SerializablePayload("data", 42);
52+
Object[] chunk = new Object[]{payload};
53+
54+
byte[] bytes = ObjectFileSerialization.serializeChunk(
55+
chunk,
56+
chunk.length,
57+
ObjectFileSerializationMode.LEGACY_JAVA_SERIALIZATION);
58+
59+
List<Object> result = ObjectFileSerialization.deserializeChunk(
60+
bytes,
61+
ObjectFileSerializationMode.LEGACY_JAVA_SERIALIZATION,
62+
SerializablePayload.class);
63+
64+
Assert.assertEquals(1, result.size());
65+
SerializablePayload deserialized = (SerializablePayload) result.get(0);
66+
Assert.assertEquals("data", deserialized.text);
67+
Assert.assertEquals(42, deserialized.value);
68+
}
69+
70+
private static class SerializablePayload implements Serializable {
71+
final String text;
72+
final int value;
73+
74+
private SerializablePayload(String text, int value) {
75+
this.text = text;
76+
this.value = value;
77+
}
78+
}
79+
}

0 commit comments

Comments
 (0)