Skip to content

Commit 6178847

Browse files
authored
[To dev/1.3] Optimized the error log for schema execution (#16982) (#16986)
* Optimized the error log for schema execution (#16982) * fix * sptls
1 parent f372e51 commit 6178847

File tree

12 files changed

+607
-180
lines changed

12 files changed

+607
-180
lines changed

iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,18 @@ public static TSStatus getStatus(List<TSStatus> statusList) {
163163
return status;
164164
}
165165

166+
public static TSStatus extractFailureStatues(final TSStatus input) {
167+
return input.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()
168+
? new TSStatus(input.getCode())
169+
.setMessage(input.getMessage())
170+
.setSubStatus(
171+
input.getSubStatus().stream()
172+
.filter(
173+
status -> status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode())
174+
.collect(Collectors.toList()))
175+
: input;
176+
}
177+
166178
/**
167179
* Convert from {@link TSStatusCode} to {@link TSStatus}, which has message appended with existing
168180
* status message

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterLogicalViewProcedure.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import java.util.Objects;
6060
import java.util.Set;
6161
import java.util.function.BiFunction;
62+
import java.util.stream.Collectors;
6263

6364
public class AlterLogicalViewProcedure
6465
extends StateMachineProcedure<ConfigNodeProcedureEnv, AlterLogicalViewState> {
@@ -72,6 +73,8 @@ public class AlterLogicalViewProcedure
7273
private transient PathPatternTree pathPatternTree;
7374
private transient ByteBuffer patternTreeBytes;
7475

76+
protected final Map<TDataNodeLocation, TSStatus> failureMap = new HashMap<>();
77+
7578
public AlterLogicalViewProcedure(final boolean isGeneratedByPipe) {
7679
super(isGeneratedByPipe);
7780
}
@@ -390,11 +393,14 @@ protected void onAllReplicasetFailure(
390393
new ProcedureException(
391394
new MetadataException(
392395
String.format(
393-
"Alter view %s failed when [%s] because failed to execute in all replicaset of schemaRegion %s. Failure nodes: %s",
396+
"Alter view %s failed when [%s] because failed to execute in all replicaset of schemaRegion %s. Failure nodes: %s, statuses: %s",
394397
viewPathToSourceMap.keySet(),
395398
taskName,
396399
consensusGroupId.id,
397-
dataNodeLocationSet))));
400+
dataNodeLocationSet.stream()
401+
.map(TDataNodeLocation::getDataNodeId)
402+
.collect(Collectors.toSet()),
403+
failureStatusList))));
398404
interruptTask();
399405
}
400406
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DataNodeRegionTaskExecutor.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@
2222
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
2323
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
2424
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
25+
import org.apache.iotdb.common.rpc.thrift.TSStatus;
2526
import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
2627
import org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager;
2728
import org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext;
2829
import org.apache.iotdb.confignode.manager.ConfigManager;
2930
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
31+
import org.apache.iotdb.rpc.TSStatusCode;
3032

3133
import java.util.ArrayList;
3234
import java.util.HashMap;
@@ -35,6 +37,7 @@
3537
import java.util.Map;
3638
import java.util.Set;
3739
import java.util.function.BiFunction;
40+
import java.util.stream.Collectors;
3841

3942
import static org.apache.iotdb.confignode.procedure.impl.schema.DataNodeRegionGroupUtil.getAllReplicaDataNodeRegionGroupMap;
4043
import static org.apache.iotdb.confignode.procedure.impl.schema.DataNodeRegionGroupUtil.getLeaderDataNodeRegionGroupMap;
@@ -48,7 +51,7 @@ public abstract class DataNodeRegionTaskExecutor<Q, R> {
4851
protected final CnToDnAsyncRequestType dataNodeRequestType;
4952
protected final BiFunction<TDataNodeLocation, List<TConsensusGroupId>, Q>
5053
dataNodeRequestGenerator;
51-
54+
protected final Map<TDataNodeLocation, TSStatus> failureMap = new HashMap<>();
5255
private boolean isInterrupted = false;
5356

5457
protected DataNodeRegionTaskExecutor(
@@ -221,5 +224,17 @@ protected abstract List<TConsensusGroupId> processResponseOfOneDataNode(
221224
* executed.
222225
*/
223226
protected abstract void onAllReplicasetFailure(
224-
TConsensusGroupId consensusGroupId, Set<TDataNodeLocation> dataNodeLocationSet);
227+
final TConsensusGroupId consensusGroupId, final Set<TDataNodeLocation> dataNodeLocationSet);
228+
229+
protected String printFailureMap() {
230+
return failureMap.entrySet().stream()
231+
.collect(
232+
Collectors.toMap(
233+
entry -> "DataNodeId: " + entry.getKey().getDataNodeId(),
234+
entry ->
235+
entry.getValue().getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()
236+
? entry.getValue().getSubStatus()
237+
: entry.getValue()))
238+
.toString();
239+
}
225240
}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.confignode.procedure.impl.schema;
21+
22+
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
23+
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
24+
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
25+
import org.apache.iotdb.common.rpc.thrift.TSStatus;
26+
import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
27+
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
28+
import org.apache.iotdb.rpc.RpcUtils;
29+
import org.apache.iotdb.rpc.TSStatusCode;
30+
31+
import java.util.ArrayList;
32+
import java.util.List;
33+
import java.util.Map;
34+
import java.util.function.BiFunction;
35+
36+
public abstract class DataNodeTSStatusTaskExecutor<Q>
37+
extends DataNodeRegionTaskExecutor<Q, TSStatus> {
38+
protected List<TSStatus> successResult = new ArrayList<>();
39+
40+
protected DataNodeTSStatusTaskExecutor(
41+
final ConfigNodeProcedureEnv env,
42+
final Map<TConsensusGroupId, TRegionReplicaSet> targetRegionGroup,
43+
final boolean executeOnAllReplicaset,
44+
final CnToDnAsyncRequestType dataNodeRequestType,
45+
final BiFunction<TDataNodeLocation, List<TConsensusGroupId>, Q> dataNodeRequestGenerator) {
46+
super(
47+
env,
48+
targetRegionGroup,
49+
executeOnAllReplicaset,
50+
dataNodeRequestType,
51+
dataNodeRequestGenerator);
52+
}
53+
54+
@Override
55+
protected List<TConsensusGroupId> processResponseOfOneDataNode(
56+
final TDataNodeLocation dataNodeLocation,
57+
final List<TConsensusGroupId> consensusGroupIdList,
58+
final TSStatus response) {
59+
final List<TConsensusGroupId> failedRegionList = new ArrayList<>();
60+
if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
61+
failureMap.remove(dataNodeLocation);
62+
return failedRegionList;
63+
}
64+
65+
if (response.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
66+
final List<TSStatus> subStatus = response.getSubStatus();
67+
for (int i = 0; i < subStatus.size(); i++) {
68+
if (subStatus.get(i).getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
69+
failedRegionList.add(consensusGroupIdList.get(i));
70+
}
71+
}
72+
} else {
73+
failedRegionList.addAll(consensusGroupIdList);
74+
}
75+
if (!failedRegionList.isEmpty()) {
76+
failureMap.put(dataNodeLocation, RpcUtils.extractFailureStatues(response));
77+
} else {
78+
failureMap.remove(dataNodeLocation);
79+
}
80+
return failedRegionList;
81+
}
82+
83+
protected List<TConsensusGroupId> processResponseOfOneDataNodeWithSuccessResult(
84+
final TDataNodeLocation dataNodeLocation,
85+
final List<TConsensusGroupId> consensusGroupIdList,
86+
final TSStatus response) {
87+
final List<TConsensusGroupId> failedRegionList = new ArrayList<>();
88+
if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
89+
successResult.add(response);
90+
} else if (response.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
91+
List<TSStatus> subStatusList = response.getSubStatus();
92+
for (int i = 0; i < subStatusList.size(); i++) {
93+
if (subStatusList.get(i).getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
94+
successResult.add(subStatusList.get(i));
95+
} else {
96+
failedRegionList.add(consensusGroupIdList.get(i));
97+
}
98+
}
99+
} else {
100+
failedRegionList.addAll(consensusGroupIdList);
101+
}
102+
if (!failedRegionList.isEmpty()) {
103+
failureMap.put(dataNodeLocation, RpcUtils.extractFailureStatues(response));
104+
} else {
105+
failureMap.remove(dataNodeLocation);
106+
}
107+
return failedRegionList;
108+
}
109+
110+
public List<TSStatus> getSuccessResult() {
111+
return successResult;
112+
}
113+
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeactivateTemplateProcedure.java

Lines changed: 9 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,6 @@ private long constructBlackList(ConfigNodeProcedureEnv env) {
143143
if (targetSchemaRegionGroup.isEmpty()) {
144144
return 0;
145145
}
146-
List<TSStatus> successResult = new ArrayList<>();
147146
DeactivateTemplateRegionTaskExecutor<TConstructSchemaBlackListWithTemplateReq>
148147
constructBlackListTask =
149148
new DeactivateTemplateRegionTaskExecutor<TConstructSchemaBlackListWithTemplateReq>(
@@ -156,26 +155,11 @@ private long constructBlackList(ConfigNodeProcedureEnv env) {
156155
consensusGroupIdList, dataNodeRequest))) {
157156
@Override
158157
protected List<TConsensusGroupId> processResponseOfOneDataNode(
159-
TDataNodeLocation dataNodeLocation,
160-
List<TConsensusGroupId> consensusGroupIdList,
161-
TSStatus response) {
162-
List<TConsensusGroupId> failedRegionList = new ArrayList<>();
163-
if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
164-
successResult.add(response);
165-
} else if (response.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
166-
List<TSStatus> subStatusList = response.getSubStatus();
167-
for (int i = 0; i < subStatusList.size(); i++) {
168-
if (subStatusList.get(i).getCode()
169-
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
170-
successResult.add(subStatusList.get(i));
171-
} else {
172-
failedRegionList.add(consensusGroupIdList.get(i));
173-
}
174-
}
175-
} else {
176-
failedRegionList.addAll(consensusGroupIdList);
177-
}
178-
return failedRegionList;
158+
final TDataNodeLocation dataNodeLocation,
159+
final List<TConsensusGroupId> consensusGroupIdList,
160+
final TSStatus response) {
161+
return processResponseOfOneDataNodeWithSuccessResult(
162+
dataNodeLocation, consensusGroupIdList, response);
179163
}
180164
};
181165
constructBlackListTask.execute();
@@ -185,7 +169,7 @@ protected List<TConsensusGroupId> processResponseOfOneDataNode(
185169
}
186170

187171
long preDeletedNum = 0;
188-
for (TSStatus resp : successResult) {
172+
for (TSStatus resp : constructBlackListTask.getSuccessResult()) {
189173
preDeletedNum += Long.parseLong(resp.getMessage());
190174
}
191175
return preDeletedNum;
@@ -428,8 +412,7 @@ public int hashCode() {
428412
getProcId(), getCurrentState(), getCycles(), isGeneratedByPipe, queryId, templateSetInfo);
429413
}
430414

431-
private class DeactivateTemplateRegionTaskExecutor<Q>
432-
extends DataNodeRegionTaskExecutor<Q, TSStatus> {
415+
private class DeactivateTemplateRegionTaskExecutor<Q> extends DataNodeTSStatusTaskExecutor<Q> {
433416

434417
private final String taskName;
435418

@@ -459,42 +442,19 @@ private class DeactivateTemplateRegionTaskExecutor<Q>
459442
this.taskName = taskName;
460443
}
461444

462-
@Override
463-
protected List<TConsensusGroupId> processResponseOfOneDataNode(
464-
TDataNodeLocation dataNodeLocation,
465-
List<TConsensusGroupId> consensusGroupIdList,
466-
TSStatus response) {
467-
List<TConsensusGroupId> failedRegionList = new ArrayList<>();
468-
if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
469-
return failedRegionList;
470-
}
471-
472-
if (response.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
473-
List<TSStatus> subStatus = response.getSubStatus();
474-
for (int i = 0; i < subStatus.size(); i++) {
475-
if (subStatus.get(i).getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
476-
failedRegionList.add(consensusGroupIdList.get(i));
477-
}
478-
}
479-
} else {
480-
failedRegionList.addAll(consensusGroupIdList);
481-
}
482-
return failedRegionList;
483-
}
484-
485445
@Override
486446
protected void onAllReplicasetFailure(
487447
TConsensusGroupId consensusGroupId, Set<TDataNodeLocation> dataNodeLocationSet) {
488448
setFailure(
489449
new ProcedureException(
490450
new MetadataException(
491451
String.format(
492-
"Deactivate template of %s failed when [%s] because failed to execute in all replicaset of %s %s. Failure nodes: %s",
452+
"Deactivate template of %s failed when [%s] because failed to execute in all replicaset of %s %s. Failure: %s",
493453
requestMessage,
494454
taskName,
495455
consensusGroupId.type,
496456
consensusGroupId.id,
497-
dataNodeLocationSet))));
457+
printFailureMap()))));
498458
interruptTask();
499459
}
500460
}

0 commit comments

Comments
 (0)