Skip to content

Commit 8f2c4b2

Browse files
committed
Merge branch 'master' into IGNITE-26839
2 parents bf1bb0c + 3455af6 commit 8f2c4b2

34 files changed

+171
-173
lines changed

modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -183,11 +183,9 @@ private void generateMethods(TypeElement type, List<VariableElement> fields) thr
183183
/**
184184
* Generates start of write/read methods:
185185
* <pre>
186-
* public boolean writeTo(Message m, ByteBuffer buf, MessageWriter writer) {
186+
* public boolean writeTo(Message m, MessageWriter writer) {
187187
* TestMessage msg = (TestMessage)m;
188188
*
189-
* writer.setBuffer(buf);
190-
*
191189
* if (!writer.isHeaderWritten()) {
192190
* if (!writer.writeHeader(msg.directType()))
193191
* return false;
@@ -204,16 +202,13 @@ private void start(TypeElement type, Collection<String> code, boolean write) {
204202

205203
code.add(line(METHOD_JAVADOC));
206204

207-
code.add(line("@Override public boolean %s(Message m, ByteBuffer buf, %s) {",
205+
code.add(line("@Override public boolean %s(Message m, %s) {",
208206
write ? "writeTo" : "readFrom",
209207
write ? "MessageWriter writer" : "MessageReader reader"));
210208

211209
indent++;
212210

213211
code.add(line("%s msg = (%s)m;", type.getSimpleName().toString(), type.getSimpleName().toString()));
214-
code.add(EMPTY);
215-
code.add(line("%s.setBuffer(buf);", write ? "writer" : "reader"));
216-
217212
code.add(EMPTY);
218213

219214
if (write) {
@@ -693,7 +688,6 @@ private void writeClassHeader(Writer writer, String pkgName, String serClsName)
693688
writer.write(NL);
694689
writer.write("package " + pkgName + ";" + NL + NL);
695690

696-
imports.add("java.nio.ByteBuffer");
697691
imports.add("org.apache.ignite.plugin.extensions.communication.Message");
698692
imports.add("org.apache.ignite.plugin.extensions.communication.MessageSerializer");
699693
imports.add("org.apache.ignite.plugin.extensions.communication.MessageWriter");

modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@
109109
import org.apache.ignite.internal.processors.cache.persistence.snapshot.DataStreamerUpdatesHandler;
110110
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager;
111111
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotPartitionsQuickVerifyHandler;
112-
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotPartitionsVerifyTaskResult;
112+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotPartitionsVerifyResult;
113113
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
114114
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
115115
import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
@@ -3256,7 +3256,7 @@ public void testCheckSnapshot() throws Exception {
32563256

32573257
StringBuilder sb = new StringBuilder();
32583258

3259-
((SnapshotPartitionsVerifyTaskResult)h.getLastOperationResult()).print(sb::append);
3259+
((SnapshotPartitionsVerifyResult)h.getLastOperationResult()).print(sb::append);
32603260

32613261
assertContains(log, sb.toString(), "The check procedure has finished, no conflicts have been found");
32623262
}

modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ public class DirectMessageReader implements MessageReader {
4848
@GridToStringInclude
4949
private final DirectMessageState<StateItem> state;
5050

51+
/** Buffer for reading. */
52+
private ByteBuffer buf;
53+
5154
/** Whether last field was fully read. */
5255
private boolean lastRead;
5356

@@ -65,9 +68,20 @@ public DirectMessageReader(final MessageFactory msgFactory, IgniteCacheObjectPro
6568

6669
/** {@inheritDoc} */
6770
@Override public void setBuffer(ByteBuffer buf) {
71+
this.buf = buf;
72+
6873
state.item().stream.setBuffer(buf);
6974
}
7075

76+
/**
77+
* Gets but buffer to read from.
78+
*
79+
* @return Byte buffer.
80+
*/
81+
public ByteBuffer getBuffer() {
82+
return buf;
83+
}
84+
7185
/** {@inheritDoc} */
7286
@Override public byte readByte() {
7387
DirectByteBufferStream stream = state.item().stream;
@@ -395,6 +409,8 @@ public DirectMessageReader(final MessageFactory msgFactory, IgniteCacheObjectPro
395409
/** {@inheritDoc} */
396410
@Override public void beforeInnerMessageRead() {
397411
state.forward();
412+
413+
state.item().stream.setBuffer(buf);
398414
}
399415

400416
/** {@inheritDoc} */

modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ public class DirectMessageWriter implements MessageWriter {
4747
@GridToStringInclude
4848
private final DirectMessageState<StateItem> state;
4949

50+
/** Buffer for writing. */
51+
private ByteBuffer buf;
52+
5053
/** */
5154
public DirectMessageWriter(final MessageFactory msgFactory) {
5255
state = new DirectMessageState<>(StateItem.class, new IgniteOutClosure<StateItem>() {
@@ -58,9 +61,20 @@ public DirectMessageWriter(final MessageFactory msgFactory) {
5861

5962
/** {@inheritDoc} */
6063
@Override public void setBuffer(ByteBuffer buf) {
64+
this.buf = buf;
65+
6166
state.item().stream.setBuffer(buf);
6267
}
6368

69+
/**
70+
* Gets buffer to write to.
71+
*
72+
* @return Byte buffer.
73+
*/
74+
public ByteBuffer getBuffer() {
75+
return buf;
76+
}
77+
6478
/** {@inheritDoc} */
6579
@Override public boolean writeHeader(short type) {
6680
DirectByteBufferStream stream = state.item().stream;
@@ -364,6 +378,8 @@ public DirectMessageWriter(final MessageFactory msgFactory) {
364378
/** {@inheritDoc} */
365379
@Override public void beforeInnerMessageWrite() {
366380
state.forward();
381+
382+
state.item().stream.setBuffer(buf);
367383
}
368384

369385
/** {@inheritDoc} */

modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -884,7 +884,7 @@ public void writeMessage(Message msg, MessageWriter writer) {
884884
try {
885885
writer.beforeInnerMessageWrite();
886886

887-
lastFinished = msgFactory.serializer(msg.directType()).writeTo(msg, buf, writer);
887+
lastFinished = msgFactory.serializer(msg.directType()).writeTo(msg, writer);
888888
}
889889
finally {
890890
writer.afterInnerMessageWrite(lastFinished);
@@ -1536,7 +1536,7 @@ public <T extends Message> T readMessage(MessageReader reader) {
15361536
try {
15371537
reader.beforeInnerMessageRead();
15381538

1539-
lastFinished = msgFactory.serializer(msg.directType()).readFrom(msg, buf, reader);
1539+
lastFinished = msgFactory.serializer(msg.directType()).readFrom(msg, reader);
15401540
}
15411541
finally {
15421542
reader.afterInnerMessageRead(lastFinished);

modules/core/src/main/java/org/apache/ignite/internal/management/snapshot/SnapshotCheckCommand.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@
1818
package org.apache.ignite.internal.management.snapshot;
1919

2020
import java.util.function.Consumer;
21-
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotPartitionsVerifyTaskResult;
21+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotPartitionsVerifyResult;
2222

2323
/** */
24-
public class SnapshotCheckCommand extends AbstractSnapshotCommand<SnapshotCheckCommandArg, SnapshotPartitionsVerifyTaskResult> {
24+
public class SnapshotCheckCommand extends AbstractSnapshotCommand<SnapshotCheckCommandArg, SnapshotPartitionsVerifyResult> {
2525
/** {@inheritDoc} */
2626
@Override public String description() {
2727
return "Check snapshot";
@@ -38,7 +38,7 @@ public class SnapshotCheckCommand extends AbstractSnapshotCommand<SnapshotCheckC
3838
}
3939

4040
/** {@inheritDoc} */
41-
@Override public void printResult(SnapshotCheckCommandArg arg, SnapshotPartitionsVerifyTaskResult res, Consumer<String> printer) {
41+
@Override public void printResult(SnapshotCheckCommandArg arg, SnapshotPartitionsVerifyResult res, Consumer<String> printer) {
4242
res.print(printer);
4343
}
4444
}

modules/core/src/main/java/org/apache/ignite/internal/management/snapshot/SnapshotCheckTask.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import org.apache.ignite.IgniteException;
2121
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager;
22-
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotPartitionsVerifyTaskResult;
22+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotPartitionsVerifyResult;
2323
import org.apache.ignite.internal.processors.task.GridInternal;
2424
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
2525
import org.apache.ignite.internal.visor.VisorJob;
@@ -29,17 +29,17 @@
2929
* @see IgniteSnapshotManager#checkSnapshot(String, String)
3030
*/
3131
@GridInternal
32-
public class SnapshotCheckTask extends VisorOneNodeTask<SnapshotCheckCommandArg, SnapshotPartitionsVerifyTaskResult> {
32+
public class SnapshotCheckTask extends VisorOneNodeTask<SnapshotCheckCommandArg, SnapshotPartitionsVerifyResult> {
3333
/** Serial version uid. */
3434
private static final long serialVersionUID = 0L;
3535

3636
/** {@inheritDoc} */
37-
@Override protected VisorJob<SnapshotCheckCommandArg, SnapshotPartitionsVerifyTaskResult> job(SnapshotCheckCommandArg arg) {
37+
@Override protected VisorJob<SnapshotCheckCommandArg, SnapshotPartitionsVerifyResult> job(SnapshotCheckCommandArg arg) {
3838
return new SnapshotCheckJob(arg, debug);
3939
}
4040

4141
/** */
42-
private static class SnapshotCheckJob extends SnapshotJob<SnapshotCheckCommandArg, SnapshotPartitionsVerifyTaskResult> {
42+
private static class SnapshotCheckJob extends SnapshotJob<SnapshotCheckCommandArg, SnapshotPartitionsVerifyResult> {
4343
/** Serial version uid. */
4444
private static final long serialVersionUID = 0L;
4545

@@ -52,7 +52,7 @@ protected SnapshotCheckJob(SnapshotCheckCommandArg arg, boolean debug) {
5252
}
5353

5454
/** {@inheritDoc} */
55-
@Override protected SnapshotPartitionsVerifyTaskResult run(SnapshotCheckCommandArg arg) throws IgniteException {
55+
@Override protected SnapshotPartitionsVerifyResult run(SnapshotCheckCommandArg arg) throws IgniteException {
5656
IgniteSnapshotManager snpMgr = ignite.context().cache().context().snapshotMgr();
5757

5858
return new IgniteFutureImpl<>(snpMgr.checkSnapshot(arg.snapshotName(), arg.src(), arg.increment())).get();

modules/core/src/main/java/org/apache/ignite/internal/managers/communication/ErrorMessage.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
* <p>Because raw serialization of throwables is prohibited, you should use this message when it is necessary
3737
* to transfer some error as part of some message. See {@link MessageProcessor} for details.
3838
* <p>Currently, under the hood marshalling and unmarshalling is performed by {@link JdkMarshaller}.
39+
* <p>If the message serialization fails, wraps this error with own one.
3940
*/
4041
@SuppressWarnings({"NullableProblems", "unused"})
4142
public class ErrorMessage implements Message {
@@ -74,8 +75,22 @@ public ErrorMessage(@Nullable Throwable err) {
7475
try {
7576
return U.marshal(jdk(), err);
7677
}
77-
catch (IgniteCheckedException e) {
78-
throw new IgniteException("Unable to marshal the holding error.", e);
78+
catch (IgniteCheckedException e0) {
79+
IgniteCheckedException wrappedErr = new IgniteCheckedException(err.getMessage());
80+
81+
wrappedErr.setStackTrace(err.getStackTrace());
82+
wrappedErr.addSuppressed(e0);
83+
84+
try {
85+
return U.marshal(jdk(), wrappedErr);
86+
}
87+
catch (IgniteCheckedException e1) {
88+
IgniteException marshErr = new IgniteException("Unable to marshal the wrapping error.", e1);
89+
90+
marshErr.addSuppressed(wrappedErr);
91+
92+
throw marshErr;
93+
}
7994
}
8095
}
8196

modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.util.function.Supplier;
2323

2424
import org.apache.ignite.IgniteException;
25+
import org.apache.ignite.internal.direct.DirectMessageReader;
26+
import org.apache.ignite.internal.direct.DirectMessageWriter;
2527
import org.apache.ignite.plugin.extensions.communication.Message;
2628
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
2729
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
@@ -43,12 +45,22 @@ public class IgniteMessageFactoryImpl implements MessageFactory {
4345
/** Delegate serialization to {@code Message} methods. */
4446
private static final MessageSerializer DEFAULT_SERIALIZER = new MessageSerializer() {
4547
/** {@inheritDoc} */
46-
@Override public boolean writeTo(Message msg, ByteBuffer buf, MessageWriter writer) {
48+
@Override public boolean writeTo(Message msg, MessageWriter writer) {
49+
ByteBuffer buf = null;
50+
51+
if (writer instanceof DirectMessageWriter)
52+
buf = ((DirectMessageWriter)writer).getBuffer();
53+
4754
return msg.writeTo(buf, writer);
4855
}
4956

5057
/** {@inheritDoc} */
51-
@Override public boolean readFrom(Message msg, ByteBuffer buf, MessageReader reader) {
58+
@Override public boolean readFrom(Message msg, MessageReader reader) {
59+
ByteBuffer buf = null;
60+
61+
if (reader instanceof DirectMessageReader)
62+
buf = ((DirectMessageReader)reader).getBuffer();
63+
5264
return msg.readFrom(buf, reader);
5365
}
5466
};

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ static CacheInvokeDirectResult lazyResult(KeyCacheObject key, Object res) {
8787
*/
8888
public CacheInvokeDirectResult(KeyCacheObject key, Throwable err) {
8989
this.key = key;
90-
this.errMsg = new ErrorMessage(err);
90+
errMsg = new ErrorMessage(err);
9191
}
9292

9393
/**

0 commit comments

Comments
 (0)