Skip to content

Commit 809b3aa

Browse files
Pipe: Fixed the bug that a failed check sum may permanently disable an air gap receiver & Optimized the air gap file offset logic & Fixed the bug that not connected socket may die forever in "send" (#14384)
Co-authored-by: Steve Yurong Su <[email protected]>
1 parent c310cb0 commit 809b3aa

File tree

16 files changed

+132
-127
lines changed

16 files changed

+132
-127
lines changed

iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeConnectionException.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ public class PipeConnectionException extends PipeException {
2424
public static final String CONNECTION_ERROR_FORMATTER =
2525
"Error occurred while connecting to receiver %s:%s, please check network connectivity or SSL configurations when enable SSL transmission";
2626

27-
public PipeConnectionException(String message) {
27+
public PipeConnectionException(final String message) {
2828
super(message);
2929
}
3030

31-
public PipeConnectionException(String message, Throwable cause) {
31+
public PipeConnectionException(final String message, final Throwable cause) {
3232
super(message, cause);
3333
}
3434
}

iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/exception/PipeException.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,17 @@ public class PipeException extends RuntimeException {
2323

2424
private final long timeStamp;
2525

26-
public PipeException(String message) {
26+
public PipeException(final String message) {
2727
super(message);
2828
this.timeStamp = System.currentTimeMillis();
2929
}
3030

31-
public PipeException(String message, long timeStamp) {
31+
public PipeException(final String message, final long timeStamp) {
3232
super(message);
3333
this.timeStamp = timeStamp;
3434
}
3535

36-
public PipeException(String message, Throwable cause) {
36+
public PipeException(final String message, final Throwable cause) {
3737
super(message, cause);
3838
this.timeStamp = System.currentTimeMillis();
3939
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,6 @@
4444
import org.slf4j.Logger;
4545
import org.slf4j.LoggerFactory;
4646

47-
import java.util.Objects;
48-
import java.util.concurrent.atomic.AtomicInteger;
49-
5047
public class PipeConnectorSubtask extends PipeAbstractConnectorSubtask {
5148

5249
private static final Logger LOGGER = LoggerFactory.getLogger(PipeConnectorSubtask.class);
@@ -295,37 +292,6 @@ public int getAsyncConnectorRetryEventQueueSize() {
295292
: 0;
296293
}
297294

298-
// For performance, this will not acquire lock and does not guarantee the correct
299-
// result. However, this shall not cause any exceptions when concurrently read & written.
300-
public int getEventCount(final String pipeName) {
301-
final AtomicInteger count = new AtomicInteger(0);
302-
try {
303-
inputPendingQueue.forEach(
304-
event -> {
305-
if (event instanceof EnrichedEvent
306-
&& pipeName.equals(((EnrichedEvent) event).getPipeName())) {
307-
count.incrementAndGet();
308-
}
309-
});
310-
} catch (final Exception e) {
311-
if (LOGGER.isDebugEnabled()) {
312-
LOGGER.debug(
313-
"Exception occurred when counting event of pipe {}, root cause: {}",
314-
pipeName,
315-
ErrorHandlingUtils.getRootCause(e).getMessage(),
316-
e);
317-
}
318-
}
319-
// Avoid potential NPE in "getPipeName"
320-
final EnrichedEvent event =
321-
lastEvent instanceof EnrichedEvent ? (EnrichedEvent) lastEvent : null;
322-
return count.get()
323-
+ (outputPipeConnector instanceof IoTDBDataRegionAsyncConnector
324-
? ((IoTDBDataRegionAsyncConnector) outputPipeConnector).getRetryEventCount(pipeName)
325-
: 0)
326-
+ (Objects.nonNull(event) && pipeName.equals(event.getPipeName()) ? 1 : 0);
327-
}
328-
329295
//////////////////////////// Error report ////////////////////////////
330296

331297
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -406,8 +406,8 @@ public void transfer(Event event) throws Exception {
406406

407407
// Transfer deletion
408408
if (event instanceof PipeDeleteDataNodeEvent) {
409-
PipeDeleteDataNodeEvent deleteDataNodeEvent = (PipeDeleteDataNodeEvent) event;
410-
boolean enqueueResult = addEvent2Buffer(deleteDataNodeEvent);
409+
final PipeDeleteDataNodeEvent deleteDataNodeEvent = (PipeDeleteDataNodeEvent) event;
410+
final boolean enqueueResult = addEvent2Buffer(deleteDataNodeEvent);
411411
if (!enqueueResult) {
412412
throw new PipeRuntimeConnectorRetryTimesConfigurableException(
413413
ENQUEUE_EXCEPTION_MSG, Integer.MAX_VALUE);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java

Lines changed: 29 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,10 @@ public class PipeConsensusSyncConnector extends IoTDBConnector {
8282
private PipeConsensusSyncBatchReqBuilder tabletBatchBuilder;
8383

8484
public PipeConsensusSyncConnector(
85-
List<TEndPoint> peers,
86-
int consensusGroupId,
87-
int thisDataNodeId,
88-
PipeConsensusConnectorMetrics pipeConsensusConnectorMetrics) {
85+
final List<TEndPoint> peers,
86+
final int consensusGroupId,
87+
final int thisDataNodeId,
88+
final PipeConsensusConnectorMetrics pipeConsensusConnectorMetrics) {
8989
// In PipeConsensus, one pipeConsensusTask corresponds to a pipeConsensusConnector. Thus,
9090
// `peers` here actually is a singletonList that contains one peer's TEndPoint. But here we
9191
// retain the implementation of list to cope with possible future expansion
@@ -98,7 +98,8 @@ public PipeConsensusSyncConnector(
9898
}
9999

100100
@Override
101-
public void customize(PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration)
101+
public void customize(
102+
final PipeParameters parameters, final PipeConnectorRuntimeConfiguration configuration)
102103
throws Exception {
103104
super.customize(parameters, configuration);
104105
if (isTabletBatchModeEnabled) {
@@ -125,7 +126,7 @@ public void heartbeat() throws Exception {
125126
}
126127

127128
@Override
128-
public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception {
129+
public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exception {
129130
// Note: here we don't need to do type judgment here, because PipeConsensus uses
130131
// PIPE_CONSENSUS_PROCESSOR and will not change the event type like
131132
// org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBDataRegionSyncConnector
@@ -135,12 +136,12 @@ public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception
135136
doTransfer();
136137
}
137138
} else {
138-
long startTime = System.nanoTime();
139+
final long startTime = System.nanoTime();
139140
doTransferWrapper((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
140-
long duration = System.nanoTime() - startTime;
141+
final long duration = System.nanoTime() - startTime;
141142
pipeConsensusConnectorMetrics.recordRetryWALTransferTimer(duration);
142143
}
143-
} catch (Exception e) {
144+
} catch (final Exception e) {
144145
throw new PipeConnectionException(
145146
String.format(
146147
"Failed to transfer tablet insertion event %s, because %s.",
@@ -150,18 +151,18 @@ public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception
150151
}
151152

152153
@Override
153-
public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
154+
public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
154155
// Note: here we don't need to do type judgment here, because PipeConsensus uses DO_NOTHING
155156
// processor and will not change the event type like
156157
// org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBDataRegionSyncConnector
157158
try {
158-
long startTime = System.nanoTime();
159+
final long startTime = System.nanoTime();
159160
// In order to commit in order
160161
if (isTabletBatchModeEnabled && !tabletBatchBuilder.isEmpty()) {
161162
doTransfer();
162163
}
163164
doTransfer((PipeTsFileInsertionEvent) tsFileInsertionEvent);
164-
long duration = System.nanoTime() - startTime;
165+
final long duration = System.nanoTime() - startTime;
165166
pipeConsensusConnectorMetrics.recordRetryTsFileTransferTimer(duration);
166167
} catch (Exception e) {
167168
throw new PipeConnectionException(
@@ -173,7 +174,7 @@ public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception
173174
}
174175

175176
@Override
176-
public void transfer(Event event) throws Exception {
177+
public void transfer(final Event event) throws Exception {
177178
// in order to commit in order
178179
if (isTabletBatchModeEnabled && !tabletBatchBuilder.isEmpty()) {
179180
doTransfer();
@@ -209,7 +210,7 @@ private void doTransfer() {
209210
// }
210211

211212
tabletBatchBuilder.onSuccess();
212-
} catch (Exception e) {
213+
} catch (final Exception e) {
213214
throw new PipeConnectionException(
214215
String.format(
215216
PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT,
@@ -240,12 +241,12 @@ private void doTransfer(final PipeDeleteDataNodeEvent pipeDeleteDataNodeEvent)
240241
throws PipeException {
241242
final ProgressIndex progressIndex;
242243
final TPipeConsensusTransferResp resp;
243-
TCommitId tCommitId =
244+
final TCommitId tCommitId =
244245
new TCommitId(
245246
pipeDeleteDataNodeEvent.getCommitId(),
246247
pipeDeleteDataNodeEvent.getCommitterKey().getRestartTimes(),
247248
pipeDeleteDataNodeEvent.getRebootTimes());
248-
TConsensusGroupId tConsensusGroupId =
249+
final TConsensusGroupId tConsensusGroupId =
249250
new TConsensusGroupId(TConsensusGroupType.DataRegion, consensusGroupId);
250251

251252
try (final SyncPipeConsensusServiceClient syncPipeConsensusServiceClient =
@@ -259,7 +260,7 @@ private void doTransfer(final PipeDeleteDataNodeEvent pipeDeleteDataNodeEvent)
259260
tConsensusGroupId,
260261
progressIndex,
261262
thisDataNodeId));
262-
} catch (Exception e) {
263+
} catch (final Exception e) {
263264
throw new PipeConnectionException(
264265
String.format(
265266
PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT,
@@ -310,12 +311,12 @@ private void doTransfer(PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletI
310311
final InsertNode insertNode;
311312
final ProgressIndex progressIndex;
312313
final TPipeConsensusTransferResp resp;
313-
TCommitId tCommitId =
314+
final TCommitId tCommitId =
314315
new TCommitId(
315316
pipeInsertNodeTabletInsertionEvent.getCommitId(),
316317
pipeInsertNodeTabletInsertionEvent.getCommitterKey().getRestartTimes(),
317318
pipeInsertNodeTabletInsertionEvent.getRebootTimes());
318-
TConsensusGroupId tConsensusGroupId =
319+
final TConsensusGroupId tConsensusGroupId =
319320
new TConsensusGroupId(TConsensusGroupType.DataRegion, consensusGroupId);
320321

321322
try (final SyncPipeConsensusServiceClient syncPipeConsensusServiceClient =
@@ -338,7 +339,7 @@ private void doTransfer(PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletI
338339
progressIndex,
339340
thisDataNodeId));
340341
}
341-
} catch (Exception e) {
342+
} catch (final Exception e) {
342343
throw new PipeConnectionException(
343344
String.format(
344345
PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT,
@@ -362,7 +363,8 @@ private void doTransfer(PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletI
362363
}
363364
}
364365

365-
private void doTransfer(PipeTsFileInsertionEvent pipeTsFileInsertionEvent) throws PipeException {
366+
private void doTransfer(final PipeTsFileInsertionEvent pipeTsFileInsertionEvent)
367+
throws PipeException {
366368
final File tsFile = pipeTsFileInsertionEvent.getTsFile();
367369
final File modFile = pipeTsFileInsertionEvent.getModFile();
368370
final TPipeConsensusTransferResp resp;
@@ -411,7 +413,7 @@ private void doTransfer(PipeTsFileInsertionEvent pipeTsFileInsertionEvent) throw
411413
pipeTsFileInsertionEvent.getProgressIndex(),
412414
thisDataNodeId));
413415
}
414-
} catch (Exception e) {
416+
} catch (final Exception e) {
415417
throw new PipeConnectionException(
416418
String.format(
417419
PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT,
@@ -436,11 +438,11 @@ private void doTransfer(PipeTsFileInsertionEvent pipeTsFileInsertionEvent) throw
436438
}
437439

438440
protected void transferFilePieces(
439-
File file,
440-
SyncPipeConsensusServiceClient syncPipeConsensusServiceClient,
441-
boolean isMultiFile,
442-
TCommitId tCommitId,
443-
TConsensusGroupId tConsensusGroupId)
441+
final File file,
442+
final SyncPipeConsensusServiceClient syncPipeConsensusServiceClient,
443+
final boolean isMultiFile,
444+
final TCommitId tCommitId,
445+
final TConsensusGroupId tConsensusGroupId)
444446
throws PipeException, IOException {
445447
final int readFileBufferSize = PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
446448
final byte[] readBuffer = new byte[readFileBufferSize];

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,20 @@ private void receive() throws IOException {
109109
try {
110110
final byte[] data = readData(inputStream);
111111

112+
// If check sum failed, it indicates that the length we read may not be correct.
113+
// Namely, there may be remaining bytes in the socket stream, which will fail any subsequent
114+
// attempts to read from that.
115+
// We directly close the socket here.
112116
if (!checkSum(data)) {
113-
LOGGER.warn("Checksum failed, receiverId: {}", receiverId);
114-
fail();
117+
LOGGER.warn(
118+
"Pipe air gap receiver {} closed because of checksum failed. Socket: {}",
119+
receiverId,
120+
socket);
121+
try {
122+
fail();
123+
} finally {
124+
socket.close();
125+
}
115126
return;
116127
}
117128

@@ -134,21 +145,31 @@ private void receive() throws IOException {
134145
|| status.getCode()
135146
== TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) {
136147
LOGGER.info(
137-
"TSStatus:{} is encountered at the air gap receiver, will ignore.", resp.getStatus());
148+
"Pipe air gap receiver {}: TSStatus {} is encountered at the air gap receiver, will ignore.",
149+
receiverId,
150+
resp.getStatus());
138151
ok();
139152
} else {
140153
LOGGER.warn(
141-
"Handle data failed, receiverId: {}, status: {}, req: {}",
154+
"Pipe air gap receiver {}: Handle data failed, status: {}, req: {}",
142155
receiverId,
143156
resp.getStatus(),
144157
req);
145158
fail();
146159
}
147160
} catch (final PipeConnectionException e) {
148-
LOGGER.info("Socket closed when listening to data. Because: {}", e.getMessage());
161+
LOGGER.info(
162+
"Pipe air gap receiver {}: Socket {} closed when listening to data. Because: {}",
163+
receiverId,
164+
socket,
165+
e.getMessage());
149166
socket.close();
150167
} catch (final Exception e) {
151-
LOGGER.warn("Exception during handling receiving, receiverId: {}", receiverId, e);
168+
LOGGER.warn(
169+
"Pipe air gap receiver {}: Exception during handling receiving. Socket: {}",
170+
receiverId,
171+
socket,
172+
e);
152173
fail();
153174
}
154175
}
@@ -169,7 +190,17 @@ private boolean checkSum(byte[] bytes) {
169190
try {
170191
final CRC32 crc32 = new CRC32();
171192
crc32.update(bytes, LONG_LEN, bytes.length - LONG_LEN);
172-
return BytesUtils.bytesToLong(BytesUtils.subBytes(bytes, 0, LONG_LEN)) == crc32.getValue();
193+
194+
final long expectedChecksum = BytesUtils.bytesToLong(BytesUtils.subBytes(bytes, 0, LONG_LEN));
195+
final long actualChecksum = crc32.getValue();
196+
if (expectedChecksum != actualChecksum) {
197+
LOGGER.warn(
198+
"Pipe air gap receiver {}: checksum failed, expected: {}, actual: {}",
199+
receiverId,
200+
expectedChecksum,
201+
actualChecksum);
202+
}
203+
return expectedChecksum == actualChecksum;
173204
} catch (final Exception e) {
174205
// ArrayIndexOutOfBoundsException when bytes.length < LONG_LEN
175206
return false;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverAgent.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,12 @@ public class IoTDBAirGapReceiverAgent implements IService {
5252
public void listen() {
5353
try {
5454
final Socket socket = serverSocket.accept();
55-
new Thread(new IoTDBAirGapReceiver(socket, receiverId.incrementAndGet())).start();
55+
final long airGapReceiverId = receiverId.incrementAndGet();
56+
final Thread airGapReceiverThread =
57+
new Thread(new IoTDBAirGapReceiver(socket, airGapReceiverId));
58+
airGapReceiverThread.setName(
59+
ThreadName.PIPE_AIR_GAP_RECEIVER.getName() + "-" + airGapReceiverId);
60+
airGapReceiverThread.start();
5661
} catch (final IOException e) {
5762
LOGGER.warn("Unhandled exception during pipe air gap receiver listening", e);
5863
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -93,13 +93,4 @@ protected boolean executeOnce() {
9393

9494
return SubscriptionAgent.broker().executePrefetch(consumerGroupId, topicName);
9595
}
96-
97-
//////////////////////////// APIs provided for metric framework ////////////////////////////
98-
99-
@Override
100-
public int getEventCount(final String pipeName) {
101-
// count the number of pipe events in sink queue and prefetching queue, note that can safely
102-
// ignore lastEvent
103-
return SubscriptionAgent.broker().getPipeEventCount(consumerGroupId, topicName);
104-
}
10596
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ public enum ThreadName {
144144
"Pipe-Runtime-Periodical-Phantom-Reference-Cleaner"),
145145
PIPE_ASYNC_CONNECTOR_CLIENT_POOL("Pipe-Async-Connector-Client-Pool"),
146146
PIPE_RECEIVER_AIR_GAP_AGENT("Pipe-Receiver-Air-Gap-Agent"),
147+
PIPE_AIR_GAP_RECEIVER("Pipe-Air-Gap-Receiver"),
147148
SUBSCRIPTION_EXECUTOR_POOL("Subscription-Executor-Pool"),
148149
SUBSCRIPTION_RUNTIME_META_SYNCER("Subscription-Runtime-Meta-Syncer"),
149150
WINDOW_EVALUATION_SERVICE("WindowEvaluationTaskPoolManager"),
@@ -295,9 +296,12 @@ public enum ThreadName {
295296
PIPE_RUNTIME_HEARTBEAT,
296297
PIPE_RUNTIME_PROCEDURE_SUBMITTER,
297298
PIPE_RUNTIME_PERIODICAL_JOB_EXECUTOR,
299+
PIPE_RUNTIME_PERIODICAL_PHANTOM_REFERENCE_CLEANER,
298300
PIPE_ASYNC_CONNECTOR_CLIENT_POOL,
299301
PIPE_RECEIVER_AIR_GAP_AGENT,
302+
PIPE_AIR_GAP_RECEIVER,
300303
SUBSCRIPTION_EXECUTOR_POOL,
304+
SUBSCRIPTION_RUNTIME_META_SYNCER,
301305
WINDOW_EVALUATION_SERVICE,
302306
STATEFUL_TRIGGER_INFORMATION_UPDATER));
303307

0 commit comments

Comments
 (0)