@@ -287,32 +287,6 @@ public void validate(final PipeParameterValidator validator) throws Exception {
287287 Arrays .asList (EXTRACTOR_REALTIME_ENABLE_KEY , SOURCE_REALTIME_ENABLE_KEY ),
288288 EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE ));
289289
290- // Validate extractor.realtime.mode
291- if (validator
292- .getParameters ()
293- .getBooleanOrDefault (
294- Arrays .asList (EXTRACTOR_REALTIME_ENABLE_KEY , SOURCE_REALTIME_ENABLE_KEY ),
295- EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE )
296- || validator
297- .getParameters ()
298- .hasAnyAttributes (
299- SOURCE_START_TIME_KEY ,
300- EXTRACTOR_START_TIME_KEY ,
301- SOURCE_END_TIME_KEY ,
302- EXTRACTOR_END_TIME_KEY )) {
303- validator .validateAttributeValueRange (
304- validator .getParameters ().hasAttribute (EXTRACTOR_REALTIME_MODE_KEY )
305- ? EXTRACTOR_REALTIME_MODE_KEY
306- : SOURCE_REALTIME_MODE_KEY ,
307- true ,
308- EXTRACTOR_REALTIME_MODE_FILE_VALUE ,
309- EXTRACTOR_REALTIME_MODE_HYBRID_VALUE ,
310- EXTRACTOR_REALTIME_MODE_LOG_VALUE ,
311- EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE ,
312- EXTRACTOR_REALTIME_MODE_STREAM_MODE_VALUE ,
313- EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE );
314- }
315-
316290 checkInvalidParameters (validator );
317291
318292 constructHistoricalExtractor ();
@@ -481,7 +455,9 @@ private void constructRealtimeExtractor(final PipeParameters parameters)
481455 return ;
482456 }
483457
484- if (pipeName == null || !pipeName .startsWith (PipeStaticMeta .CONSENSUS_PIPE_PREFIX )) {
458+ if (!(pipeName != null
459+ && (pipeName .startsWith (PipeStaticMeta .SUBSCRIPTION_PIPE_PREFIX )
460+ || pipeName .startsWith (PipeStaticMeta .CONSENSUS_PIPE_PREFIX )))) {
485461 realtimeExtractor = new PipeRealtimeDataRegionTsFileExtractor ();
486462 return ;
487463 }
0 commit comments