@@ -78,7 +78,7 @@ public boolean isNeedListenToInsertNode() {
7878 }
7979
8080 private void extractTabletInsertion (final PipeRealtimeEvent event ) {
81- if (canNotUseTabletAnyMore ()) {
81+ if (canNotUseTabletAnyMore (event )) {
8282 event
8383 .getTsFileEpoch ()
8484 .migrateState (
@@ -201,7 +201,7 @@ private void extractTsFileInsertion(final PipeRealtimeEvent event) {
201201 }
202202 }
203203
204- private boolean canNotUseTabletAnyMore () {
204+ private boolean canNotUseTabletAnyMore (final PipeRealtimeEvent event ) {
205205 // In the following 7 cases, we should not extract any more tablet events. all the data
206206 // represented by the tablet events should be carried by the following tsfile event:
207207 // 0. If the pipe task is currently restarted.
@@ -212,19 +212,19 @@ private boolean canNotUseTabletAnyMore() {
212212 // 4. The number of realtime tsfile events to transfer has exceeded the limit.
213213 // 5. The number of linked tsfiles has reached the dangerous threshold.
214214 // 6. The shallow memory usage of the insert node has reached the dangerous threshold.
215- return isPipeTaskCurrentlyRestarted ()
216- || mayWalSizeReachThrottleThreshold ()
217- || mayMemTablePinnedCountReachDangerousThreshold ()
218- || isHistoricalTsFileEventCountExceededLimit ()
219- || isRealtimeTsFileEventCountExceededLimit ()
220- || mayTsFileLinkedCountReachDangerousThreshold ()
221- || mayInsertNodeMemoryReachDangerousThreshold ();
215+ return isPipeTaskCurrentlyRestarted (event )
216+ || mayWalSizeReachThrottleThreshold (event )
217+ || mayMemTablePinnedCountReachDangerousThreshold (event )
218+ || isHistoricalTsFileEventCountExceededLimit (event )
219+ || isRealtimeTsFileEventCountExceededLimit (event )
220+ || mayTsFileLinkedCountReachDangerousThreshold (event )
221+ || mayInsertNodeMemoryReachDangerousThreshold (event );
222222 }
223223
224- private boolean isPipeTaskCurrentlyRestarted () {
224+ private boolean isPipeTaskCurrentlyRestarted (final PipeRealtimeEvent event ) {
225225 final boolean isPipeTaskCurrentlyRestarted =
226226 PipeDataNodeAgent .task ().isPipeTaskCurrentlyRestarted (pipeName );
227- if (isPipeTaskCurrentlyRestarted ) {
227+ if (isPipeTaskCurrentlyRestarted && event . mayExtractorUseTablets ( this ) ) {
228228 LOGGER .info (
229229 "Pipe task {}@{} canNotUseTabletAnyMore1: Pipe task is currently restarted" ,
230230 pipeName ,
@@ -233,11 +233,11 @@ private boolean isPipeTaskCurrentlyRestarted() {
233233 return isPipeTaskCurrentlyRestarted ;
234234 }
235235
236- private boolean mayWalSizeReachThrottleThreshold () {
236+ private boolean mayWalSizeReachThrottleThreshold (final PipeRealtimeEvent event ) {
237237 final boolean mayWalSizeReachThrottleThreshold =
238238 3 * WALManager .getInstance ().getTotalDiskUsage ()
239239 > IoTDBDescriptor .getInstance ().getConfig ().getThrottleThreshold ();
240- if (mayWalSizeReachThrottleThreshold ) {
240+ if (mayWalSizeReachThrottleThreshold && event . mayExtractorUseTablets ( this ) ) {
241241 LOGGER .info (
242242 "Pipe task {}@{} canNotUseTabletAnyMore2: Wal size {} has reached throttle threshold {}" ,
243243 pipeName ,
@@ -248,11 +248,11 @@ private boolean mayWalSizeReachThrottleThreshold() {
248248 return mayWalSizeReachThrottleThreshold ;
249249 }
250250
251- private boolean mayMemTablePinnedCountReachDangerousThreshold () {
251+ private boolean mayMemTablePinnedCountReachDangerousThreshold (final PipeRealtimeEvent event ) {
252252 final boolean mayMemTablePinnedCountReachDangerousThreshold =
253253 PipeDataNodeResourceManager .wal ().getPinnedWalCount ()
254254 >= PipeConfig .getInstance ().getPipeMaxAllowedPinnedMemTableCount ();
255- if (mayMemTablePinnedCountReachDangerousThreshold ) {
255+ if (mayMemTablePinnedCountReachDangerousThreshold && event . mayExtractorUseTablets ( this ) ) {
256256 LOGGER .info (
257257 "Pipe task {}@{} canNotUseTabletAnyMore3: The number of pinned memtables {} has reached the dangerous threshold {}" ,
258258 pipeName ,
@@ -263,14 +263,14 @@ private boolean mayMemTablePinnedCountReachDangerousThreshold() {
263263 return mayMemTablePinnedCountReachDangerousThreshold ;
264264 }
265265
266- private boolean isHistoricalTsFileEventCountExceededLimit () {
266+ private boolean isHistoricalTsFileEventCountExceededLimit (final PipeRealtimeEvent event ) {
267267 final IoTDBDataRegionExtractor extractor =
268268 PipeDataRegionExtractorMetrics .getInstance ().getExtractorMap ().get (getTaskID ());
269269 final boolean isHistoricalTsFileEventCountExceededLimit =
270270 Objects .nonNull (extractor )
271271 && extractor .getHistoricalTsFileInsertionEventCount ()
272272 >= PipeConfig .getInstance ().getPipeMaxAllowedHistoricalTsFilePerDataRegion ();
273- if (isHistoricalTsFileEventCountExceededLimit ) {
273+ if (isHistoricalTsFileEventCountExceededLimit && event . mayExtractorUseTablets ( this ) ) {
274274 LOGGER .info (
275275 "Pipe task {}@{} canNotUseTabletAnyMore4: The number of historical tsFile events {} has exceeded the limit {}" ,
276276 pipeName ,
@@ -281,11 +281,11 @@ private boolean isHistoricalTsFileEventCountExceededLimit() {
281281 return isHistoricalTsFileEventCountExceededLimit ;
282282 }
283283
284- private boolean isRealtimeTsFileEventCountExceededLimit () {
284+ private boolean isRealtimeTsFileEventCountExceededLimit (final PipeRealtimeEvent event ) {
285285 final boolean isRealtimeTsFileEventCountExceededLimit =
286286 pendingQueue .getTsFileInsertionEventCount ()
287287 >= PipeConfig .getInstance ().getPipeMaxAllowedPendingTsFileEpochPerDataRegion ();
288- if (isRealtimeTsFileEventCountExceededLimit ) {
288+ if (isRealtimeTsFileEventCountExceededLimit && event . mayExtractorUseTablets ( this ) ) {
289289 LOGGER .info (
290290 "Pipe task {}@{} canNotUseTabletAnyMore5: The number of realtime tsFile events {} has exceeded the limit {}" ,
291291 pipeName ,
@@ -296,11 +296,11 @@ private boolean isRealtimeTsFileEventCountExceededLimit() {
296296 return isRealtimeTsFileEventCountExceededLimit ;
297297 }
298298
299- private boolean mayTsFileLinkedCountReachDangerousThreshold () {
299+ private boolean mayTsFileLinkedCountReachDangerousThreshold (final PipeRealtimeEvent event ) {
300300 final boolean mayTsFileLinkedCountReachDangerousThreshold =
301301 PipeDataNodeResourceManager .tsfile ().getLinkedTsfileCount ()
302302 >= PipeConfig .getInstance ().getPipeMaxAllowedLinkedTsFileCount ();
303- if (mayTsFileLinkedCountReachDangerousThreshold ) {
303+ if (mayTsFileLinkedCountReachDangerousThreshold && event . mayExtractorUseTablets ( this ) ) {
304304 LOGGER .info (
305305 "Pipe task {}@{} canNotUseTabletAnyMore6: The number of linked tsfiles {} has reached the dangerous threshold {}" ,
306306 pipeName ,
@@ -311,15 +311,15 @@ private boolean mayTsFileLinkedCountReachDangerousThreshold() {
311311 return mayTsFileLinkedCountReachDangerousThreshold ;
312312 }
313313
314- private boolean mayInsertNodeMemoryReachDangerousThreshold () {
314+ private boolean mayInsertNodeMemoryReachDangerousThreshold (final PipeRealtimeEvent event ) {
315315 final long floatingMemoryUsageInByte =
316316 PipeDataNodeAgent .task ().getFloatingMemoryUsageInByte (pipeName );
317317 final long pipeCount = PipeDataNodeAgent .task ().getPipeCount ();
318318 final long freeMemorySizeInBytes =
319319 PipeDataNodeResourceManager .memory ().getFreeMemorySizeInBytes ();
320320 final boolean mayInsertNodeMemoryReachDangerousThreshold =
321321 3 * floatingMemoryUsageInByte * pipeCount >= 2 * freeMemorySizeInBytes ;
322- if (mayInsertNodeMemoryReachDangerousThreshold ) {
322+ if (mayInsertNodeMemoryReachDangerousThreshold && event . mayExtractorUseTablets ( this ) ) {
323323 LOGGER .info (
324324 "Pipe task {}@{} canNotUseTabletAnyMore7: The shallow memory usage of the insert node {} has reached the dangerous threshold {}" ,
325325 pipeName ,
@@ -379,7 +379,7 @@ private Event supplyTabletInsertion(final PipeRealtimeEvent event) {
379379 return state ;
380380 }
381381
382- return canNotUseTabletAnyMore ()
382+ return canNotUseTabletAnyMore (event )
383383 ? TsFileEpoch .State .USING_TSFILE
384384 : TsFileEpoch .State .USING_TABLET ;
385385 });
0 commit comments