Skip to content

Commit d008388

Browse files
authored
IGNITE-26736 Use MessageSerializer for Calcite's ErrorMessage (#12457)
1 parent a431294 commit d008388

File tree

6 files changed

+93
-174
lines changed

6 files changed

+93
-174
lines changed

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox;
3333
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpIoTracker;
3434
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker;
35-
import org.apache.ignite.internal.processors.query.calcite.message.ErrorMessage;
35+
import org.apache.ignite.internal.processors.query.calcite.message.CalciteErrorMessage;
3636
import org.apache.ignite.internal.processors.query.calcite.message.InboxCloseMessage;
3737
import org.apache.ignite.internal.processors.query.calcite.message.MessageService;
3838
import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
@@ -171,7 +171,7 @@ public void queryRegistry(QueryRegistry qryRegistry) {
171171

172172
/** {@inheritDoc} */
173173
@Override public void sendError(UUID nodeId, UUID qryId, long fragmentId, Throwable err) throws IgniteCheckedException {
174-
messageService().send(nodeId, new ErrorMessage(qryId, fragmentId, err));
174+
messageService().send(nodeId, new CalciteErrorMessage(qryId, fragmentId, err));
175175
}
176176

177177
/** {@inheritDoc} */

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@
7474
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker;
7575
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.PerformanceStatisticsIoTracker;
7676
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.QueryMemoryTracker;
77-
import org.apache.ignite.internal.processors.query.calcite.message.ErrorMessage;
77+
import org.apache.ignite.internal.processors.query.calcite.message.CalciteErrorMessage;
7878
import org.apache.ignite.internal.processors.query.calcite.message.MessageService;
7979
import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
8080
import org.apache.ignite.internal.processors.query.calcite.message.QueryStartRequest;
@@ -472,7 +472,7 @@ public void injectService(InjectResourcesService injectSvc) {
472472
@Override public void init() {
473473
messageService().register((n, m) -> onMessage(n, (QueryStartRequest)m), MessageType.QUERY_START_REQUEST);
474474
messageService().register((n, m) -> onMessage(n, (QueryStartResponse)m), MessageType.QUERY_START_RESPONSE);
475-
messageService().register((n, m) -> onMessage(n, (ErrorMessage)m), MessageType.QUERY_ERROR_MESSAGE);
475+
messageService().register((n, m) -> onMessage(n, (CalciteErrorMessage)m), MessageType.QUERY_ERROR_MESSAGE);
476476

477477
eventManager().addDiscoveryEventListener(discoLsnr, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
478478

@@ -945,7 +945,7 @@ private void onMessage(UUID nodeId, QueryStartResponse msg) {
945945
}
946946

947947
/** */
948-
private void onMessage(UUID nodeId, ErrorMessage msg) {
948+
private void onMessage(UUID nodeId, CalciteErrorMessage msg) {
949949
assert nodeId != null && msg != null;
950950

951951
Query<?> qry = qryReg.query(msg.queryId());
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.processors.query.calcite.message;
19+
20+
import java.util.UUID;
21+
import org.apache.ignite.internal.Order;
22+
import org.apache.ignite.internal.managers.communication.ErrorMessage;
23+
24+
/**
25+
*
26+
*/
27+
public class CalciteErrorMessage extends ErrorMessage implements CalciteMessage {
28+
/** */
29+
@Order(value = 1, method = "queryId")
30+
private UUID qryId;
31+
32+
/** */
33+
@Order(2)
34+
private long fragmentId;
35+
36+
/** */
37+
public CalciteErrorMessage() {
38+
// No-op.
39+
}
40+
41+
/** */
42+
public CalciteErrorMessage(UUID qryId, long fragmentId, Throwable err) {
43+
super(err);
44+
45+
assert err != null;
46+
47+
this.qryId = qryId;
48+
this.fragmentId = fragmentId;
49+
}
50+
51+
/**
52+
* @return Query ID.
53+
*/
54+
public UUID queryId() {
55+
return qryId;
56+
}
57+
58+
/** */
59+
public void queryId(UUID qryId) {
60+
this.qryId = qryId;
61+
}
62+
63+
/**
64+
* @return Fragment ID.
65+
*/
66+
public long fragmentId() {
67+
return fragmentId;
68+
}
69+
70+
/** */
71+
public void fragmentId(long fragmentId) {
72+
this.fragmentId = fragmentId;
73+
}
74+
75+
/** {@inheritDoc} */
76+
@Override public MessageType type() {
77+
return MessageType.QUERY_ERROR_MESSAGE;
78+
}
79+
80+
/** {@inheritDoc} */
81+
@Override public short directType() {
82+
return MessageType.QUERY_ERROR_MESSAGE.directType();
83+
}
84+
}

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/ErrorMessage.java

Lines changed: 0 additions & 167 deletions
This file was deleted.

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.ignite.internal.processors.query.calcite.message;
1919

2020
import java.util.function.Supplier;
21+
import org.apache.ignite.internal.codegen.CalciteErrorMessageSerializer;
2122
import org.apache.ignite.internal.codegen.ColocationGroupSerializer;
2223
import org.apache.ignite.internal.codegen.FragmentDescriptionSerializer;
2324
import org.apache.ignite.internal.codegen.FragmentMappingSerializer;
@@ -42,7 +43,7 @@ public enum MessageType {
4243
QUERY_START_RESPONSE(301, QueryStartResponse::new, new QueryStartResponseSerializer()),
4344

4445
/** */
45-
QUERY_ERROR_MESSAGE(302, ErrorMessage::new),
46+
QUERY_ERROR_MESSAGE(302, CalciteErrorMessage::new, new CalciteErrorMessageSerializer()),
4647

4748
/** */
4849
QUERY_BATCH_MESSAGE(303, QueryBatchMessage::new),

modules/core/src/main/java/org/apache/ignite/internal/managers/communication/ErrorMessage.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.ignite.internal.MessageProcessor;
2323
import org.apache.ignite.internal.Order;
2424
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
25+
import org.apache.ignite.internal.util.typedef.F;
2526
import org.apache.ignite.internal.util.typedef.internal.S;
2627
import org.apache.ignite.internal.util.typedef.internal.U;
2728
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
@@ -101,7 +102,7 @@ public ErrorMessage(@Nullable Throwable err) {
101102
* @see MessageWriter
102103
*/
103104
public void errorBytes(@Nullable byte[] errBytes) {
104-
if (errBytes == null)
105+
if (F.isEmpty(errBytes))
105106
err = null;
106107
else {
107108
try {

0 commit comments

Comments
 (0)