Skip to content

Commit 448592e

Browse files
authored
Pipe: Fixed the check for no permission 2 (apache#16804)
* edge * sonar * emperor-cloth * emperor-patch
1 parent 1c6a2b0 commit 448592e

File tree

8 files changed

+351
-15
lines changed

8 files changed

+351
-15
lines changed

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionAirGapSink.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,8 @@ private void doTransfer(
191191
new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
192192
.setMessage(errorMessage),
193193
errorMessage,
194-
pipeConfigRegionWritePlanEvent.toString());
194+
pipeConfigRegionWritePlanEvent.toString(),
195+
true);
195196
}
196197
}
197198

@@ -252,7 +253,8 @@ private void doTransfer(
252253
new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
253254
.setMessage(errorMessage),
254255
errorMessage,
255-
pipeConfigRegionSnapshotEvent.toString());
256+
pipeConfigRegionSnapshotEvent.toString(),
257+
true);
256258
} else {
257259
LOGGER.info("Successfully transferred config region snapshot {}.", snapshot);
258260
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,8 @@ private void doTransfer(final PipeConfigRegionWritePlanEvent pipeConfigRegionWri
188188
String.format(
189189
"Transfer config region write plan %s error, result status %s.",
190190
pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan().getType(), status),
191-
pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan().toString());
191+
pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan().toString(),
192+
true);
192193
}
193194

194195
if (LOGGER.isDebugEnabled()) {
@@ -279,7 +280,8 @@ private void doTransfer(final PipeConfigRegionSnapshotEvent snapshotEvent)
279280
String.format(
280281
"Seal config region snapshot file %s error, result status %s.",
281282
snapshotFile, resp.getStatus()),
282-
snapshotFile.toString());
283+
snapshotFile.toString(),
284+
true);
283285
}
284286

285287
LOGGER.info("Successfully transferred config region snapshot {}.", snapshotFile);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,8 @@ private void doTransfer(
122122
new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
123123
.setMessage(errorMessage),
124124
errorMessage,
125-
pipeSchemaRegionWritePlanEvent.toString());
125+
pipeSchemaRegionWritePlanEvent.toString(),
126+
true);
126127
}
127128
}
128129

@@ -187,7 +188,8 @@ private void doTransfer(
187188
new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
188189
.setMessage(errorMessage),
189190
errorMessage,
190-
pipeSchemaRegionSnapshotEvent.toString());
191+
pipeSchemaRegionSnapshotEvent.toString(),
192+
true);
191193
} else {
192194
LOGGER.info(
193195
"Successfully transferred schema region snapshot {}, {} and {}.",

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusDeleteEventHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public void onComplete(TPipeConsensusTransferResp response) {
7979
// Only handle the failed statuses to avoid string format performance overhead
8080
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
8181
&& status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
82-
connector.statusHandler().handle(status, status.getMessage(), event.toString());
82+
connector.statusHandler().handle(status, status.getMessage(), event.toString(), true);
8383
}
8484
event.decreaseReferenceCount(PipeConsensusDeleteEventHandler.class.getName(), true);
8585

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,8 @@ private void doTransfer(final PipeDeleteDataNodeEvent pipeDeleteDataNodeEvent)
253253
String.format(
254254
"Transfer deletion %s error, result status %s.",
255255
pipeDeleteDataNodeEvent.getDeleteDataNode().getType(), status),
256-
pipeDeleteDataNodeEvent.getDeletionResource().toString());
256+
pipeDeleteDataNodeEvent.getDeletionResource().toString(),
257+
true);
257258
}
258259

259260
if (LOGGER.isDebugEnabled()) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,8 @@ private void doTransfer(final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWri
127127
String.format(
128128
"Transfer data node write plan %s error, result status %s.",
129129
pipeSchemaRegionWritePlanEvent.getPlanNode().getType(), status),
130-
pipeSchemaRegionWritePlanEvent.getPlanNode().toString());
130+
pipeSchemaRegionWritePlanEvent.getPlanNode().toString(),
131+
true);
131132
}
132133

133134
if (LOGGER.isDebugEnabled()) {
@@ -222,7 +223,8 @@ private void doTransfer(final PipeSchemaRegionSnapshotEvent snapshotEvent)
222223
String.format(
223224
"Seal file %s, %s and %s error, result status %s.",
224225
mTreeSnapshotFile, tagLogSnapshotFile, attributeSnapshotFile, resp.getStatus()),
225-
snapshotEvent.toString());
226+
snapshotEvent.toString(),
227+
true);
226228
}
227229

228230
LOGGER.info(

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.iotdb.commons.pipe.config.PipeConfig;
2828
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
2929
import org.apache.iotdb.commons.utils.RetryUtils;
30+
import org.apache.iotdb.commons.utils.TestOnly;
3031
import org.apache.iotdb.pipe.api.event.Event;
3132
import org.apache.iotdb.pipe.api.exception.PipeException;
3233
import org.apache.iotdb.rpc.TSStatusCode;
@@ -44,7 +45,7 @@
4445

4546
public class PipeReceiverStatusHandler {
4647

47-
private static final Logger LOGGER = LoggerFactory.getLogger(PipeReceiverStatusHandler.class);
48+
private static Logger LOGGER = LoggerFactory.getLogger(PipeReceiverStatusHandler.class);
4849
private static final String NO_PERMISSION = "No permission";
4950
private static final String UNCLASSIFIED_EXCEPTION = "Unclassified exception";
5051
private static final String NO_PERMISSION_STR = "No permissions for this operation";
@@ -86,6 +87,11 @@ public PipeReceiverStatusHandler(
8687
this.skipIfNoPrivileges = skipIfNoPrivileges;
8788
}
8889

90+
public void handle(
91+
final TSStatus status, final String exceptionMessage, final String recordMessage) {
92+
handle(status, exceptionMessage, recordMessage, false);
93+
}
94+
8995
/**
9096
* Handle {@link TSStatus} returned by receiver. Do nothing if ignore the {@link Event}, and throw
9197
* exception if retry the {@link Event}. Upper class must ensure that the method is invoked only
@@ -99,7 +105,10 @@ public PipeReceiverStatusHandler(
99105
* put any time-related info here
100106
*/
101107
public void handle(
102-
final TSStatus status, final String exceptionMessage, final String recordMessage) {
108+
final TSStatus status,
109+
final String exceptionMessage,
110+
final String recordMessage,
111+
final boolean log4NoPrivileges) {
103112

104113
if (RetryUtils.needRetryForWrite(status.getCode())) {
105114
LOGGER.info("IoTConsensusV2: will retry with increasing interval. status: {}", status);
@@ -184,17 +193,28 @@ public void handle(
184193

185194
case 803: // NO_PERMISSION
186195
if (skipIfNoPrivileges) {
196+
if (log4NoPrivileges && LOGGER.isWarnEnabled()) {
197+
LOGGER.warn(
198+
"{}: Skip if no privileges. will be ignored. event: {}. status: {}",
199+
getNoPermission(true),
200+
shouldRecordIgnoredDataWhenOtherExceptionsOccur ? recordMessage : "not recorded",
201+
status);
202+
}
187203
return;
188204
}
189205
handleOtherExceptions(status, exceptionMessage, recordMessage, true);
190206
break;
191-
case 305:
192-
handleOtherExceptions(status, exceptionMessage, recordMessage, false);
193-
break;
194207
default:
195208
// Some auth error may be wrapped in other codes
196209
if (exceptionMessage.contains(NO_PERMISSION_STR)) {
197210
if (skipIfNoPrivileges) {
211+
if (log4NoPrivileges && LOGGER.isWarnEnabled()) {
212+
LOGGER.warn(
213+
"{}: Skip if no privileges. will be ignored. event: {}. status: {}",
214+
getNoPermission(true),
215+
shouldRecordIgnoredDataWhenOtherExceptionsOccur ? recordMessage : "not recorded",
216+
status);
217+
}
198218
return;
199219
}
200220
handleOtherExceptions(status, exceptionMessage, recordMessage, true);
@@ -314,4 +334,9 @@ public static TSStatus getPriorStatus(final List<TSStatus> givenStatusList) {
314334
resultStatus.setSubStatus(givenStatusList);
315335
return resultStatus;
316336
}
337+
338+
@TestOnly
339+
public static void setLogger(final Logger logger) {
340+
LOGGER = logger;
341+
}
317342
}

0 commit comments

Comments
 (0)