4141import org .apache .iotdb .db .pipe .event .common .schema .PipeSchemaRegionWritePlanEvent ;
4242import org .apache .iotdb .db .pipe .metric .overview .PipeDataNodeSinglePipeMetrics ;
4343import org .apache .iotdb .db .pipe .metric .schema .PipeSchemaRegionSourceMetrics ;
44+ import org .apache .iotdb .db .pipe .receiver .visitor .PipeTreeStatementToBatchVisitor ;
4445import org .apache .iotdb .db .queryengine .plan .planner .plan .node .PlanNode ;
4546import org .apache .iotdb .db .queryengine .plan .planner .plan .node .PlanNodeId ;
4647import org .apache .iotdb .db .queryengine .plan .planner .plan .node .PlanNodeType ;
4748import org .apache .iotdb .db .queryengine .plan .planner .plan .node .metadata .write .AlterTimeSeriesNode ;
4849import org .apache .iotdb .db .queryengine .plan .planner .plan .node .pipe .PipeOperateSchemaQueueNode ;
4950import org .apache .iotdb .db .queryengine .plan .relational .sql .ast .Node ;
51+ import org .apache .iotdb .db .queryengine .plan .statement .Statement ;
52+ import org .apache .iotdb .db .queryengine .plan .statement .StatementNode ;
5053import org .apache .iotdb .db .schemaengine .SchemaEngine ;
5154import org .apache .iotdb .db .tools .schema .SRStatementGenerator ;
5255import org .apache .iotdb .db .tools .schema .SchemaRegionSnapshotParser ;
6366import java .nio .file .Paths ;
6467import java .util .Collections ;
6568import java .util .HashSet ;
69+ import java .util .Iterator ;
6670import java .util .Objects ;
6771import java .util .Optional ;
6872import java .util .Set ;
@@ -76,16 +80,22 @@ public class IoTDBSchemaRegionSource extends IoTDBNonDataRegionSource {
7680 new PipePlanTablePatternParseVisitor ();
7781 public static final PipePlanTablePrivilegeParseVisitor TABLE_PRIVILEGE_PARSE_VISITOR =
7882 new PipePlanTablePrivilegeParseVisitor ();
79- private static final PipeStatementToPlanVisitor STATEMENT_TO_PLAN_VISITOR =
80- new PipeStatementToPlanVisitor ();
83+ private static final PipeTableStatementToPlanVisitor TABLE_STATEMENT_TO_PLAN_VISITOR =
84+ new PipeTableStatementToPlanVisitor ();
85+ private static final PipeTreeStatementToPlanVisitor TREE_STATEMENT_TO_PLAN_VISITOR =
86+ new PipeTreeStatementToPlanVisitor ();
87+ private final PipeTreeStatementToBatchVisitor batchVisitor =
88+ new PipeTreeStatementToBatchVisitor ();
8189
8290 // Local for exception
8391 private PipePlanTreePrivilegeParseVisitor treePrivilegeParseVisitor ;
8492 private SchemaRegionId schemaRegionId ;
8593
8694 private Set <PlanNodeType > listenedTypeSet = new HashSet <>();
8795 private String database ;
96+ private boolean isTableModel ;
8897 private SRStatementGenerator generator ;
98+ private Iterator <Statement > remainBatches ;
8999
90100 @ Override
91101 public void customize (
@@ -132,6 +142,7 @@ public void start() throws Exception {
132142 }
133143
134144 database = SchemaEngine .getInstance ().getSchemaRegion (schemaRegionId ).getDatabaseFullPath ();
145+ isTableModel = PathUtils .isTableModelDatabase (database );
135146 super .start ();
136147 }
137148
@@ -170,7 +181,7 @@ protected long getMaxBlockingTimeMs() {
170181 @ Override
171182 protected boolean canSkipSnapshotPrivilegeCheck (final PipeSnapshotEvent event ) {
172183 try {
173- if (PathUtils . isTableModelDatabase ( database ) ) {
184+ if (isTableModel ) {
174185 AuthorityChecker .getAccessControl ()
175186 .checkCanSelectFromDatabase4Pipe (userName , database , userEntity );
176187 return true ;
@@ -209,14 +220,40 @@ protected void initSnapshotGenerator(final PipeSnapshotEvent event)
209220
210221 @ Override
211222 protected boolean hasNextEventInCurrentSnapshot () {
212- return Objects .nonNull (generator ) && generator .hasNext ();
223+ return Objects .nonNull (generator ) && generator .hasNext ()
224+ || Objects .nonNull (remainBatches ) && remainBatches .hasNext ();
213225 }
214226
215227 @ Override
216228 protected PipeWritePlanEvent getNextEventInCurrentSnapshot () {
217- // Currently only support table model event
229+ if (isTableModel ) {
230+ return new PipeSchemaRegionWritePlanEvent (
231+ TABLE_STATEMENT_TO_PLAN_VISITOR .process ((Node ) generator .next ()), false );
232+ }
233+ while (generator .hasNext ()) {
234+ final Optional <Statement > statement =
235+ batchVisitor .process ((StatementNode ) generator .next (), null );
236+ if (statement .isPresent ()) {
237+ if (!generator .hasNext ()) {
238+ remainBatches =
239+ batchVisitor .getRemainBatches ().stream ()
240+ .filter (Optional ::isPresent )
241+ .map (Optional ::get )
242+ .iterator ();
243+ }
244+ return new PipeSchemaRegionWritePlanEvent (
245+ TREE_STATEMENT_TO_PLAN_VISITOR .process (statement .get (), null ), false );
246+ }
247+ }
248+ if (Objects .isNull (remainBatches )) {
249+ remainBatches =
250+ batchVisitor .getRemainBatches ().stream ()
251+ .filter (Optional ::isPresent )
252+ .map (Optional ::get )
253+ .iterator ();
254+ }
218255 return new PipeSchemaRegionWritePlanEvent (
219- STATEMENT_TO_PLAN_VISITOR .process (( Node ) generator .next ()), false );
256+ TREE_STATEMENT_TO_PLAN_VISITOR .process (remainBatches .next (), null ), false );
220257 }
221258
222259 @ Override
0 commit comments