Skip to content

Commit 7f5aa0b

Browse files
authored
IGNITE-26508 Encapsulate ByteBuffer into DirectMessageWriter/Reader (#12447)
1 parent 54f3339 commit 7f5aa0b

File tree

19 files changed

+102
-123
lines changed

19 files changed

+102
-123
lines changed

modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -183,11 +183,9 @@ private void generateMethods(TypeElement type, List<VariableElement> fields) thr
183183
/**
184184
* Generates start of write/read methods:
185185
* <pre>
186-
* public boolean writeTo(Message m, ByteBuffer buf, MessageWriter writer) {
186+
* public boolean writeTo(Message m, MessageWriter writer) {
187187
* TestMessage msg = (TestMessage)m;
188188
*
189-
* writer.setBuffer(buf);
190-
*
191189
* if (!writer.isHeaderWritten()) {
192190
* if (!writer.writeHeader(msg.directType()))
193191
* return false;
@@ -204,16 +202,13 @@ private void start(TypeElement type, Collection<String> code, boolean write) {
204202

205203
code.add(line(METHOD_JAVADOC));
206204

207-
code.add(line("@Override public boolean %s(Message m, ByteBuffer buf, %s) {",
205+
code.add(line("@Override public boolean %s(Message m, %s) {",
208206
write ? "writeTo" : "readFrom",
209207
write ? "MessageWriter writer" : "MessageReader reader"));
210208

211209
indent++;
212210

213211
code.add(line("%s msg = (%s)m;", type.getSimpleName().toString(), type.getSimpleName().toString()));
214-
code.add(EMPTY);
215-
code.add(line("%s.setBuffer(buf);", write ? "writer" : "reader"));
216-
217212
code.add(EMPTY);
218213

219214
if (write) {
@@ -693,7 +688,6 @@ private void writeClassHeader(Writer writer, String pkgName, String serClsName)
693688
writer.write(NL);
694689
writer.write("package " + pkgName + ";" + NL + NL);
695690

696-
imports.add("java.nio.ByteBuffer");
697691
imports.add("org.apache.ignite.plugin.extensions.communication.Message");
698692
imports.add("org.apache.ignite.plugin.extensions.communication.MessageSerializer");
699693
imports.add("org.apache.ignite.plugin.extensions.communication.MessageWriter");

modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ public class DirectMessageReader implements MessageReader {
4848
@GridToStringInclude
4949
private final DirectMessageState<StateItem> state;
5050

51+
/** Buffer for reading. */
52+
private ByteBuffer buf;
53+
5154
/** Whether last field was fully read. */
5255
private boolean lastRead;
5356

@@ -65,9 +68,20 @@ public DirectMessageReader(final MessageFactory msgFactory, IgniteCacheObjectPro
6568

6669
/** {@inheritDoc} */
6770
@Override public void setBuffer(ByteBuffer buf) {
71+
this.buf = buf;
72+
6873
state.item().stream.setBuffer(buf);
6974
}
7075

76+
/**
77+
* Gets but buffer to read from.
78+
*
79+
* @return Byte buffer.
80+
*/
81+
public ByteBuffer getBuffer() {
82+
return buf;
83+
}
84+
7185
/** {@inheritDoc} */
7286
@Override public byte readByte() {
7387
DirectByteBufferStream stream = state.item().stream;
@@ -395,6 +409,8 @@ public DirectMessageReader(final MessageFactory msgFactory, IgniteCacheObjectPro
395409
/** {@inheritDoc} */
396410
@Override public void beforeInnerMessageRead() {
397411
state.forward();
412+
413+
state.item().stream.setBuffer(buf);
398414
}
399415

400416
/** {@inheritDoc} */

modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ public class DirectMessageWriter implements MessageWriter {
4747
@GridToStringInclude
4848
private final DirectMessageState<StateItem> state;
4949

50+
/** Buffer for writing. */
51+
private ByteBuffer buf;
52+
5053
/** */
5154
public DirectMessageWriter(final MessageFactory msgFactory) {
5255
state = new DirectMessageState<>(StateItem.class, new IgniteOutClosure<StateItem>() {
@@ -58,9 +61,20 @@ public DirectMessageWriter(final MessageFactory msgFactory) {
5861

5962
/** {@inheritDoc} */
6063
@Override public void setBuffer(ByteBuffer buf) {
64+
this.buf = buf;
65+
6166
state.item().stream.setBuffer(buf);
6267
}
6368

69+
/**
70+
* Gets buffer to write to.
71+
*
72+
* @return Byte buffer.
73+
*/
74+
public ByteBuffer getBuffer() {
75+
return buf;
76+
}
77+
6478
/** {@inheritDoc} */
6579
@Override public boolean writeHeader(short type) {
6680
DirectByteBufferStream stream = state.item().stream;
@@ -364,6 +378,8 @@ public DirectMessageWriter(final MessageFactory msgFactory) {
364378
/** {@inheritDoc} */
365379
@Override public void beforeInnerMessageWrite() {
366380
state.forward();
381+
382+
state.item().stream.setBuffer(buf);
367383
}
368384

369385
/** {@inheritDoc} */

modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -884,7 +884,7 @@ public void writeMessage(Message msg, MessageWriter writer) {
884884
try {
885885
writer.beforeInnerMessageWrite();
886886

887-
lastFinished = msgFactory.serializer(msg.directType()).writeTo(msg, buf, writer);
887+
lastFinished = msgFactory.serializer(msg.directType()).writeTo(msg, writer);
888888
}
889889
finally {
890890
writer.afterInnerMessageWrite(lastFinished);
@@ -1536,7 +1536,7 @@ public <T extends Message> T readMessage(MessageReader reader) {
15361536
try {
15371537
reader.beforeInnerMessageRead();
15381538

1539-
lastFinished = msgFactory.serializer(msg.directType()).readFrom(msg, buf, reader);
1539+
lastFinished = msgFactory.serializer(msg.directType()).readFrom(msg, reader);
15401540
}
15411541
finally {
15421542
reader.afterInnerMessageRead(lastFinished);

modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.util.function.Supplier;
2323

2424
import org.apache.ignite.IgniteException;
25+
import org.apache.ignite.internal.direct.DirectMessageReader;
26+
import org.apache.ignite.internal.direct.DirectMessageWriter;
2527
import org.apache.ignite.plugin.extensions.communication.Message;
2628
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
2729
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
@@ -43,12 +45,22 @@ public class IgniteMessageFactoryImpl implements MessageFactory {
4345
/** Delegate serialization to {@code Message} methods. */
4446
private static final MessageSerializer DEFAULT_SERIALIZER = new MessageSerializer() {
4547
/** {@inheritDoc} */
46-
@Override public boolean writeTo(Message msg, ByteBuffer buf, MessageWriter writer) {
48+
@Override public boolean writeTo(Message msg, MessageWriter writer) {
49+
ByteBuffer buf = null;
50+
51+
if (writer instanceof DirectMessageWriter)
52+
buf = ((DirectMessageWriter)writer).getBuffer();
53+
4754
return msg.writeTo(buf, writer);
4855
}
4956

5057
/** {@inheritDoc} */
51-
@Override public boolean readFrom(Message msg, ByteBuffer buf, MessageReader reader) {
58+
@Override public boolean readFrom(Message msg, MessageReader reader) {
59+
ByteBuffer buf = null;
60+
61+
if (reader instanceof DirectMessageReader)
62+
buf = ((DirectMessageReader)reader).getBuffer();
63+
5264
return msg.readFrom(buf, reader);
5365
}
5466
};

modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientMessage.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,11 @@
2222
import java.io.ObjectInput;
2323
import java.io.ObjectOutput;
2424
import java.nio.ByteBuffer;
25-
import org.apache.ignite.internal.IgniteCodeGeneratingFail;
2625
import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
2726
import org.apache.ignite.internal.util.typedef.internal.U;
2827
import org.apache.ignite.plugin.extensions.communication.Message;
29-
import org.apache.ignite.plugin.extensions.communication.MessageReader;
30-
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
3128

3229
/** */
33-
@IgniteCodeGeneratingFail
3430
public class ClientMessage implements Message, Externalizable {
3531
/** */
3632
private static final long serialVersionUID = -4609408156037304495L;
@@ -85,8 +81,13 @@ public ClientMessage(BinaryOutputStream stream) {
8581
isFirstMessage = false;
8682
}
8783

88-
/** {@inheritDoc} */
89-
@Override public boolean writeTo(ByteBuffer buf, MessageWriter ignored) {
84+
/**
85+
* Writes this message to provided byte buffer.
86+
*
87+
* @param buf Byte buffer.
88+
* @return Whether message was fully written.
89+
*/
90+
public boolean writeTo(ByteBuffer buf) {
9091
assert stream != null || data != null;
9192

9293
byte[] data = stream != null ? stream.array() : this.data;
@@ -132,11 +133,6 @@ public ClientMessage(BinaryOutputStream stream) {
132133
return false;
133134
}
134135

135-
/** {@inheritDoc} */
136-
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
137-
throw new UnsupportedOperationException();
138-
}
139-
140136
/**
141137
* Reads this message from provided byte buffer.
142138
*

modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -222,8 +222,6 @@
222222
import org.apache.ignite.marshaller.Marshaller;
223223
import org.apache.ignite.marshaller.Marshallers;
224224
import org.apache.ignite.plugin.PluginProvider;
225-
import org.apache.ignite.plugin.extensions.communication.Message;
226-
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
227225
import org.apache.ignite.spi.IgniteSpi;
228226
import org.apache.ignite.spi.IgniteSpiException;
229227
import org.apache.ignite.spi.discovery.DiscoverySpi;
@@ -6613,39 +6611,6 @@ public static <T extends R, R> List<R> arrayList(Iterator<T> c, int cap,
66136611
return list;
66146612
}
66156613

6616-
/**
6617-
* Fully writes communication message to provided stream.
6618-
*
6619-
* @param msg Message.
6620-
* @param out Stream to write to.
6621-
* @param buf Byte buffer that will be passed to {@link Message#writeTo(ByteBuffer, MessageWriter)} method.
6622-
* @param writer Message writer.
6623-
* @return Number of written bytes.
6624-
* @throws IOException In case of error.
6625-
*/
6626-
public static int writeMessageFully(Message msg, OutputStream out, ByteBuffer buf,
6627-
MessageWriter writer) throws IOException {
6628-
assert msg != null;
6629-
assert out != null;
6630-
assert buf != null;
6631-
assert buf.hasArray();
6632-
6633-
boolean finished = false;
6634-
int cnt = 0;
6635-
6636-
while (!finished) {
6637-
finished = msg.writeTo(buf, writer);
6638-
6639-
out.write(buf.array(), 0, buf.position());
6640-
6641-
cnt += buf.position();
6642-
6643-
buf.clear();
6644-
}
6645-
6646-
return cnt;
6647-
}
6648-
66496614
/**
66506615
* Throws exception with uniform error message if given parameter's assertion condition
66516616
* is {@code false}.

modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@
2222
import java.nio.ByteBuffer;
2323
import org.apache.ignite.IgniteCheckedException;
2424
import org.apache.ignite.IgniteLogger;
25+
import org.apache.ignite.internal.direct.DirectMessageReader;
2526
import org.apache.ignite.internal.util.typedef.internal.U;
2627
import org.apache.ignite.plugin.extensions.communication.Message;
2728
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
28-
import org.apache.ignite.plugin.extensions.communication.MessageReader;
2929
import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
3030
import org.jetbrains.annotations.Nullable;
3131

@@ -67,10 +67,10 @@ public GridDirectParser(IgniteLogger log, MessageFactory msgFactory, GridNioMess
6767
/** {@inheritDoc} */
6868
@Nullable @Override public Object decode(GridNioSession ses, ByteBuffer buf)
6969
throws IOException, IgniteCheckedException {
70-
MessageReader reader = ses.meta(READER_META_KEY);
70+
DirectMessageReader reader = ses.meta(READER_META_KEY);
7171

7272
if (reader == null)
73-
ses.addMeta(READER_META_KEY, reader = readerFactory.reader(ses, msgFactory));
73+
ses.addMeta(READER_META_KEY, reader = (DirectMessageReader)readerFactory.reader(ses, msgFactory));
7474

7575
Message msg = ses.removeMeta(MSG_META_KEY);
7676

@@ -87,7 +87,9 @@ public GridDirectParser(IgniteLogger log, MessageFactory msgFactory, GridNioMess
8787
if (msg != null && buf.hasRemaining()) {
8888
MessageSerializer msgSer = msgFactory.serializer(msg.directType());
8989

90-
finished = msgSer.readFrom(msg, buf, reader);
90+
reader.setBuffer(buf);
91+
92+
finished = msgSer.readFrom(msg, reader);
9193
}
9294

9395
if (finished) {

modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1611,12 +1611,14 @@ private boolean writeToBuffer(
16111611
if (messageFactory() == null) {
16121612
assert msg instanceof ClientMessage; // TODO: Will refactor in IGNITE-26554.
16131613

1614-
finished = msg.writeTo(buf, writer);
1614+
finished = ((ClientMessage)msg).writeTo(buf);
16151615
}
16161616
else {
16171617
MessageSerializer msgSer = messageFactory().serializer(msg.directType());
16181618

1619-
finished = msgSer.writeTo(msg, buf, writer);
1619+
writer.setBuffer(buf);
1620+
1621+
finished = msgSer.writeTo(msg, writer);
16201622
}
16211623

16221624
span.addTag(SOCKET_WRITE_BYTES, () -> Integer.toString(buf.position() - startPos));
@@ -1810,12 +1812,14 @@ private boolean writeToBuffer(GridSelectorNioSessionImpl ses, ByteBuffer buf, Se
18101812
if (msgFactory == null) {
18111813
assert msg instanceof ClientMessage; // TODO: Will refactor in IGNITE-26554.
18121814

1813-
finished = msg.writeTo(buf, writer);
1815+
finished = ((ClientMessage)msg).writeTo(buf);
18141816
}
18151817
else {
18161818
MessageSerializer msgSer = msgFactory.serializer(msg.directType());
18171819

1818-
finished = msgSer.writeTo(msg, buf, writer);
1820+
writer.setBuffer(buf);
1821+
1822+
finished = msgSer.writeTo(msg, writer);
18191823
}
18201824

18211825
span.addTag(SOCKET_WRITE_BYTES, () -> Integer.toString(buf.position() - startPos));

modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,10 @@ public interface MessageReader {
4040
*
4141
* @param buf Byte buffer.
4242
*/
43-
public void setBuffer(ByteBuffer buf);
43+
@Deprecated
44+
public default void setBuffer(ByteBuffer buf) {
45+
// No-op.
46+
}
4447

4548
/**
4649
* Reads {@code byte} value.

0 commit comments

Comments
 (0)