Skip to content

Commit 90dcbd3

Browse files
committed
Merge branch 'master' of https://github.com/apache/iotdb into fix-audit-logger
2 parents 0df40e3 + a7f1527 commit 90dcbd3

File tree

11 files changed

+120
-95
lines changed

11 files changed

+120
-95
lines changed

iotdb-core/ainode/iotdb/ainode/core/ai_node.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,11 @@ def stop(self):
163163
self._rpc_handler.stop()
164164
if self._rpc_service:
165165
self._rpc_service.stop()
166-
self._rpc_service.join(1)
167-
if self._rpc_service.is_alive():
168-
logger.warning("RPC service thread failed to stop in time.")
166+
for retry in range(30):
167+
self._rpc_service.join(2)
168+
if not self._rpc_service.is_alive():
169+
logger.warning(
170+
"RPC service thread failed to stop in time, retrying..."
171+
)
172+
break
169173
logger.info("IoTDB-AINode has successfully stopped.")

iotdb-core/ainode/iotdb/ainode/core/inference/pool_controller.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -504,7 +504,7 @@ def get_load(self, model_id, device_id, pool_id) -> int:
504504
return pool_group.get_load(pool_id)
505505
return -1
506506

507-
def shutdown(self):
507+
def stop(self):
508508
self._stop_event.set()
509509

510510
# shutdown pool controller

iotdb-core/ainode/iotdb/ainode/core/manager/inference_manager.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -366,9 +366,9 @@ def inference(self, req: TInferenceReq):
366366
single_output=False,
367367
)
368368

369-
def shutdown(self):
369+
def stop(self):
370370
self._stop_event.set()
371-
self._pool_controller.shutdown()
371+
self._pool_controller.stop()
372372
while not self._result_queue.empty():
373373
self._result_queue.get_nowait()
374374
self._result_queue.close()

iotdb-core/ainode/iotdb/ainode/core/rpc/handler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ def __init__(self, ainode):
6969

7070
def stop(self) -> None:
7171
logger.info("Stopping the RPC service handler of IoTDB-AINode...")
72-
self._inference_manager.shutdown()
72+
self._inference_manager.stop()
7373

7474
def stopAINode(self) -> TSStatus:
7575
self._ainode.stop()

iotdb-core/ainode/iotdb/ainode/core/rpc/service.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,3 +125,4 @@ def stop(self) -> None:
125125
logger.info("Stopping the RPC service of IoTDB-AINode...")
126126
self._stop_event.set()
127127
self.__pool_server.stop()
128+
self._handler.stop()

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,8 @@ protected void handleException(Logger logger, Exception e) {
164164
} else if (e instanceof InterruptedException
165165
|| Thread.interrupted()
166166
|| e instanceof StopReadTsFileByInterruptException
167-
|| !tsFileManager.isAllowCompaction()) {
167+
|| !tsFileManager.isAllowCompaction()
168+
|| CompactionTaskManager.getInstance().isStopAllCompactionWorker()) {
168169
logger.warn(
169170
"{}-{} [Compaction] {} task interrupted",
170171
storageGroupName,

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -116,14 +116,14 @@ public InnerSpaceCompactionTask(
116116
}
117117

118118
protected static class InnerCompactionTaskFilesView {
119-
protected List<TsFileResource> sortedAllSourceFilesInTask;
120-
protected List<TsFileResource> sourceFilesInCompactionPerformer;
121-
protected List<TsFileResource> skippedSourceFiles;
119+
protected List<TsFileResource> sortedAllSourceFilesInTask = Collections.emptyList();
120+
protected List<TsFileResource> sourceFilesInCompactionPerformer = Collections.emptyList();
121+
protected List<TsFileResource> skippedSourceFiles = Collections.emptyList();
122122
protected boolean sequence;
123-
protected List<TsFileResource> sourceFilesInLog;
124-
protected List<TsFileResource> targetFilesInLog;
125-
protected List<TsFileResource> targetFilesInPerformer;
126-
protected List<TsFileResource> renamedTargetFiles;
123+
protected List<TsFileResource> sourceFilesInLog = Collections.emptyList();
124+
protected List<TsFileResource> targetFilesInLog = Collections.emptyList();
125+
protected List<TsFileResource> targetFilesInPerformer = Collections.emptyList();
126+
protected List<TsFileResource> renamedTargetFiles = Collections.emptyList();
127127

128128
protected long selectedFileSize;
129129
protected int sumOfCompactionCount;

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionTestFileWriter.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.apache.iotdb.db.storageengine.dataregion.compaction.utils;
2121

22+
import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileWriter;
23+
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
2224
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
2325

2426
import org.apache.tsfile.enums.TSDataType;
@@ -49,7 +51,9 @@ public class CompactionTestFileWriter implements Closeable {
4951

5052
public CompactionTestFileWriter(TsFileResource emptyFile) throws IOException {
5153
this.resource = emptyFile;
52-
fileWriter = new TsFileIOWriter(emptyFile.getTsFile());
54+
fileWriter =
55+
new CompactionTsFileWriter(
56+
emptyFile.getTsFile(), 1024 * 1024 * 1024, CompactionType.INNER_SEQ_COMPACTION);
5357
}
5458

5559
public IDeviceID startChunkGroup(String deviceNameWithoutParentPath) throws IOException {

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java

Lines changed: 64 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@
4545
public class PipeReceiverStatusHandler {
4646

4747
private static final Logger LOGGER = LoggerFactory.getLogger(PipeReceiverStatusHandler.class);
48+
private static final String NO_PERMISSION = "No permission";
49+
private static final String UNCLASSIFIED_EXCEPTION = "Unclassified exception";
50+
private static final String NO_PERMISSION_STR = "No permissions for this operation";
4851

4952
private static final int CONFLICT_RETRY_MAX_TIMES = 100;
5053

@@ -183,83 +186,74 @@ public void handle(
183186
if (skipIfNoPrivileges) {
184187
return;
185188
}
186-
187-
synchronized (this) {
188-
recordExceptionStatusIfNecessary(recordMessage);
189-
190-
if (exceptionEventHasBeenRetried.get()
191-
&& System.currentTimeMillis() - exceptionFirstEncounteredTime.get()
192-
> retryMaxMillisWhenOtherExceptionsOccur) {
193-
LOGGER.warn(
194-
"No permission: retry timeout. will be ignored. event: {}. status: {}",
195-
shouldRecordIgnoredDataWhenOtherExceptionsOccur ? recordMessage : "not recorded",
196-
status);
197-
resetExceptionStatus();
189+
handleOtherExceptions(status, exceptionMessage, recordMessage, true);
190+
break;
191+
case 305:
192+
handleOtherExceptions(status, exceptionMessage, recordMessage, false);
193+
break;
194+
default:
195+
// Some auth error may be wrapped in other codes
196+
if (exceptionMessage.contains(NO_PERMISSION_STR)) {
197+
if (skipIfNoPrivileges) {
198198
return;
199199
}
200-
201-
// Reduce the log if retry forever
202-
if (retryMaxMillisWhenOtherExceptionsOccur == Long.MAX_VALUE) {
203-
PipeLogger.log(LOGGER::warn, "No permission: will retry forever. status: %s", status);
204-
} else {
205-
LOGGER.warn(
206-
"No permission: will retry for at least {} seconds. status: {}",
207-
(retryMaxMillisWhenOtherExceptionsOccur
208-
+ exceptionFirstEncounteredTime.get()
209-
- System.currentTimeMillis())
210-
/ 1000.0,
211-
status);
212-
}
213-
214-
exceptionEventHasBeenRetried.set(true);
215-
throw new PipeRuntimeSinkRetryTimesConfigurableException(
216-
exceptionMessage,
217-
(int)
218-
Math.max(
219-
PipeSubtask.MAX_RETRY_TIMES,
220-
Math.min(
221-
CONFLICT_RETRY_MAX_TIMES, retryMaxMillisWhenOtherExceptionsOccur * 1.1)));
200+
handleOtherExceptions(status, exceptionMessage, recordMessage, true);
201+
break;
222202
}
203+
// Other exceptions
204+
handleOtherExceptions(status, exceptionMessage, recordMessage, false);
205+
break;
206+
}
207+
}
223208

224-
default: // Other exceptions
225-
synchronized (this) {
226-
recordExceptionStatusIfNecessary(recordMessage);
209+
private synchronized void handleOtherExceptions(
210+
final TSStatus status,
211+
final String exceptionMessage,
212+
final String recordMessage,
213+
final boolean noPermission) {
214+
recordExceptionStatusIfNecessary(recordMessage);
215+
216+
if (exceptionEventHasBeenRetried.get()
217+
&& System.currentTimeMillis() - exceptionFirstEncounteredTime.get()
218+
> retryMaxMillisWhenOtherExceptionsOccur) {
219+
LOGGER.warn(
220+
"{}: retry timeout. will be ignored. event: {}. status: {}",
221+
getNoPermission(noPermission),
222+
shouldRecordIgnoredDataWhenOtherExceptionsOccur ? recordMessage : "not recorded",
223+
status);
224+
resetExceptionStatus();
225+
return;
226+
}
227227

228-
if (exceptionEventHasBeenRetried.get()
229-
&& System.currentTimeMillis() - exceptionFirstEncounteredTime.get()
230-
> retryMaxMillisWhenOtherExceptionsOccur) {
231-
LOGGER.warn(
232-
"Unclassified exception: retry timeout. will be ignored. event: {}. status: {}",
233-
shouldRecordIgnoredDataWhenOtherExceptionsOccur ? recordMessage : "not recorded",
234-
status);
235-
resetExceptionStatus();
236-
return;
237-
}
228+
// Reduce the log if retry forever
229+
if (retryMaxMillisWhenOtherExceptionsOccur == Long.MAX_VALUE) {
230+
PipeLogger.log(
231+
LOGGER::warn,
232+
"%s: will retry forever. status: %s",
233+
getNoPermission(noPermission),
234+
status);
235+
} else {
236+
LOGGER.warn(
237+
"{}: will retry for at least {} seconds. status: {}",
238+
getNoPermission(noPermission),
239+
(retryMaxMillisWhenOtherExceptionsOccur
240+
+ exceptionFirstEncounteredTime.get()
241+
- System.currentTimeMillis())
242+
/ 1000.0,
243+
status);
244+
}
238245

239-
// Reduce the log if retry forever
240-
if (retryMaxMillisWhenOtherExceptionsOccur == Long.MAX_VALUE) {
241-
PipeLogger.log(
242-
LOGGER::warn, "Unclassified exception: will retry forever. status: %s", status);
243-
} else {
244-
LOGGER.warn(
245-
"Unclassified exception: will retry for at least {} seconds. status: {}",
246-
(retryMaxMillisWhenOtherExceptionsOccur
247-
+ exceptionFirstEncounteredTime.get()
248-
- System.currentTimeMillis())
249-
/ 1000.0,
250-
status);
251-
}
246+
exceptionEventHasBeenRetried.set(true);
247+
throw new PipeRuntimeSinkRetryTimesConfigurableException(
248+
exceptionMessage,
249+
(int)
250+
Math.max(
251+
PipeSubtask.MAX_RETRY_TIMES,
252+
Math.min(CONFLICT_RETRY_MAX_TIMES, retryMaxMillisWhenOtherExceptionsOccur * 1.1)));
253+
}
252254

253-
exceptionEventHasBeenRetried.set(true);
254-
throw new PipeRuntimeSinkRetryTimesConfigurableException(
255-
exceptionMessage,
256-
(int)
257-
Math.max(
258-
PipeSubtask.MAX_RETRY_TIMES,
259-
Math.min(
260-
CONFLICT_RETRY_MAX_TIMES, retryMaxMillisWhenOtherExceptionsOccur * 1.1)));
261-
}
262-
}
255+
private static String getNoPermission(final boolean noPermission) {
256+
return noPermission ? NO_PERMISSION : UNCLASSIFIED_EXCEPTION;
263257
}
264258

265259
private void recordExceptionStatusIfNecessary(final String message) {

scripts/sbin/start-ainode.sh

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,14 @@ export IOTDB_AINODE_HOME
2727
echo "IOTDB_AINODE_HOME: $IOTDB_AINODE_HOME"
2828

2929
# fetch parameters with names
30-
while getopts "i:rn" opt; do
30+
daemon_mode=false
31+
while getopts "i:rnd" opt; do
3132
case $opt in
3233
n)
3334
;;
35+
d)
36+
daemon_mode=true
37+
;;
3438
\?) echo "Invalid option -$OPTARG" >&2
3539
exit 1
3640
;;
@@ -41,6 +45,11 @@ ain_ainode_executable="$IOTDB_AINODE_HOME/lib/ainode"
4145

4246
echo Script got ainode executable: "$ain_ainode_executable"
4347

44-
echo Starting AINode...
45-
46-
$ain_ainode_executable start
48+
if [ "$daemon_mode" = true ]; then
49+
echo Starting AINode in daemon mode...
50+
nohup $ain_ainode_executable start > /dev/null 2>&1 &
51+
echo AINode started in background
52+
else
53+
echo Starting AINode...
54+
$ain_ainode_executable start
55+
fi

0 commit comments

Comments
 (0)