Skip to content

Commit f0cdfe0

Browse files
authored
fix (#16702) (#16772)
1 parent a60d00e commit f0cdfe0

File tree

1 file changed

+64
-70
lines changed

1 file changed

+64
-70
lines changed

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java

Lines changed: 64 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@
4343
public class PipeReceiverStatusHandler {
4444

4545
private static final Logger LOGGER = LoggerFactory.getLogger(PipeReceiverStatusHandler.class);
46+
private static final String NO_PERMISSION = "No permission";
47+
private static final String UNCLASSIFIED_EXCEPTION = "Unclassified exception";
48+
private static final String NO_PERMISSION_STR = "No permissions for this operation";
4649

4750
private static final int CONFLICT_RETRY_MAX_TIMES = 100;
4851

@@ -169,83 +172,74 @@ public void handle(
169172
if (skipIfNoPrivileges) {
170173
return;
171174
}
172-
173-
synchronized (this) {
174-
recordExceptionStatusIfNecessary(recordMessage);
175-
176-
if (exceptionEventHasBeenRetried.get()
177-
&& System.currentTimeMillis() - exceptionFirstEncounteredTime.get()
178-
> retryMaxMillisWhenOtherExceptionsOccur) {
179-
LOGGER.warn(
180-
"No permission: retry timeout. will be ignored. event: {}. status: {}",
181-
shouldRecordIgnoredDataWhenOtherExceptionsOccur ? recordMessage : "not recorded",
182-
status);
183-
resetExceptionStatus();
175+
handleOtherExceptions(status, exceptionMessage, recordMessage, true);
176+
break;
177+
case 305:
178+
handleOtherExceptions(status, exceptionMessage, recordMessage, false);
179+
break;
180+
default:
181+
// Some auth error may be wrapped in other codes
182+
if (exceptionMessage.contains(NO_PERMISSION_STR)) {
183+
if (skipIfNoPrivileges) {
184184
return;
185185
}
186-
187-
// Reduce the log if retry forever
188-
if (retryMaxMillisWhenOtherExceptionsOccur == Long.MAX_VALUE) {
189-
PipeLogger.log(LOGGER::warn, "No permission: will retry forever. status: %s", status);
190-
} else {
191-
LOGGER.warn(
192-
"No permission: will retry for at least {} seconds. status: {}",
193-
(retryMaxMillisWhenOtherExceptionsOccur
194-
+ exceptionFirstEncounteredTime.get()
195-
- System.currentTimeMillis())
196-
/ 1000.0,
197-
status);
198-
}
199-
200-
exceptionEventHasBeenRetried.set(true);
201-
throw new PipeRuntimeSinkRetryTimesConfigurableException(
202-
exceptionMessage,
203-
(int)
204-
Math.max(
205-
PipeSubtask.MAX_RETRY_TIMES,
206-
Math.min(
207-
CONFLICT_RETRY_MAX_TIMES, retryMaxMillisWhenOtherExceptionsOccur * 1.1)));
186+
handleOtherExceptions(status, exceptionMessage, recordMessage, true);
187+
break;
208188
}
189+
// Other exceptions
190+
handleOtherExceptions(status, exceptionMessage, recordMessage, false);
191+
break;
192+
}
193+
}
209194

210-
default: // Other exceptions
211-
synchronized (this) {
212-
recordExceptionStatusIfNecessary(recordMessage);
195+
private synchronized void handleOtherExceptions(
196+
final TSStatus status,
197+
final String exceptionMessage,
198+
final String recordMessage,
199+
final boolean noPermission) {
200+
recordExceptionStatusIfNecessary(recordMessage);
201+
202+
if (exceptionEventHasBeenRetried.get()
203+
&& System.currentTimeMillis() - exceptionFirstEncounteredTime.get()
204+
> retryMaxMillisWhenOtherExceptionsOccur) {
205+
LOGGER.warn(
206+
"{}: retry timeout. will be ignored. event: {}. status: {}",
207+
getNoPermission(noPermission),
208+
shouldRecordIgnoredDataWhenOtherExceptionsOccur ? recordMessage : "not recorded",
209+
status);
210+
resetExceptionStatus();
211+
return;
212+
}
213213

214-
if (exceptionEventHasBeenRetried.get()
215-
&& System.currentTimeMillis() - exceptionFirstEncounteredTime.get()
216-
> retryMaxMillisWhenOtherExceptionsOccur) {
217-
LOGGER.warn(
218-
"Unclassified exception: retry timeout. will be ignored. event: {}. status: {}",
219-
shouldRecordIgnoredDataWhenOtherExceptionsOccur ? recordMessage : "not recorded",
220-
status);
221-
resetExceptionStatus();
222-
return;
223-
}
214+
// Reduce the log if retry forever
215+
if (retryMaxMillisWhenOtherExceptionsOccur == Long.MAX_VALUE) {
216+
PipeLogger.log(
217+
LOGGER::warn,
218+
"%s: will retry forever. status: %s",
219+
getNoPermission(noPermission),
220+
status);
221+
} else {
222+
LOGGER.warn(
223+
"{}: will retry for at least {} seconds. status: {}",
224+
getNoPermission(noPermission),
225+
(retryMaxMillisWhenOtherExceptionsOccur
226+
+ exceptionFirstEncounteredTime.get()
227+
- System.currentTimeMillis())
228+
/ 1000.0,
229+
status);
230+
}
224231

225-
// Reduce the log if retry forever
226-
if (retryMaxMillisWhenOtherExceptionsOccur == Long.MAX_VALUE) {
227-
PipeLogger.log(
228-
LOGGER::warn, "Unclassified exception: will retry forever. status: %s", status);
229-
} else {
230-
LOGGER.warn(
231-
"Unclassified exception: will retry for at least {} seconds. status: {}",
232-
(retryMaxMillisWhenOtherExceptionsOccur
233-
+ exceptionFirstEncounteredTime.get()
234-
- System.currentTimeMillis())
235-
/ 1000.0,
236-
status);
237-
}
232+
exceptionEventHasBeenRetried.set(true);
233+
throw new PipeRuntimeSinkRetryTimesConfigurableException(
234+
exceptionMessage,
235+
(int)
236+
Math.max(
237+
PipeSubtask.MAX_RETRY_TIMES,
238+
Math.min(CONFLICT_RETRY_MAX_TIMES, retryMaxMillisWhenOtherExceptionsOccur * 1.1)));
239+
}
238240

239-
exceptionEventHasBeenRetried.set(true);
240-
throw new PipeRuntimeSinkRetryTimesConfigurableException(
241-
exceptionMessage,
242-
(int)
243-
Math.max(
244-
PipeSubtask.MAX_RETRY_TIMES,
245-
Math.min(
246-
CONFLICT_RETRY_MAX_TIMES, retryMaxMillisWhenOtherExceptionsOccur * 1.1)));
247-
}
248-
}
241+
private static String getNoPermission(final boolean noPermission) {
242+
return noPermission ? NO_PERMISSION : UNCLASSIFIED_EXCEPTION;
249243
}
250244

251245
private void recordExceptionStatusIfNecessary(final String message) {

0 commit comments

Comments
 (0)