@@ -120,6 +120,10 @@ public void init(int numThreads) {
120120 recover ();
121121 }
122122
123+ /***
124+ * Filter out pipe procedures that do not need to re-acquire lock and re-execute when there are multiple locked pipe procedures during restore.
125+ * @return non pipe procedures and one pipe procedure with max lock seq id (if there is.)
126+ */
123127 private List <Procedure <Env >> filteredProcedureList (final List <Procedure <Env >> procedures ) {
124128 List <Procedure <Env >> nonPipeOrLockedProcedures =
125129 procedures .stream ()
@@ -128,23 +132,23 @@ private List<Procedure<Env>> filteredProcedureList(final List<Procedure<Env>> pr
128132
129133 List <AbstractOperatePipeProcedureV2 > lockedPipeProcedures =
130134 procedures .stream ()
131- .filter (AbstractOperatePipeProcedureV2 .class ::isInstance )
132- .filter (Procedure ::isLockedWhenLoading )
135+ .filter (p -> p instanceof AbstractOperatePipeProcedureV2 && p .isLockedWhenLoading ())
133136 .map (AbstractOperatePipeProcedureV2 .class ::cast )
134137 .collect (Collectors .toList ());
135138 Optional <Procedure <Env >> maxPipeProcedure =
136139 lockedPipeProcedures .stream ()
137140 .max (Comparator .comparingLong (AbstractOperatePipeProcedureV2 ::getLockSeqId ))
138141 .map (p -> (Procedure <Env >) p );
142+
139143 if (lockedPipeProcedures .size () > 1 ) {
140144 LOG .warn (
141145 "[Procedure restore]Detected multiple locked pipe procedures in procedure executor {}, only keep last one {}" ,
142146 lockedPipeProcedures ,
143147 maxPipeProcedure .get ());
144148 }
145- List < Procedure < Env >> result = new ArrayList <>( nonPipeOrLockedProcedures );
146- maxPipeProcedure .ifPresent (result ::add );
147- return result ;
149+
150+ maxPipeProcedure .ifPresent (nonPipeOrLockedProcedures ::add );
151+ return nonPipeOrLockedProcedures ;
148152 }
149153
150154 private void recover () {
0 commit comments