2020package org .apache .iotdb .db .pipe .extractor .dataregion .realtime ;
2121
2222import org .apache .iotdb .commons .exception .pipe .PipeRuntimeNonCriticalException ;
23- import org .apache .iotdb .commons .pipe .config .PipeConfig ;
2423import org .apache .iotdb .commons .pipe .event .ProgressReportEvent ;
2524import org .apache .iotdb .db .pipe .agent .PipeDataNodeAgent ;
2625import org .apache .iotdb .db .pipe .event .common .deletion .PipeDeleteDataNodeEvent ;
2726import org .apache .iotdb .db .pipe .event .common .heartbeat .PipeHeartbeatEvent ;
2827import org .apache .iotdb .db .pipe .event .common .tsfile .PipeTsFileInsertionEvent ;
2928import org .apache .iotdb .db .pipe .event .realtime .PipeRealtimeEvent ;
30- import org .apache .iotdb .db .pipe .extractor .dataregion .IoTDBDataRegionExtractor ;
3129import org .apache .iotdb .db .pipe .extractor .dataregion .realtime .assigner .PipeTsFileEpochProgressIndexKeeper ;
3230import org .apache .iotdb .db .pipe .extractor .dataregion .realtime .epoch .TsFileEpoch ;
3331import org .apache .iotdb .db .pipe .metric .overview .PipeDataNodeRemainingEventAndTimeOperator ;
3432import org .apache .iotdb .db .pipe .metric .overview .PipeDataNodeSinglePipeMetrics ;
35- import org .apache .iotdb .db .pipe .metric .source .PipeDataRegionExtractorMetrics ;
3633import org .apache .iotdb .db .pipe .resource .PipeDataNodeResourceManager ;
3734import org .apache .iotdb .pipe .api .event .Event ;
3835import org .apache .iotdb .pipe .api .event .dml .insertion .TabletInsertionEvent ;
4138import org .slf4j .Logger ;
4239import org .slf4j .LoggerFactory ;
4340
44- import java .util .Objects ;
4541import java .util .Optional ;
4642
4743public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegionExtractor {
@@ -82,7 +78,7 @@ public boolean isNeedListenToInsertNode() {
8278 private void extractTabletInsertion (final PipeRealtimeEvent event ) {
8379 TsFileEpoch .State state ;
8480
85- if (canNotUseTabletAnyMore (event )) {
81+ if (canNotUseTabletAnymore (event )) {
8682 event .getTsFileEpoch ().migrateState (this , curState -> TsFileEpoch .State .USING_TSFILE );
8783 PipeTsFileEpochProgressIndexKeeper .getInstance ()
8884 .registerProgressIndex (dataRegionId , pipeName , event .getTsFileEpoch ().getResource ());
@@ -162,7 +158,7 @@ private void extractTsFileInsertion(final PipeRealtimeEvent event) {
162158 return TsFileEpoch .State .USING_TSFILE ;
163159 case USING_BOTH :
164160 default :
165- return canNotUseTabletAnyMore (event )
161+ return canNotUseTabletAnymore (event )
166162 ? TsFileEpoch .State .USING_TSFILE
167163 : TsFileEpoch .State .USING_BOTH ;
168164 }
@@ -171,9 +167,10 @@ private void extractTsFileInsertion(final PipeRealtimeEvent event) {
171167 final TsFileEpoch .State state = event .getTsFileEpoch ().getState (this );
172168 switch (state ) {
173169 case USING_TABLET :
174- // Though the data in tsfile event has been extracted in tablet mode, we still need to
175- // extract the tsfile event to help to determine isTsFileEventCountInQueueExceededLimit().
176- // The extracted tsfile event will be discarded in supplyTsFileInsertion.
170+ // If the state is USING_TABLET, discard the event
171+ PipeTsFileEpochProgressIndexKeeper .getInstance ()
172+ .eliminateProgressIndex (dataRegionId , pipeName , event .getTsFileEpoch ().getFilePath ());
173+ return ;
177174 case EMPTY :
178175 case USING_TSFILE :
179176 case USING_BOTH :
@@ -202,17 +199,9 @@ private void extractTsFileInsertion(final PipeRealtimeEvent event) {
202199 }
203200 }
204201
205- private boolean canNotUseTabletAnyMore (final PipeRealtimeEvent event ) {
206- // In the following 4 cases, we should not extract this tablet event. all the data
207- // represented by the tablet event should be carried by the following tsfile event:
208- // the write operation will be throttled, so we should not extract any more tablet events.
209- // 1. The shallow memory usage of the insert node has reached the dangerous threshold.
210- // 2. Deprecated logics (unused by default)
211- return mayInsertNodeMemoryReachDangerousThreshold (event )
212- || canNotUseTabletAnymoreDeprecated (event );
213- }
214-
215- private boolean mayInsertNodeMemoryReachDangerousThreshold (final PipeRealtimeEvent event ) {
202+ // If the insertNode's memory has reached the dangerous threshold, we should not extract any
203+ // tablets.
204+ private boolean canNotUseTabletAnymore (final PipeRealtimeEvent event ) {
216205 final long floatingMemoryUsageInByte =
217206 PipeDataNodeAgent .task ().getFloatingMemoryUsageInByte (pipeName );
218207 final long pipeCount = PipeDataNodeAgent .task ().getPipeCount ();
@@ -224,7 +213,7 @@ private boolean mayInsertNodeMemoryReachDangerousThreshold(final PipeRealtimeEve
224213 final PipeDataNodeRemainingEventAndTimeOperator operator =
225214 PipeDataNodeSinglePipeMetrics .getInstance ().remainingEventAndTimeOperatorMap .get (pipeID );
226215 LOGGER .info (
227- "Pipe task {}@{} canNotUseTabletAnyMore(1) for tsFile {}: The memory usage of the insert node {} has reached the dangerous threshold of single pipe {}, event count: {}" ,
216+ "Pipe task {}@{} canNotUseTabletAnyMore for tsFile {}: The memory usage of the insert node {} has reached the dangerous threshold of single pipe {}, event count: {}" ,
228217 pipeName ,
229218 dataRegionId ,
230219 event .getTsFileEpoch ().getFilePath (),
@@ -237,83 +226,6 @@ private boolean mayInsertNodeMemoryReachDangerousThreshold(final PipeRealtimeEve
237226 return mayInsertNodeMemoryReachDangerousThreshold ;
238227 }
239228
240- /**
241- * These judgements are deprecated, and are only reserved for manual operation and compatibility.
242- */
243- @ Deprecated
244- private boolean canNotUseTabletAnymoreDeprecated (final PipeRealtimeEvent event ) {
245- // In the following 5 cases, we should not extract any more tablet events. all the data
246- // represented by the tablet events should be carried by the following tsfile event:
247- // 1. The number of historical tsFile events to transfer has exceeded the limit.
248- // 2. The number of realtime tsfile events to transfer has exceeded the limit.
249- // 3. The number of linked tsFiles has reached the dangerous threshold.
250- return isHistoricalTsFileEventCountExceededLimit (event )
251- || isRealtimeTsFileEventCountExceededLimit (event )
252- || mayTsFileLinkedCountReachDangerousThreshold (event );
253- }
254-
255- private boolean isHistoricalTsFileEventCountExceededLimit (final PipeRealtimeEvent event ) {
256- if (PipeConfig .getInstance ().getPipeMaxAllowedHistoricalTsFilePerDataRegion ()
257- == Integer .MAX_VALUE ) {
258- return false ;
259- }
260- final IoTDBDataRegionExtractor extractor =
261- PipeDataRegionExtractorMetrics .getInstance ().getExtractorMap ().get (getTaskID ());
262- final boolean isHistoricalTsFileEventCountExceededLimit =
263- Objects .nonNull (extractor )
264- && extractor .getHistoricalTsFileInsertionEventCount ()
265- >= PipeConfig .getInstance ().getPipeMaxAllowedHistoricalTsFilePerDataRegion ();
266- if (isHistoricalTsFileEventCountExceededLimit && event .mayExtractorUseTablets (this )) {
267- LOGGER .info (
268- "Pipe task {}@{} canNotUseTabletAnymoreDeprecated(1) for tsFile {}: The number of historical tsFile events {} has exceeded the limit {}" ,
269- pipeName ,
270- dataRegionId ,
271- event .getTsFileEpoch ().getFilePath (),
272- extractor .getHistoricalTsFileInsertionEventCount (),
273- PipeConfig .getInstance ().getPipeMaxAllowedHistoricalTsFilePerDataRegion ());
274- }
275- return isHistoricalTsFileEventCountExceededLimit ;
276- }
277-
278- private boolean isRealtimeTsFileEventCountExceededLimit (final PipeRealtimeEvent event ) {
279- if (PipeConfig .getInstance ().getPipeMaxAllowedPendingTsFileEpochPerDataRegion ()
280- == Integer .MAX_VALUE ) {
281- return false ;
282- }
283- final boolean isRealtimeTsFileEventCountExceededLimit =
284- pendingQueue .getTsFileInsertionEventCount ()
285- >= PipeConfig .getInstance ().getPipeMaxAllowedPendingTsFileEpochPerDataRegion ();
286- if (isRealtimeTsFileEventCountExceededLimit && event .mayExtractorUseTablets (this )) {
287- LOGGER .info (
288- "Pipe task {}@{} canNotUseTabletAnymoreDeprecated(2) for tsFile {}: The number of realtime tsFile events {} has exceeded the limit {}" ,
289- pipeName ,
290- dataRegionId ,
291- event .getTsFileEpoch ().getFilePath (),
292- pendingQueue .getTsFileInsertionEventCount (),
293- PipeConfig .getInstance ().getPipeMaxAllowedPendingTsFileEpochPerDataRegion ());
294- }
295- return isRealtimeTsFileEventCountExceededLimit ;
296- }
297-
298- private boolean mayTsFileLinkedCountReachDangerousThreshold (final PipeRealtimeEvent event ) {
299- if (PipeConfig .getInstance ().getPipeMaxAllowedLinkedTsFileCount () == Long .MAX_VALUE ) {
300- return false ;
301- }
302- final boolean mayTsFileLinkedCountReachDangerousThreshold =
303- PipeDataNodeResourceManager .tsfile ().getLinkedTsFileCount (pipeName )
304- >= PipeConfig .getInstance ().getPipeMaxAllowedLinkedTsFileCount ();
305- if (mayTsFileLinkedCountReachDangerousThreshold && event .mayExtractorUseTablets (this )) {
306- LOGGER .info (
307- "Pipe task {}@{} canNotUseTabletAnymoreDeprecated(3) for tsFile {}: The number of linked tsFiles {} has reached the dangerous threshold {}" ,
308- pipeName ,
309- dataRegionId ,
310- event .getTsFileEpoch ().getFilePath (),
311- PipeDataNodeResourceManager .tsfile ().getLinkedTsFileCount (pipeName ),
312- PipeConfig .getInstance ().getPipeMaxAllowedLinkedTsFileCount ());
313- }
314- return mayTsFileLinkedCountReachDangerousThreshold ;
315- }
316-
317229 @ Override
318230 public Event supply () {
319231 PipeRealtimeEvent realtimeEvent = (PipeRealtimeEvent ) pendingQueue .directPoll ();
@@ -355,103 +267,40 @@ public Event supply() {
355267 }
356268
357269 private Event supplyTabletInsertion (final PipeRealtimeEvent event ) {
358- event
359- .getTsFileEpoch ()
360- .migrateState (
361- this ,
362- state -> {
363- switch (state ) {
364- case EMPTY :
365- return canNotUseTabletAnyMore (event )
366- ? TsFileEpoch .State .USING_TSFILE
367- : TsFileEpoch .State .USING_TABLET ;
368- case USING_TSFILE :
369- return canNotUseTabletAnyMore (event )
370- ? TsFileEpoch .State .USING_TSFILE
371- : TsFileEpoch .State .USING_BOTH ;
372- case USING_TABLET :
373- case USING_BOTH :
374- default :
375- return state ;
376- }
377- });
378-
379- final TsFileEpoch .State state = event .getTsFileEpoch ().getState (this );
380- if (state == TsFileEpoch .State .USING_TSFILE ) {
381- PipeTsFileEpochProgressIndexKeeper .getInstance ()
382- .registerProgressIndex (dataRegionId , pipeName , event .getTsFileEpoch ().getResource ());
383- }
384-
385- switch (state ) {
386- case USING_TSFILE :
387- // If the state is USING_TSFILE, discard the event and poll the next one.
388- return null ;
389- case EMPTY :
390- case USING_TABLET :
391- case USING_BOTH :
392- default :
393- if (event .increaseReferenceCount (PipeRealtimeDataRegionHybridExtractor .class .getName ())) {
394- return event .getEvent ();
395- } else {
396- // If the event's reference count can not be increased, it means the data represented by
397- // this event is not reliable anymore. but the data represented by this event
398- // has been carried by the following tsfile event, so we can just discard this event.
399- event .getTsFileEpoch ().migrateState (this , s -> TsFileEpoch .State .USING_BOTH );
400- LOGGER .warn (
401- "Discard tablet event {} because it is not reliable anymore. "
402- + "Change the state of TsFileEpoch to USING_TSFILE." ,
403- event );
404- return null ;
405- }
270+ if (event .increaseReferenceCount (PipeRealtimeDataRegionHybridExtractor .class .getName ())) {
271+ return event .getEvent ();
272+ } else {
273+ // If the event's reference count can not be increased, it means the data represented by
274+ // this event is not reliable anymore. but the data represented by this event
275+ // has been carried by the following tsfile event, so we can just discard this event.
276+ event .getTsFileEpoch ().migrateState (this , s -> TsFileEpoch .State .USING_BOTH );
277+ LOGGER .warn (
278+ "Discard tablet event {} because it is not reliable anymore. "
279+ + "Change the state of TsFileEpoch to USING_BOTH." ,
280+ event );
281+ return null ;
406282 }
407283 }
408284
409285 private Event supplyTsFileInsertion (final PipeRealtimeEvent event ) {
410- event
411- .getTsFileEpoch ()
412- .migrateState (
413- this ,
414- state -> {
415- // This would not happen, but just in case.
416- if (state .equals (TsFileEpoch .State .EMPTY )) {
417- LOGGER .error (
418- String .format ("EMPTY TsFileEpoch when supplying TsFile Event %s" , event ));
419- return TsFileEpoch .State .USING_TSFILE ;
420- }
421- return state ;
422- });
423-
424- final TsFileEpoch .State state = event .getTsFileEpoch ().getState (this );
425- switch (state ) {
426- case USING_TABLET :
427- // If the state is USING_TABLET, discard the event and poll the next one.
428- PipeTsFileEpochProgressIndexKeeper .getInstance ()
429- .eliminateProgressIndex (dataRegionId , pipeName , event .getTsFileEpoch ().getFilePath ());
430- return null ;
431- case EMPTY :
432- case USING_TSFILE :
433- case USING_BOTH :
434- default :
435- if (event .increaseReferenceCount (PipeRealtimeDataRegionHybridExtractor .class .getName ())) {
436- return event .getEvent ();
437- } else {
438- // If the event's reference count can not be increased, it means the data represented by
439- // this event is not reliable anymore. the data has been lost. we simply discard this
440- // event
441- // and report the exception to PipeRuntimeAgent.
442- final String errorMessage =
443- String .format (
444- "TsFile Event %s can not be supplied because "
445- + "the reference count can not be increased, "
446- + "the data represented by this event is lost" ,
447- event .getEvent ());
448- LOGGER .error (errorMessage );
449- PipeDataNodeAgent .runtime ()
450- .report (pipeTaskMeta , new PipeRuntimeNonCriticalException (errorMessage ));
451- PipeTsFileEpochProgressIndexKeeper .getInstance ()
452- .eliminateProgressIndex (dataRegionId , pipeName , event .getTsFileEpoch ().getFilePath ());
453- return null ;
454- }
286+ if (event .increaseReferenceCount (PipeRealtimeDataRegionHybridExtractor .class .getName ())) {
287+ return event .getEvent ();
288+ } else {
289+ // If the event's reference count can not be increased, it means the data represented by
290+ // this event is not reliable anymore. the data has been lost. we simply discard this
291+ // event and report the exception to PipeRuntimeAgent.
292+ final String errorMessage =
293+ String .format (
294+ "TsFile Event %s can not be supplied because "
295+ + "the reference count can not be increased, "
296+ + "the data represented by this event is lost" ,
297+ event .getEvent ());
298+ LOGGER .error (errorMessage );
299+ PipeDataNodeAgent .runtime ()
300+ .report (pipeTaskMeta , new PipeRuntimeNonCriticalException (errorMessage ));
301+ PipeTsFileEpochProgressIndexKeeper .getInstance ()
302+ .eliminateProgressIndex (dataRegionId , pipeName , event .getTsFileEpoch ().getFilePath ());
303+ return null ;
455304 }
456305 }
457306}
0 commit comments