Skip to content

Commit 3a63220

Browse files
authored
fix (#16702)
1 parent 1ebb951 commit 3a63220

File tree

1 file changed

+64
-70
lines changed

1 file changed

+64
-70
lines changed

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java

Lines changed: 64 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@
4545
public class PipeReceiverStatusHandler {
4646

4747
private static final Logger LOGGER = LoggerFactory.getLogger(PipeReceiverStatusHandler.class);
48+
private static final String NO_PERMISSION = "No permission";
49+
private static final String UNCLASSIFIED_EXCEPTION = "Unclassified exception";
50+
private static final String NO_PERMISSION_STR = "No permissions for this operation";
4851

4952
private static final int CONFLICT_RETRY_MAX_TIMES = 100;
5053

@@ -183,83 +186,74 @@ public void handle(
183186
if (skipIfNoPrivileges) {
184187
return;
185188
}
186-
187-
synchronized (this) {
188-
recordExceptionStatusIfNecessary(recordMessage);
189-
190-
if (exceptionEventHasBeenRetried.get()
191-
&& System.currentTimeMillis() - exceptionFirstEncounteredTime.get()
192-
> retryMaxMillisWhenOtherExceptionsOccur) {
193-
LOGGER.warn(
194-
"No permission: retry timeout. will be ignored. event: {}. status: {}",
195-
shouldRecordIgnoredDataWhenOtherExceptionsOccur ? recordMessage : "not recorded",
196-
status);
197-
resetExceptionStatus();
189+
handleOtherExceptions(status, exceptionMessage, recordMessage, true);
190+
break;
191+
case 305:
192+
handleOtherExceptions(status, exceptionMessage, recordMessage, false);
193+
break;
194+
default:
195+
// Some auth error may be wrapped in other codes
196+
if (exceptionMessage.contains(NO_PERMISSION_STR)) {
197+
if (skipIfNoPrivileges) {
198198
return;
199199
}
200-
201-
// Reduce the log if retry forever
202-
if (retryMaxMillisWhenOtherExceptionsOccur == Long.MAX_VALUE) {
203-
PipeLogger.log(LOGGER::warn, "No permission: will retry forever. status: %s", status);
204-
} else {
205-
LOGGER.warn(
206-
"No permission: will retry for at least {} seconds. status: {}",
207-
(retryMaxMillisWhenOtherExceptionsOccur
208-
+ exceptionFirstEncounteredTime.get()
209-
- System.currentTimeMillis())
210-
/ 1000.0,
211-
status);
212-
}
213-
214-
exceptionEventHasBeenRetried.set(true);
215-
throw new PipeRuntimeSinkRetryTimesConfigurableException(
216-
exceptionMessage,
217-
(int)
218-
Math.max(
219-
PipeSubtask.MAX_RETRY_TIMES,
220-
Math.min(
221-
CONFLICT_RETRY_MAX_TIMES, retryMaxMillisWhenOtherExceptionsOccur * 1.1)));
200+
handleOtherExceptions(status, exceptionMessage, recordMessage, true);
201+
break;
222202
}
203+
// Other exceptions
204+
handleOtherExceptions(status, exceptionMessage, recordMessage, false);
205+
break;
206+
}
207+
}
223208

224-
default: // Other exceptions
225-
synchronized (this) {
226-
recordExceptionStatusIfNecessary(recordMessage);
209+
private synchronized void handleOtherExceptions(
210+
final TSStatus status,
211+
final String exceptionMessage,
212+
final String recordMessage,
213+
final boolean noPermission) {
214+
recordExceptionStatusIfNecessary(recordMessage);
215+
216+
if (exceptionEventHasBeenRetried.get()
217+
&& System.currentTimeMillis() - exceptionFirstEncounteredTime.get()
218+
> retryMaxMillisWhenOtherExceptionsOccur) {
219+
LOGGER.warn(
220+
"{}: retry timeout. will be ignored. event: {}. status: {}",
221+
getNoPermission(noPermission),
222+
shouldRecordIgnoredDataWhenOtherExceptionsOccur ? recordMessage : "not recorded",
223+
status);
224+
resetExceptionStatus();
225+
return;
226+
}
227227

228-
if (exceptionEventHasBeenRetried.get()
229-
&& System.currentTimeMillis() - exceptionFirstEncounteredTime.get()
230-
> retryMaxMillisWhenOtherExceptionsOccur) {
231-
LOGGER.warn(
232-
"Unclassified exception: retry timeout. will be ignored. event: {}. status: {}",
233-
shouldRecordIgnoredDataWhenOtherExceptionsOccur ? recordMessage : "not recorded",
234-
status);
235-
resetExceptionStatus();
236-
return;
237-
}
228+
// Reduce the log if retry forever
229+
if (retryMaxMillisWhenOtherExceptionsOccur == Long.MAX_VALUE) {
230+
PipeLogger.log(
231+
LOGGER::warn,
232+
"%s: will retry forever. status: %s",
233+
getNoPermission(noPermission),
234+
status);
235+
} else {
236+
LOGGER.warn(
237+
"{}: will retry for at least {} seconds. status: {}",
238+
getNoPermission(noPermission),
239+
(retryMaxMillisWhenOtherExceptionsOccur
240+
+ exceptionFirstEncounteredTime.get()
241+
- System.currentTimeMillis())
242+
/ 1000.0,
243+
status);
244+
}
238245

239-
// Reduce the log if retry forever
240-
if (retryMaxMillisWhenOtherExceptionsOccur == Long.MAX_VALUE) {
241-
PipeLogger.log(
242-
LOGGER::warn, "Unclassified exception: will retry forever. status: %s", status);
243-
} else {
244-
LOGGER.warn(
245-
"Unclassified exception: will retry for at least {} seconds. status: {}",
246-
(retryMaxMillisWhenOtherExceptionsOccur
247-
+ exceptionFirstEncounteredTime.get()
248-
- System.currentTimeMillis())
249-
/ 1000.0,
250-
status);
251-
}
246+
exceptionEventHasBeenRetried.set(true);
247+
throw new PipeRuntimeSinkRetryTimesConfigurableException(
248+
exceptionMessage,
249+
(int)
250+
Math.max(
251+
PipeSubtask.MAX_RETRY_TIMES,
252+
Math.min(CONFLICT_RETRY_MAX_TIMES, retryMaxMillisWhenOtherExceptionsOccur * 1.1)));
253+
}
252254

253-
exceptionEventHasBeenRetried.set(true);
254-
throw new PipeRuntimeSinkRetryTimesConfigurableException(
255-
exceptionMessage,
256-
(int)
257-
Math.max(
258-
PipeSubtask.MAX_RETRY_TIMES,
259-
Math.min(
260-
CONFLICT_RETRY_MAX_TIMES, retryMaxMillisWhenOtherExceptionsOccur * 1.1)));
261-
}
262-
}
255+
private static String getNoPermission(final boolean noPermission) {
256+
return noPermission ? NO_PERMISSION : UNCLASSIFIED_EXCEPTION;
263257
}
264258

265259
private void recordExceptionStatusIfNecessary(final String message) {

0 commit comments

Comments
 (0)