Skip to content

Commit ded2664

Browse files
authored
Pipe: Further reduce the pipe logs & Added configurations for tsFile segment lock (apache#16315)
* logger-fix * add-ons * fix * fix * fix * warn-fix * further-fix * fix
1 parent 01d88b7 commit ded2664

File tree

9 files changed

+158
-105
lines changed

9 files changed

+158
-105
lines changed

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -655,7 +655,7 @@ public TSStatus handleMetaChanges(final PipeHandleMetaChangePlan plan) {
655655
}
656656

657657
private TSStatus handleMetaChangesInternal(final PipeHandleMetaChangePlan plan) {
658-
LOGGER.info("Handling pipe meta changes ...");
658+
LOGGER.debug("Handling pipe meta changes ...");
659659

660660
pipeMetaKeeper.clear();
661661

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -109,12 +109,12 @@ protected AtomicReference<PipeTaskInfo> acquireLockInternal(
109109

110110
@Override
111111
protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
112-
LOGGER.info("ProcedureId {} try to acquire pipe lock.", getProcId());
112+
LOGGER.debug("ProcedureId {} try to acquire pipe lock.", getProcId());
113113
pipeTaskInfo = acquireLockInternal(configNodeProcedureEnv);
114114
if (pipeTaskInfo == null) {
115115
LOGGER.warn("ProcedureId {} failed to acquire pipe lock.", getProcId());
116116
} else {
117-
LOGGER.info("ProcedureId {} acquired pipe lock.", getProcId());
117+
LOGGER.debug("ProcedureId {} acquired pipe lock.", getProcId());
118118
}
119119

120120
final ProcedureLockState procedureLockState = super.acquireLock(configNodeProcedureEnv);
@@ -125,7 +125,7 @@ protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProced
125125
"ProcedureId {}: LOCK_ACQUIRED. The following procedure should not be executed without pipe lock.",
126126
getProcId());
127127
} else {
128-
LOGGER.info(
128+
LOGGER.debug(
129129
"ProcedureId {}: LOCK_ACQUIRED. The following procedure should be executed with pipe lock.",
130130
getProcId());
131131
}
@@ -134,7 +134,7 @@ protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProced
134134
if (pipeTaskInfo == null) {
135135
LOGGER.warn("ProcedureId {}: LOCK_EVENT_WAIT. Without acquiring pipe lock.", getProcId());
136136
} else {
137-
LOGGER.info("ProcedureId {}: LOCK_EVENT_WAIT. Pipe lock will be released.", getProcId());
137+
LOGGER.debug("ProcedureId {}: LOCK_EVENT_WAIT. Pipe lock will be released.", getProcId());
138138
configNodeProcedureEnv
139139
.getConfigManager()
140140
.getPipeManager()
@@ -173,7 +173,7 @@ protected void releaseLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
173173
if (pipeTaskInfo == null) {
174174
LOGGER.warn("ProcedureId {} release lock. No need to release pipe lock.", getProcId());
175175
} else {
176-
LOGGER.info("ProcedureId {} release lock. Pipe lock will be released.", getProcId());
176+
LOGGER.debug("ProcedureId {} release lock. Pipe lock will be released.", getProcId());
177177
if (this instanceof PipeMetaSyncProcedure) {
178178
configNodeProcedureEnv
179179
.getConfigManager()

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProced
8787
if (System.currentTimeMillis() - LAST_EXECUTION_TIME.get() < MIN_EXECUTION_INTERVAL_MS) {
8888
// Skip by setting the pipeTaskInfo to null
8989
pipeTaskInfo = null;
90-
LOGGER.info(
90+
LOGGER.debug(
9191
"PipeMetaSyncProcedure: acquireLock, skip the procedure due to the last execution time {}",
9292
LAST_EXECUTION_TIME.get());
9393
return ProcedureLockState.LOCK_ACQUIRED;
@@ -103,15 +103,15 @@ protected PipeTaskOperation getOperation() {
103103

104104
@Override
105105
public boolean executeFromValidateTask(ConfigNodeProcedureEnv env) {
106-
LOGGER.info("PipeMetaSyncProcedure: executeFromValidateTask");
106+
LOGGER.debug("PipeMetaSyncProcedure: executeFromValidateTask");
107107

108108
LAST_EXECUTION_TIME.set(System.currentTimeMillis());
109109
return true;
110110
}
111111

112112
@Override
113113
public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
114-
LOGGER.info("PipeMetaSyncProcedure: executeFromCalculateInfoForTask");
114+
LOGGER.debug("PipeMetaSyncProcedure: executeFromCalculateInfoForTask");
115115

116116
// Re-balance the external source tasks here in case of any changes in the dataRegion
117117
pipeTaskInfo
@@ -170,7 +170,7 @@ public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
170170

171171
@Override
172172
public void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) {
173-
LOGGER.info("PipeMetaSyncProcedure: executeFromWriteConfigNodeConsensus");
173+
LOGGER.debug("PipeMetaSyncProcedure: executeFromWriteConfigNodeConsensus");
174174

175175
final List<PipeMeta> pipeMetaList = new ArrayList<>();
176176
for (final PipeMeta pipeMeta : pipeTaskInfo.get().getPipeMetaList()) {
@@ -196,7 +196,7 @@ public void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) {
196196
@Override
197197
public void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
198198
throws PipeException, IOException {
199-
LOGGER.info("PipeMetaSyncProcedure: executeFromOperateOnDataNodes");
199+
LOGGER.debug("PipeMetaSyncProcedure: executeFromOperateOnDataNodes");
200200

201201
Map<Integer, TPushPipeMetaResp> respMap = pushPipeMetaToDataNodes(env);
202202
if (pipeTaskInfo.get().recordDataNodePushPipeMetaExceptions(respMap)) {
@@ -209,28 +209,28 @@ public void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
209209

210210
@Override
211211
public void rollbackFromValidateTask(ConfigNodeProcedureEnv env) {
212-
LOGGER.info("PipeMetaSyncProcedure: rollbackFromValidateTask");
212+
LOGGER.debug("PipeMetaSyncProcedure: rollbackFromValidateTask");
213213

214214
// Do nothing
215215
}
216216

217217
@Override
218218
public void rollbackFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
219-
LOGGER.info("PipeMetaSyncProcedure: rollbackFromCalculateInfoForTask");
219+
LOGGER.debug("PipeMetaSyncProcedure: rollbackFromCalculateInfoForTask");
220220

221221
// Do nothing
222222
}
223223

224224
@Override
225225
public void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) {
226-
LOGGER.info("PipeMetaSyncProcedure: rollbackFromWriteConfigNodeConsensus");
226+
LOGGER.debug("PipeMetaSyncProcedure: rollbackFromWriteConfigNodeConsensus");
227227

228228
// Do nothing
229229
}
230230

231231
@Override
232232
public void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
233-
LOGGER.info("PipeMetaSyncProcedure: rollbackFromOperateOnDataNodes");
233+
LOGGER.debug("PipeMetaSyncProcedure: rollbackFromOperateOnDataNodes");
234234

235235
// Do nothing
236236
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
3030
import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver;
3131
import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler;
32+
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
3233
import org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapPseudoTPipeTransferRequest;
3334
import org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferSliceReqHandler;
3435
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
@@ -449,7 +450,7 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) {
449450
} catch (final Exception e) {
450451
final String error =
451452
String.format("Exception %s encountered while handling request %s.", e.getMessage(), req);
452-
LOGGER.warn("Receiver id = {}: {}", receiverId.get(), error, e);
453+
PipeLogger.log(LOGGER::warn, e, "Receiver id = %s: %s", receiverId.get(), error);
453454
return new TPipeTransferResp(RpcUtils.getStatus(TSStatusCode.PIPE_ERROR, error));
454455
}
455456
}
@@ -661,7 +662,8 @@ private TPipeTransferResp handleTransferSchemaPlan(final PipeTransferPlanNodeReq
661662
final TSStatus status =
662663
((AlterLogicalViewNode) req.getPlanNode()).checkPermissionBeforeProcess(username);
663664
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
664-
LOGGER.warn(
665+
PipeLogger.log(
666+
LOGGER::warn,
665667
"Receiver id = {}: Failed to check authority for statement {}, username = {}, response = {}.",
666668
receiverId.get(),
667669
StatementType.ALTER_LOGICAL_VIEW.name(),
@@ -812,15 +814,17 @@ private TSStatus executeStatementAndClassifyExceptions(
812814
|| result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
813815
return result;
814816
} else {
815-
LOGGER.warn(
817+
PipeLogger.log(
818+
LOGGER::warn,
816819
"Receiver id = {}: Failure status encountered while executing statement {}: {}",
817820
receiverId.get(),
818821
statement,
819822
result);
820823
return statement.accept(STATEMENT_STATUS_VISITOR, result);
821824
}
822825
} catch (final Exception e) {
823-
LOGGER.warn(
826+
PipeLogger.log(
827+
LOGGER::warn,
824828
"Receiver id = {}: Exception encountered while executing statement {}: ",
825829
receiverId.get(),
826830
statement,
@@ -873,7 +877,8 @@ private TSStatus executeStatementWithPermissionCheckAndRetryOnDataTypeMismatch(
873877
final TSStatus permissionCheckStatus =
874878
AuthorityChecker.checkAuthority(statement, clientSession);
875879
if (permissionCheckStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
876-
LOGGER.warn(
880+
PipeLogger.log(
881+
LOGGER::warn,
877882
"Receiver id = {}: Failed to check authority for statement {}, username = {}, response = {}.",
878883
receiverId.get(),
879884
statement.getType().name(),
@@ -1054,19 +1059,21 @@ private TSStatus executeStatementForTableModelWithPermissionCheck(
10541059
// No strong need to handle the failure result
10551060
if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
10561061
|| result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode())) {
1057-
LOGGER.warn(
1058-
"Receiver id = {}: Failure status encountered while executing statement {}: {}",
1062+
PipeLogger.log(
1063+
LOGGER::warn,
1064+
"Receiver id = %s: Failure status encountered while executing statement %s: %s",
10591065
receiverId.get(),
10601066
statement,
10611067
result);
10621068
}
10631069
return result;
10641070
} catch (final Exception e) {
1065-
LOGGER.warn(
1066-
"Receiver id = {}: Exception encountered while executing statement {}: ",
1071+
PipeLogger.log(
1072+
LOGGER::warn,
1073+
e,
1074+
"Receiver id = %s: Exception encountered while executing statement %s: ",
10671075
receiverId.get(),
1068-
statement,
1069-
e);
1076+
statement);
10701077
return new TSStatus(TSStatusCode.PIPE_TRANSFER_EXECUTE_STATEMENT_ERROR.getStatusCode())
10711078
.setMessage(e.getMessage());
10721079
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java

Lines changed: 22 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.iotdb.db.pipe.resource.tsfile;
2121

22+
import org.apache.iotdb.commons.pipe.config.PipeConfig;
2223
import org.apache.iotdb.db.storageengine.StorageEngine;
2324

2425
import org.slf4j.Logger;
@@ -31,25 +32,29 @@
3132
public class PipeTsFileResourceSegmentLock {
3233

3334
private static final Logger LOGGER = LoggerFactory.getLogger(PipeTsFileResourceSegmentLock.class);
34-
3535
private static final int SEGMENT_LOCK_MIN_SIZE = 32;
3636
private static final int SEGMENT_LOCK_MAX_SIZE = 128;
37-
3837
private volatile ReentrantLock[] locks;
3938

4039
private void initIfNecessary() {
4140
if (locks == null) {
4241
synchronized (this) {
42+
int lockSegmentSize = PipeConfig.getInstance().getPipeTsFileResourceSegmentLockNum();
4343
if (locks == null) {
44-
int lockSegmentSize = SEGMENT_LOCK_MIN_SIZE;
45-
try {
46-
lockSegmentSize = StorageEngine.getInstance().getAllDataRegionIds().size();
47-
} catch (final Exception e) {
48-
LOGGER.warn(
49-
"Cannot get data region ids, use default lock segment size: {}", lockSegmentSize);
44+
if (lockSegmentSize <= 0) {
45+
try {
46+
lockSegmentSize =
47+
Math.min(
48+
Math.max(
49+
StorageEngine.getInstance().getAllDataRegionIds().size(),
50+
SEGMENT_LOCK_MIN_SIZE),
51+
SEGMENT_LOCK_MAX_SIZE);
52+
} catch (final Exception e) {
53+
LOGGER.warn(
54+
"Cannot get data region ids, use default lock segment size: {}", lockSegmentSize);
55+
lockSegmentSize = SEGMENT_LOCK_MIN_SIZE;
56+
}
5057
}
51-
lockSegmentSize = Math.min(SEGMENT_LOCK_MAX_SIZE, lockSegmentSize);
52-
lockSegmentSize = Math.max(SEGMENT_LOCK_MIN_SIZE, lockSegmentSize);
5358

5459
final ReentrantLock[] tmpLocks = new ReentrantLock[lockSegmentSize];
5560
for (int i = 0; i < tmpLocks.length; i++) {
@@ -58,6 +63,13 @@ private void initIfNecessary() {
5863

5964
// publish this variable
6065
locks = tmpLocks;
66+
} else if (locks.length < lockSegmentSize) {
67+
final ReentrantLock[] tmpLocks = new ReentrantLock[lockSegmentSize];
68+
System.arraycopy(locks, 0, tmpLocks, 0, locks.length);
69+
for (int i = locks.length; i < lockSegmentSize; ++i) {
70+
tmpLocks[i] = new ReentrantLock();
71+
}
72+
locks = tmpLocks;
6173
}
6274
}
6375
}
@@ -74,39 +86,8 @@ public boolean tryLock(final File file, final long timeout, final TimeUnit timeU
7486
return locks[Math.abs(file.hashCode()) % locks.length].tryLock(timeout, timeUnit);
7587
}
7688

77-
public boolean tryLockAll(final long timeout, final TimeUnit timeUnit)
78-
throws InterruptedException {
79-
initIfNecessary();
80-
int alreadyLocked = 0;
81-
for (final ReentrantLock lock : locks) {
82-
if (lock.tryLock(timeout, timeUnit)) {
83-
alreadyLocked++;
84-
} else {
85-
break;
86-
}
87-
}
88-
89-
if (alreadyLocked == locks.length) {
90-
return true;
91-
} else {
92-
unlockUntil(alreadyLocked);
93-
return false;
94-
}
95-
}
96-
97-
private void unlockUntil(final int index) {
98-
for (int i = 0; i < index; i++) {
99-
locks[i].unlock();
100-
}
101-
}
102-
10389
public void unlock(final File file) {
10490
initIfNecessary();
10591
locks[Math.abs(file.hashCode()) % locks.length].unlock();
10692
}
107-
108-
public void unlockAll() {
109-
initIfNecessary();
110-
unlockUntil(locks.length);
111-
}
11293
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,7 @@ public class CommonConfig {
329329
private volatile double pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold = 0.8d;
330330
private volatile boolean pipeTransferTsFileSync = false;
331331
private volatile long pipeCheckAllSyncClientLiveTimeIntervalMs = 5 * 60 * 1000L; // 5 minutes
332+
private int pipeTsFileResourceSegmentLockNum = -1;
332333

333334
private long twoStageAggregateMaxCombinerLiveTimeInMs = 8 * 60 * 1000L; // 8 minutes
334335
private long twoStageAggregateDataRegionInfoCacheTimeInMs = 3 * 60 * 1000L; // 3 minutes
@@ -1942,14 +1943,28 @@ public long getPipeCheckAllSyncClientLiveTimeIntervalMs() {
19421943
}
19431944

19441945
public void setPipeCheckAllSyncClientLiveTimeIntervalMs(
1945-
long pipeCheckSyncAllClientLiveTimeIntervalMs) {
1946-
if (this.pipeCheckAllSyncClientLiveTimeIntervalMs == pipeCheckSyncAllClientLiveTimeIntervalMs) {
1946+
long pipeCheckAllSyncClientLiveTimeIntervalMs) {
1947+
if (this.pipeCheckAllSyncClientLiveTimeIntervalMs == pipeCheckAllSyncClientLiveTimeIntervalMs) {
19471948
return;
19481949
}
1949-
this.pipeCheckAllSyncClientLiveTimeIntervalMs = pipeCheckSyncAllClientLiveTimeIntervalMs;
1950+
this.pipeCheckAllSyncClientLiveTimeIntervalMs = pipeCheckAllSyncClientLiveTimeIntervalMs;
19501951
logger.info(
19511952
"pipeCheckSyncAllClientLiveTimeIntervalMs is set to {}",
1952-
pipeCheckSyncAllClientLiveTimeIntervalMs);
1953+
pipeCheckAllSyncClientLiveTimeIntervalMs);
1954+
}
1955+
1956+
public int getPipeTsFileResourceSegmentLockNum() {
1957+
return pipeTsFileResourceSegmentLockNum;
1958+
}
1959+
1960+
public void setPipeTsFileResourceSegmentLockNum(int pipeTsFileResourceSegmentLockNum) {
1961+
if (this.pipeTsFileResourceSegmentLockNum == pipeTsFileResourceSegmentLockNum) {
1962+
return;
1963+
}
1964+
this.pipeTsFileResourceSegmentLockNum = pipeTsFileResourceSegmentLockNum;
1965+
logger.info(
1966+
"pipeCheckSyncAllClientLiveTimeIntervalMs is set to {}",
1967+
pipeCheckAllSyncClientLiveTimeIntervalMs);
19531968
}
19541969

19551970
public double getPipeSendTsFileRateLimitBytesPerSecond() {

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,10 @@ public long getPipeCheckAllSyncClientLiveTimeIntervalMs() {
293293
return COMMON_CONFIG.getPipeCheckAllSyncClientLiveTimeIntervalMs();
294294
}
295295

296+
public int getPipeTsFileResourceSegmentLockNum() {
297+
return COMMON_CONFIG.getPipeTsFileResourceSegmentLockNum();
298+
}
299+
296300
/////////////////////////////// Meta Consistency ///////////////////////////////
297301

298302
public boolean isSeperatedPipeHeartbeatEnabled() {
@@ -525,6 +529,9 @@ public void printAllConfigs() {
525529
LOGGER.info(
526530
"PipeCheckAllSyncClientLiveTimeIntervalMs: {}",
527531
getPipeCheckAllSyncClientLiveTimeIntervalMs());
532+
LOGGER.info(
533+
"PipeCheckAllSyncClientLiveTimeIntervalMs: {}",
534+
getPipeCheckAllSyncClientLiveTimeIntervalMs());
528535

529536
LOGGER.info("PipeDynamicMemoryHistoryWeight: {}", getPipeDynamicMemoryHistoryWeight());
530537
LOGGER.info(

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,11 @@ public static void loadPipeInternalConfig(CommonConfig config, TrimProperties pr
498498
properties.getProperty(
499499
"pipe_check_all_sync_client_live_time_interval_ms",
500500
String.valueOf(config.getPipeCheckAllSyncClientLiveTimeIntervalMs()))));
501+
config.setPipeTsFileResourceSegmentLockNum(
502+
Integer.parseInt(
503+
properties.getProperty(
504+
"pipe_tsfile_resource_segment_lock_num",
505+
String.valueOf(config.getPipeTsFileResourceSegmentLockNum()))));
501506

502507
config.setPipeRemainingTimeCommitRateAutoSwitchSeconds(
503508
Long.parseLong(

0 commit comments

Comments
 (0)