Skip to content

Commit 542beb2

Browse files
committed
Added jackson smile
1 parent fa12233 commit 542beb2

File tree

9 files changed

+210
-37
lines changed

9 files changed

+210
-37
lines changed

codec-parent/codec-jackson-smile/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
<version>2.4.10-SNAPSHOT</version>
1111
</parent>
1212

13-
<artifactId>codec-jackson-smile</artifactId>
13+
<artifactId>scalecube-codec-jackson-smile</artifactId>
1414
<name>ScaleCube/ClusterCodecJacksonSmile</name>
1515

1616
<dependencies>

codec-parent/codec-jackson-smile/src/main/java/io/scalecube/cluster/codec/jackson/smile/JacksonSmileMessageCodec.java

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,44 +6,25 @@
66
import java.io.InputStream;
77
import java.io.OutputStream;
88

9-
/** JacksonSmile based message codec. */
109
public class JacksonSmileMessageCodec implements MessageCodec {
1110

1211
private final ObjectMapper delegate;
1312

14-
/** Default constructor. */
1513
public JacksonSmileMessageCodec() {
1614
this(DefaultObjectMapper.OBJECT_MAPPER);
1715
}
1816

19-
/**
20-
* Create instance with external {@link ObjectMapper}.
21-
*
22-
* @param delegate jackson object mapper
23-
*/
2417
public JacksonSmileMessageCodec(ObjectMapper delegate) {
2518
this.delegate = delegate;
2619
}
2720

28-
/**
29-
* Deserializes message from given input stream.
30-
*
31-
* @param stream input stream
32-
* @return message from the input stream
33-
*/
3421
@Override
3522
public Message deserialize(InputStream stream) throws Exception {
3623
return this.delegate.readValue(stream, Message.class);
3724
}
3825

39-
/**
40-
* Serializes given message into given output stream.
41-
*
42-
* @param message message
43-
* @param stream output stream
44-
*/
4526
@Override
4627
public void serialize(Message message, OutputStream stream) throws Exception {
47-
this.delegate.writeValue(stream, message);
28+
stream.write(this.delegate.writeValueAsBytes(message));
4829
}
4930
}
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
package io.scalecube.cluster.codec.jackson.smile;
2+
3+
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
4+
import static org.junit.jupiter.api.Assertions.assertEquals;
5+
6+
import io.scalecube.cluster.transport.api.Message;
7+
import io.scalecube.cluster.transport.api.MessageCodec;
8+
import java.io.ByteArrayInputStream;
9+
import java.io.ByteArrayOutputStream;
10+
import java.nio.ByteBuffer;
11+
import java.util.Arrays;
12+
import java.util.Random;
13+
import org.junit.jupiter.api.Disabled;
14+
import org.junit.jupiter.api.Test;
15+
16+
class JacksonSmileMessageCodecTest {
17+
18+
private static final MessageCodec messageCodec = MessageCodec.INSTANCE;
19+
private static final Random random = new Random();
20+
21+
@Test
22+
void serializeAndDeserializeByteBuffer() throws Exception {
23+
byte[] bytes = "hello".getBytes();
24+
25+
Message to = Message.builder().data(new Entity(ByteBuffer.wrap(bytes))).build();
26+
ByteArrayOutputStream output = new ByteArrayOutputStream();
27+
28+
messageCodec.serialize(to, output);
29+
30+
Message from = messageCodec.deserialize(new ByteArrayInputStream(output.toByteArray()));
31+
Entity entity = from.data();
32+
33+
assertArrayEquals(bytes, entity.getMetadata().array());
34+
}
35+
36+
@Test
37+
void serializeAndDeserializeDirectByteBuffer() throws Exception {
38+
byte[] bytes = new byte[512];
39+
random.nextBytes(bytes);
40+
41+
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(bytes.length * 4);
42+
byteBuffer.put(bytes);
43+
byteBuffer.flip();
44+
45+
Message to = Message.builder().data(new Entity(byteBuffer)).build();
46+
ByteArrayOutputStream output = new ByteArrayOutputStream();
47+
48+
messageCodec.serialize(to, output);
49+
50+
Message from = messageCodec.deserialize(new ByteArrayInputStream(output.toByteArray()));
51+
Entity entity = from.data();
52+
53+
assertArrayEquals(bytes, entity.getMetadata().array());
54+
}
55+
56+
@Test
57+
void serializeAndDeserializeEmptyByteBuffer() throws Exception {
58+
byte[] bytes = new byte[0];
59+
60+
Message to = Message.builder().data(new Entity(ByteBuffer.wrap(bytes))).build();
61+
ByteArrayOutputStream output = new ByteArrayOutputStream();
62+
63+
messageCodec.serialize(to, output);
64+
65+
Message from = messageCodec.deserialize(new ByteArrayInputStream(output.toByteArray()));
66+
Entity entity = from.data();
67+
68+
assertArrayEquals(bytes, entity.getMetadata().array());
69+
}
70+
71+
@Test
72+
@Disabled("https://github.com/FasterXML/jackson-databind/issues/1662")
73+
void serializeAndDeserializeByteBufferWithOffset() throws Exception {
74+
byte[] bytes = new byte[512];
75+
random.nextBytes(bytes);
76+
int offset = random.nextInt(bytes.length / 2);
77+
78+
byte[] expected = Arrays.copyOfRange(bytes, offset, bytes.length);
79+
80+
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes, offset, bytes.length - offset);
81+
82+
assertEquals(offset, byteBuffer.position());
83+
assertEquals(bytes.length - offset, byteBuffer.remaining());
84+
85+
Message to = Message.builder().data(new Entity(byteBuffer)).build();
86+
ByteArrayOutputStream output = new ByteArrayOutputStream();
87+
88+
messageCodec.serialize(to, output);
89+
90+
Message from = messageCodec.deserialize(new ByteArrayInputStream(output.toByteArray()));
91+
Entity entity = from.data();
92+
93+
assertArrayEquals(expected, entity.getMetadata().array());
94+
}
95+
96+
@Test
97+
void serializeAndDeserializeByteBufferWithOffsetSlice() throws Exception {
98+
byte[] bytes = new byte[512];
99+
random.nextBytes(bytes);
100+
int offset = random.nextInt(bytes.length / 2);
101+
102+
byte[] expected = Arrays.copyOfRange(bytes, offset, bytes.length);
103+
104+
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes, offset, bytes.length - offset).slice();
105+
106+
assertEquals(0, byteBuffer.position());
107+
assertEquals(bytes.length - offset, byteBuffer.remaining());
108+
109+
Message to = Message.builder().data(new Entity(byteBuffer)).build();
110+
ByteArrayOutputStream output = new ByteArrayOutputStream();
111+
112+
messageCodec.serialize(to, output);
113+
114+
Message from = messageCodec.deserialize(new ByteArrayInputStream(output.toByteArray()));
115+
Entity entity = from.data();
116+
117+
assertArrayEquals(expected, entity.getMetadata().array());
118+
}
119+
120+
@Test
121+
@Disabled("https://github.com/FasterXML/jackson-databind/issues/1662")
122+
void serializeAndDeserializeDirectByteBufferWithOffset() throws Exception {
123+
byte[] bytes = new byte[512];
124+
random.nextBytes(bytes);
125+
int offset = random.nextInt(bytes.length / 2);
126+
127+
byte[] expected = Arrays.copyOfRange(bytes, offset, bytes.length);
128+
129+
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(bytes.length * 4);
130+
byteBuffer.put(bytes);
131+
byteBuffer.flip();
132+
byteBuffer.position(offset);
133+
134+
assertEquals(offset, byteBuffer.position());
135+
assertEquals(bytes.length - offset, byteBuffer.remaining());
136+
137+
Message to = Message.builder().data(new Entity(byteBuffer)).build();
138+
ByteArrayOutputStream output = new ByteArrayOutputStream();
139+
140+
messageCodec.serialize(to, output);
141+
142+
Message from = messageCodec.deserialize(new ByteArrayInputStream(output.toByteArray()));
143+
Entity entity = from.data();
144+
145+
assertArrayEquals(expected, entity.getMetadata().array());
146+
}
147+
148+
@Test
149+
void serializeAndDeserializeDirectByteBufferWithOffsetSlice() throws Exception {
150+
byte[] bytes = new byte[512];
151+
random.nextBytes(bytes);
152+
int offset = random.nextInt(bytes.length / 2);
153+
154+
byte[] expected = Arrays.copyOfRange(bytes, offset, bytes.length);
155+
156+
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(bytes.length * 4);
157+
byteBuffer.put(bytes);
158+
byteBuffer.flip();
159+
byteBuffer.position(offset);
160+
ByteBuffer slice = byteBuffer.slice();
161+
162+
assertEquals(0, slice.position());
163+
assertEquals(bytes.length - offset, slice.remaining());
164+
165+
Message to = Message.builder().data(new Entity(slice)).build();
166+
ByteArrayOutputStream output = new ByteArrayOutputStream();
167+
168+
messageCodec.serialize(to, output);
169+
170+
Message from = messageCodec.deserialize(new ByteArrayInputStream(output.toByteArray()));
171+
Entity entity = from.data();
172+
173+
assertArrayEquals(expected, entity.getMetadata().array());
174+
}
175+
176+
@Test
177+
@Disabled("Cannot construct instance of java.nio.HeapByteBuffer (no Creators, default construct)")
178+
void serializeAndDeserializeByteBufferWithoutEntity() throws Exception {
179+
byte[] bytes = "hello".getBytes();
180+
181+
Message to = Message.builder().data(ByteBuffer.wrap(bytes)).build();
182+
ByteArrayOutputStream output = new ByteArrayOutputStream();
183+
184+
messageCodec.serialize(to, output);
185+
186+
Message from = messageCodec.deserialize(new ByteArrayInputStream(output.toByteArray()));
187+
ByteBuffer byteBuffer = from.data();
188+
189+
assertArrayEquals(bytes, byteBuffer.array());
190+
}
191+
192+
static final class Entity {
193+
private ByteBuffer metadata;
194+
195+
Entity() {}
196+
197+
Entity(ByteBuffer metadata) {
198+
this.metadata = metadata;
199+
}
200+
201+
ByteBuffer getMetadata() {
202+
return metadata;
203+
}
204+
}
205+
}

codec-parent/codec-jackson/src/main/java/io/scalecube/cluster/codec/jackson/JacksonMessageCodec.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,11 @@ public JacksonMessageCodec(ObjectMapper delegate) {
1818
this.delegate = delegate;
1919
}
2020

21-
/**
22-
* Deserializes message from given input stream.
23-
*
24-
* @param stream input stream
25-
* @return message from the input stream
26-
*/
2721
@Override
2822
public Message deserialize(InputStream stream) throws Exception {
2923
return this.delegate.readValue(stream, Message.class);
3024
}
3125

32-
/**
33-
* Serializes given message into given output stream.
34-
*
35-
* @param message message
36-
* @param stream output stream
37-
*/
3826
@Override
3927
public void serialize(Message message, OutputStream stream) throws Exception {
4028
this.delegate.writeValue(stream, message);

examples/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
</dependency>
2424
<dependency>
2525
<groupId>io.scalecube</groupId>
26-
<artifactId>scalecube-codec-jackson</artifactId>
26+
<artifactId>scalecube-codec-jackson-smile</artifactId>
2727
<version>${project.version}</version>
2828
</dependency>
2929
<dependency>

transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,8 +197,8 @@ public Address sender() {
197197
@Override
198198
public String toString() {
199199
return new StringJoiner(", ", Message.class.getSimpleName() + "[", "]")
200-
.add("headers(" + headers.size() + ")")
201-
.add("data=" + (data != null ? data.getClass().getName() : null))
200+
.add("headers=" + headers)
201+
.add("data=" + data)
202202
.toString();
203203
}
204204

transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,6 @@ public final Flux<Message> listen() {
233233
public Mono<Void> send(Address address, Message message) {
234234
return connections
235235
.computeIfAbsent(address, this::connect0)
236-
.doOnSubscribe(s -> LOGGER.debug("[{}] Send {} to {}", this.address, message, address))
237236
.map(Connection::outbound)
238237
.flatMap(out -> out.send(Mono.just(message).map(this::toByteBuf), bb -> true).then())
239238
.then();

0 commit comments

Comments
 (0)